Documentation ¶
Overview ¶
Package taskqueue manages running and scheduling tasks.
Applications using taskqueue first create a Manager. One manager handles one or more topics. There is one processor per topic. Applications need to register topics and their processors before starting the manager.
After topics and processors are registered, applications can start the manager. The manager then initializes a list of workers that will work on the actual tasks. At the beginning, all workers are idle.
The manager has a Store to implemented persistent storage. By default, Redis is used as a backend store. Internally, the manager has a list of three states and this resembles in Redis. There is an input queue which contains all the tasks that need to be worked on. Then there is a work queue that contains all tasks currently being worked on. Finally, there is a dead queue that contains all tasks that couldn't be completed, even after retrying. The dead queue will not be touched until a human will move tasks back into the input queue.
After being started, the manager moves all tasks in the work queue back into the input queue. This could happen in the event that a previously started manager couldn't complete tasks e.g. because it has crashed. After that, the manager periodically polls the input queue for new tasks. When a new task is found and a worker is available, the manager finds the associated processor and puts it into the work queue, and passes it to a worker to run the task.
If the worker finishes successfully, the task is removed from the work queue and disappears. If the worker fails, it is retried according to the NumRetries set when enqueing the task initially. Such tasks are simply put back into the input queue to be scheduled again at a later time. If a worker fails even after retrying, it is moved to the dead queue for inspection by a human.
Index ¶
- Constants
- type BackoffFunc
- type InMemoryStore
- func (r *InMemoryStore) Dequeue(id string) error
- func (r *InMemoryStore) Enqueue(spec *TaskSpec) error
- func (r *InMemoryStore) MoveToDeadQueue(spec *TaskSpec) error
- func (r *InMemoryStore) MoveWorkQueueToInputQueue() error
- func (r *InMemoryStore) Next() (*TaskSpec, error)
- func (r *InMemoryStore) Publish(e *WatchEvent) error
- func (r *InMemoryStore) Retry(spec *TaskSpec) error
- func (r *InMemoryStore) SizeOfDeadQueue() (int, error)
- func (r *InMemoryStore) SizeOfInputQueue() (int, error)
- func (r *InMemoryStore) SizeOfWorkQueue() (int, error)
- func (r *InMemoryStore) StatsIncrement(f StatsField, delta int) error
- func (r *InMemoryStore) StatsSnapshot() (*Stats, error)
- func (r *InMemoryStore) Subscribe(recv chan *WatchEvent)
- type Logger
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) CloseWithTimeout(timeout time.Duration) (err error)
- func (m *Manager) Enqueue(task *Task) error
- func (m *Manager) Register(topic string, p Processor) error
- func (m *Manager) Start() error
- func (m *Manager) Stats() (*Stats, error)
- func (m *Manager) Watch(done chan struct{}) <-chan *WatchEvent
- type ManagerOption
- type Processor
- type RedisStore
- func (r *RedisStore) Dequeue(id string) error
- func (r *RedisStore) Enqueue(spec *TaskSpec) error
- func (r *RedisStore) MoveToDeadQueue(spec *TaskSpec) error
- func (r *RedisStore) MoveWorkQueueToInputQueue() error
- func (r *RedisStore) Next() (*TaskSpec, error)
- func (r *RedisStore) Publish(e *WatchEvent) error
- func (r *RedisStore) Retry(spec *TaskSpec) error
- func (r *RedisStore) SizeOfDeadQueue() (int, error)
- func (r *RedisStore) SizeOfInputQueue() (int, error)
- func (r *RedisStore) SizeOfWorkQueue() (int, error)
- func (r *RedisStore) StatsIncrement(f StatsField, delta int) error
- func (r *RedisStore) StatsSnapshot() (*Stats, error)
- func (r *RedisStore) Subscribe(recv chan *WatchEvent)
- type Stats
- type StatsField
- type Store
- type Task
- type TaskSpec
- type WatchEvent
Constants ¶
const ( // ManagerStart event type is triggered on manager startup. ManagerStart = "MANAGER_START" // ManagerStop event type is triggered on manager shutdown. ManagerStop = "MANAGER_STOP" // ManagerStats event type returns global stats periodically. ManagerStats = "MANAGER_STATS" // TaskEnqueue event type is triggered when a new task is enqueued. TaskEnqueue = "TASK_ENQUEUE" // TaskStart event type is triggered when a new task is started. TaskStart = "TASK_START" // TaskRetry event type is triggered when a task is retried. TaskRetry = "TASK_RETRY" // TaskCompletion event type is triggered when a task completed successfully. TaskCompletion = "TASK_COMPLETION" // TaskFailure event type is triggered when a task has failed. TaskFailure = "TASK_FAILURE" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackoffFunc ¶
BackoffFunc is a callback that returns a backoff. It is configurable via the SetBackoff option in the manager. The BackoffFunc is used to return the timespan between retries of failed jobs.
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
InMemoryStore is a simple in-memory storage backend. It is used in tests only.
func NewInMemoryStore ¶
func NewInMemoryStore() *InMemoryStore
func (*InMemoryStore) Dequeue ¶
func (r *InMemoryStore) Dequeue(id string) error
func (*InMemoryStore) Enqueue ¶
func (r *InMemoryStore) Enqueue(spec *TaskSpec) error
func (*InMemoryStore) MoveToDeadQueue ¶
func (r *InMemoryStore) MoveToDeadQueue(spec *TaskSpec) error
func (*InMemoryStore) MoveWorkQueueToInputQueue ¶
func (r *InMemoryStore) MoveWorkQueueToInputQueue() error
Move all items from work queue to input queue (used at startup).
func (*InMemoryStore) Next ¶
func (r *InMemoryStore) Next() (*TaskSpec, error)
func (*InMemoryStore) Publish ¶
func (r *InMemoryStore) Publish(e *WatchEvent) error
func (*InMemoryStore) Retry ¶
func (r *InMemoryStore) Retry(spec *TaskSpec) error
func (*InMemoryStore) SizeOfDeadQueue ¶
func (r *InMemoryStore) SizeOfDeadQueue() (int, error)
func (*InMemoryStore) SizeOfInputQueue ¶
func (r *InMemoryStore) SizeOfInputQueue() (int, error)
func (*InMemoryStore) SizeOfWorkQueue ¶
func (r *InMemoryStore) SizeOfWorkQueue() (int, error)
func (*InMemoryStore) StatsIncrement ¶
func (r *InMemoryStore) StatsIncrement(f StatsField, delta int) error
func (*InMemoryStore) StatsSnapshot ¶
func (r *InMemoryStore) StatsSnapshot() (*Stats, error)
func (*InMemoryStore) Subscribe ¶
func (r *InMemoryStore) Subscribe(recv chan *WatchEvent)
type Logger ¶
type Logger interface {
Printf(format string, v ...interface{})
}
Logger defines an interface that implementers can use to redirect logging into their own application.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages tasks.
func New ¶
func New(options ...ManagerOption) *Manager
New creates a new manager.
Configure the manager with Set methods. Example:
m := taskqueue.New(taskqueue.SetStore(...), taskqueue.SetPollInterval(...))
func (*Manager) Close ¶
Close stops the task manager immediately, canceling all active workers immediately. If you want graceful shutdown, use CloseWithTimeout.
func (*Manager) CloseWithTimeout ¶
CloseWithTimeout is like Close but waits until all running tasks are completed. New tasks are no longer accepted. This ensures a graceful shutdown. Use a negative timeout to wait indefinitely.
func (*Manager) Enqueue ¶
Enqueue adds a task to the input queue. It will be picked up and run eventually. Tasks are processed ordered by time.
func (*Manager) Start ¶
Start runs the task manager. Use Close to stop it. It is the callers responsibility to ensure that only one worker per namespace is started at any point in time.
func (*Manager) Stats ¶
Stats returns a snapshot of the current statistics, e.g. the number of started and completed tasks.
func (*Manager) Watch ¶
func (m *Manager) Watch(done chan struct{}) <-chan *WatchEvent
Watch enables consumers to watch events happening inside a manager. Watch returns a channel of WatchEvents that it will send on. The caller must pass a done channel that it needs to close if it is no longer interested in watching events.
type ManagerOption ¶
type ManagerOption func(*Manager)
ManagerOption is an options provider to be used when creating a new task manager.
func SetBackoffFunc ¶
func SetBackoffFunc(fn BackoffFunc) ManagerOption
SetBackoffFunc specifies the backoff function that returns the timespan between retries of failed jobs. Exponential backoff is used by default.
func SetConcurrency ¶
func SetConcurrency(n int) ManagerOption
SetConcurrency specifies the number of workers working in parallel. Concurrency must be greater or equal to 1 and is 25 by default.
func SetLogger ¶
func SetLogger(logger Logger) ManagerOption
SetLogger specifies the logger to use when reporting.
func SetPollInterval ¶
func SetPollInterval(interval time.Duration) ManagerOption
SetPollInterval specifies the interval at which the manager polls for jobs.
func SetStore ¶
func SetStore(store Store) ManagerOption
SetStore specifies the data store to use for storing task information. The default is RedisStore.
type Processor ¶
type Processor func(args ...interface{}) error
Processor works on a task. It must be registered in the Mangager.
type RedisStore ¶
type RedisStore struct {
// contains filtered or unexported fields
}
func NewRedisStore ¶
func NewRedisStore(server, namespace, password string, db int) *RedisStore
func NewRedisStoreFromPool ¶
func NewRedisStoreFromPool(namespace string, pool *redis.Pool) *RedisStore
func (*RedisStore) Dequeue ¶
func (r *RedisStore) Dequeue(id string) error
Dequeue removes the task specification from the work queue.
func (*RedisStore) Enqueue ¶
func (r *RedisStore) Enqueue(spec *TaskSpec) error
Enqueue adds the task specification to the input queue.
func (*RedisStore) MoveToDeadQueue ¶
func (r *RedisStore) MoveToDeadQueue(spec *TaskSpec) error
MoveToDeadQueue moves the task from the work queue to the dead queue.
func (*RedisStore) MoveWorkQueueToInputQueue ¶
func (r *RedisStore) MoveWorkQueueToInputQueue() error
Move all items from work queue to input queue (used at startup).
func (*RedisStore) Next ¶
func (r *RedisStore) Next() (*TaskSpec, error)
Next checks for tasks in the input queue and, if one is found, moves it into the work queue. If there are no tasks in the input queue, nil is returned.
func (*RedisStore) Publish ¶
func (r *RedisStore) Publish(e *WatchEvent) error
func (*RedisStore) Retry ¶
func (r *RedisStore) Retry(spec *TaskSpec) error
Retry takes a task specification and moves it from the work queue back into the input queue.
func (*RedisStore) SizeOfDeadQueue ¶
func (r *RedisStore) SizeOfDeadQueue() (int, error)
SizeOfDeadQueue returns the current number of items in the dead queue.
func (*RedisStore) SizeOfInputQueue ¶
func (r *RedisStore) SizeOfInputQueue() (int, error)
SizeOfInputQueue returns the current number of items in the input queue.
func (*RedisStore) SizeOfWorkQueue ¶
func (r *RedisStore) SizeOfWorkQueue() (int, error)
SizeOfWorkQueue returns the current number of items in the work queue.
func (*RedisStore) StatsIncrement ¶
func (r *RedisStore) StatsIncrement(f StatsField, delta int) error
StatsIncrement writes the updated statistics to the store.
func (*RedisStore) StatsSnapshot ¶
func (r *RedisStore) StatsSnapshot() (*Stats, error)
StatsSnapshot reads the stored statistics.
func (*RedisStore) Subscribe ¶
func (r *RedisStore) Subscribe(recv chan *WatchEvent)
type Stats ¶
type Stats struct { Enqueued int `json:"enqueued"` // put into input queue Started int `json:"started"` // started processor Retried int `json:"retried"` // failed but still retrying Failed int `json:"failed"` // finally failed and moved to dead queue Completed int `json:"completed"` // completed successfully InputQueueSize int `json:"input_queue_size"` WorkQueueSize int `json:"work_queue_size"` DeadQueueSize int `json:"dead_queue_size"` }
Stats represents statistics.
type StatsField ¶
type StatsField string
StatsField represents a metrics.
const ( EnqueuedField StatsField = "enqueued" StartedField StatsField = "started" RetriedField StatsField = "retried" FailedField StatsField = "failed" CompletedField StatsField = "completed" )
type Store ¶
type Store interface { // Enqueue adds the task specification to the input queue. Enqueue(*TaskSpec) error // Dequeue removes the task specification from the work queue. Dequeue(id string) error // Next checks for tasks in the input queue and moves it into the // work queue. If there are no tasks in the input queue, nil is returned. Next() (*TaskSpec, error) // Retry takes a task specification and moves it from the work queue // back into the input queue. Retry(*TaskSpec) error // MoveToDeadQueue moves the task from the work queue to the dead queue. MoveToDeadQueue(*TaskSpec) error // Move all items from work queue to input queue (used at startup). MoveWorkQueueToInputQueue() error // SizeOfInputQueue returns the current number of items in the input queue. SizeOfInputQueue() (int, error) // SizeOfWorkQueue returns the current number of items in the work queue. SizeOfWorkQueue() (int, error) // SizeOfDeadQueue returns the current number of items in the dead queue. SizeOfDeadQueue() (int, error) // Stats returns a snapshot of the currently stored statistics. StatsSnapshot() (*Stats, error) // StatsIncrement increments a given statistic. StatsIncrement(field StatsField, delta int) error // Publish publishes an event. Publish(payload *WatchEvent) error // Subscribe subscribes to the list of changes. Subscribe(recv chan *WatchEvent) }
type Task ¶
type Task struct { // Topic for this task. Topic string // ExternalID is an application-specific identifier for a task. ExternalID string // Args is the list of arguments passed to the Processor. Args []interface{} // NumRetries specifies the number of retries in case of failures // in the processor. NumRetries int }
Task specifies a task to execute.
type TaskSpec ¶
type TaskSpec struct { ID string `json:"id"` Topic string `json:"topic"` ExternalID string `json:"xid,omitempty"` Args []interface{} `json:"args"` Retry int `json:"retry"` // current retry NumRetries int `json:"num_retries"` // max. number of retries Enqueued int64 `json:"enqueued"` // time the task has been enqueued Priority int64 `json:"priority"` // lower means: execute earlier }
TaskSpec is the internal representation of a Task state.
type WatchEvent ¶
type WatchEvent struct { Type string `json:"type"` // event type Task *TaskSpec `json:"task,omitempty"` // task details Stats *Stats `json:"stats,omitempty"` // statistics }
WatchEvent is send to consumers watching the manager after calling Watch on the manager.