rdb

package
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

View Source
const LeaseDuration = 30 * time.Second

LeaseDuration is the duration used to initially create a lease and to extend it thereafter.

Variables

This section is empty.

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// Total number of tasks processed during the given day.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the given day.
	Failed int
	// Date this stats was taken.
	Time time.Time
}

DailyStats holds aggregate data for a given day.

type GroupStat added in v0.23.0

type GroupStat struct {
	// Name of the group.
	Group string

	// Size of the group.
	Size int
}

type Pagination added in v0.2.2

type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

Pagination specifies the page size and page number for the list operation.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client redis.UniversalClient) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) AddToGroup added in v0.23.0

func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string) error

func (*RDB) AddToGroupUnique added in v0.23.0

func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, groupKey string, ttl time.Duration) error

func (*RDB) AggregationCheck added in v0.23.0

func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error)

AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns the set ID. If not, it returns an empty string for the set ID. The time for gracePeriod and maxDelay is computed relative to the time t.

Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words, the function only checks the most recently added task aganist the given gracePeriod.

func (*RDB) AllQueues added in v0.12.0

func (r *RDB) AllQueues() ([]string, error)

AllQueues returns a list of all queue names.

func (*RDB) Archive added in v0.14.0

func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error

Archive sends the given task to archive, attaching the error message to the task. It also trims the archive by timestamp and set size.

func (*RDB) ArchiveAllAggregatingTasks added in v0.23.0

func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error)

ArchiveAllAggregatingTasks archives all aggregating tasks from the given group and returns the number of tasks archived. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveAllPendingTasks added in v0.15.0

func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error)

ArchiveAllPendingTasks archives all pending tasks from the given queue and returns the number of tasks moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveAllRetryTasks added in v0.14.0

func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error)

ArchiveAllRetryTasks archives all retry tasks from the given queue and returns the number of tasks that were moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveAllScheduledTasks added in v0.14.0

func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error)

ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and returns the number of tasks that were moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveTask added in v0.18.0

func (r *RDB) ArchiveTask(qname, id string) error

ArchiveTask finds a task that matches the id from the given queue and archives it. It returns nil if it successfully archived the task.

If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is already archived, it returns TaskAlreadyArchivedError. If a task is in active state it returns non-nil error with FailedPrecondition code.

func (*RDB) CancelationPubSub added in v0.4.0

func (r *RDB) CancelationPubSub() (*redis.PubSub, error)

CancelationPubSub returns a pubsub for cancelation messages.

func (*RDB) ClearSchedulerEntries added in v0.13.0

func (r *RDB) ClearSchedulerEntries(scheduelrID string) error

ClearSchedulerEntries deletes scheduler entries data from redis.

func (*RDB) ClearSchedulerHistory added in v0.15.0

func (r *RDB) ClearSchedulerHistory(entryID string) error

ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.

func (*RDB) ClearServerState added in v0.8.0

func (r *RDB) ClearServerState(host string, pid int, serverID string) error

ClearServerState deletes server state data from redis.

func (*RDB) Client added in v0.18.0

func (r *RDB) Client() redis.UniversalClient

Client returns the reference to underlying redis client.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) ClusterKeySlot added in v0.12.0

func (r *RDB) ClusterKeySlot(qname string) (int64, error)

ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

func (*RDB) ClusterNodes added in v0.12.0

func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)

ClusterNodes returns a list of nodes the given queue belongs to.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats(qname string) (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteAggregationSet added in v0.23.0

func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error

DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.

func (*RDB) DeleteAllAggregatingTasks added in v0.23.0

func (r *RDB) DeleteAllAggregatingTasks(qname, gname string) (int64, error)

DeleteAllAggregatingTasks deletes all aggregating tasks from the given group and returns the number of tasks deleted.

func (*RDB) DeleteAllArchivedTasks added in v0.14.0

func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error)

DeleteAllArchivedTasks deletes all archived tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllCompletedTasks added in v0.19.0

func (r *RDB) DeleteAllCompletedTasks(qname string) (int64, error)

DeleteAllCompletedTasks deletes all completed tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllPendingTasks added in v0.15.0

func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)

DeleteAllPendingTasks deletes all pending tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)

DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)

DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteExpiredCompletedTasks added in v0.19.0

func (r *RDB) DeleteExpiredCompletedTasks(qname string) error

DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, and delete all expired tasks.

func (*RDB) DeleteTask added in v0.18.0

func (r *RDB) DeleteTask(qname, id string) error

DeleteTask finds a task that matches the id from the given queue and deletes it. It returns nil if it successfully archived the task.

If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is in active state it returns non-nil error with Code FailedPrecondition.

func (*RDB) Dequeue

func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationTime time.Time, err error)

Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and its lease expiration time. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.

func (*RDB) Done

func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error

Done removes the task from active queue and deletes the task. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Enqueue

func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error

Enqueue adds the given task to the pending list of the queue.

func (*RDB) EnqueueUnique added in v0.7.0

func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time.Duration) error

EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) ExtendLease added in v0.22.0

func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error)

ExtendLease extends the lease for the given tasks by LeaseDuration (30s). It returns a new expiration time if the operation was successful.

func (*RDB) ForwardIfReady added in v0.18.0

func (r *RDB) ForwardIfReady(qnames ...string) error

ForwardIfReady checks scheduled and retry sets of the given queues and move any tasks that are ready to be processed to the pending set.

func (*RDB) GetTaskInfo added in v0.18.0

func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error)

GetTaskInfo returns a TaskInfo describing the task from the given queue.

func (*RDB) GroupStats added in v0.23.0

func (r *RDB) GroupStats(qname string) ([]*GroupStat, error)

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days for the given queue.

func (*RDB) ListActive added in v0.12.0

func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListActive returns all tasks that are currently being processed for the given queue.

func (*RDB) ListAggregating added in v0.23.0

func (r *RDB) ListAggregating(qname, gname string, pgn Pagination) ([]*base.TaskInfo, error)

ListAggregating returns all tasks from the given group.

func (*RDB) ListArchived added in v0.14.0

func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListArchived returns all tasks from the given queue that have exhausted its retry limit.

func (*RDB) ListCompleted added in v0.19.0

func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListCompleted returns all tasks from the given queue that have completed successfully.

func (*RDB) ListGroups added in v0.23.0

func (r *RDB) ListGroups(qname string) ([]string, error)

ListGroups returns a list of all known groups in the given queue.

func (*RDB) ListLeaseExpired added in v0.22.0

func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error)

ListLeaseExpired returns a list of task messages with an expired lease from the given queues.

func (*RDB) ListPending added in v0.12.0

func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListPending returns pending tasks that are ready to be processed.

func (*RDB) ListRetry

func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.

func (*RDB) ListSchedulerEnqueueEvents added in v0.13.0

func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)

ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.

func (*RDB) ListSchedulerEntries added in v0.13.0

func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)

ListSchedulerEntries returns the list of scheduler entries.

func (*RDB) ListServers added in v0.8.0

func (r *RDB) ListServers() ([]*base.ServerInfo, error)

ListServers returns the list of server info.

func (*RDB) ListWorkers added in v0.5.0

func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)

ListWorkers returns the list of worker stats.

func (*RDB) MarkAsComplete added in v0.19.0

func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error

MarkAsComplete removes the task from active queue to mark the task as completed. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Pause added in v0.9.2

func (r *RDB) Pause(qname string) error

Pause pauses processing of tasks from the given queue.

func (*RDB) Ping added in v0.11.0

func (r *RDB) Ping() error

Ping checks the connection with redis server.

func (*RDB) PublishCancelation added in v0.4.0

func (r *RDB) PublishCancelation(id string) error

PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.

func (*RDB) ReadAggregationSet added in v0.23.0

func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error)

ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and the deadline for aggregating those tasks.

func (*RDB) ReclaimStaleAggregationSets added in v0.23.0

func (r *RDB) ReclaimStaleAggregationSets(qname string) error

ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and reclaim tasks in the stale aggregation set by putting them back in the group.

func (*RDB) RecordSchedulerEnqueueEvent added in v0.13.0

func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error

RecordSchedulerEnqueueEvent records the time when the given task was enqueued.

func (*RDB) RedisClusterInfo added in v0.12.0

func (r *RDB) RedisClusterInfo() (map[string]string, error)

RedisClusterInfo returns a map of redis cluster info.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) RemoveQueue added in v0.2.0

func (r *RDB) RemoveQueue(qname string, force bool) error

RemoveQueue removes the specified queue.

If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.

func (*RDB) Requeue

func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error

Requeue moves the task from active queue to the specified queue.

func (*RDB) Retry

func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error

Retry moves the task from active to retry queue. It also annotates the message with the given error message and if isFailure is true increments the retried counter.

func (*RDB) RunAllAggregatingTasks added in v0.23.0

func (r *RDB) RunAllAggregatingTasks(qname, gname string) (int64, error)

RunAllAggregatingTasks schedules all tasks from the given queue to run and returns the number of tasks scheduled to run. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunAllArchivedTasks added in v0.14.0

func (r *RDB) RunAllArchivedTasks(qname string) (int64, error)

RunAllArchivedTasks enqueues all archived tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunAllRetryTasks added in v0.12.0

func (r *RDB) RunAllRetryTasks(qname string) (int64, error)

RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunAllScheduledTasks added in v0.12.0

func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)

RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunTask added in v0.18.0

func (r *RDB) RunTask(qname, id string) error

RunTask finds a task that matches the id from the given queue and updates it to pending state. It returns nil if it successfully updated the task.

If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is in active or pending state it returns non-nil error with Code FailedPrecondition.

func (*RDB) Schedule

func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the scheduled set to be processed in the future.

func (*RDB) ScheduleUnique added in v0.7.0

func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error

ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) SetClock added in v0.19.1

func (r *RDB) SetClock(c timeutil.Clock)

SetClock sets the clock used by RDB to the given clock.

Use this function to set the clock to SimulatedClock in tests.

func (*RDB) Unpause added in v0.9.2

func (r *RDB) Unpause(qname string) error

Unpause resumes processing of tasks from the given queue.

func (*RDB) WriteResult added in v0.19.0

func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error)

WriteResult writes the given result data for the specified task.

func (*RDB) WriteSchedulerEntries added in v0.13.0

func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error

WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.

func (*RDB) WriteServerState added in v0.8.0

func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error

WriteServerState writes server state data to redis with expiration set to the value ttl.

type Stats

type Stats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// MemoryUsage is the total number of bytes the queue and its tasks require
	// to be stored in redis. It is an approximate memory usage value in bytes
	// since the value is computed by sampling.
	MemoryUsage int64
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue should not be processed.
	Paused bool
	// Size is the total number of tasks in the queue.
	Size int

	// Groups is the total number of groups in the queue.
	Groups int

	// Number of tasks in each state.
	Pending     int
	Active      int
	Scheduled   int
	Retry       int
	Archived    int
	Completed   int
	Aggregating int

	// Number of tasks processed within the current date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Number of tasks failed within the current date.
	Failed int

	// Total number of tasks processed (both succeeded and failed) from this queue.
	ProcessedTotal int
	// Total number of tasks failed.
	FailedTotal int

	// Latency of the queue, measured by the oldest pending task in the queue.
	Latency time.Duration
	// Time this stats was taken.
	Timestamp time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL