gocelerypub

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 12 Imported by: 0

README

Go Celery Publisher

A lightweight, goroutine-safe Go package for publishing Celery tasks to AMQP brokers (RabbitMQ) with a fire-and-forget mechanism.

Features

  • Simple API - Easy-to-use interface for publishing Celery tasks
  • Dual Publishing Modes - Choose between DirectMode (simple) or ChannelMode (goroutine-safe)
  • Thread-Safe - ChannelMode handles concurrent publishing from multiple goroutines safely
  • Automatic Reconnection - Built-in connection recovery and retry logic
  • No Worker Required - Pure publisher implementation, no consumer/worker needed
  • Celery Compatible - Generates standard Celery protocol v1 messages

Why This Package?

AMQP connections and channels are not goroutine-safe. In high-performance applications using goroutines, concurrent access can cause frame mixing issues or require creating new connections for each publish operation, leading to poor performance.

This package solves the problem by offering two modes:

  • DirectMode: Simple, single-threaded publishing (default)
  • ChannelMode: Uses a dedicated goroutine with an internal task channel to handle concurrent publishing safely

Installation

go get github.com/nine2onetech/gocelerypub

Quick Start

DirectMode (Simple)
package main

import (
    "log"
    publisher "github.com/nine2onetech/gocelerypub"
)

func main() {
    // Create a publisher in DirectMode (default)
    pub, err := publisher.New(publisher.Config{
        BrokerType: publisher.AMQP,
        HostURL:    "amqp://guest:guest@localhost:5672/",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pub.Close()

    // Publish a task
    err = pub.Publish(&publisher.PublishRequest{
        Queue:  "tasks",
        Task:   "myapp.tasks.add",
        Args:   []interface{}{1, 2},
        Kwargs: map[string]interface{}{"debug": true},
    })
    if err != nil {
        log.Fatal(err)
    }
}
ChannelMode (Goroutine-Safe)
package main

import (
    "log"
    "sync"
    publisher "github.com/nine2onetech/gocelerypub"
)

func main() {
    // Create a publisher in ChannelMode
    pub, err := publisher.New(publisher.Config{
        BrokerType:  publisher.AMQP,
        HostURL:     "amqp://guest:guest@localhost:5672/",
        PublishMode: publisher.ChannelMode,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pub.Close()

    // Publish from multiple goroutines safely
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            err := pub.Send(&publisher.PublishRequest{
                Queue:  "tasks",
                Task:   "myapp.tasks.process",
                Args:   []interface{}{id},
                Kwargs: map[string]interface{}{"worker_id": id},
            })
            if err != nil {
                log.Printf("Failed to publish task %d: %v", id, err)
            }
        }(i)
    }
    wg.Wait()
}

Configuration

type Config struct {
    BrokerType  BrokerType  // Type of broker (currently only AMQP supported)
    HostURL     string      // Broker connection URL
    PublishMode PublishMode // DirectMode or ChannelMode (default: DirectMode)
}
Broker Types
Type Description
AMQP RabbitMQ/AMQP broker
Publishing Modes
Mode Description Use Case Goroutine-Safe
DirectMode Publishes messages directly Simple, single-threaded applications ❌ No
ChannelMode Uses internal task channel with dedicated goroutine High-performance concurrent applications ✅ Yes

API Reference

Creating a Publisher
pub, err := publisher.New(publisher.Config{
    BrokerType:  publisher.AMQP,
    HostURL:     "amqp://guest:guest@localhost:5672/",
    PublishMode: publisher.ChannelMode, // or publisher.DirectMode
})
Publishing Methods
Publish(req *PublishRequest) error

Publishes a task directly. Not goroutine-safe in DirectMode.

err := pub.Publish(&publisher.PublishRequest{
    Queue:  "my_queue",
    Task:   "myapp.tasks.process",
    Args:   []interface{}{1, 2, 3},
    Kwargs: map[string]interface{}{"key": "value"},
})
Send(req *PublishRequest) error

Publishes a task through internal channel. Only available in ChannelMode. Goroutine-safe.

err := pub.Send(&publisher.PublishRequest{
    Queue:  "my_queue",
    Task:   "myapp.tasks.process",
    Args:   []interface{}{1, 2, 3},
    Kwargs: map[string]interface{}{"key": "value"},
})
Close() error

Gracefully shuts down the publisher, closing channels and broker connections.

defer pub.Close()
PublishRequest
type PublishRequest struct {
    Queue  string                 // Target queue name
    Task   string                 // Task name (e.g., "tasks.add")
    Args   []interface{}          // Positional arguments
    Kwargs map[string]interface{} // Keyword arguments
}

Architecture

DirectMode
┌─────────┐
│ Your    │
│ Code    │──Publish()──► Publisher ──► AMQP Broker
└─────────┘

Simple and straightforward, but not safe for concurrent goroutines.

ChannelMode
┌─────────┐
│Goroutine│──┐
└─────────┘  │
             │
┌─────────┐  │    ┌──────────┐    ┌────────────┐    ┌────────────┐
│Goroutine│──┼───►│Task Chan │───►│ Dedicated  │───►│    AMQP    │
└─────────┘  │    └──────────┘    │ Goroutine  │    │   Broker   │
             │                     └────────────┘    └────────────┘
┌─────────┐  │
│Goroutine│──┘
└─────────┘

All goroutines send requests to an internal channel. A single dedicated goroutine processes all AMQP operations sequentially, avoiding concurrency issues.

Error Handling

The package includes built-in error handling:

  • Automatic Reconnection: If the broker connection is lost, it automatically attempts to reconnect
  • PRECONDITION_FAILED Retry: On queue mismatch errors, it reconnects and retries once
  • Error Propagation: All errors are properly wrapped and returned to the caller

Testing

Run the test suite:

# Run all tests
go test ./...

# Run with coverage
go test -cover ./...

# Run with verbose output
go test -v ./...

The package includes comprehensive unit tests using mocks for the broker interface.

Examples

Publishing with Custom Task Options
req := &publisher.PublishRequest{
    Queue:  "priority_tasks",
    Task:   "myapp.tasks.urgent_process",
    Args:   []interface{}{"data1", "data2"},
    Kwargs: map[string]interface{}{
        "priority": "high",
        "timeout":  300,
    },
}

err := pub.Send(req)
if err != nil {
    log.Printf("Failed to publish: %v", err)
}
Handling Errors Gracefully
err := pub.Send(req)
if err != nil {
    if strings.Contains(err.Error(), "connection") {
        // Handle connection errors
        log.Println("Connection lost, will retry...")
    } else {
        // Handle other errors
        log.Printf("Publish failed: %v", err)
    }
}
Bulk Publishing
tasks := []string{"task1", "task2", "task3", "task4", "task5"}

var wg sync.WaitGroup
for _, task := range tasks {
    wg.Add(1)
    go func(t string) {
        defer wg.Done()

        err := pub.Send(&publisher.PublishRequest{
            Queue: "bulk_queue",
            Task:  t,
            Args:  []interface{}{},
        })
        if err != nil {
            log.Printf("Failed to publish %s: %v", t, err)
        }
    }(task)
}
wg.Wait()

Performance Considerations

  • ChannelMode adds minimal overhead (single channel operation) while providing full goroutine safety
  • The dedicated goroutine in ChannelMode processes requests sequentially, so throughput is limited by AMQP network latency
  • For maximum throughput, consider running multiple Publisher instances (each with its own dedicated goroutine)
  • Connection pooling is intentionally avoided due to AMQP's lack of thread-safety

Requirements

  • Go 1.24.0 or higher
  • RabbitMQ or another AMQP 0.9.1 compatible broker
  • Dependencies:
    • github.com/rabbitmq/amqp091-go
    • github.com/google/uuid

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

MIT License - see the LICENSE file for details

Acknowledgments

  • Celery Project for the message protocol specification
  • RabbitMQ team for the excellent AMQP Go client

Support

If you encounter any issues or have questions, please file an issue on the GitHub issue tracker.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumeMessage

func ConsumeMessage(t *testing.T, amqpURL, queueName string, timeout time.Duration) *amqp091.Delivery

ConsumeMessage consumes a single message from the specified queue

func DecodeTaskMessage

func DecodeTaskMessage(t *testing.T, delivery *amqp091.Delivery) map[string]interface{}

DecodeTaskMessage decodes the task message directly from AMQP delivery body The delivery body contains plain JSON TaskMessage (Celery Protocol v1 with utf-8 encoding)

func NewAMQPConnection

func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)

NewAMQPConnection creates new AMQP channel

func VerifyTaskMessageArgs

func VerifyTaskMessageArgs(t *testing.T, taskMsg map[string]interface{}, expectedArgs []interface{})

VerifyTaskMessageArgs verifies that the task message contains expected args

func VerifyTaskMessageKwargs

func VerifyTaskMessageKwargs(t *testing.T, taskMsg map[string]interface{}, expectedKwargs map[string]interface{})

VerifyTaskMessageKwargs verifies that the task message contains expected kwargs Note: This function checks that all expectedKwargs are present, but allows additional kwargs

Types

type AMQPBroker

type AMQPBroker struct {
	Channel    *amqp.Channel
	Connection *amqp.Connection
}

func NewAMQPBroker

func NewAMQPBroker(host string) *AMQPBroker

NewAMQPBroker creates new AMQPBroker

func (*AMQPBroker) CanPublish

func (b *AMQPBroker) CanPublish() bool

func (*AMQPBroker) Reconnect

func (b *AMQPBroker) Reconnect(host string) error

func (*AMQPBroker) SendCeleryMessage

func (b *AMQPBroker) SendCeleryMessage(msg *CeleryMessage) error

type Broker

type Broker interface {
	CanPublish() bool
	Reconnect(host string) error
	SendCeleryMessage(msg *CeleryMessage) error
}

type BrokerType

type BrokerType string

BrokerType represents the type of message broker to use.

var (
	AMQP BrokerType = "amqp" // RabbitMQ/AMQP broker
)

Supported broker types

type CeleryDeliveryInfo

type CeleryDeliveryInfo struct {
	Priority   int    `json:"priority"`
	RoutingKey string `json:"routing_key"`
	Exchange   string `json:"exchange"`
}

CeleryDeliveryInfo represents deliveryinfo json

type CeleryMessage

type CeleryMessage struct {
	Body            string                 `json:"body"`
	Headers         map[string]interface{} `json:"headers,omitempty"`
	ContentType     string                 `json:"content-type"`
	Properties      CeleryProperties       `json:"properties"`
	ContentEncoding string                 `json:"content-encoding"`
}

CeleryMessage is actual message to be sent to the broker https://docs.celeryq.dev/projects/kombu/en/stable/_modules/kombu/message.html

type CeleryProperties

type CeleryProperties struct {
	BodyEncoding  string             `json:"body_encoding"`
	CorrelationID string             `json:"correlation_id"`
	ReplyTo       string             `json:"reply_to"`
	DeliveryInfo  CeleryDeliveryInfo `json:"delivery_info"`
	DeliveryMode  int                `json:"delivery_mode"`
	DeliveryTag   string             `json:"delivery_tag"`
}

CeleryProperties represents properties json

type Config

type Config struct {
	BrokerType  BrokerType  // Type of broker to use (e.g., AMQP)
	HostURL     string      // Broker connection URL (e.g., "amqp://guest:guest@localhost:5672/")
	PublishMode PublishMode // Publishing mode (DirectMode or ChannelMode)
	// contains filtered or unexported fields
}

Config holds the configuration for a Publisher.

type PublishMode

type PublishMode string

PublishMode represents the publishing mode for the Publisher.

var (
	// DirectMode publishes messages directly without goroutine safety.
	// Use this mode for simple, single-threaded applications.
	// WARNING: Not safe for concurrent use from multiple goroutines.
	DirectMode PublishMode = "direct"

	// ChannelMode publishes messages through an internal task channel.
	// This mode is goroutine-safe and suitable for high-performance concurrent applications.
	// Uses a single dedicated goroutine to handle all AMQP operations sequentially.
	ChannelMode PublishMode = "channel"
)

Supported publish modes

type PublishRequest

type PublishRequest struct {
	Queue  string                 // Target queue name
	Task   string                 // Task name (e.g., "tasks.add")
	Args   []interface{}          // Positional arguments for the task
	Kwargs map[string]interface{} // Keyword arguments for the task
}

PublishRequest represents a request to publish a Celery task.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes Celery tasks to a message broker. It maintains a connection to the broker and handles message serialization.

func New

func New(cfg Config) (*Publisher, error)

New creates a new Publisher with the given configuration. It initializes the appropriate broker based on the BrokerType specified in the config. If PublishMode is not specified, it defaults to DirectMode. For ChannelMode, it starts a dedicated goroutine to handle publishing operations. Returns an error if the broker type is not supported.

func (*Publisher) CanPublish added in v0.1.0

func (p *Publisher) CanPublish() bool

func (*Publisher) Close

func (p *Publisher) Close() error

Close gracefully shuts down the Publisher. For ChannelMode, it closes the task channel and waits for the run() goroutine to finish. For both modes, it closes the broker connection. This method should be called when the Publisher is no longer needed to prevent resource leaks.

func (*Publisher) Publish

func (p *Publisher) Publish(req *PublishRequest) error

Publish publishes a Celery task to the specified queue. It creates a Celery-compatible message and sends it via the configured broker. The message uses the default exchange ("") and routes directly to the queue by name.

func (*Publisher) Reconnect added in v0.1.0

func (p *Publisher) Reconnect() error

func (*Publisher) Send

func (p *Publisher) Send(req *PublishRequest) error

Send publishes a Celery task using channel-based mode. This method is goroutine-safe and should be used when the Publisher is configured with ChannelMode. It sends the publish request to an internal channel where a dedicated goroutine handles the actual publishing.

The error channel is buffered to prevent blocking in the run() goroutine. Even if this method returns before the caller reads from errCh, the run() goroutine won't block.

Returns an error if publishing fails or if the Publisher is not in ChannelMode.

type RabbitMQTestContainer

type RabbitMQTestContainer struct {
	Container *rabbitmq.RabbitMQContainer
	AmqpURL   string
}

RabbitMQTestContainer represents a RabbitMQ test container

func SetupRabbitMQContainer

func SetupRabbitMQContainer(t *testing.T) (*RabbitMQTestContainer, func())

SetupRabbitMQContainer starts a RabbitMQ container for testing

type TaskMessage

type TaskMessage interface {
	ToCeleryMessage(deliveryInfo CeleryDeliveryInfo) *CeleryMessage
	Encode() (string, error)
	GetID() string
}

TaskMessage is interface for celery task messages TaskMessage composes the body of CeleryMessage

type TaskMessageV1

type TaskMessageV1 struct {
	ID        string                 `json:"id"`
	Task      string                 `json:"task"`
	Args      []interface{}          `json:"args"`
	Kwargs    map[string]interface{} `json:"kwargs"`
	Retries   int                    `json:"retries,omitempty"`
	ETA       *string                `json:"eta,omitempty"`
	Expires   *string                `json:"expires,omitempty"`
	Taskset   string                 `json:"taskset,omitempty"`   // Group ID (also called group)
	UTC       bool                   `json:"utc,omitempty"`       // Whether to use UTC timezone
	TimeLimit []float64              `json:"timelimit,omitempty"` // [soft, hard] time limits in seconds
}

TaskMessageV1 is celery-compatible message (protocol v1) https://celery-safwan.readthedocs.io/en/latest/internals/protocol.html#version-1

func (*TaskMessageV1) Encode

func (tm *TaskMessageV1) Encode() (string, error)

Encode returns json encoded string (without base64)

func (*TaskMessageV1) GetID

func (tm *TaskMessageV1) GetID() string

func (*TaskMessageV1) ToCeleryMessage

func (tm *TaskMessageV1) ToCeleryMessage(deliveryInfo CeleryDeliveryInfo) *CeleryMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL