taskqueue

package module
v0.0.0-...-40ad89c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2016 License: MIT Imports: 10 Imported by: 0

README

Taskqueue

This is just an experiment and not ready for production.

Taskqueue manages running and scheduling tasks (think Sidekiq or Resque).

Build Status Godoc license

Prerequisites

Redis is currently only persistent storage backend. So you must have Redis installed.

Getting started

Get the repository with go get github.com/olivere/taskqueue.

Example:

// Create a Redis-based backend, namespaced with "taskqueue_example"
store := taskqueue.NewRedisStore("localhost:6379", "taskqueue_example", "", 0)

// Create a manager with the Redis store. See source for more options.
m := taskqueue.New(taskqueue.SetStore(store))

// Register one or more topics and their processor
m.Register("clicks", func(args ...interface{}) error {
	// Handle "clicks" topic
})

// Start the manager
err := m.Start()
if err != nil {
	panic(err)
}

// Enqueue a task: It'll be added to the input queue and processed eventually
err = m.Enqueue(&taskqueue.Task{Topic: "clicks", Args: []interface{}{640, 480}})
if err != nil {
	panic(err)
}

...

// Stop the manager, either via Close (which stops immediately)
// or CloseWithTimeout (which gracefully waits until either all working
// tasks are completed or a timeout is reached)
err = m.CloseWithTimeout(15 * time.Second)
if err != nil {
	panic(err)
}

See the tests for more details on using taskqueue.

Tests and Web UI

Ensure the tests succeed with go test. You may have to install dependencies.

Run an end-to-end test with go run e2e/main.go. It simulates a real worker. Play with the options: go run e2e/main.go -h.

While running the end-to-end tests, open up a second console and run cd ui && go run main.go. Then direct your web browser to 127.0.0.1:12345.

Screenshot

License

MIT License. See LICENSE file for details.

Documentation

Overview

Package taskqueue manages running and scheduling tasks.

Applications using taskqueue first create a Manager. One manager handles one or more topics. There is one processor per topic. Applications need to register topics and their processors before starting the manager.

After topics and processors are registered, applications can start the manager. The manager then initializes a list of workers that will work on the actual tasks. At the beginning, all workers are idle.

The manager has a Store to implemented persistent storage. By default, Redis is used as a backend store. Internally, the manager has a list of three states and this resembles in Redis. There is an input queue which contains all the tasks that need to be worked on. Then there is a work queue that contains all tasks currently being worked on. Finally, there is a dead queue that contains all tasks that couldn't be completed, even after retrying. The dead queue will not be touched until a human will move tasks back into the input queue.

After being started, the manager moves all tasks in the work queue back into the input queue. This could happen in the event that a previously started manager couldn't complete tasks e.g. because it has crashed. After that, the manager periodically polls the input queue for new tasks. When a new task is found and a worker is available, the manager finds the associated processor and puts it into the work queue, and passes it to a worker to run the task.

If the worker finishes successfully, the task is removed from the work queue and disappears. If the worker fails, it is retried according to the NumRetries set when enqueing the task initially. Such tasks are simply put back into the input queue to be scheduled again at a later time. If a worker fails even after retrying, it is moved to the dead queue for inspection by a human.

Index

Constants

View Source
const (
	// ManagerStart event type is triggered on manager startup.
	ManagerStart = "MANAGER_START"
	// ManagerStop event type is triggered on manager shutdown.
	ManagerStop = "MANAGER_STOP"
	// ManagerStats event type returns global stats periodically.
	ManagerStats = "MANAGER_STATS"
	// TaskEnqueue event type is triggered when a new task is enqueued.
	TaskEnqueue = "TASK_ENQUEUE"
	// TaskStart event type is triggered when a new task is started.
	TaskStart = "TASK_START"
	// TaskRetry event type is triggered when a task is retried.
	TaskRetry = "TASK_RETRY"
	// TaskCompletion event type is triggered when a task completed successfully.
	TaskCompletion = "TASK_COMPLETION"
	// TaskFailure event type is triggered when a task has failed.
	TaskFailure = "TASK_FAILURE"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffFunc

type BackoffFunc func(attempts int) time.Duration

BackoffFunc is a callback that returns a backoff. It is configurable via the SetBackoff option in the manager. The BackoffFunc is used to return the timespan between retries of failed jobs.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is a simple in-memory storage backend. It is used in tests only.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

func (*InMemoryStore) Dequeue

func (r *InMemoryStore) Dequeue(id string) error

func (*InMemoryStore) Enqueue

func (r *InMemoryStore) Enqueue(spec *TaskSpec) error

func (*InMemoryStore) MoveToDeadQueue

func (r *InMemoryStore) MoveToDeadQueue(spec *TaskSpec) error

func (*InMemoryStore) MoveWorkQueueToInputQueue

func (r *InMemoryStore) MoveWorkQueueToInputQueue() error

Move all items from work queue to input queue (used at startup).

func (*InMemoryStore) Next

func (r *InMemoryStore) Next() (*TaskSpec, error)

func (*InMemoryStore) Publish

func (r *InMemoryStore) Publish(e *WatchEvent) error

func (*InMemoryStore) Retry

func (r *InMemoryStore) Retry(spec *TaskSpec) error

func (*InMemoryStore) SizeOfDeadQueue

func (r *InMemoryStore) SizeOfDeadQueue() (int, error)

func (*InMemoryStore) SizeOfInputQueue

func (r *InMemoryStore) SizeOfInputQueue() (int, error)

func (*InMemoryStore) SizeOfWorkQueue

func (r *InMemoryStore) SizeOfWorkQueue() (int, error)

func (*InMemoryStore) StatsIncrement

func (r *InMemoryStore) StatsIncrement(f StatsField, delta int) error

func (*InMemoryStore) StatsSnapshot

func (r *InMemoryStore) StatsSnapshot() (*Stats, error)

func (*InMemoryStore) Subscribe

func (r *InMemoryStore) Subscribe(recv chan *WatchEvent)

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger defines an interface that implementers can use to redirect logging into their own application.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages tasks.

func New

func New(options ...ManagerOption) *Manager

New creates a new manager.

Configure the manager with Set methods. Example:

m := taskqueue.New(taskqueue.SetStore(...), taskqueue.SetPollInterval(...))

func (*Manager) Close

func (m *Manager) Close() error

Close stops the task manager immediately, canceling all active workers immediately. If you want graceful shutdown, use CloseWithTimeout.

func (*Manager) CloseWithTimeout

func (m *Manager) CloseWithTimeout(timeout time.Duration) (err error)

CloseWithTimeout is like Close but waits until all running tasks are completed. New tasks are no longer accepted. This ensures a graceful shutdown. Use a negative timeout to wait indefinitely.

func (*Manager) Enqueue

func (m *Manager) Enqueue(task *Task) error

Enqueue adds a task to the input queue. It will be picked up and run eventually. Tasks are processed ordered by time.

func (*Manager) Register

func (m *Manager) Register(topic string, p Processor) error

Register registers a topic and the associated processor.

func (*Manager) Start

func (m *Manager) Start() error

Start runs the task manager. Use Close to stop it. It is the callers responsibility to ensure that only one worker per namespace is started at any point in time.

func (*Manager) Stats

func (m *Manager) Stats() (*Stats, error)

Stats returns a snapshot of the current statistics, e.g. the number of started and completed tasks.

func (*Manager) Watch

func (m *Manager) Watch(done chan struct{}) <-chan *WatchEvent

Watch enables consumers to watch events happening inside a manager. Watch returns a channel of WatchEvents that it will send on. The caller must pass a done channel that it needs to close if it is no longer interested in watching events.

type ManagerOption

type ManagerOption func(*Manager)

ManagerOption is an options provider to be used when creating a new task manager.

func SetBackoffFunc

func SetBackoffFunc(fn BackoffFunc) ManagerOption

SetBackoffFunc specifies the backoff function that returns the timespan between retries of failed jobs. Exponential backoff is used by default.

func SetConcurrency

func SetConcurrency(n int) ManagerOption

SetConcurrency specifies the number of workers working in parallel. Concurrency must be greater or equal to 1 and is 25 by default.

func SetLogger

func SetLogger(logger Logger) ManagerOption

SetLogger specifies the logger to use when reporting.

func SetPollInterval

func SetPollInterval(interval time.Duration) ManagerOption

SetPollInterval specifies the interval at which the manager polls for jobs.

func SetStore

func SetStore(store Store) ManagerOption

SetStore specifies the data store to use for storing task information. The default is RedisStore.

type Processor

type Processor func(args ...interface{}) error

Processor works on a task. It must be registered in the Mangager.

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

func NewRedisStore

func NewRedisStore(server, namespace, password string, db int) *RedisStore

func NewRedisStoreFromPool

func NewRedisStoreFromPool(namespace string, pool *redis.Pool) *RedisStore

func (*RedisStore) Dequeue

func (r *RedisStore) Dequeue(id string) error

Dequeue removes the task specification from the work queue.

func (*RedisStore) Enqueue

func (r *RedisStore) Enqueue(spec *TaskSpec) error

Enqueue adds the task specification to the input queue.

func (*RedisStore) MoveToDeadQueue

func (r *RedisStore) MoveToDeadQueue(spec *TaskSpec) error

MoveToDeadQueue moves the task from the work queue to the dead queue.

func (*RedisStore) MoveWorkQueueToInputQueue

func (r *RedisStore) MoveWorkQueueToInputQueue() error

Move all items from work queue to input queue (used at startup).

func (*RedisStore) Next

func (r *RedisStore) Next() (*TaskSpec, error)

Next checks for tasks in the input queue and, if one is found, moves it into the work queue. If there are no tasks in the input queue, nil is returned.

func (*RedisStore) Publish

func (r *RedisStore) Publish(e *WatchEvent) error

func (*RedisStore) Retry

func (r *RedisStore) Retry(spec *TaskSpec) error

Retry takes a task specification and moves it from the work queue back into the input queue.

func (*RedisStore) SizeOfDeadQueue

func (r *RedisStore) SizeOfDeadQueue() (int, error)

SizeOfDeadQueue returns the current number of items in the dead queue.

func (*RedisStore) SizeOfInputQueue

func (r *RedisStore) SizeOfInputQueue() (int, error)

SizeOfInputQueue returns the current number of items in the input queue.

func (*RedisStore) SizeOfWorkQueue

func (r *RedisStore) SizeOfWorkQueue() (int, error)

SizeOfWorkQueue returns the current number of items in the work queue.

func (*RedisStore) StatsIncrement

func (r *RedisStore) StatsIncrement(f StatsField, delta int) error

StatsIncrement writes the updated statistics to the store.

func (*RedisStore) StatsSnapshot

func (r *RedisStore) StatsSnapshot() (*Stats, error)

StatsSnapshot reads the stored statistics.

func (*RedisStore) Subscribe

func (r *RedisStore) Subscribe(recv chan *WatchEvent)

type Stats

type Stats struct {
	Enqueued       int `json:"enqueued"`  // put into input queue
	Started        int `json:"started"`   // started processor
	Retried        int `json:"retried"`   // failed but still retrying
	Failed         int `json:"failed"`    // finally failed and moved to dead queue
	Completed      int `json:"completed"` // completed successfully
	InputQueueSize int `json:"input_queue_size"`
	WorkQueueSize  int `json:"work_queue_size"`
	DeadQueueSize  int `json:"dead_queue_size"`
}

Stats represents statistics.

type StatsField

type StatsField string

StatsField represents a metrics.

const (
	EnqueuedField  StatsField = "enqueued"
	StartedField   StatsField = "started"
	RetriedField   StatsField = "retried"
	FailedField    StatsField = "failed"
	CompletedField StatsField = "completed"
)

type Store

type Store interface {
	// Enqueue adds the task specification to the input queue.
	Enqueue(*TaskSpec) error

	// Dequeue removes the task specification from the work queue.
	Dequeue(id string) error

	// Next checks for tasks in the input queue and moves it into the
	// work queue. If there are no tasks in the input queue, nil is returned.
	Next() (*TaskSpec, error)

	// Retry takes a task specification and moves it from the work queue
	// back into the input queue.
	Retry(*TaskSpec) error

	// MoveToDeadQueue moves the task from the work queue to the dead queue.
	MoveToDeadQueue(*TaskSpec) error

	// Move all items from work queue to input queue (used at startup).
	MoveWorkQueueToInputQueue() error

	// SizeOfInputQueue returns the current number of items in the input queue.
	SizeOfInputQueue() (int, error)

	// SizeOfWorkQueue returns the current number of items in the work queue.
	SizeOfWorkQueue() (int, error)

	// SizeOfDeadQueue returns the current number of items in the dead queue.
	SizeOfDeadQueue() (int, error)

	// Stats returns a snapshot of the currently stored statistics.
	StatsSnapshot() (*Stats, error)

	// StatsIncrement increments a given statistic.
	StatsIncrement(field StatsField, delta int) error

	// Publish publishes an event.
	Publish(payload *WatchEvent) error

	// Subscribe subscribes to the list of changes.
	Subscribe(recv chan *WatchEvent)
}

type Task

type Task struct {
	// Topic for this task.
	Topic string
	// ExternalID is an application-specific identifier for a task.
	ExternalID string
	// Args is the list of arguments passed to the Processor.
	Args []interface{}
	// NumRetries specifies the number of retries in case of failures
	// in the processor.
	NumRetries int
}

Task specifies a task to execute.

type TaskSpec

type TaskSpec struct {
	ID         string        `json:"id"`
	Topic      string        `json:"topic"`
	ExternalID string        `json:"xid,omitempty"`
	Args       []interface{} `json:"args"`
	Retry      int           `json:"retry"`       // current retry
	NumRetries int           `json:"num_retries"` // max. number of retries
	Enqueued   int64         `json:"enqueued"`    // time the task has been enqueued
	Priority   int64         `json:"priority"`    // lower means: execute earlier
}

TaskSpec is the internal representation of a Task state.

type WatchEvent

type WatchEvent struct {
	Type  string    `json:"type"`            // event type
	Task  *TaskSpec `json:"task,omitempty"`  // task details
	Stats *Stats    `json:"stats,omitempty"` // statistics
}

WatchEvent is send to consumers watching the manager after calling Watch on the manager.

Directories

Path Synopsis
ui

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL