Documentation
¶
Index ¶
- Variables
- type Gorder
- type Logger
- type Options
- type Queue
- func (q *Queue[T]) Ack(queueKey T) error
- func (q *Queue[T]) AllQueues() []T
- func (q *Queue[T]) DeleteUnusedQueues(activeThreshold time.Duration) (deleted []T, struggling []T)
- func (q *Queue[T]) Next(queueKey T) (Task, error)
- func (q *Queue[T]) Push(queueKey T, task Task)
- func (q *Queue[T]) Stat() map[T]QueueStat
- func (q *Queue[T]) WaitingQueues() []T
- type QueueStat
- type Task
- type TaskFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrWaitForAck is returned in Next() when the previous task is waiting for ack ErrWaitForAck = errors.New("previous task iswaiting for ack") // ErrQueueNotFound is returned in Next() when the queue is not found ErrQueueNotFound = errors.New("queue not found") // ErrQueueIsEmpty is returned in Next() when the queue is empty ErrQueueIsEmpty = errors.New("queue is empty") // ErrNothingToAck is returned in Ack() when there is no tasks to ack ErrNothingToAck = errors.New("nothing to ack") )
Functions ¶
This section is empty.
Types ¶
type Gorder ¶
type Gorder[T comparable] struct { // contains filtered or unexported fields }
Gorder is a in-memory task worker with strict ordering. It handles tasks in parallel using worker pool. It is safe for concurrent use.
func New ¶
func New[T comparable](ctx context.Context, rawOpts ...Options) *Gorder[T]
New creates a new Gorder. It starts workers and flusher goroutines.
func (*Gorder[T]) BrokenQueues ¶
BrokenQueues returns number of retries for each broken queue.
func (*Gorder[T]) Push ¶
Push adds task to the end of queue. It is non blocking and safe for concurrent use. Tasks with different queue keys are executed in parallel, not sequentially. Tasks with the same queue key are executed in order. Don't use it after Shutdown.
type Logger ¶
type Logger interface {
Debug(string, ...any)
Info(string, ...any)
Warn(string, ...any)
Error(string, ...any)
}
Logger is an interface for logging. You can use slog.
type Options ¶
type Options struct {
// Workers is the number of workers to handle tasks functions.
// The default value is the number of CPUs.
Workers int
// FlushInterval is the max time of sleeping between flushing tasks to workers.
// The default value is 100ms.
FlushInterval time.Duration
// UnusedThreshold is the max time between pushing tasks to queue before it gets deleted.
// The default value is 5 minutes.
UnusedThreshold time.Duration
// Retries is the max number of retries for failed task before throwing it.
// Use -1 to make retries unlimited.
// The default value is 10.
Retries int
// RetryBackoffMinTimeout is the min timeout between retries.
// The default value is 100ms.
RetryBackoffMinTimeout time.Duration
// RetryBackoffMaxTimeout is the max timeout between retries.
// The default value is 10s.
RetryBackoffMaxTimeout time.Duration
// NoRetries is a flag that indicates that we should not retry failed tasks.
// The default value is false.
NoRetries bool
// DoNotThrowOnShutdown is a flag that indicates that we should wait for all tasks to complete before shutting down.
// The default value is false.
DoNotThrowOnShutdown bool
// Logger is used to log messages in case of errors.
// The default value is nil, so there is no logging.
Logger Logger
}
Options contains options for Gorder. Every field is optional.
type Queue ¶
type Queue[T comparable] struct { // contains filtered or unexported fields }
Queue provides in memory task queue with requirements
- Strict task ordering;
- Guarantees that tasks won't get lost with Ack mechanics.
It is safe for concurrent use.
func (*Queue[T]) Ack ¶
Ack removes task from queue, it can be called only after Next() or it will return error.
func (*Queue[T]) AllQueues ¶
func (q *Queue[T]) AllQueues() []T
AllQueues returns list of all queue keys.
func (*Queue[T]) DeleteUnusedQueues ¶
DeleteUnusedQueues removes empty queues with last active duration > activeThreshold; return keys of deleted queues. It also returns keys of queues with active tasks, but with last active duration > activeThreshold
func (*Queue[T]) Next ¶
Next returns a next task from the queue according to the order key. You should call Ack() after handling this task to remove it from queue. Two subsequent calls of Next() without Ack() will return error.
func (*Queue[T]) WaitingQueues ¶
func (q *Queue[T]) WaitingQueues() []T
WaitingQueues returns list of all queue keys with waiting tasks.