Documentation
¶
Index ¶
- type Client
- func (c *Client) Add(tasks ...Task) *TaskAddOp
- func (c *Client) Install() error
- func (c *Client) Notify()
- func (c *Client) Register(queue Queue)
- func (c *Client) Start(ctx context.Context)
- func (c *Client) Status(ctx context.Context, taskID string) (TaskStatus, error)
- func (c *Client) Stop(ctx context.Context) bool
- type ClientConfig
- type Dispatcher
- type Logger
- type Queue
- type QueueConfig
- type QueueProcessor
- type RetainData
- type Retention
- type Task
- type TaskAddOp
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a client used to register queues and add tasks to them for execution.
func FromContext ¶
FromContext returns a Client from a context which is set for queue processor callbacks, so they can access the client in order to create additional tasks.
func NewClient ¶
func NewClient(cfg ClientConfig) (*Client, error)
NewClient initializes a new Client
func (*Client) Install ¶
Install installs the provided schema in the database. TODO provide migrations
func (*Client) Notify ¶
func (c *Client) Notify()
Notify notifies the dispatcher that a new task has been added. This is only needed and required if you supply a database transaction when adding a task. See TaskAddOp.Tx().
func (*Client) Register ¶
Register registers a new Queue so tasks can be added to it. This will panic if the name of the queue provided has already been registered.
func (*Client) Start ¶
Start starts the dispatcher so queued tasks can automatically be executed in the background. To gracefully shut down the dispatcher, call Stop(), or to hard-stop, cancel the provided context.
type ClientConfig ¶
type ClientConfig struct {
// DB is the open database connection used for storing tasks.
DB *sql.DB
// Logger is the logger used to log task execution.
Logger Logger
// NumWorkers is the number of goroutines to open to use for executing queued tasks concurrently.
NumWorkers int
// ReleaseAfter is the duration after which a task is released back to a queue if it has not finished executing.
// This value should be much higher than the timeout setting used for each queue and exists as a fail-safe
// just in case tasks become stuck.
ReleaseAfter time.Duration
// CleanupInterval is how often to run cleanup operations on the database in order to remove expired completed
// tasks. If omitted, no cleanup operations will be performed and the task retention duration will be ignored.
CleanupInterval time.Duration
}
ClientConfig contains configuration for the Client.
type Dispatcher ¶
type Dispatcher interface {
// Start starts the dispatcher.
Start(context.Context)
// Stop stops the dispatcher.
Stop(context.Context) bool
// Notify notifies the dispatcher that a new task has been added.
Notify()
}
Dispatcher handles automatically pulling queued tasks and executing them via queue processors.
type Logger ¶
type Logger interface {
// Info logs info messages.
Info(message string, params ...any)
// Error logs error messages.
Error(message string, params ...any)
}
Logger is used to log operations.
type Queue ¶
type Queue interface {
// Config returns the configuration for the queue.
Config() *QueueConfig
// Process processes the Task.
Process(ctx context.Context, payload []byte) error
}
Queue represents a queue which contains tasks to be executed.
type QueueConfig ¶
type QueueConfig struct {
// Name is the name of the queue and must be unique.
Name string
// MaxAttempts are the maximum number of attempts to execute this task before it's marked as completed.
MaxAttempts int
// Timeout is the duration set on the context while executing a given task.
Timeout time.Duration
// Backoff is the duration a failed task will be held in the queue until being retried.
Backoff time.Duration
// Retention dictates if and how completed tasks will be retained in the database.
// If nil, no completed tasks will be retained.
Retention *Retention
}
QueueConfig is the configuration options for a queue.
type QueueProcessor ¶
QueueProcessor is a generic processor callback for a given queue to process Tasks
type RetainData ¶
type RetainData struct {
// OnlyFailed indicates if Task payload data should only be retained for failed tasks.
OnlyFailed bool
}
RetainData is the policy for how Task payload data will be retained in the database after the task is complete.
type Retention ¶
type Retention struct {
// Duration is the amount of time to retain a task for after completion.
// If omitted, the task will be retained forever.
Duration time.Duration
// OnlyFailed indicates if only failed tasks should be retained.
OnlyFailed bool
// Data provides options for retaining Task payload data.
// If nil, no task payload data will be retained.
Data *RetainData
}
Retention is the policy for how completed tasks will be retained in the database.
type Task ¶
type Task interface {
// Config returns the configuration options for the queue that this Task will be placed in.
Config() QueueConfig
}
Task represents a task that will be placed in to a queue for execution.
type TaskAddOp ¶
type TaskAddOp struct {
// contains filtered or unexported fields
}
TaskAddOp facilitates adding Tasks to the queue.
func (*TaskAddOp) Save ¶
Save saves the task, so it can be queued for execution, and returns the task IDs.
func (*TaskAddOp) Tx ¶
Tx will include the task as part of a given database transaction. When using this, it is critical that after you commit the transaction that you call Notify() on the client so the dispatcher is aware that a new task has been created, otherwise it may not be executed. This is necessary because there is, unfortunately, no way for outsiders to know if or when a transaction is committed and since the dispatcher avoids continuous polling, it needs to know when tasks are added.
type TaskStatus ¶ added in v0.6.0
type TaskStatus int
const ( // TaskStatusPending indicates the task is awaiting execution. TaskStatusPending TaskStatus = iota // TaskStatusRunning indicates the task is being executed. TaskStatusRunning // TaskStatusSuccess indicates the task completed successfully. TaskStatusSuccess // TaskStatusFailure indicates the task execution failed. TaskStatusFailure // TaskStatusNotFound indicates the task was not found in the database. TaskStatusNotFound )
