Documentation ¶
Overview ¶
Package rdb encapsulates the interactions with redis.
Index ¶
- Variables
- type DailyStats
- type ErrQueueNotEmpty
- type ErrQueueNotFound
- type Pagination
- type RDB
- func (r *RDB) AllQueues() ([]string, error)
- func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error
- func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error)
- func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error)
- func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error
- func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) CancelationPubSub() (*redis.PubSub, error)
- func (r *RDB) CheckAndEnqueue(qnames ...string) error
- func (r *RDB) ClearSchedulerEntries(scheduelrID string) error
- func (r *RDB) ClearSchedulerHistory(entryID string) error
- func (r *RDB) ClearServerState(host string, pid int, serverID string) error
- func (r *RDB) Close() error
- func (r *RDB) ClusterKeySlot(qname string) (int64, error)
- func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)
- func (r *RDB) CurrentStats(qname string) (*Stats, error)
- func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error
- func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error)
- func (r *RDB) Done(msg *base.TaskMessage) error
- func (r *RDB) Enqueue(msg *base.TaskMessage) error
- func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error
- func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)
- func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error)
- func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error)
- func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error)
- func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)
- func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)
- func (r *RDB) ListServers() ([]*base.ServerInfo, error)
- func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
- func (r *RDB) Pause(qname string) error
- func (r *RDB) Ping() error
- func (r *RDB) PublishCancelation(id string) error
- func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error
- func (r *RDB) RedisClusterInfo() (map[string]string, error)
- func (r *RDB) RedisInfo() (map[string]string, error)
- func (r *RDB) RemoveQueue(qname string, force bool) error
- func (r *RDB) Requeue(msg *base.TaskMessage) error
- func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
- func (r *RDB) RunAllArchivedTasks(qname string) (int64, error)
- func (r *RDB) RunAllRetryTasks(qname string) (int64, error)
- func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error
- func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
- func (r *RDB) Unpause(qname string) error
- func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error
- func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoProcessableTask indicates that there are no tasks ready to be processed. ErrNoProcessableTask = errors.New("no tasks are ready for processing") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. ErrDuplicateTask = errors.New("task already exists") )
Functions ¶
This section is empty.
Types ¶
type DailyStats ¶
type DailyStats struct { // Name of the queue (e.g. "default", "critical"). Queue string // Total number of tasks processed during the given day. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed during the given day. Failed int // Date this stats was taken. Time time.Time }
DailyStats holds aggregate data for a given day.
type ErrQueueNotEmpty ¶
type ErrQueueNotEmpty struct {
// contains filtered or unexported fields
}
ErrQueueNotEmpty indicates specified queue is not empty.
func (*ErrQueueNotEmpty) Error ¶
func (e *ErrQueueNotEmpty) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
// contains filtered or unexported fields
}
ErrQueueNotFound indicates specified queue does not exist.
func (*ErrQueueNotFound) Error ¶
func (e *ErrQueueNotFound) Error() string
type Pagination ¶
type Pagination struct { // Number of items in the page. Size int // Page number starting from zero. Page int }
Pagination specifies the page size and page number for the list operation.
type RDB ¶
type RDB struct {
// contains filtered or unexported fields
}
RDB is a client interface to query and mutate task queues.
func (*RDB) Archive ¶
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error
Archive sends the given task to archive, attaching the error message to the task. It also trims the archive by timestamp and set size.
func (*RDB) ArchiveAllPendingTasks ¶
ArchiveAllPendingTasks archives all pending tasks from the given queue and returns the number of tasks that were moved.
func (*RDB) ArchiveAllRetryTasks ¶
ArchiveAllRetryTasks archives all retry tasks from the given queue and returns the number of tasks that were moved.
func (*RDB) ArchiveAllScheduledTasks ¶
ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and returns the number of tasks that were moved.
func (*RDB) ArchivePendingTask ¶
ArchivePendingTask finds a pending task that matches the given id from the given queue and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
func (*RDB) ArchiveRetryTask ¶
ArchiveRetryTask finds a retry task that matches the given id and score from the given queue and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ArchiveScheduledTask ¶
ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) CancelationPubSub ¶
CancelationPubSub returns a pubsub for cancelation messages.
func (*RDB) CheckAndEnqueue ¶
CheckAndEnqueue checks for scheduled/retry tasks for the given queues and enqueues any tasks that are ready to be processed.
func (*RDB) ClearSchedulerEntries ¶
ClearSchedulerEntries deletes scheduler entries data from redis.
func (*RDB) ClearSchedulerHistory ¶
ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
func (*RDB) ClearServerState ¶
ClearServerState deletes server state data from redis.
func (*RDB) ClusterKeySlot ¶
ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (*RDB) ClusterNodes ¶
ClusterNodes returns a list of nodes the given queue belongs to.
func (*RDB) CurrentStats ¶
CurrentStats returns a current state of the queues.
func (*RDB) DeleteAllArchivedTasks ¶
DeleteAllArchivedTasks deletes all archived tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllPendingTasks ¶
DeleteAllPendingTasks deletes all pending tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteArchivedTask ¶
DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeletePendingTask ¶
DeletePendingTask deletes a pending tasks that matches the given id from the given queue. If a task that matches the id does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteRetryTask ¶
DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteScheduledTask ¶
DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) Dequeue ¶
Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and deadline. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.
func (*RDB) Done ¶
func (r *RDB) Done(msg *base.TaskMessage) error
Done removes the task from active queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.
func (*RDB) Enqueue ¶
func (r *RDB) Enqueue(msg *base.TaskMessage) error
Enqueue inserts the given task to the tail of the queue.
func (*RDB) EnqueueUnique ¶
EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) HistoricalStats ¶
func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)
HistoricalStats returns a list of stats from the last n days for the given queue.
func (*RDB) ListActive ¶
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error)
ListActive returns all tasks that are currently being processed for the given queue.
func (*RDB) ListArchived ¶
ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func (*RDB) ListDeadlineExceeded ¶
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error)
ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
func (*RDB) ListPending ¶
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error)
ListPending returns pending tasks that are ready to be processed.
func (*RDB) ListRetry ¶
ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.
func (*RDB) ListScheduled ¶
ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.
func (*RDB) ListSchedulerEnqueueEvents ¶
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)
ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
func (*RDB) ListSchedulerEntries ¶
func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)
ListSchedulerEntries returns the list of scheduler entries.
func (*RDB) ListServers ¶
func (r *RDB) ListServers() ([]*base.ServerInfo, error)
ListServers returns the list of server info.
func (*RDB) ListWorkers ¶
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
ListWorkers returns the list of worker stats.
func (*RDB) PublishCancelation ¶
PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.
func (*RDB) RecordSchedulerEnqueueEvent ¶
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error
RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (*RDB) RedisClusterInfo ¶
RedisClusterInfo returns a map of redis cluster info.
func (*RDB) RemoveQueue ¶
RemoveQueue removes the specified queue.
If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.
func (*RDB) Requeue ¶
func (r *RDB) Requeue(msg *base.TaskMessage) error
Requeue moves the task from active queue to the specified queue.
func (*RDB) Retry ¶
Retry moves the task from active to retry queue, incrementing retry count and assigning error message to the task message.
func (*RDB) RunAllArchivedTasks ¶
RunAllArchivedTasks enqueues all archived tasks from the given queue and returns the number of tasks enqueued.
func (*RDB) RunAllRetryTasks ¶
RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued.
func (*RDB) RunAllScheduledTasks ¶
RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued.
func (*RDB) RunArchivedTask ¶
RunArchivedTask finds an archived task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) RunRetryTask ¶
RunRetryTask finds a retry task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) RunScheduledTask ¶
RunScheduledTask finds a scheduled task that matches the given id and score from from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ScheduleUnique ¶
ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) WriteSchedulerEntries ¶
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error
WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.
func (*RDB) WriteServerState ¶
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
WriteServerState writes server state data to redis with expiration set to the value ttl.
type Stats ¶
type Stats struct { // Name of the queue (e.g. "default", "critical"). Queue string // MemoryUsage is the total number of bytes the queue and its tasks require // to be stored in redis. MemoryUsage int64 // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool // Size is the total number of tasks in the queue. Size int // Number of tasks in each state. Pending int Active int Scheduled int Retry int Archived int // Total number of tasks processed during the current date. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed during the current date. Failed int // Time this stats was taken. Timestamp time.Time }
Stats represents a state of queues at a certain time.