Documentation
¶
Index ¶
Constants ¶
const ( AllServers = "redisq:servers" // ZSET AllWorkers = "redisq:workers" // ZSET AllSchedulers = "redisq:schedulers" // ZSET AllQueues = "redisq:queues" // SET CancelChannel = "redisq:cancel" // PubSub channel )
Global Redis keys.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { Ping() error Close() error Enqueue(ctx context.Context, msg *TaskMessage) error EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Done(ctx context.Context, msg *TaskMessage) error MarkAsComplete(ctx context.Context, msg *TaskMessage) error Requeue(ctx context.Context, msg *TaskMessage) error Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error Archive(ctx context.Context, msg *TaskMessage, errMsg string) error ForwardIfReady(qnames ...string) error // Group aggregation related methods AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error ListGroups(qname string) ([]string, error) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error) ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error ReclaimStaleAggregationSets(qname string) error // Task retention related method DeleteExpiredCompletedTasks(qname string) error // Lease related methods ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) ExtendLease(qname string, ids ...string) (time.Time, error) // State snapshot related methods WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error // Cancelation related methods CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers PublishCancelation(id string) error WriteResult(qname, id string, data []byte) (n int, err error) }
Broker is a message broker that supports operations to manage task queues.
See rdb.RDB as a reference implementation.
type Cancelations ¶
type Cancelations struct {
// contains filtered or unexported fields
}
Cancelations is a collection that holds cancel functions for all active tasks.
Cancelations are safe for concurrent use by multiple goroutines.
func NewCancelations ¶
func NewCancelations() *Cancelations
NewCancelations returns a Cancelations instance.
func (*Cancelations) Add ¶
func (c *Cancelations) Add(id string, fn context.CancelFunc)
Add adds a new cancel func to the collection.
func (*Cancelations) Delete ¶
func (c *Cancelations) Delete(id string)
Delete deletes a cancel func from the collection given an id.
func (*Cancelations) Get ¶
func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)
Get returns a cancel func given an id.
type ServerInfo ¶
type ServerInfo struct { Host string PID int ServerID string Concurrency int Queues map[string]int StrictPriority bool Status string Started time.Time ActiveWorkerCount int }
ServerInfo holds information about a running server.
type TaskMessage ¶
type TaskMessage struct { // Type indicates the kind of the task to be performed. Type string // Payload holds data needed to process the task. Payload []byte // ID is a unique identifier for each task. ID string // Queue is a name this message should be enqueued to. Queue string // Retry is the max number of retry for this task. Retry int // Retried is the number of times we've retried this task so far. Retried int // ErrorMsg holds the error message from the last failure. ErrorMsg string // Time of last failure in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // // Use zero to indicate no last failure LastFailedAt int64 // Timeout specifies timeout in seconds. // If task processing doesn't complete within the timeout, the task will be retried // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no timeout. Timeout int64 // Deadline specifies the deadline for the task in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // If task processing doesn't complete before the deadline, the task will be retried // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no deadline. Deadline int64 // UniqueKey holds the redis key used for uniqueness lock for this task. // // Empty string indicates that no uniqueness lock was used. UniqueKey string // GroupKey holds the group key used for task aggregation. // // Empty string indicates no aggregation is used for this task. GroupKey string // Retention specifies the number of seconds the task should be retained after completion. Retention int64 // CompletedAt is the time the task was processed successfully in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // // Use zero to indicate no value. CompletedAt int64 }
TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.