Documentation ¶
Overview ¶
Package gocelery is Celery Distributed Itf_CeleryTask Queue in Go
Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.
This package can also be used as pure go distributed task queue.
Supported brokers/backends
- Redis (broker/backend)
- AMQP (broker/backend)
Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.
CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] # Ignore other content CELERY_RESULT_SERIALIZER='json' CELERY_ENABLE_UTC=True
Actually, the newest version (Right now, 2018.11, is v4.2.1) of Celery is configured defaultly to use json serializer.
Index ¶
- Variables
- func GetRealValue(val *reflect.Value) interface{}
- func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)
- func NewRedisPool(host string, port int, db int, pass string) *redis.Pool
- type AMQPCeleryBackend
- type AMQPCeleryBroker
- type AMQPExchange
- type AMQPQueue
- type AsyncResult
- type BrokerOptions
- type CeleryBackend
- type CeleryBroker
- type CeleryClient
- func (cc *CeleryClient) ApplyAsync(task string, args []interface{}, kwargs map[string]interface{}, ...) (*AsyncResult, error)
- func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
- type CeleryDeliveryInfo
- type CeleryMessage
- type CeleryServer
- type CeleryTask
- type CeleryWorker
- func (w *CeleryWorker) GetNumWorkers() int
- func (w *CeleryWorker) GetTask(name string) interface{}
- func (w *CeleryWorker) Register(name string, task interface{})
- func (w *CeleryWorker) RunTask(message *CeleryTask) (*ResultMessage, error)
- func (w *CeleryWorker) StartWorker()
- func (w *CeleryWorker) StopWorker()
- type Itf_CeleryTask
- type PythonBody
- type RedisCeleryBackend
- type RedisCeleryBroker
- type ResultMessage
- type ST_Headers
- type ST_Properties
Constants ¶
This section is empty.
Variables ¶
var (
ISO8601 = "2006-01-02T15:04:05"
)
GLOBAL 替换掉 encoding/json
Functions ¶
func GetRealValue ¶
GetRealValue returns real value of reflect.Value Required for JSON Marshalling
func NewAMQPConnection ¶
func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)
NewAMQPConnection creates new AMQP channel
Types ¶
type AMQPCeleryBackend ¶
AMQPCeleryBackend CeleryBackend for AMQP
func NewAMQPCeleryBackend ¶
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend
NewAMQPCeleryBackend creates new AMQPCeleryBackend
func (*AMQPCeleryBackend) GetResult ¶
func (b *AMQPCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult retrieves result from AMQP queue
func (*AMQPCeleryBackend) Reconnect ¶
func (b *AMQPCeleryBackend) Reconnect()
Reconnect reconnects to AMQP server
func (*AMQPCeleryBackend) SetResult ¶
func (b *AMQPCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult sets result back to AMQP queue
type AMQPCeleryBroker ¶
AMQPCeleryBroker is RedisBroker for AMQP
func NewAMQPCeleryBroker ¶
func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker
NewAMQPCeleryBroker creates new AMQPCeleryBroker
func (*AMQPCeleryBroker) CreateExchange ¶
func (b *AMQPCeleryBroker) CreateExchange() error
CreateExchange declares AMQP exchange with stored configuration
func (*AMQPCeleryBroker) CreateQueue ¶
func (b *AMQPCeleryBroker) CreateQueue() error
CreateQueue declares AMQP Queue with stored configuration
func (*AMQPCeleryBroker) GetTask ¶
func (b *AMQPCeleryBroker) GetTask() (*CeleryTask, error)
GetTask retrieves task message from AMQP queue
func (*AMQPCeleryBroker) SendCeleryMessage ¶
func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to broker
func (*AMQPCeleryBroker) StartConsumingChannel ¶
func (b *AMQPCeleryBroker) StartConsumingChannel() error
StartConsumingChannel spawns receiving channel on AMQP queue
type AMQPExchange ¶
AMQPExchange stores AMQP Exchange configuration
func NewAMQPExchange ¶
func NewAMQPExchange(name string) *AMQPExchange
NewAMQPExchange creates new AMQPExchange
type AsyncResult ¶
type AsyncResult struct {
// contains filtered or unexported fields
}
AsyncResult is pending result
func (*AsyncResult) AsyncGet ¶
func (ar *AsyncResult) AsyncGet() (interface{}, error)
AsyncGet gets actual result from redis and returns nil if not available
func (*AsyncResult) Get ¶
func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)
Get gets actual result from redis It blocks for period of time set by timeout and return error if unavailable
func (*AsyncResult) GetTaskId ¶
func (ar *AsyncResult) GetTaskId() string
func (*AsyncResult) Ready ¶
func (ar *AsyncResult) Ready() (bool, error)
Ready checks if actual result is ready
type BrokerOptions ¶
type BrokerOptions struct {
// contains filtered or unexported fields
}
func BrokerQueueName ¶
func BrokerQueueName(queueName string) BrokerOptions
type CeleryBackend ¶
type CeleryBackend interface { GetResult(string) (*ResultMessage, error) // must be non-blocking SetResult(taskID string, result *ResultMessage) error }
CeleryBackend is interface for celery backend database
type CeleryBroker ¶
type CeleryBroker interface { SendCeleryMessage(*CeleryMessage) error GetTask() (*CeleryTask, error) // must be non-blocking }
CeleryBroker is interface for celery broker database
type CeleryClient ¶
type CeleryClient struct {
// contains filtered or unexported fields
}
func NewCeleryClient ¶
func NewCeleryClient(broker CeleryBroker, backend CeleryBackend) (*CeleryClient, error)
func (*CeleryClient) ApplyAsync ¶
func (*CeleryClient) Delay ¶
func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
Delay gets asynchronous result
func (*CeleryClient) DelayKwargs ¶
func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
DelayKwargs gets asynchronous results with argument map
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct { RoutingKey string `json:"routing_key"` Exchange string `json:"exchange"` }
CeleryDeliveryInfo represents deliveryinfo json
func NewCeleryDeliveryInfo ¶
func NewCeleryDeliveryInfo(routingKey string, exchange string) *CeleryDeliveryInfo
type CeleryMessage ¶
type CeleryMessage struct { // body是语言相关的,比如可以使用thrift // object[] args, // Mapping kwargs, // Mapping embed { // 'callbacks': Signature[] callbacks, // 'errbacks': Signature[] errbacks, // 'chain': Signature[] chain, // 'chord': Signature chord_callback, // } Body string `json:"body"` Headers ST_Headers `json:"headers"` Properties ST_Properties `json:"properties"` ContentType string `json:"content-type"` ContentEncoding string `json:"content-encoding"` }
CeleryMessage is actual message to be sent to Redis 参考:http://docs.celeryproject.org/en/latest/internals/protocol.html#definition example:
{ "body": "W1tdLCB7InkiOiAyODc4LCAieCI6IDU0NTZ9LCB7ImNob3JkIjogbnVsbCwgImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGx9XQ==", "headers": { "origin": "gen66194@DanceinydeMacBook-Pro.local", "root_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f", "expires": null, "shadow": null, "id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f", "kwargsrepr": "{'y': 2878, 'x': 5456}", "lang": "py", "retries": 0, "task": "worker.add_reflect", "group": null, "timelimit": [null, null], "parent_id": null, "argsrepr": "()", "eta": null }, "properties": { "priority": 0, "body_encoding": "base64", "correlation_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f", "reply_to": "2f6f7ea8-dcc3-30a7-ae0c-4eb03ae4910c", "delivery_info": { "routing_key": "celery", "exchange": "" }, "delivery_mode": 2, "delivery_tag": "a18604c0-5422-4592-877b-72e106744981" }, "content-type": "application/json", "content-encoding": "utf-8" }
type CeleryServer ¶
type CeleryServer struct {
// contains filtered or unexported fields
}
CeleryClient provides API for sending celery tasks
func NewCeleryServer ¶
func NewCeleryServer(broker CeleryBroker, backend CeleryBackend, numWorkers int) (*CeleryServer, error)
NewCeleryClient creates new celery client
func (*CeleryServer) Register ¶
func (cc *CeleryServer) Register(name string, task interface{})
Register task
func (*CeleryServer) StartWorker ¶
func (cc *CeleryServer) StartWorker()
StartWorker starts celery workers infinite loop
func (*CeleryServer) StopWorker ¶
func (cc *CeleryServer) StopWorker()
StopWorker stops celery workers
type CeleryTask ¶
type CeleryTask struct { Id string `json:"id"` Task string `json:"task"` Args []interface{} `json:"args"` // argsrepr Kwargs map[string]interface{} `json:"kwargs"` // kwargsrepr Retries int `json:"retries"` // Protocol 2: ISO8601,格式:"2006-01-02T15:04:05", 与RFC3339: "2006-01-02T15:04:05Z07:00"不同 ETA time.Time `json:"eta" time_format:"2006-01-02T15:04:05"` Expires time.Time `json:"expires" time_format:"2006-01-02T15:04:05"` Priority int `json:"priority"` Embed map[string]interface{} `json:"embed"` }
按照Celery现有的python实现,不是将CeleryTask直接进行json序列化
func Msg2Task ¶
func Msg2Task(msg *CeleryMessage) *CeleryTask
func (*CeleryTask) EncodeBody ¶
func (tm *CeleryTask) EncodeBody() string
EncodeBody returns base64 json encoded string
type CeleryWorker ¶
type CeleryWorker struct {
// contains filtered or unexported fields
}
CeleryWorker represents distributed task worker
func NewCeleryWorker ¶
func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker
NewCeleryWorker returns new celery worker
func (*CeleryWorker) GetNumWorkers ¶
func (w *CeleryWorker) GetNumWorkers() int
GetNumWorkers returns number of currently running workers
func (*CeleryWorker) GetTask ¶
func (w *CeleryWorker) GetTask(name string) interface{}
GetTask retrieves registered task
func (*CeleryWorker) Register ¶
func (w *CeleryWorker) Register(name string, task interface{})
Register registers tasks (functions)
func (*CeleryWorker) RunTask ¶
func (w *CeleryWorker) RunTask(message *CeleryTask) (*ResultMessage, error)
RunTask runs celery task
func (*CeleryWorker) StartWorker ¶
func (w *CeleryWorker) StartWorker()
StartWorker starts celery worker
func (*CeleryWorker) StopWorker ¶
func (w *CeleryWorker) StopWorker()
StopWorker stops celery workers
type Itf_CeleryTask ¶
type Itf_CeleryTask interface { // ParseKwargs - define a method to parse kwargs ParseKwargs(map[string]interface{}) error // RunTask - define a method to run RunTask() (interface{}, error) }
Itf_CeleryTask is an interface that represents actual task Passing Itf_CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()
type PythonBody ¶
type PythonBody struct { Args []interface{} Kwargs map[string]interface{} Embed map[string]interface{} }
func DecodeBody ¶
func DecodeBody(encodedBody string) *PythonBody
DecodeTaskMessage decodes base64 encrypted body
func (*PythonBody) UnmarshalJSON ¶
func (body *PythonBody) UnmarshalJSON(buf []byte) error
type RedisCeleryBackend ¶
RedisCeleryBackend is CeleryBackend for Redis
func NewRedisCeleryBackend ¶
func NewRedisCeleryBackend(host string, port int, db int, pass string) *RedisCeleryBackend
Support Broker Options: https://github.com/gocelery/gocelery/pull/31
func (*RedisCeleryBackend) GetResult ¶
func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult calls API to get asynchronous result Should be called by AsyncResult
func (*RedisCeleryBackend) SetResult ¶
func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult pushes result back into backend
type RedisCeleryBroker ¶
type RedisCeleryBroker struct { *redis.Pool QueueName string // contains filtered or unexported fields }
RedisCeleryBroker is CeleryBroker for Redis
func NewRedisCeleryBroker ¶
func NewRedisCeleryBroker(host string, port int, db int, pass string, options ...BrokerOptions) *RedisCeleryBroker
func (*RedisCeleryBroker) GetCeleryMessage ¶
func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)
GetCeleryMessage retrieves celery message from redis queue
func (*RedisCeleryBroker) GetTask ¶
func (cb *RedisCeleryBroker) GetTask() (*CeleryTask, error)
GetTask retrieves task message from redis queue
func (*RedisCeleryBroker) SendCeleryMessage ¶
func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to redis queue
type ResultMessage ¶
type ResultMessage struct { ID string `json:"task_id"` Status string `json:"status"` Traceback interface{} `json:"traceback"` Result interface{} `json:"result"` Children []interface{} `json:"children"` }
ResultMessage is return message received from broker
type ST_Headers ¶
type ST_Headers struct { Lang string `json:"lang"` Task string `json:"task"` // task name TaskId string `json:"id"` // uuid RootId string `json:"root_id"` // uuid ParentId string `json:"parent_id"` // uuid Group string `json:"group"` // uuid group_id Retries int `json:"retries"` ETA time.Time `json:"eta" time_format:"2006-01-02T15:04:05"` Expires time.Time `json:"expires" time_format:"2006-01-02T15:04:05"` Origin string `json:"origin"` // optional Shadow string `json:"shadow"` // alias_name, optional ArgsRepr string `json:"argsrepr"` KwargsRepr string `json:"kwargsrepr"` TimeLimit [2]string `json:"timelimit"` }
真实json示例:
"headers": { "origin": "gen66194@DanceinydeMacBook-Pro.local", "root_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f", "expires": null, "shadow": null, "id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f", "kwargsrepr": "{'y': 2878, 'x': 5456}", "lang": "py", "retries": 0, "task": "worker.add_reflect", "group": null, "timelimit": [null, null], "parent_id": null, "argsrepr": "()", "eta": null },
type ST_Properties ¶
type ST_Properties struct { // ContentEncoding string `json:"content_encoding"` // 事实上该字段移动到与properties并列的层级了 // ContentType string `json:"content_type"` // 事实上该字段移动到与properties并列的层级了 CorrelationID string `json:"correlation_id"` ReplyTo string `json:"replay_to"` // 下面的在Celery文档中未曾提及 BodyEncoding string `json:"body_encoding"` Priority int `json:"priority"` DeliveryInfo CeleryDeliveryInfo `json:"delivery_info"` DeliveryMode int `json:"delivery_mode"` DeliveryTag string `json:"delivery_tag"` CorrelationId string `json:"correlation_id"` }
"properties": { "priority": 0, "body_encoding": "base64", "correlation_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f", "reply_to": "2f6f7ea8-dcc3-30a7-ae0c-4eb03ae4910c", "delivery_info": { "routing_key": "celery", "exchange": "" }, "delivery_mode": 2, "delivery_tag": "a18604c0-5422-4592-877b-72e106744981" },