base

package
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package base defines foundational types and constants used in asynq package.

Index

Constants

View Source
const (
	AllServers    = "asynq:servers"    // ZSET
	AllWorkers    = "asynq:workers"    // ZSET
	AllSchedulers = "asynq:schedulers" // ZSET
	AllQueues     = "asynq:queues"     // SET
	CancelChannel = "asynq:cancel"     // PubSub channel
)

Global Redis keys.

View Source
const DefaultQueueName = "default"

DefaultQueueName is the queue name used if none are specified by user.

View Source
const Version = "0.23.0"

Version of asynq library and CLI.

Variables

DefaultQueue is the redis key for the default queue.

Functions

func ActiveKey added in v0.12.0

func ActiveKey(qname string) string

ActiveKey returns a redis key for the active tasks.

func AggregationSetKey added in v0.23.0

func AggregationSetKey(qname, gname, setID string) string

AggregationSetKey returns a redis key used for an aggregation set.

func AllAggregationSets added in v0.23.0

func AllAggregationSets(qname string) string

AllAggregationSets returns a redis key used to store all aggregation sets (set of tasks staged to be aggregated) in a given queue.

func AllGroups added in v0.23.0

func AllGroups(qname string) string

AllGroups return a redis key used to store all group keys used in a given queue.

func ArchivedKey added in v0.14.0

func ArchivedKey(qname string) string

ArchivedKey returns a redis key for the archived tasks.

func CompletedKey added in v0.19.0

func CompletedKey(qname string) string

func EncodeMessage added in v0.9.3

func EncodeMessage(msg *TaskMessage) ([]byte, error)

EncodeMessage marshals the given task message and returns an encoded bytes.

func EncodeSchedulerEnqueueEvent added in v0.18.0

func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error)

EncodeSchedulerEnqueueEvent marshals the given event and returns an encoded bytes.

func EncodeSchedulerEntry added in v0.18.0

func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error)

EncodeSchedulerEntry marshals the given entry and returns an encoded bytes.

func EncodeServerInfo added in v0.18.0

func EncodeServerInfo(info *ServerInfo) ([]byte, error)

EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes.

func EncodeWorkerInfo added in v0.18.0

func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error)

EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes.

func FailedKey added in v0.12.0

func FailedKey(qname string, t time.Time) string

FailedKey returns a redis key for failure count for the given day for the queue.

func FailedTotalKey added in v0.20.0

func FailedTotalKey(qname string) string

FailedTotalKey returns a redis key for total failure count for the given queue.

func GroupKey added in v0.23.0

func GroupKey(qname, gkey string) string

GroupKey returns a redis key used to group tasks belong in the same group.

func GroupKeyPrefix added in v0.23.0

func GroupKeyPrefix(qname string) string

GroupKeyPrefix returns a prefix for group key.

func LeaseKey added in v0.22.0

func LeaseKey(qname string) string

LeaseKey returns a redis key for the lease.

func PausedKey added in v0.12.0

func PausedKey(qname string) string

PausedKey returns a redis key to indicate that the given queue is paused.

func PendingKey added in v0.18.0

func PendingKey(qname string) string

PendingKey returns a redis key for the given queue name.

func ProcessedKey

func ProcessedKey(qname string, t time.Time) string

ProcessedKey returns a redis key for processed count for the given day for the queue.

func ProcessedTotalKey added in v0.20.0

func ProcessedTotalKey(qname string) string

ProcessedTotalKey returns a redis key for total processed count for the given queue.

func QueueKeyPrefix added in v0.18.0

func QueueKeyPrefix(qname string) string

QueueKeyPrefix returns a prefix for all keys in the given queue.

func RetryKey added in v0.12.0

func RetryKey(qname string) string

RetryKey returns a redis key for the retry tasks.

func ScheduledKey added in v0.12.0

func ScheduledKey(qname string) string

ScheduledKey returns a redis key for the scheduled tasks.

func SchedulerEntriesKey added in v0.13.0

func SchedulerEntriesKey(schedulerID string) string

SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.

func SchedulerHistoryKey added in v0.13.0

func SchedulerHistoryKey(entryID string) string

SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.

func ServerInfoKey added in v0.8.0

func ServerInfoKey(hostname string, pid int, serverID string) string

ServerInfoKey returns a redis key for process info.

func TaskKey added in v0.18.0

func TaskKey(qname, id string) string

TaskKey returns a redis key for the given task message.

func TaskKeyPrefix added in v0.18.0

func TaskKeyPrefix(qname string) string

TaskKeyPrefix returns a prefix for task key.

func UniqueKey added in v0.12.0

func UniqueKey(qname, tasktype string, payload []byte) string

UniqueKey returns a redis key with the given type, payload, and queue name.

func ValidateQueueName added in v0.15.0

func ValidateQueueName(qname string) error

ValidateQueueName validates a given qname to be used as a queue name. Returns nil if valid, otherwise returns non-nil error.

func WorkersKey added in v0.5.0

func WorkersKey(hostname string, pid int, serverID string) string

WorkersKey returns a redis key for the workers given hostname, pid, and server ID.

Types

type Broker added in v0.8.0

type Broker interface {
	Ping() error
	Close() error
	Enqueue(ctx context.Context, msg *TaskMessage) error
	EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
	Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
	Done(ctx context.Context, msg *TaskMessage) error
	MarkAsComplete(ctx context.Context, msg *TaskMessage) error
	Requeue(ctx context.Context, msg *TaskMessage) error
	Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error
	ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error
	Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
	Archive(ctx context.Context, msg *TaskMessage, errMsg string) error
	ForwardIfReady(qnames ...string) error

	// Group aggregation related methods
	AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error
	AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error
	ListGroups(qname string) ([]string, error)
	AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error)
	ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error)
	DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error
	ReclaimStaleAggregationSets(qname string) error

	// Task retention related method
	DeleteExpiredCompletedTasks(qname string) error

	// Lease related methods
	ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
	ExtendLease(qname string, ids ...string) (time.Time, error)

	// State snapshot related methods
	WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
	ClearServerState(host string, pid int, serverID string) error

	// Cancelation related methods
	CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
	PublishCancelation(id string) error

	WriteResult(qname, id string, data []byte) (n int, err error)
}

Broker is a message broker that supports operations to manage task queues.

See rdb.RDB as a reference implementation.

type Cancelations added in v0.4.0

type Cancelations struct {
	// contains filtered or unexported fields
}

Cancelations is a collection that holds cancel functions for all active tasks.

Cancelations are safe for concurrent use by multipel goroutines.

func NewCancelations added in v0.4.0

func NewCancelations() *Cancelations

NewCancelations returns a Cancelations instance.

func (*Cancelations) Add added in v0.4.0

func (c *Cancelations) Add(id string, fn context.CancelFunc)

Add adds a new cancel func to the collection.

func (*Cancelations) Delete added in v0.4.0

func (c *Cancelations) Delete(id string)

Delete deletes a cancel func from the collection given an id.

func (*Cancelations) Get added in v0.4.0

func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)

Get returns a cancel func given an id.

type Lease added in v0.22.0

type Lease struct {
	Clock timeutil.Clock
	// contains filtered or unexported fields
}

Lease is a time bound lease for worker to process task. It provides a communication channel between lessor and lessee about lease expiration.

func NewLease added in v0.22.0

func NewLease(expirationTime time.Time) *Lease

func (*Lease) Deadline added in v0.22.0

func (l *Lease) Deadline() time.Time

Deadline returns the expiration time of the lease.

func (*Lease) Done added in v0.22.0

func (l *Lease) Done() <-chan struct{}

Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration.

func (*Lease) IsValid added in v0.22.0

func (l *Lease) IsValid() bool

IsValid returns true if the lease's expieration time is in the future or equals to the current time, returns false otherwise.

func (*Lease) NotifyExpiration added in v0.22.0

func (l *Lease) NotifyExpiration() bool

Sends a notification to lessee about expired lease Returns true if notification was sent, returns false if the lease is still valid and notification was not sent.

func (*Lease) Reset added in v0.22.0

func (l *Lease) Reset(expirationTime time.Time) bool

Reset chanegs the lease to expire at the given time. It returns true if the lease is still valid and reset operation was successful, false if the lease had been expired.

type SchedulerEnqueueEvent added in v0.13.0

type SchedulerEnqueueEvent struct {
	// ID of the task that was enqueued.
	TaskID string

	// Time the task was enqueued.
	EnqueuedAt time.Time
}

SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.

func DecodeSchedulerEnqueueEvent added in v0.18.0

func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error)

DecodeSchedulerEnqueueEvent unmarshals the given bytes and returns a decoded SchedulerEnqueueEvent.

type SchedulerEntry added in v0.13.0

type SchedulerEntry struct {
	// Identifier of this entry.
	ID string

	// Spec describes the schedule of this entry.
	Spec string

	// Type is the task type of the periodic task.
	Type string

	// Payload is the payload of the periodic task.
	Payload []byte

	// Opts is the options for the periodic task.
	Opts []string

	// Next shows the next time the task will be enqueued.
	Next time.Time

	// Prev shows the last time the task was enqueued.
	// Zero time if task was never enqueued.
	Prev time.Time
}

SchedulerEntry holds information about a periodic task registered with a scheduler.

func DecodeSchedulerEntry added in v0.18.0

func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error)

DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry.

type ServerInfo added in v0.8.0

type ServerInfo struct {
	Host              string
	PID               int
	ServerID          string
	Concurrency       int
	Queues            map[string]int
	StrictPriority    bool
	Status            string
	Started           time.Time
	ActiveWorkerCount int
}

ServerInfo holds information about a running server.

func DecodeServerInfo added in v0.18.0

func DecodeServerInfo(b []byte) (*ServerInfo, error)

DecodeServerInfo decodes the given bytes into ServerInfo.

type TaskInfo added in v0.18.0

type TaskInfo struct {
	Message       *TaskMessage
	State         TaskState
	NextProcessAt time.Time
	Result        []byte
}

TaskInfo describes a task message and its metadata.

type TaskMessage

type TaskMessage struct {
	// Type indicates the kind of the task to be performed.
	Type string

	// Payload holds data needed to process the task.
	Payload []byte

	// ID is a unique identifier for each task.
	ID string

	// Queue is a name this message should be enqueued to.
	Queue string

	// Retry is the max number of retry for this task.
	Retry int

	// Retried is the number of times we've retried this task so far.
	Retried int

	// ErrorMsg holds the error message from the last failure.
	ErrorMsg string

	// Time of last failure in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Use zero to indicate no last failure
	LastFailedAt int64

	// Timeout specifies timeout in seconds.
	// If task processing doesn't complete within the timeout, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no timeout.
	Timeout int64

	// Deadline specifies the deadline for the task in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	// If task processing doesn't complete before the deadline, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no deadline.
	Deadline int64

	// UniqueKey holds the redis key used for uniqueness lock for this task.
	//
	// Empty string indicates that no uniqueness lock was used.
	UniqueKey string

	// GroupKey holds the group key used for task aggregation.
	//
	// Empty string indicates no aggregation is used for this task.
	GroupKey string

	// Retention specifies the number of seconds the task should be retained after completion.
	Retention int64

	// CompletedAt is the time the task was processed successfully in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Use zero to indicate no value.
	CompletedAt int64
}

TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.

func DecodeMessage added in v0.9.3

func DecodeMessage(data []byte) (*TaskMessage, error)

DecodeMessage unmarshals the given bytes and returns a decoded task message.

type TaskState added in v0.18.0

type TaskState int

TaskState denotes the state of a task.

const (
	TaskStateActive TaskState = iota + 1
	TaskStatePending
	TaskStateScheduled
	TaskStateRetry
	TaskStateArchived
	TaskStateCompleted
	TaskStateAggregating // describes a state where task is waiting in a group to be aggregated
)

func TaskStateFromString added in v0.18.0

func TaskStateFromString(s string) (TaskState, error)

func (TaskState) String added in v0.18.0

func (s TaskState) String() string

type WorkerInfo added in v0.5.0

type WorkerInfo struct {
	Host     string
	PID      int
	ServerID string
	ID       string
	Type     string
	Payload  []byte
	Queue    string
	Started  time.Time
	Deadline time.Time
}

WorkerInfo holds information about a running worker.

func DecodeWorkerInfo added in v0.18.0

func DecodeWorkerInfo(b []byte) (*WorkerInfo, error)

DecodeWorkerInfo decodes the given bytes into WorkerInfo.

type Z added in v0.11.0

type Z struct {
	Message *TaskMessage
	Score   int64
}

Z represents sorted set member.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL