celery

package module
v0.0.0-...-e6c9a1c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2025 License: MIT Imports: 8 Imported by: 0

README

Celery Go Client

一个基于 Celery 2.0 协议的 Golang 客户端包,用于向 Celery 分布式任务队列发送任务。

特性

  • ✅ 完全兼容 Celery 协议版本 1 (Protocol v1)
  • ✅ 支持 Redis 和 RabbitMQ (AMQP) 作为消息代理
  • ✅ 支持位置参数和关键字参数
  • ✅ 支持任务调度 (ETA) 和过期时间
  • ✅ 支持自定义队列和交换机
  • ✅ 简洁易用的 API 设计
  • ✅ 完整的单元测试覆盖

安装

go get github.com/alaikis/celery-go-client

快速开始

使用 Redis 作为 Broker
package main

import (
    "context"
    "log"
    
    celery "github.com/alaikis/celery-go-client"
)

func main() {
    // 创建 Redis broker
    broker, err := celery.NewRedisBroker(celery.RedisBrokerConfig{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
        Queue:    "celery",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer broker.Close()

    // 创建 Celery 客户端
    client := celery.NewClient(celery.ClientConfig{
        Broker: broker,
        Queue:  "celery",
    })
    defer client.Close()

    // 发送任务
    ctx := context.Background()
    taskID, err := client.SendTaskWithArgs(ctx, "tasks.add", 10, 20)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Task sent! ID: %s", taskID)
}
使用 RabbitMQ (AMQP) 作为 Broker (默认 Base64 编码)
package main

import (
    "context"
    "log"
    
    celery "github.com/alaikis/celery-go-client"
)

func main() {
    // 创建 AMQP broker
    broker, err := celery.NewAMQPBroker(celery.AMQPBrokerConfig{
        URL:      "amqp://guest:guest@localhost:5672/",
        Exchange: "celery",
        Queue:    "celery",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer broker.Close()

    // 创建 Celery 客户端
    client := celery.NewClient(celery.ClientConfig{
        Broker:   broker,
        Queue:    "celery",
        Exchange: "celery",
    })
    
    // 如果需要发送原始 JSON 消息体 (非 Base64 编码),请使用以下配置:
    /*
    client := celery.NewClient(celery.ClientConfig{
        Broker:   broker,
        Queue:    "celery",
        Exchange: "celery",
        UseRawJSONBody: true, // 启用原始 JSON 消息体
    })
    */
    defer client.Close()

    // 发送任务
    ctx := context.Background()
    taskID, err := client.SendTaskWithArgs(ctx, "tasks.multiply", 5, 8)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Task sent! ID: %s", taskID)
}

使用示例

1. 发送带位置参数的任务 (默认 Base64 编码)
taskID, err := client.SendTaskWithArgs(ctx, "tasks.add", 10, 20)
2. 发送带关键字参数的任务 (默认 Base64 编码)
taskID, err := client.SendTaskWithKwargs(ctx, "tasks.process_data", map[string]interface{}{
    "name":   "John Doe",
    "age":    30,
    "active": true,
})
3. 发送带位置参数和关键字参数的任务 (默认 Base64 编码)
taskID, err := client.SendTask(ctx, "tasks.complex_task", &celery.TaskOptions{
    Args: []interface{}{"arg1", "arg2"},
    Kwargs: map[string]interface{}{
        "key1": "value1",
        "key2": 42,
    },
})
4. 发送定时任务 (ETA) (默认 Base64 编码)
eta := time.Now().Add(5 * time.Minute)
taskID, err := client.SendTask(ctx, "tasks.scheduled_task", &celery.TaskOptions{
    Args: []interface{}{"scheduled"},
    ETA:  &eta,
})
5. 发送带过期时间的任务 (默认 Base64 编码)
expires := time.Now().Add(10 * time.Minute)
taskID, err := client.SendTask(ctx, "tasks.temporary_task", &celery.TaskOptions{
    Args:    []interface{}{"data"},
    Expires: &expires,
})
6. 发送到指定队列 (默认 Base64 编码)
taskID, err := client.SendTaskToQueue(ctx, "tasks.priority_task", "high_priority",
    []interface{}{"urgent"},
    map[string]interface{}{"priority": "high"},
)

Python Worker 配置

为了与此 Go 客户端兼容,Python Celery worker 必须配置为使用协议版本 1JSON 序列化:

from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 重要配置
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    enable_utc=True,
    task_protocol=1,  # 必须设置为协议版本 1
)

@app.task(name='tasks.add')
def add(x, y):
    return x + y

运行 worker:

celery -A tasks worker --loglevel=info

项目结构

celery-go-client/
├── broker.go           # Broker 接口定义
├── client.go           # Celery 客户端实现
├── message.go          # 消息结构定义
├── redis_broker.go     # Redis Broker 实现
├── amqp_broker.go      # AMQP Broker 实现
├── message_test.go     # 单元测试
├── cmd/                # 示例程序
│   ├── redis_example/
│   │   └── main.go
│   ├── amqp_example/
│   │   └── main.go
│   └── python_worker.py
├── go.mod
├── go.sum
├── Makefile
├── LICENSE
└── README.md

API 文档

Client
NewClient(config ClientConfig) *Client

创建新的 Celery 客户端。

参数:

  • config.Broker: Broker 实现 (必需)
  • config.Queue: 默认队列名称 (默认: "celery")
  • config.Exchange: 默认交换机名称 (默认: "celery")
SendTask(ctx context.Context, taskName string, options *TaskOptions) (string, error)

发送任务到 Celery worker。

参数:

  • ctx: 上下文
  • taskName: 任务名称
  • options: 任务选项

返回: 任务 ID 和错误

SendTaskWithArgs(ctx context.Context, taskName string, args ...interface{}) (string, error)

便捷方法,发送带位置参数的任务。

SendTaskWithKwargs(ctx context.Context, taskName string, kwargs map[string]interface{}) (string, error)

便捷方法,发送带关键字参数的任务。

SendTaskToQueue(ctx context.Context, taskName, queue string, args []interface{}, kwargs map[string]interface{}) (string, error)

发送任务到指定队列。

Close() error

关闭客户端和 broker 连接。

TaskOptions
type TaskOptions struct {
    Queue    string                 // 覆盖默认队列
    Exchange string                 // 覆盖默认交换机
    ETA      *time.Time             // 预计执行时间
    Expires  *time.Time             // 任务过期时间
    Args     []interface{}          // 位置参数
    Kwargs   map[string]interface{} // 关键字参数
}
Redis Broker
NewRedisBroker(config RedisBrokerConfig) (*RedisBroker, error)

创建 Redis broker。

配置:

  • Addr: Redis 服务器地址 (例如: "localhost:6379")
  • Password: Redis 密码 (可选)
  • DB: Redis 数据库编号 (默认: 0)
  • Queue: 默认队列名称 (默认: "celery")
AMQP Broker
NewAMQPBroker(config AMQPBrokerConfig) (*AMQPBroker, error)

创建 AMQP (RabbitMQ) broker。

配置:

  • URL: AMQP 连接 URL (例如: "amqp://guest:guest@localhost:5672/")
  • Exchange: 交换机名称 (默认: "celery")
  • Queue: 队列名称 (默认: "celery")

协议说明

默认情况下,客户端使用 Base64 编码的 JSON 消息体,以确保与标准 Celery worker 的兼容性。

原始 JSON 消息体 (Raw JSON Body)

ClientConfig.UseRawJSONBody 设置为 true 时,客户端将直接发送 TaskMessage 的 JSON 字符串作为消息体,此时消息头将变为:

  • CeleryMessage.Body: TaskMessage 的原始 JSON 对象 (未转义)
  • CeleryMessage.ContentType: application/json
  • CeleryMessage.Properties.BodyEncoding: utf-8 (表示消息体是 utf-8 编码的原始 JSON)

这种模式通常用于 worker 端配置为接受原始 JSON 消息体的场景。

Base64 编码消息格式

本客户端实现了 Celery 协议版本 1,消息格式如下:

外层消息 (CeleryMessage, Base64 编码)
{
  "body": "<base64-encoded-task-message>",
  "content-type": "application/json",
  "content-encoding": "utf-8",
  "properties": {
    "body_encoding": "base64",
    "correlation_id": "<uuid>",
    "reply_to": "<uuid>",
    "delivery_info": {
      "priority": 0,
      "routing_key": "celery",
      "exchange": "celery"
    },
    "delivery_mode": 2,
    "delivery_tag": "<uuid>"
  }
}
内层消息 (TaskMessage, Base64 解码后)
{
  "id": "<task-uuid>",
  "task": "tasks.add",
  "args": [10, 20],
  "kwargs": {},
  "retries": 0,
  "eta": "2024-01-01T12:00:00Z"
}

测试

运行单元测试:

go test -v

运行测试覆盖率:

go test -cover

依赖

  • github.com/alaikis/celery-go-client - 本项目
  • github.com/google/uuid - UUID 生成
  • github.com/redis/go-redis/v9 - Redis 客户端
  • github.com/rabbitmq/amqp091-go - AMQP 客户端

注意事项

  1. 协议版本: 本客户端仅支持 Celery 协议版本 1。Python Celery 4.0+ 默认使用版本 2,必须显式设置 task_protocol=1

  2. 序列化格式: 仅支持 JSON 序列化,不支持 pickle。

  3. 结果后端: 本客户端仅实现任务发送功能,不包含结果获取功能。如需获取任务结果,请使用 Python 客户端或直接从结果后端读取。

  4. 连接管理: 建议在应用生命周期内复用 Client 实例,避免频繁创建和销毁连接。

许可证

MIT License

贡献

欢迎提交 Issue 和 Pull Request!

参考资料

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPBroker

type AMQPBroker struct {
	// contains filtered or unexported fields
}

AMQPBroker implements Broker interface for RabbitMQ/AMQP

func NewAMQPBroker

func NewAMQPBroker(config AMQPBrokerConfig) (*AMQPBroker, error)

NewAMQPBroker creates a new AMQP broker instance

func (*AMQPBroker) Close

func (ab *AMQPBroker) Close() error

Close closes the AMQP connection

func (*AMQPBroker) SendTask

func (ab *AMQPBroker) SendTask(ctx context.Context, message *CeleryMessage) error

SendTask sends a task message to RabbitMQ

type AMQPBrokerConfig

type AMQPBrokerConfig struct {
	URL      string // AMQP connection URL (e.g., "amqp://guest:guest@localhost:5672/")
	Exchange string // Exchange name (default: "celery")
	Queue    string // Queue name (default: "celery")
}

AMQPBrokerConfig contains configuration for AMQP broker

type Broker

type Broker interface {
	// SendTask sends a task message to the broker
	SendTask(ctx context.Context, message *CeleryMessage) error

	// Close closes the broker connection
	Close() error
}

Broker defines the interface for message brokers

type CeleryDeliveryInfo

type CeleryDeliveryInfo struct {
	Priority   int    `json:"priority"`
	RoutingKey string `json:"routing_key"`
	Exchange   string `json:"exchange"`
}

CeleryDeliveryInfo contains routing information

type CeleryMessage

type CeleryMessage struct {
	Body            json.RawMessage        `json:"body"`
	Headers         map[string]interface{} `json:"headers,omitempty"`
	ContentType     string                 `json:"content-type"`
	Properties      CeleryProperties       `json:"properties"`
	ContentEncoding string                 `json:"content-encoding"`
}

CeleryMessage is the outer message envelope sent to the broker

func NewCeleryMessage

func NewCeleryMessage(encodedBody []byte, queue, exchange string) *CeleryMessage

NewCeleryMessage creates a new Celery message envelope

func NewCeleryMessageWithEncoding

func NewCeleryMessageWithEncoding(body []byte, queue, exchange, contentType, bodyEncoding, contentEncoding string) *CeleryMessage

NewCeleryMessageWithEncoding creates a new Celery message envelope with custom encoding

func (*CeleryMessage) Encode

func (cm *CeleryMessage) Encode() ([]byte, error)

Encode serializes the Celery message to JSON

type CeleryProperties

type CeleryProperties struct {
	BodyEncoding  string             `json:"body_encoding"`
	CorrelationID string             `json:"correlation_id"`
	ReplyTo       string             `json:"reply_to"`
	DeliveryInfo  CeleryDeliveryInfo `json:"delivery_info"`
	DeliveryMode  int                `json:"delivery_mode"`
	DeliveryTag   string             `json:"delivery_tag"`
}

CeleryProperties contains message properties

type Client

type Client struct {

	// UseRawJSONBody is a flag to indicate whether to use raw JSON body instead of base64 encoded body
	// This is typically used for AMQP brokers where the worker is configured to accept raw JSON.
	UseRawJSONBody bool
	// contains filtered or unexported fields
}

Client is the main Celery client

func NewClient

func NewClient(config ClientConfig) *Client

NewClient creates a new Celery client

func (*Client) Close

func (c *Client) Close() error

Close closes the client and its broker connection

func (*Client) SendTask

func (c *Client) SendTask(ctx context.Context, taskName string, options *TaskOptions) (string, error)

SendTask sends a task to the Celery worker

func (*Client) SendTaskToQueue

func (c *Client) SendTaskToQueue(ctx context.Context, taskName, queue string, args []interface{}, kwargs map[string]interface{}) (string, error)

SendTaskToQueue sends a task to a specific queue

func (*Client) SendTaskWithArgs

func (c *Client) SendTaskWithArgs(ctx context.Context, taskName string, args ...interface{}) (string, error)

SendTaskWithArgs is a convenience method to send a task with positional arguments

func (*Client) SendTaskWithKwargs

func (c *Client) SendTaskWithKwargs(ctx context.Context, taskName string, kwargs map[string]interface{}) (string, error)

SendTaskWithKwargs is a convenience method to send a task with keyword arguments

type ClientConfig

type ClientConfig struct {
	Broker   Broker // The broker implementation to use
	Queue    string // Default queue name (default: "celery")
	Exchange string // Default exchange name (default: "celery")
	// UseRawJSONBody is a flag to indicate whether to use raw JSON body instead of base64 encoded body
	UseRawJSONBody bool
}

ClientConfig contains configuration for Celery client

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

RedisBroker implements Broker interface for Redis

func NewRedisBroker

func NewRedisBroker(config RedisBrokerConfig) (*RedisBroker, error)

NewRedisBroker creates a new Redis broker instance

func (*RedisBroker) Close

func (rb *RedisBroker) Close() error

Close closes the Redis connection

func (*RedisBroker) SendTask

func (rb *RedisBroker) SendTask(ctx context.Context, message *CeleryMessage) error

SendTask sends a task message to Redis

type RedisBrokerConfig

type RedisBrokerConfig struct {
	Addr     string // Redis server address (e.g., "localhost:6379")
	Password string // Redis password (empty if no password)
	DB       int    // Redis database number
	Queue    string // Default queue name (default: "celery")
}

RedisBrokerConfig contains configuration for Redis broker

type TaskMessage

type TaskMessage struct {
	ID      string                 `json:"id"`
	Task    string                 `json:"task"`
	Args    []interface{}          `json:"args,omitempty"`
	Kwargs  map[string]interface{} `json:"kwargs,omitempty"`
	Retries int                    `json:"retries,omitempty"`
	ETA     *string                `json:"eta,omitempty"`
	Expires *time.Time             `json:"expires,omitempty"`
}

TaskMessage represents the Celery task message (protocol v1) This is the inner message that gets base64 encoded in the body

func NewTaskMessage

func NewTaskMessage(taskName string, args []interface{}, kwargs map[string]interface{}) *TaskMessage

NewTaskMessage creates a new task message with default values

func (*TaskMessage) Encode

func (tm *TaskMessage) Encode() (string, error)

Encode serializes the task message to base64-encoded JSON

func (*TaskMessage) EncodeJSON

func (tm *TaskMessage) EncodeJSON() (string, error)

EncodeJSON serializes the task message to raw JSON string

func (*TaskMessage) SetETA

func (tm *TaskMessage) SetETA(eta time.Time)

SetETA sets the estimated time of arrival for the task

func (*TaskMessage) SetExpires

func (tm *TaskMessage) SetExpires(expires time.Time)

SetExpires sets the expiration time for the task

type TaskOptions

type TaskOptions struct {
	Queue    string                 // Override default queue
	Exchange string                 // Override default exchange
	ETA      *time.Time             // Estimated time of arrival
	Expires  *time.Time             // Task expiration time
	Args     []interface{}          // Positional arguments
	Kwargs   map[string]interface{} // Keyword arguments
}

TaskOptions contains options for task execution

Directories

Path Synopsis
cmd
amqp_example command
redis_example command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL