rdb

package
v0.17.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
	ErrNoProcessableTask = errors.New("no tasks are ready for processing")

	// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
	ErrTaskNotFound = errors.New("could not find a task")

	// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
	ErrDuplicateTask = errors.New("task already exists")
)

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// Total number of tasks processed during the given day.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the given day.
	Failed int
	// Date this stats was taken.
	Time time.Time
}

DailyStats holds aggregate data for a given day.

type ErrQueueNotEmpty

type ErrQueueNotEmpty struct {
	// contains filtered or unexported fields
}

ErrQueueNotEmpty indicates specified queue is not empty.

func (*ErrQueueNotEmpty) Error

func (e *ErrQueueNotEmpty) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	// contains filtered or unexported fields
}

ErrQueueNotFound indicates specified queue does not exist.

func (*ErrQueueNotFound) Error

func (e *ErrQueueNotFound) Error() string

type Pagination

type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

Pagination specifies the page size and page number for the list operation.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client redis.UniversalClient) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) AllQueues

func (r *RDB) AllQueues() ([]string, error)

AllQueues returns a list of all queue names.

func (*RDB) Archive

func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error

Archive sends the given task to archive, attaching the error message to the task. It also trims the archive by timestamp and set size.

func (*RDB) ArchiveAllPendingTasks

func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error)

ArchiveAllPendingTasks archives all pending tasks from the given queue and returns the number of tasks that were moved.

func (*RDB) ArchiveAllRetryTasks

func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error)

ArchiveAllRetryTasks archives all retry tasks from the given queue and returns the number of tasks that were moved.

func (*RDB) ArchiveAllScheduledTasks

func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error)

ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and returns the number of tasks that were moved.

func (*RDB) ArchivePendingTask

func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error

ArchivePendingTask finds a pending task that matches the given id from the given queue and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.

func (*RDB) ArchiveRetryTask

func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error

ArchiveRetryTask finds a retry task that matches the given id and score from the given queue and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) ArchiveScheduledTask

func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error

ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) CancelationPubSub

func (r *RDB) CancelationPubSub() (*redis.PubSub, error)

CancelationPubSub returns a pubsub for cancelation messages.

func (*RDB) CheckAndEnqueue

func (r *RDB) CheckAndEnqueue(qnames ...string) error

CheckAndEnqueue checks for scheduled/retry tasks for the given queues and enqueues any tasks that are ready to be processed.

func (*RDB) ClearSchedulerEntries

func (r *RDB) ClearSchedulerEntries(scheduelrID string) error

ClearSchedulerEntries deletes scheduler entries data from redis.

func (*RDB) ClearSchedulerHistory

func (r *RDB) ClearSchedulerHistory(entryID string) error

ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.

func (*RDB) ClearServerState

func (r *RDB) ClearServerState(host string, pid int, serverID string) error

ClearServerState deletes server state data from redis.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) ClusterKeySlot

func (r *RDB) ClusterKeySlot(qname string) (int64, error)

ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

func (*RDB) ClusterNodes

func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)

ClusterNodes returns a list of nodes the given queue belongs to.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats(qname string) (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteAllArchivedTasks

func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error)

DeleteAllArchivedTasks deletes all archived tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllPendingTasks

func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)

DeleteAllPendingTasks deletes all pending tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)

DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)

DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteArchivedTask

func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error

DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeletePendingTask

func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error

DeletePendingTask deletes a pending tasks that matches the given id from the given queue. If a task that matches the id does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteRetryTask

func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error

DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteScheduledTask

func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error

DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Dequeue

func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error)

Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and deadline. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.

func (*RDB) Done

func (r *RDB) Done(msg *base.TaskMessage) error

Done removes the task from active queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Enqueue

func (r *RDB) Enqueue(msg *base.TaskMessage) error

Enqueue inserts the given task to the tail of the queue.

func (*RDB) EnqueueUnique

func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error

EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days for the given queue.

func (*RDB) ListActive

func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error)

ListActive returns all tasks that are currently being processed for the given queue.

func (*RDB) ListArchived

func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error)

ListArchived returns all tasks from the given queue that have exhausted its retry limit.

func (*RDB) ListDeadlineExceeded

func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error)

ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.

func (*RDB) ListPending

func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error)

ListPending returns pending tasks that are ready to be processed.

func (*RDB) ListRetry

func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error)

ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error)

ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.

func (*RDB) ListSchedulerEnqueueEvents

func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)

ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.

func (*RDB) ListSchedulerEntries

func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)

ListSchedulerEntries returns the list of scheduler entries.

func (*RDB) ListServers

func (r *RDB) ListServers() ([]*base.ServerInfo, error)

ListServers returns the list of server info.

func (*RDB) ListWorkers

func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)

ListWorkers returns the list of worker stats.

func (*RDB) Pause

func (r *RDB) Pause(qname string) error

Pause pauses processing of tasks from the given queue.

func (*RDB) Ping

func (r *RDB) Ping() error

Ping checks the connection with redis server.

func (*RDB) PublishCancelation

func (r *RDB) PublishCancelation(id string) error

PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.

func (*RDB) RecordSchedulerEnqueueEvent

func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error

RecordSchedulerEnqueueEvent records the time when the given task was enqueued.

func (*RDB) RedisClusterInfo

func (r *RDB) RedisClusterInfo() (map[string]string, error)

RedisClusterInfo returns a map of redis cluster info.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) RemoveQueue

func (r *RDB) RemoveQueue(qname string, force bool) error

RemoveQueue removes the specified queue.

If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.

func (*RDB) Requeue

func (r *RDB) Requeue(msg *base.TaskMessage) error

Requeue moves the task from active queue to the specified queue.

func (*RDB) Retry

func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error

Retry moves the task from active to retry queue, incrementing retry count and assigning error message to the task message.

func (*RDB) RunAllArchivedTasks

func (r *RDB) RunAllArchivedTasks(qname string) (int64, error)

RunAllArchivedTasks enqueues all archived tasks from the given queue and returns the number of tasks enqueued.

func (*RDB) RunAllRetryTasks

func (r *RDB) RunAllRetryTasks(qname string) (int64, error)

RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued.

func (*RDB) RunAllScheduledTasks

func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)

RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued.

func (*RDB) RunArchivedTask

func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error

RunArchivedTask finds an archived task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) RunRetryTask

func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error

RunRetryTask finds a retry task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) RunScheduledTask

func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error

RunScheduledTask finds a scheduled task that matches the given id and score from from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Schedule

func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the backlog queue to be processed in the future.

func (*RDB) ScheduleUnique

func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error

ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) Unpause

func (r *RDB) Unpause(qname string) error

Unpause resumes processing of tasks from the given queue.

func (*RDB) WriteSchedulerEntries

func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error

WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.

func (*RDB) WriteServerState

func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error

WriteServerState writes server state data to redis with expiration set to the value ttl.

type Stats

type Stats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// MemoryUsage is the total number of bytes the queue and its tasks require
	// to be stored in redis.
	MemoryUsage int64
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue should not be processed.
	Paused bool
	// Size is the total number of tasks in the queue.
	Size int
	// Number of tasks in each state.
	Pending   int
	Active    int
	Scheduled int
	Retry     int
	Archived  int
	// Total number of tasks processed during the current date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the current date.
	Failed int
	// Time this stats was taken.
	Timestamp time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL