gocelery

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2019 License: MIT Imports: 14 Imported by: 0

README

gocelery

Go Client/Server for Celery Distributed Task Queue

With new features updated contributed by @Danceiny.

GoDoc License motivation

New features compared with gocelery/gocelery (original author)

  • [*] ApplyAsync call just like that in Python (currently supported in go client).
  • TODO: Support More options in go worker.

Notice

  • syscall.Exec task is not allowed (will executed only once and the worker will exit)

Having being involved in a number of projects migrating server from python to go, I have realized Go can help improve performance of existing python web applications. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,

Celery Worker Example

Run Celery Worker implemented in Go

// example/worker/main.go

// Celery Task
func add(a int, b int) int {
	return a + b
}

func main() {
    // create broker and backend
	celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
    celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")

    // use AMQP instead
    // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
    // celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")

	// Configure with 2 celery workers
	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 2)

	// worker.add name reflects "add" task method found in "worker.py"
	celeryClient.Register("worker.add", add)

    // Start Worker - blocking method
	go celeryClient.StartWorker()

    // Wait 30 seconds and stop all workers
	time.Sleep(30 * time.Second)
	celeryClient.StopWorker()
}
go run example/worker/main.go

You can use custom struct instead to hold shared structures.


type MyStruct struct {
	MyInt int
}

func (so *MyStruct) add(a int, b int) int {
	return a + b + so.MyInt
}

// code omitted ...

ms := &MyStruct{10}
celeryClient.Register("worker.add", ms.add)

// code omitted ...

Submit Task from Python Client

# example/test.py

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    # submit celery task to be executed in Go workers
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())
python example/test.py

Celery Client Example

Run Celery Worker implemented in Python

# example/worker.py

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
cd example
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

Submit Task from Go Client

func main() {
    // create broker and backend
	celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
    celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")

    // use AMQP instead
    // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
    // celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")

    // create client
	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 0)

    // send task
	asyncResult, err := celeryClient.Delay("worker.add", 3, 5)
	if err != nil {
		panic(err)
	}

    // check if result is ready
	isReady, _ := asyncResult.Ready()
	fmt.Printf("ready status %v\n", isReady)

    // get result with 5s timeout
	res, err = asyncResult.Get(5 * time.Second)
	if err != nil {
		fmt.Println(err)
	} else {
        fmt.Println(res)
    }
}
go run example/client/main.go

Sample Celery Task Message (Protocol 2)

// 参考:http://docs.celeryproject.org/en/latest/internals/protocol.html#definition
// example:
/*
{
	"body": "W1tdLCB7InkiOiAyODc4LCAieCI6IDU0NTZ9LCB7ImNob3JkIjogbnVsbCwgImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGx9XQ==",
	"headers": {
		"origin": "gen66194@DanceinydeMacBook-Pro.local",
		"root_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
		"expires": null,
		"shadow": null,
		"id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
		"kwargsrepr": "{'y': 2878, 'x': 5456}",
		"lang": "py",
		"retries": 0,
		"task": "worker.add_reflect",
		"group": null,
		"timelimit": [null, null],
		"parent_id": null,
		"argsrepr": "()",
		"eta": null
	},
	"properties": {
		"priority": 0,
		"body_encoding": "base64",
		"correlation_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
		"reply_to": "2f6f7ea8-dcc3-30a7-ae0c-4eb03ae4910c",
		"delivery_info": {
			"routing_key": "celery",
			"exchange": ""
		},
		"delivery_mode": 2,
		"delivery_tag": "a18604c0-5422-4592-877b-72e106744981"
	},
	"content-type": "application/json",
	"content-encoding": "utf-8"
}

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

Documentation

Overview

Package gocelery is Celery Distributed Itf_CeleryTask Queue in Go

Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

This package can also be used as pure go distributed task queue.

Supported brokers/backends

  • Redis (broker/backend)
  • AMQP (broker/backend)

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']  # Ignore other content
CELERY_RESULT_SERIALIZER='json'
CELERY_ENABLE_UTC=True

Actually, the newest version (Right now, 2018.11, is v4.2.1) of Celery is configured defaultly to use json serializer.

Index

Constants

This section is empty.

Variables

View Source
var (
	ISO8601 = "2006-01-02T15:04:05"
)

GLOBAL 替换掉 encoding/json

Functions

func GetRealValue

func GetRealValue(val *reflect.Value) interface{}

GetRealValue returns real value of reflect.Value Required for JSON Marshalling

func NewAMQPConnection

func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)

NewAMQPConnection creates new AMQP channel

func NewRedisPool

func NewRedisPool(host string, port int, db int, pass string) *redis.Pool

NewRedisPool creates pool of redis connections

Types

type AMQPCeleryBackend

type AMQPCeleryBackend struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

AMQPCeleryBackend CeleryBackend for AMQP

func NewAMQPCeleryBackend

func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend

NewAMQPCeleryBackend creates new AMQPCeleryBackend

func (*AMQPCeleryBackend) GetResult

func (b *AMQPCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult retrieves result from AMQP queue

func (*AMQPCeleryBackend) Reconnect

func (b *AMQPCeleryBackend) Reconnect()

Reconnect reconnects to AMQP server

func (*AMQPCeleryBackend) SetResult

func (b *AMQPCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult sets result back to AMQP queue

type AMQPCeleryBroker

type AMQPCeleryBroker struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

AMQPCeleryBroker is RedisBroker for AMQP

func NewAMQPCeleryBroker

func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker

NewAMQPCeleryBroker creates new AMQPCeleryBroker

func (*AMQPCeleryBroker) CreateExchange

func (b *AMQPCeleryBroker) CreateExchange() error

CreateExchange declares AMQP exchange with stored configuration

func (*AMQPCeleryBroker) CreateQueue

func (b *AMQPCeleryBroker) CreateQueue() error

CreateQueue declares AMQP Queue with stored configuration

func (*AMQPCeleryBroker) GetTask

func (b *AMQPCeleryBroker) GetTask() (*CeleryTask, error)

GetTask retrieves task message from AMQP queue

func (*AMQPCeleryBroker) SendCeleryMessage

func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error

SendCeleryMessage sends CeleryMessage to broker

func (*AMQPCeleryBroker) StartConsumingChannel

func (b *AMQPCeleryBroker) StartConsumingChannel() error

StartConsumingChannel spawns receiving channel on AMQP queue

type AMQPExchange

type AMQPExchange struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
}

AMQPExchange stores AMQP Exchange configuration

func NewAMQPExchange

func NewAMQPExchange(name string) *AMQPExchange

NewAMQPExchange creates new AMQPExchange

type AMQPQueue

type AMQPQueue struct {
	Name       string
	Durable    bool
	AutoDelete bool
}

AMQPQueue stores AMQP Queue configuration

func NewAMQPQueue

func NewAMQPQueue(name string) *AMQPQueue

NewAMQPQueue creates new AMQPQueue

type AsyncResult

type AsyncResult struct {
	// contains filtered or unexported fields
}

AsyncResult is pending result

func (*AsyncResult) AsyncGet

func (ar *AsyncResult) AsyncGet() (interface{}, error)

AsyncGet gets actual result from redis and returns nil if not available

func (*AsyncResult) Get

func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)

Get gets actual result from redis It blocks for period of time set by timeout and return error if unavailable

func (*AsyncResult) GetTaskId

func (ar *AsyncResult) GetTaskId() string

func (*AsyncResult) Ready

func (ar *AsyncResult) Ready() (bool, error)

Ready checks if actual result is ready

type BrokerOptions

type BrokerOptions struct {
	// contains filtered or unexported fields
}

func BrokerQueueName

func BrokerQueueName(queueName string) BrokerOptions

type CeleryBackend

type CeleryBackend interface {
	GetResult(string) (*ResultMessage, error) // must be non-blocking
	SetResult(taskID string, result *ResultMessage) error
}

CeleryBackend is interface for celery backend database

type CeleryBroker

type CeleryBroker interface {
	SendCeleryMessage(*CeleryMessage) error
	GetTask() (*CeleryTask, error) // must be non-blocking
}

CeleryBroker is interface for celery broker database

type CeleryClient

type CeleryClient struct {
	// contains filtered or unexported fields
}

func NewCeleryClient

func NewCeleryClient(broker CeleryBroker, backend CeleryBackend) (*CeleryClient, error)

func (*CeleryClient) ApplyAsync

func (cc *CeleryClient) ApplyAsync(task string, args []interface{}, kwargs map[string]interface{},
	expires *time.Time, eta *time.Time, retry bool, queue string,
	priority int, routingKey string, exchange string) (*AsyncResult, error)

func (*CeleryClient) Delay

func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)

Delay gets asynchronous result

func (*CeleryClient) DelayKwargs

func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)

DelayKwargs gets asynchronous results with argument map

type CeleryDeliveryInfo

type CeleryDeliveryInfo struct {
	RoutingKey string `json:"routing_key"`
	Exchange   string `json:"exchange"`
}

CeleryDeliveryInfo represents deliveryinfo json

func NewCeleryDeliveryInfo

func NewCeleryDeliveryInfo(routingKey string, exchange string) *CeleryDeliveryInfo

type CeleryMessage

type CeleryMessage struct {
	// body是语言相关的,比如可以使用thrift
	//     object[] args,
	//    Mapping kwargs,
	//    Mapping embed {
	//        'callbacks': Signature[] callbacks,
	//        'errbacks': Signature[] errbacks,
	//        'chain': Signature[] chain,
	//        'chord': Signature chord_callback,
	//    }
	Body            string        `json:"body"`
	Headers         ST_Headers    `json:"headers"`
	Properties      ST_Properties `json:"properties"`
	ContentType     string        `json:"content-type"`
	ContentEncoding string        `json:"content-encoding"`
}

CeleryMessage is actual message to be sent to Redis 参考:http://docs.celeryproject.org/en/latest/internals/protocol.html#definition example:

{
	"body": "W1tdLCB7InkiOiAyODc4LCAieCI6IDU0NTZ9LCB7ImNob3JkIjogbnVsbCwgImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGx9XQ==",
	"headers": {
		"origin": "gen66194@DanceinydeMacBook-Pro.local",
		"root_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
		"expires": null,
		"shadow": null,
		"id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
		"kwargsrepr": "{'y': 2878, 'x': 5456}",
		"lang": "py",
		"retries": 0,
		"task": "worker.add_reflect",
		"group": null,
		"timelimit": [null, null],
		"parent_id": null,
		"argsrepr": "()",
		"eta": null
	},
	"properties": {
		"priority": 0,
		"body_encoding": "base64",
		"correlation_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
		"reply_to": "2f6f7ea8-dcc3-30a7-ae0c-4eb03ae4910c",
		"delivery_info": {
			"routing_key": "celery",
			"exchange": ""
		},
		"delivery_mode": 2,
		"delivery_tag": "a18604c0-5422-4592-877b-72e106744981"
	},
	"content-type": "application/json",
	"content-encoding": "utf-8"
}

func Task2Msg

func Task2Msg(task *CeleryTask) *CeleryMessage

* CeleryTask -> CeleryMessage

type CeleryServer

type CeleryServer struct {
	// contains filtered or unexported fields
}

CeleryClient provides API for sending celery tasks

func NewCeleryServer

func NewCeleryServer(broker CeleryBroker, backend CeleryBackend, numWorkers int) (*CeleryServer, error)

NewCeleryClient creates new celery client

func (*CeleryServer) Register

func (cc *CeleryServer) Register(name string, task interface{})

Register task

func (*CeleryServer) StartWorker

func (cc *CeleryServer) StartWorker()

StartWorker starts celery workers infinite loop

func (*CeleryServer) StopWorker

func (cc *CeleryServer) StopWorker()

StopWorker stops celery workers

type CeleryTask

type CeleryTask struct {
	Id      string                 `json:"id"`
	Task    string                 `json:"task"`
	Args    []interface{}          `json:"args"`   // argsrepr
	Kwargs  map[string]interface{} `json:"kwargs"` // kwargsrepr
	Retries int                    `json:"retries"`
	// Protocol 2: ISO8601,格式:"2006-01-02T15:04:05", 与RFC3339: "2006-01-02T15:04:05Z07:00"不同
	ETA      time.Time              `json:"eta" time_format:"2006-01-02T15:04:05"`
	Expires  time.Time              `json:"expires" time_format:"2006-01-02T15:04:05"`
	Priority int                    `json:"priority"`
	Embed    map[string]interface{} `json:"embed"`
}

按照Celery现有的python实现,不是将CeleryTask直接进行json序列化

func Msg2Task

func Msg2Task(msg *CeleryMessage) *CeleryTask

func (*CeleryTask) EncodeBody

func (tm *CeleryTask) EncodeBody() string

EncodeBody returns base64 json encoded string

type CeleryWorker

type CeleryWorker struct {
	// contains filtered or unexported fields
}

CeleryWorker represents distributed task worker

func NewCeleryWorker

func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker

NewCeleryWorker returns new celery worker

func (*CeleryWorker) GetNumWorkers

func (w *CeleryWorker) GetNumWorkers() int

GetNumWorkers returns number of currently running workers

func (*CeleryWorker) GetTask

func (w *CeleryWorker) GetTask(name string) interface{}

GetTask retrieves registered task

func (*CeleryWorker) Register

func (w *CeleryWorker) Register(name string, task interface{})

Register registers tasks (functions)

func (*CeleryWorker) RunTask

func (w *CeleryWorker) RunTask(message *CeleryTask) (*ResultMessage, error)

RunTask runs celery task

func (*CeleryWorker) StartWorker

func (w *CeleryWorker) StartWorker()

StartWorker starts celery worker

func (*CeleryWorker) StopWorker

func (w *CeleryWorker) StopWorker()

StopWorker stops celery workers

type Itf_CeleryTask

type Itf_CeleryTask interface {
	// ParseKwargs - define a method to parse kwargs
	ParseKwargs(map[string]interface{}) error

	// RunTask - define a method to run
	RunTask() (interface{}, error)
}

Itf_CeleryTask is an interface that represents actual task Passing Itf_CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()

type PythonBody

type PythonBody struct {
	Args   []interface{}
	Kwargs map[string]interface{}
	Embed  map[string]interface{}
}

func DecodeBody

func DecodeBody(encodedBody string) *PythonBody

DecodeTaskMessage decodes base64 encrypted body

func (*PythonBody) UnmarshalJSON

func (body *PythonBody) UnmarshalJSON(buf []byte) error

http://eagain.net/articles/go-json-array-to-struct/

type RedisCeleryBackend

type RedisCeleryBackend struct {
	*redis.Pool
}

RedisCeleryBackend is CeleryBackend for Redis

func NewRedisCeleryBackend

func NewRedisCeleryBackend(host string, port int, db int, pass string) *RedisCeleryBackend

Support Broker Options: https://github.com/gocelery/gocelery/pull/31

func (*RedisCeleryBackend) GetResult

func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult calls API to get asynchronous result Should be called by AsyncResult

func (*RedisCeleryBackend) SetResult

func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult pushes result back into backend

type RedisCeleryBroker

type RedisCeleryBroker struct {
	*redis.Pool
	QueueName string
	// contains filtered or unexported fields
}

RedisCeleryBroker is CeleryBroker for Redis

func NewRedisCeleryBroker

func NewRedisCeleryBroker(host string, port int, db int, pass string, options ...BrokerOptions) *RedisCeleryBroker

func (*RedisCeleryBroker) GetCeleryMessage

func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)

GetCeleryMessage retrieves celery message from redis queue

func (*RedisCeleryBroker) GetTask

func (cb *RedisCeleryBroker) GetTask() (*CeleryTask, error)

GetTask retrieves task message from redis queue

func (*RedisCeleryBroker) SendCeleryMessage

func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error

SendCeleryMessage sends CeleryMessage to redis queue

type ResultMessage

type ResultMessage struct {
	ID        string        `json:"task_id"`
	Status    string        `json:"status"`
	Traceback interface{}   `json:"traceback"`
	Result    interface{}   `json:"result"`
	Children  []interface{} `json:"children"`
}

ResultMessage is return message received from broker

type ST_Headers

type ST_Headers struct {
	Lang       string    `json:"lang"`
	Task       string    `json:"task"`      // task name
	TaskId     string    `json:"id"`        // uuid
	RootId     string    `json:"root_id"`   // uuid
	ParentId   string    `json:"parent_id"` // uuid
	Group      string    `json:"group"`     // uuid group_id
	Retries    int       `json:"retries"`
	ETA        time.Time `json:"eta" time_format:"2006-01-02T15:04:05"`
	Expires    time.Time `json:"expires" time_format:"2006-01-02T15:04:05"`
	Origin     string    `json:"origin"` // optional
	Shadow     string    `json:"shadow"` // alias_name, optional
	ArgsRepr   string    `json:"argsrepr"`
	KwargsRepr string    `json:"kwargsrepr"`
	TimeLimit  [2]string `json:"timelimit"`
}

真实json示例:

"headers": {
	"origin": "gen66194@DanceinydeMacBook-Pro.local",
	"root_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
	"expires": null,
	"shadow": null,
	"id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
	"kwargsrepr": "{'y': 2878, 'x': 5456}",
	"lang": "py",
	"retries": 0,
	"task": "worker.add_reflect",
	"group": null,
	"timelimit": [null, null],
	"parent_id": null,
	"argsrepr": "()",
	"eta": null
},

type ST_Properties

type ST_Properties struct {
	// ContentEncoding string `json:"content_encoding"` // 事实上该字段移动到与properties并列的层级了
	// ContentType     string `json:"content_type"`     // 事实上该字段移动到与properties并列的层级了
	CorrelationID string `json:"correlation_id"`
	ReplyTo       string `json:"replay_to"`
	// 下面的在Celery文档中未曾提及
	BodyEncoding  string             `json:"body_encoding"`
	Priority      int                `json:"priority"`
	DeliveryInfo  CeleryDeliveryInfo `json:"delivery_info"`
	DeliveryMode  int                `json:"delivery_mode"`
	DeliveryTag   string             `json:"delivery_tag"`
	CorrelationId string             `json:"correlation_id"`
}
"properties": {
	"priority": 0,
	"body_encoding": "base64",
	"correlation_id": "25abb5e6-d8c3-4b20-8dfb-7dc1be9ecf8f",
	"reply_to": "2f6f7ea8-dcc3-30a7-ae0c-4eb03ae4910c",
	"delivery_info": {
		"routing_key": "celery",
		"exchange": ""
	},
	"delivery_mode": 2,
	"delivery_tag": "a18604c0-5422-4592-877b-72e106744981"
},

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL