Documentation ¶
Overview ¶
Package gocelery is a Golang implemenation of celery task queue. It allows you to enqueue a task and execute it using go runtime which brings efficiency and concurrency features
gocelery is compatible with celery so that you can execute tasks enqueued by celery client or submit tasks to be executed by celery workers
gocelery requires a broker and results backend. currently only rabbitmq is supported (although any AMQP broker should work but not tested)
gocelery is shipped as a library so you can implement your own workers.
Index ¶
- Constants
- func IsWorkerRegistered(name string) bool
- func NewTaskFailedEvent(task *Task, taskResult *TaskResult, err error) map[string]interface{}
- func NewTaskReceivedEvent(task *Task) map[string]interface{}
- func NewTaskStartedEvent(task *Task) map[string]interface{}
- func NewTaskSucceedEvent(task *Task, taskResult *TaskResult, runtime time.Duration) map[string]interface{}
- func RegisterWorker(name string, worker Worker)
- func RegisteredWorkers() []string
- type Config
- type EventType
- type GoCelery
- func (gocelery *GoCelery) Close()
- func (gocelery *GoCelery) Enqueue(taskName string, args []interface{}, ignoreResult bool) (chan *TaskResult, error)
- func (gocelery *GoCelery) EnqueueInQueue(queueName string, taskName string, args []interface{}, ignoreResult bool) (chan *TaskResult, error)
- func (gocelery *GoCelery) EnqueueInQueueWithSchedule(spec string, queueName string, taskName string, args []interface{}) error
- func (gocelery *GoCelery) EnqueueWithSchedule(spec string, queueName string, taskName string, args []interface{}) error
- func (gocelery *GoCelery) StartWorkers()
- func (gocelery *GoCelery) StartWorkersWithQueues(queues []string)
- type ResultStatus
- type Task
- type TaskResult
- type Worker
- type WorkerEvent
Constants ¶
const (
// DefaultQueue is the default task queue name
DefaultQueue = "celery"
)
const (
JSON string = "application/json"
)
Constants
Variables ¶
This section is empty.
Functions ¶
func IsWorkerRegistered ¶
IsWorkerRegistered checks if worker exists for the task name
func NewTaskFailedEvent ¶
func NewTaskFailedEvent(task *Task, taskResult *TaskResult, err error) map[string]interface{}
NewTaskFailedEvent creates new event for task failed
func NewTaskReceivedEvent ¶
NewTaskReceivedEvent creates new event for task received
func NewTaskStartedEvent ¶
NewTaskStartedEvent creates new event for task started
func NewTaskSucceedEvent ¶
func NewTaskSucceedEvent(task *Task, taskResult *TaskResult, runtime time.Duration) map[string]interface{}
NewTaskSucceedEvent creates new event for task succeeded
func RegisterWorker ¶
RegisterWorker registers the worker with given task name
func RegisteredWorkers ¶
func RegisteredWorkers() []string
RegisteredWorkers List all registered workers
Types ¶
type Config ¶
type Config struct { // BrokerURL in the format amqp:user@password//<host>/<virtualhost> BrokerURL string // LogLevel: debug, info, warn, error, fatal LogLevel string }
Config stores the configuration information for gocelery
type EventType ¶
type EventType string
EventType is enum of valid event types in celery
const ( None EventType = "None" WorkerOffline EventType = "worker-offline" WorkerHeartbeat EventType = "worker-heartbeat" WorkerOnline EventType = "worker-online" TaskRetried EventType = "task-retried" TaskSucceeded EventType = "task-succeeded" TaskStarted EventType = "task-started" TaskReceived EventType = "task-received" TaskFailed EventType = "task-failed" TaskRevoked EventType = "task-revoked" )
Valid EventTypes
func (EventType) RoutingKey ¶
RoutingKey returns celery routing keys for events
type GoCelery ¶
type GoCelery struct {
// contains filtered or unexported fields
}
GoCelery creates an instance of entry
func (*GoCelery) Close ¶
func (gocelery *GoCelery) Close()
Close disconnects with broker and cleans up all resources used. Use a defer statement to make sure resources are closed
func (*GoCelery) Enqueue ¶
func (gocelery *GoCelery) Enqueue(taskName string, args []interface{}, ignoreResult bool) (chan *TaskResult, error)
Enqueue adds a task to queue to be executed immediately. If ignoreResult is true the function returns immediately with a nil channel returned. Otherwise, a result channel is returned so client can wait for the result.
func (*GoCelery) EnqueueInQueue ¶
func (gocelery *GoCelery) EnqueueInQueue(queueName string, taskName string, args []interface{}, ignoreResult bool) (chan *TaskResult, error)
EnqueueInQueue adds a task to queue to be executed immediately. If ignoreResult is true the function returns immediately with a nil channel returned. Otherwise, a result channel is returned so client can wait for the result.
func (*GoCelery) EnqueueInQueueWithSchedule ¶
func (gocelery *GoCelery) EnqueueInQueueWithSchedule(spec string, queueName string, taskName string, args []interface{}) error
EnqueueInQueueWithSchedule adds a task that is scheduled repeatedly. Schedule is specified in a string with cron format
func (*GoCelery) EnqueueWithSchedule ¶
func (gocelery *GoCelery) EnqueueWithSchedule(spec string, queueName string, taskName string, args []interface{}) error
EnqueueWithSchedule adds a task that is scheduled repeatedly. Schedule is specified in a string with cron format
func (*GoCelery) StartWorkers ¶
func (gocelery *GoCelery) StartWorkers()
StartWorkers start running the workers with default queue
func (*GoCelery) StartWorkersWithQueues ¶
StartWorkersWithQueues start running the workers
type ResultStatus ¶
type ResultStatus string
ResultStatus is the valid statuses for task executions
const ( Pending ResultStatus = "PENDING" Started ResultStatus = "STARTED" Success ResultStatus = "SUCCESS" Retry ResultStatus = "RETRY" Failure ResultStatus = "FAILURE" Revoked ResultStatus = "REVOKED" )
ResultStatus values
type Task ¶
type Task struct { Task string `json:"task"` ID string `json:"id"` Args []interface{} `json:"args,omitempty"` Kwargs map[string]interface{} `json:"kwargs,omitempty"` Retries int `json:"retries,omitempty"` Eta celeryTime `json:"eta,omitempty"` Expires celeryTime `json:"expires,omitempty"` ContentType string `json:"-"` }
Task represents the a single piece of work
type TaskResult ¶
type TaskResult struct { ID string `json:"task_id"` Result interface{} `json:"result"` Status ResultStatus `json:"status"` TraceBack string `json:"traceback"` }
TaskResult is the result wrapper for task
type WorkerEvent ¶
type WorkerEvent struct { Type EventType `json:"type"` Ident string `json:"sw_ident"` Ver string `json:"sw_ver"` Sys string `json:"sw_sys"` HostName string `json:"hostname"` Timestamp int64 `json:"timestamp"` }
WorkerEvent implements the structure for worker related events
func NewWorkerEvent ¶
func NewWorkerEvent(eventType EventType) *WorkerEvent
NewWorkerEvent creates new worker events
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Godeps
|
|
_workspace/src/github.com/apcera/nats
A Go client for the NATS messaging system (https://nats.io).
|
A Go client for the NATS messaging system (https://nats.io). |
_workspace/src/github.com/garyburd/redigo/internal/redistest
Package redistest contains utilities for writing Redigo tests.
|
Package redistest contains utilities for writing Redigo tests. |
_workspace/src/github.com/garyburd/redigo/redis
Package redis is a client for the Redis database.
|
Package redis is a client for the Redis database. |
_workspace/src/github.com/go-errors/errors
Package errors provides errors that have stack-traces.
|
Package errors provides errors that have stack-traces. |
_workspace/src/github.com/hydrogen18/stalecucumber
This package reads and writes pickled data.
|
This package reads and writes pickled data. |
_workspace/src/github.com/robfig/cron
This library implements a cron spec parser and runner.
|
This library implements a cron spec parser and runner. |
_workspace/src/github.com/twinj/uuid
This package provides RFC4122 UUID capabilities.
|
This package provides RFC4122 UUID capabilities. |