celery

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2023 License: MIT Imports: 11 Imported by: 6

README

Gopher Celery 🥬

Documentation Go Report Card

The objective of this project is to provide the very basic mechanism to efficiently produce and consume Celery tasks on Go side. Therefore there are no plans to support all the rich features the Python version provides, such as tasks chains, etc. Even task result backend has no practical value in the context of Gopher Celery, so it wasn't taken into account. Note, Celery has no result backend enabled by default (it incurs overhead).

Typically one would want to use Gopher Celery when certain tasks on Python side take too long to complete or there is a big volume of tasks requiring lots of Python workers (expensive infrastructure).

This project offers a little bit more convenient API of https://github.com/gocelery/gocelery including support for Celery protocol v2.

Usage

The Celery app can be used as either a producer or consumer (worker). To send tasks to a queue for a worker to consume, use Delay method. In order to process a task you should register it using Register method.

def mytask(a, b):
    print(a + b)

For example, whenever a task mytask is popped from important queue, the Go function is executed with args and kwargs obtained from the task message. By default Redis broker (localhost) is used with json task message serialization.

app := celery.NewApp()
app.Register(
	"myproject.apps.myapp.tasks.mytask",
	"important",
	func(ctx context.Context, p *celery.TaskParam) error {
		p.NameArgs("a", "b")
		// Methods prefixed with Must panic if they can't find an argument name
		// or can't cast it to the corresponding type.
		// The panic doesn't affect other tasks execution; it's logged.
		fmt.Println(p.MustInt("a") + p.MustInt("b"))
		// Non-nil errors are logged.
		return nil
	},
)
if err := app.Run(context.Background()); err != nil {
	log.Printf("celery worker error: %v", err)
}

Here is an example of sending mytask task to important queue with a=2, b=3 arguments. If a task is processed on Python side, you don't need to register the task or run the app.

app := celery.NewApp()
err := app.Delay(
	"myproject.apps.myapp.tasks.mytask",
	"important",
	2,
	3,
)
if err != nil {
	log.Printf("failed to send mytask: %v", err)
}

More examples can be found in the examples dir. Note, you'll need a Redis server to run them.

$ redis-server
$ cd ./examples/
Sending tasks from Go and receiving them on Python side.
$ go run ./producer/
{"err":null,"msg":"task was sent using protocol v2"}
{"err":null,"msg":"task was sent using protocol v1"}
$ celery --app myproject worker --queues important --loglevel=debug --without-heartbeat --without-mingle
...
[... WARNING/ForkPoolWorker-1] received a=fizz b=bazz
[... WARNING/ForkPoolWorker-8] received a=fizz b=bazz
Sending tasks from Python and receiving them on Go side.
$ python producer.py
$ go run ./consumer/
{"msg":"waiting for tasks..."}
received a=fizz b=bazz
received a=fizz b=bazz

Most likely your Redis server won't be running on localhost when the service is deployed, so you would need to pass a connection pool to the broker.

Redis connection pool.
$ go run ./producer/
{"err":null,"msg":"task was sent using protocol v2"}
{"err":null,"msg":"task was sent using protocol v1"}
$ go run ./redis/
Prometheus task metrics.
$ go run ./producer/
$ go run ./metrics/
$ curl http://0.0.0.0:8080/metrics
# HELP task_duration_seconds How long it took in seconds to process a task.
# TYPE task_duration_seconds histogram
task_duration_seconds_bucket{task="myproject.mytask",le="0.016"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.032"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.064"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.128"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.256"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.512"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="1.024"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="2.048"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="4.096"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="8.192"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="16.384"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="32.768"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="60"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="+Inf"} 2
task_duration_seconds_sum{task="myproject.mytask"} 7.2802e-05
task_duration_seconds_count{task="myproject.mytask"} 2
# HELP tasks_total How many Celery tasks processed, partitioned by task name and error.
# TYPE tasks_total counter
tasks_total{error="false",task="myproject.mytask"} 2

Although there is no built-in support for task retries (publishing a task back to Redis), you can still retry the operation within the same goroutine.

Task retries.
$ go run ./retry/
...
{"attempt":1,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:23.401191Z"}
{"attempt":2,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:28.337204Z"}
{"attempt":3,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:37.279873Z"}

Testing

Tests require a Redis server running locally.

$ go test -v -count=1 ./...

Benchmarks help to spot performance changes as the project evolves and also compare performance of serializers. For example, based on the results below the protocol v2 is faster than v1 when encoding args:

  • 350 nanoseconds mean time, 3 allocations (248 bytes) with 0% variation across the samples
  • 1.21 microseconds mean time, 4 allocations (672 bytes) with 0% variation across the samples

It is recommended to run benchmarks multiple times and check how stable they are using Benchstat tool.

$ go test -bench=. -benchmem -count=10 ./internal/... | tee bench-new.txt
$ benchstat bench-old.txt
name                                  time/op
JSONSerializerEncode_v2NoParams-12    2.97ns ± 1%
JSONSerializerEncode_v2Args-12         350ns ± 0%
JSONSerializerEncode_v2Kwargs-12       582ns ± 0%
JSONSerializerEncode_v2ArgsKwargs-12   788ns ± 1%
JSONSerializerEncode_v1NoParams-12    1.12µs ± 1%
JSONSerializerEncode_v1Args-12        1.21µs ± 0%
JSONSerializerEncode_v1Kwargs-12      1.68µs ± 0%
JSONSerializerEncode_v1ArgsKwargs-12  1.77µs ± 0%

name                                  alloc/op
JSONSerializerEncode_v2NoParams-12     0.00B
JSONSerializerEncode_v2Args-12          248B ± 0%
JSONSerializerEncode_v2Kwargs-12        472B ± 0%
JSONSerializerEncode_v2ArgsKwargs-12    528B ± 0%
JSONSerializerEncode_v1NoParams-12      672B ± 0%
JSONSerializerEncode_v1Args-12          672B ± 0%
JSONSerializerEncode_v1Kwargs-12      1.00kB ± 0%
JSONSerializerEncode_v1ArgsKwargs-12  1.00kB ± 0%

name                                  allocs/op
JSONSerializerEncode_v2NoParams-12      0.00
JSONSerializerEncode_v2Args-12          3.00 ± 0%
JSONSerializerEncode_v2Kwargs-12        7.00 ± 0%
JSONSerializerEncode_v2ArgsKwargs-12    8.00 ± 0%
JSONSerializerEncode_v1NoParams-12      4.00 ± 0%
JSONSerializerEncode_v1Args-12          4.00 ± 0%
JSONSerializerEncode_v1Kwargs-12        10.0 ± 0%
JSONSerializerEncode_v1ArgsKwargs-12    10.0 ± 0%

The old and new stats are compared as follows.

$ benchstat bench-old.txt bench-new.txt

Documentation

Overview

Package celery helps to work with Celery (place tasks in queues and execute them).

Index

Examples

Constants

View Source
const (
	// ContextKeyTaskName is a context key to access task names.
	ContextKeyTaskName contextKey = iota
)
View Source
const DefaultMaxWorkers = 1000

DefaultMaxWorkers is the default upper limit of goroutines allowed to process Celery tasks. Note, the workers are launched only when there are tasks to process.

Let's say it takes ~5s to process a task on average, so 1000 goroutines should be able to handle 200 tasks per second (X = N / R = 1000 / 5) according to Little's law N = X * R.

Variables

This section is empty.

Functions

This section is empty.

Types

type App

type App struct {
	// contains filtered or unexported fields
}

App is a Celery app to produce or consume tasks asynchronously.

func NewApp

func NewApp(options ...Option) *App

NewApp creates a Celery app. The default broker is Redis assumed to run on localhost. When producing tasks the default message serializer is json and protocol is v2.

func (*App) ApplyAsync added in v0.0.4

func (a *App) ApplyAsync(path, queue string, p *AsyncParam) error

ApplyAsync sends a task message.

func (*App) Delay

func (a *App) Delay(path, queue string, args ...interface{}) error

Delay is a shortcut to send a task message, i.e., it places the task associated with given Python path into queue.

func (*App) Register

func (a *App) Register(path, queue string, task TaskF)

Register associates the task with given Python path and queue. For example, when "myproject.apps.myapp.tasks.mytask" is seen in "important" queue, the TaskF task is executed.

Note, the method is not concurrency safe. The tasks mustn't be registered after the app starts processing tasks.

func (*App) Run

func (a *App) Run(ctx context.Context) error

Run launches the workers that process the tasks received from the broker. The call is blocking until ctx is cancelled. The caller mustn't register any new tasks at this point.

type AsyncParam added in v0.0.4

type AsyncParam struct {
	// Args is a list of arguments.
	// It will be an empty list if not provided.
	Args []interface{}
	// Kwargs is a dictionary of keyword arguments.
	// It will be an empty dictionary if not provided.
	Kwargs map[string]interface{}
	// Expires is an expiration date.
	// If not provided the message will never expire.
	Expires time.Time
}

AsyncParam represents parameters for sending a task message.

type Broker

type Broker interface {
	// Send puts a message to a queue.
	// Note, the method is safe to call concurrently.
	Send(msg []byte, queue string) error
	// Observe sets the queues from which the tasks should be received.
	// Note, the method is not concurrency safe.
	Observe(queues []string)
	// Receive returns a raw message from one of the queues.
	// It blocks until there is a message available for consumption.
	// Note, the method is not concurrency safe.
	Receive() ([]byte, error)
}

Broker is responsible for receiving and sending task messages. For example, it knows how to read a message from a given queue in Redis. The messages can be in defferent formats depending on Celery protocol version.

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config represents Celery settings.

type Middleware added in v0.0.3

type Middleware func(next TaskF) TaskF

Middleware is a chainable behavior modifier for tasks. For example, a caller can collect task metrics.

type Option

type Option func(*Config)

Option sets up a Config.

func WithBroker

func WithBroker(broker Broker) Option

WithBroker allows a caller to replace the default broker.

func WithCustomTaskSerializer

func WithCustomTaskSerializer(serializer protocol.Serializer, mime, encoding string) Option

WithCustomTaskSerializer registers a custom serializer where mime is the mime-type describing the serialized structure, e.g., application/json, and encoding is the content encoding which is usually utf-8 or binary.

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets a structured logger.

func WithMaxWorkers

func WithMaxWorkers(n int) Option

WithMaxWorkers sets an upper limit of goroutines allowed to process Celery tasks.

func WithMiddlewares added in v0.0.3

func WithMiddlewares(chain ...Middleware) Option

WithMiddlewares sets a chain of task middlewares. The first middleware is treated as the outermost middleware.

func WithTaskProtocol

func WithTaskProtocol(version int) Option

WithTaskProtocol sets the default task message protocol version used to send tasks. It is equivalent to CELERY_TASK_PROTOCOL in Python.

func WithTaskSerializer

func WithTaskSerializer(mime string) Option

WithTaskSerializer sets a serializer mime-type, e.g., the message's body is encoded in JSON when a task is sent to the broker. It is equivalent to CELERY_TASK_SERIALIZER in Python.

type TaskF

type TaskF func(ctx context.Context, p *TaskParam) error

TaskF represents a Celery task implemented by the client. The error doesn't affect anything, it's logged though.

type TaskParam

type TaskParam struct {
	// contains filtered or unexported fields
}

TaskParam provides access to task's positional and keyword arguments. A task function might not know upfront how parameters will be supplied from the caller. They could be passed as positional arguments f(2, 3), keyword arguments f(a=2, b=3) or a mix of both f(2, b=3). In this case the arguments should be named and accessed by name, see NameArgs and Get methods.

Methods prefixed with Must panic if they can't find an argument name or can't cast it to the corresponding type. The panic is logged by a worker and it doesn't affect other tasks.

Example
var (
	args   = []interface{}{2}
	kwargs = map[string]interface{}{"b": 3}
)
p := NewTaskParam(args, kwargs)
p.NameArgs("a", "b")

fmt.Println(p.Get("a"))
fmt.Println(p.Get("b"))
fmt.Println(p.Get("c"))
Output:

2 true
3 true
<nil> false

func NewTaskParam

func NewTaskParam(args []interface{}, kwargs map[string]interface{}) *TaskParam

NewTaskParam returns a task param which facilitates access to args and kwargs.

func (*TaskParam) Args

func (p *TaskParam) Args() []interface{}

Args returns task's positional arguments.

func (*TaskParam) Get

func (p *TaskParam) Get(name string) (v interface{}, ok bool)

Get returns a parameter by name. Firstly it tries to look it up in Kwargs, and then in Args if their names were provided by the client.

func (*TaskParam) Kwargs

func (p *TaskParam) Kwargs() map[string]interface{}

Kwargs returns task's keyword arguments.

func (*TaskParam) MustBool

func (p *TaskParam) MustBool(name string) bool

MustBool looks up a parameter by name and casts it to boolean. It panics if a parameter is missing or of a wrong type.

func (*TaskParam) MustFloat

func (p *TaskParam) MustFloat(name string) float64

MustFloat looks up a parameter by name and casts it to float. It panics if a parameter is missing or of a wrong type.

func (*TaskParam) MustInt

func (p *TaskParam) MustInt(name string) int

MustInt looks up a parameter by name and casts it to integer. It panics if a parameter is missing or of a wrong type.

func (*TaskParam) MustString

func (p *TaskParam) MustString(name string) string

MustString looks up a parameter by name and casts it to string. It panics if a parameter is missing or of a wrong type.

func (*TaskParam) NameArgs

func (p *TaskParam) NameArgs(name ...string)

NameArgs assigns names to the task arguments.

Directories

Path Synopsis
examples module
Package goredis implements a Celery broker using Redis and https://github.com/redis/go-redis.
Package goredis implements a Celery broker using Redis and https://github.com/redis/go-redis.
internal
Package protocol provides means to encode/decode task messages as described in https://github.com/celery/celery/blob/master/docs/internals/protocol.rst.
Package protocol provides means to encode/decode task messages as described in https://github.com/celery/celery/blob/master/docs/internals/protocol.rst.
Package redis implements a Celery broker using Redis and github.com/gomodule/redigo.
Package redis implements a Celery broker using Redis and github.com/gomodule/redigo.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL