Documentation
¶
Index ¶
- type AMQPBroker
- type AMQPBrokerConfig
- type Broker
- type CeleryDeliveryInfo
- type CeleryMessage
- type CeleryProperties
- type Client
- func (c *Client) Close() error
- func (c *Client) SendTask(ctx context.Context, taskName string, options *TaskOptions) (string, error)
- func (c *Client) SendTaskToQueue(ctx context.Context, taskName, queue string, args []interface{}, ...) (string, error)
- func (c *Client) SendTaskWithArgs(ctx context.Context, taskName string, args ...interface{}) (string, error)
- func (c *Client) SendTaskWithKwargs(ctx context.Context, taskName string, kwargs map[string]interface{}) (string, error)
- type ClientConfig
- type RedisBroker
- type RedisBrokerConfig
- type TaskMessage
- type TaskOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPBroker ¶
type AMQPBroker struct {
// contains filtered or unexported fields
}
AMQPBroker implements Broker interface for RabbitMQ/AMQP
func NewAMQPBroker ¶
func NewAMQPBroker(config AMQPBrokerConfig) (*AMQPBroker, error)
NewAMQPBroker creates a new AMQP broker instance
func (*AMQPBroker) SendTask ¶
func (ab *AMQPBroker) SendTask(ctx context.Context, message *CeleryMessage) error
SendTask sends a task message to RabbitMQ
type AMQPBrokerConfig ¶
type AMQPBrokerConfig struct {
URL string // AMQP connection URL (e.g., "amqp://guest:guest@localhost:5672/")
Exchange string // Exchange name (default: "celery")
Queue string // Queue name (default: "celery")
}
AMQPBrokerConfig contains configuration for AMQP broker
type Broker ¶
type Broker interface {
// SendTask sends a task message to the broker
SendTask(ctx context.Context, message *CeleryMessage) error
// Close closes the broker connection
Close() error
}
Broker defines the interface for message brokers
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct {
Priority int `json:"priority"`
RoutingKey string `json:"routing_key"`
Exchange string `json:"exchange"`
}
CeleryDeliveryInfo contains routing information
type CeleryMessage ¶
type CeleryMessage struct {
Body json.RawMessage `json:"body"`
Headers map[string]interface{} `json:"headers,omitempty"`
ContentType string `json:"content-type"`
Properties CeleryProperties `json:"properties"`
ContentEncoding string `json:"content-encoding"`
}
CeleryMessage is the outer message envelope sent to the broker
func NewCeleryMessage ¶
func NewCeleryMessage(encodedBody []byte, queue, exchange string) *CeleryMessage
NewCeleryMessage creates a new Celery message envelope
func NewCeleryMessageWithEncoding ¶
func NewCeleryMessageWithEncoding(body []byte, queue, exchange, contentType, bodyEncoding, contentEncoding string) *CeleryMessage
NewCeleryMessageWithEncoding creates a new Celery message envelope with custom encoding
func (*CeleryMessage) Encode ¶
func (cm *CeleryMessage) Encode() ([]byte, error)
Encode serializes the Celery message to JSON
type CeleryProperties ¶
type CeleryProperties struct {
BodyEncoding string `json:"body_encoding"`
CorrelationID string `json:"correlation_id"`
ReplyTo string `json:"reply_to"`
DeliveryInfo CeleryDeliveryInfo `json:"delivery_info"`
DeliveryMode int `json:"delivery_mode"`
DeliveryTag string `json:"delivery_tag"`
}
CeleryProperties contains message properties
type Client ¶
type Client struct {
// UseRawJSONBody is a flag to indicate whether to use raw JSON body instead of base64 encoded body
// This is typically used for AMQP brokers where the worker is configured to accept raw JSON.
UseRawJSONBody bool
// contains filtered or unexported fields
}
Client is the main Celery client
func (*Client) SendTask ¶
func (c *Client) SendTask(ctx context.Context, taskName string, options *TaskOptions) (string, error)
SendTask sends a task to the Celery worker
func (*Client) SendTaskToQueue ¶
func (c *Client) SendTaskToQueue(ctx context.Context, taskName, queue string, args []interface{}, kwargs map[string]interface{}) (string, error)
SendTaskToQueue sends a task to a specific queue
type ClientConfig ¶
type ClientConfig struct {
Broker Broker // The broker implementation to use
Queue string // Default queue name (default: "celery")
Exchange string // Default exchange name (default: "celery")
// UseRawJSONBody is a flag to indicate whether to use raw JSON body instead of base64 encoded body
UseRawJSONBody bool
}
ClientConfig contains configuration for Celery client
type RedisBroker ¶
type RedisBroker struct {
// contains filtered or unexported fields
}
RedisBroker implements Broker interface for Redis
func NewRedisBroker ¶
func NewRedisBroker(config RedisBrokerConfig) (*RedisBroker, error)
NewRedisBroker creates a new Redis broker instance
func (*RedisBroker) SendTask ¶
func (rb *RedisBroker) SendTask(ctx context.Context, message *CeleryMessage) error
SendTask sends a task message to Redis
type RedisBrokerConfig ¶
type RedisBrokerConfig struct {
Addr string // Redis server address (e.g., "localhost:6379")
Password string // Redis password (empty if no password)
DB int // Redis database number
Queue string // Default queue name (default: "celery")
}
RedisBrokerConfig contains configuration for Redis broker
type TaskMessage ¶
type TaskMessage struct {
ID string `json:"id"`
Task string `json:"task"`
Args []interface{} `json:"args,omitempty"`
Kwargs map[string]interface{} `json:"kwargs,omitempty"`
Retries int `json:"retries,omitempty"`
ETA *string `json:"eta,omitempty"`
Expires *time.Time `json:"expires,omitempty"`
}
TaskMessage represents the Celery task message (protocol v1) This is the inner message that gets base64 encoded in the body
func NewTaskMessage ¶
func NewTaskMessage(taskName string, args []interface{}, kwargs map[string]interface{}) *TaskMessage
NewTaskMessage creates a new task message with default values
func (*TaskMessage) Encode ¶
func (tm *TaskMessage) Encode() (string, error)
Encode serializes the task message to base64-encoded JSON
func (*TaskMessage) EncodeJSON ¶
func (tm *TaskMessage) EncodeJSON() (string, error)
EncodeJSON serializes the task message to raw JSON string
func (*TaskMessage) SetETA ¶
func (tm *TaskMessage) SetETA(eta time.Time)
SetETA sets the estimated time of arrival for the task
func (*TaskMessage) SetExpires ¶
func (tm *TaskMessage) SetExpires(expires time.Time)
SetExpires sets the expiration time for the task
type TaskOptions ¶
type TaskOptions struct {
Queue string // Override default queue
Exchange string // Override default exchange
ETA *time.Time // Estimated time of arrival
Expires *time.Time // Task expiration time
Args []interface{} // Positional arguments
Kwargs map[string]interface{} // Keyword arguments
}
TaskOptions contains options for task execution