celerygo

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2025 License: MIT Imports: 13 Imported by: 0

README

Celery-go

A minimal Celery client for Go that supports publishing tasks to Celery workers. This client follows the Celery 2.0 protocol.

Features

  • Supports task publishing (no task consumer yet)
  • Implements Celery 2.0 protocol
  • Built-in logging for better observability
  • Automatic connection re-establishment with exponential back-off
  • Thread-safety

Installation

go get github.com/Vipatra/celerygo

Usage

Import the package
import (
    "github.com/Vipatra/celerygo"
)
Example

package main

import (
"context"
"fmt"
"log"
"sync"
"time"

celerygo "github.com/Vipatra/celerygo"
)

func main() {
	client, err := celerygo.NewCeleryClient("amqp://<username>:<password>@localhost:<port>", "amqp", &celerygo.Options{
		MaxBackOffDuration: 5 * time.Second,
		LogLevel:           celerygo.ErrorLevel,
	})
	if err != nil {
		log.Fatal(err)
	}
	client.RegisterRoutes(map[string]string{
		"long_tasks.*": "long_tasks",
	})

	taskInfo, err := client.SendCeleryTask(context.Background(), "long_tasks.process_portfolio_jobs", nil, map[string]any{
		"org_id": 798798,
	}, &celerygo.AdditionalParameters{
		CountdownSeconds: celerygo.Ptr(30),
	})
	if err != nil {
		log.Println(fmt.Errorf("send task failed: %w", err))
		return
	}
	log.Println(fmt.Sprintf("task info %v",taskInfo))
}

Documentation

Index

Constants

View Source
const (
	ISO8601 = "2006-01-02T15:04:05.999999-07:00"

	AMQP = "amqp"

	CorrelationIdKey = "trace_id"
)
View Source
const (
	Debug      = LogLevel(logrus.InfoLevel)
	InfoLevel  = LogLevel(logrus.DebugLevel)
	WarnLevel  = LogLevel(logrus.WarnLevel)
	ErrorLevel = LogLevel(logrus.ErrorLevel)
	FatalLevel = LogLevel(logrus.FatalLevel)
	PanicLevel = LogLevel(logrus.PanicLevel)
	TraceLevel = LogLevel(logrus.TraceLevel)
)

Variables

This section is empty.

Functions

func GetBodyTuple

func GetBodyTuple(body Body) ([]interface{}, error)

func GetCtxValueString added in v1.0.2

func GetCtxValueString(ctx context.Context, key string) string

func GetOrigin

func GetOrigin() (string, error)

func GetRetryDelayWithExponentialBackOff

func GetRetryDelayWithExponentialBackOff(currentRetries int64) time.Duration

func Ptr

func Ptr[T any](v T) *T

Types

type AdditionalParameters

type AdditionalParameters struct {
	// Countdown expects int pointer which specifies after how many seconds
	// the task supposed to be delivered
	CountdownSeconds *int `json:"countdown_second"`
	// Eta expects time string to be in ISO8601 format
	Eta *string `json:"eta"`
	// TaskId is an unique identifier, generally UUID
	TaskId *string `json:"task_id"`
	// ExpiresSeconds task expiry in seconds
	ExpiresSeconds      *int          `json:"expires_seconds"`
	Method              *string       `json:"method"`
	GroupId             *string       `json:"group_id"`
	Group               *string       `json:"group"`
	GroupIndex          *int          `json:"group_index"`
	Retries             int           `json:"retries"`
	Chord               interface{}   `json:"chord"`
	ReplyTo             string        `json:"reply_to"`
	TimeLimit           []interface{} `json:"time_limit"`
	SoftTimeLimit       interface{}   `json:"soft_time_limit"`
	RootId              *string       `json:"root_id"`
	ParentId            *string       `json:"parent_id"`
	RouteName           *string       `json:"route_name"`
	Shadow              *string       `json:"shadow"`
	Chain               []interface{} `json:"chain"`
	Callbacks           []interface{} `json:"callbacks"`
	Errbacks            []interface{} `json:"errbacks"`
	TaskType            *string       `json:"task_type"`
	ReplacedTaskNesting int           `json:"replaced_task_nesting"`
	RoutingKey          *string       `json:"routing_key"`
	Exchange            string        `json:"exchange"`
	Mandatory           bool          `json:"mandatory"`
	Immediate           bool          `json:"immediate"`
	ContentType         *string       `json:"content_type"`
	ContentEncoding     *string       `json:"content_encoding"`
	Language            *string       `json:"language"`
	// CorrelationId - Added as an additional field to trace the flow of task. By default, celery-go extracts from context value (CorrelationIdKey). Capitalized to prevent clash with correlation_id of AMQP
	CorrelationId *string `json:"CORRELATION_ID"`
}

func (*AdditionalParameters) GetConsumerLanguage

func (i *AdditionalParameters) GetConsumerLanguage() string

func (*AdditionalParameters) GetContentEncoding

func (i *AdditionalParameters) GetContentEncoding() string

func (*AdditionalParameters) GetContentType

func (i *AdditionalParameters) GetContentType() string

type Body

type Body struct {
	Args   []interface{}          `json:"args"`
	Kwargs map[string]interface{} `json:"kwargs"`
	Embed  Embed                  `json:"embed"`
}

type CeleryAMQPClient

type CeleryAMQPClient struct {
	// contains filtered or unexported fields
}

func (*CeleryAMQPClient) Channel

func (c *CeleryAMQPClient) Channel() (*amqp.Channel, error)

Channel - returns the ampqChannel if already exists and is not closed otherwise returns error

func (*CeleryAMQPClient) Close

func (c *CeleryAMQPClient) Close() error

func (*CeleryAMQPClient) Connection

func (c *CeleryAMQPClient) Connection() interface{}

Connection - returns connection

func (*CeleryAMQPClient) RegisterRoutes

func (c *CeleryAMQPClient) RegisterRoutes(routes map[string]string)

func (*CeleryAMQPClient) SendCeleryTask

func (c *CeleryAMQPClient) SendCeleryTask(ctx context.Context, task string, args []interface{}, kwArgs map[string]any, additionalParams *AdditionalParameters) (*TaskInfo, error)

type CeleryClient

type CeleryClient interface {
	Close() error
	RegisterRoutes(routes map[string]string)
	SendCeleryTask(ctx context.Context, task string, args []interface{}, kwArgs map[string]any, additionalParams *AdditionalParameters) (*TaskInfo, error)
	Connection() interface{}
}

func NewCeleryClient

func NewCeleryClient(dsn string, broker string, options *Options) (CeleryClient, error)

NewCeleryClient - creates new celery publisher for given broker connects to broker and attaches connection to celeryClient object

type Embed

type Embed struct {
	Callbacks []interface{} `json:"callbacks"`
	Errbacks  []interface{} `json:"errbacks"`
	Chain     []interface{} `json:"chain"`
	Chord     interface{}   `json:"chord"`
}

type Headers

type Headers struct {
	Lang                string        `json:"lang"`
	Task                string        `json:"task"`
	ID                  string        `json:"id"`
	RootID              string        `json:"root_id"`
	ParentID            *string       `json:"parent_id"`
	Group               *string       `json:"group"`
	Method              *string       `json:"meth,"`
	Shadow              *string       `json:"shadow,"`
	ETA                 *string       `json:"eta,"`
	Expires             *string       `json:"expires,"`
	Retries             *int          `json:"retries,"`
	TimeLimit           []interface{} `json:"timelimit,"`
	ArgsRepr            string        `json:"argsrepr,"`
	KWArgsRepr          string        `json:"kwargsrepr,"`
	Origin              string        `json:"origin,"`
	ReplacedTaskNesting *int          `json:"replaced_task_nesting,"`
	// CorrelationId - Added as an additional field to trace the flow of request. By default, celery-go extracts from context value (CorrelationIdKey). Capitalized to prevent clash with correlation_id of AMQP
	CorrelationId string `json:"CORRELATION_ID"`
}

type LogLevel

type LogLevel logrus.Level

type Message

type Message struct {
	Headers   Headers
	BodyTuple []interface{}
}

type Options

type Options struct {
	// MaxBackOffDuration if set connection / channel retry count
	// will be reset to 0 once current retry duration breaches max backoff duration
	MaxBackOffDuration time.Duration
	LogLevel           LogLevel
}

func NewDefaultOptions

func NewDefaultOptions() *Options

type TaskInfo

type TaskInfo struct {
	Id      string  `json:"id"`
	Message Message `json:"message"`
}

Directories

Path Synopsis
example
publisher command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL