queue

package
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotConnected       = errors.New("queue: not connected")
	ErrAlreadyConnected   = errors.New("queue: already connected")
	ErrConnectionFailed   = errors.New("queue: connection failed")
	ErrQueueNotFound      = errors.New("queue: queue not found")
	ErrQueueAlreadyExists = errors.New("queue: queue already exists")
	ErrMessageNotFound    = errors.New("queue: message not found")
	ErrInvalidMessage     = errors.New("queue: invalid message")
	ErrConsumerNotFound   = errors.New("queue: consumer not found")
	ErrPublishFailed      = errors.New("queue: publish failed")
	ErrConsumeFailed      = errors.New("queue: consume failed")
	ErrAckFailed          = errors.New("queue: acknowledgment failed")
	ErrNackFailed         = errors.New("queue: negative acknowledgment failed")
	ErrTimeout            = errors.New("queue: operation timeout")
	ErrInvalidConfig      = errors.New("queue: invalid configuration")
	ErrUnsupportedDriver  = errors.New("queue: unsupported driver")
	ErrMessageTooLarge    = errors.New("queue: message too large")
	ErrQueueFull          = errors.New("queue: queue is full")
)

Common queue errors

Functions

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new queue extension with functional options

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new queue extension with a complete config

Types

type Config

type Config struct {
	// Driver specifies the queue backend: "inmemory", "redis", "rabbitmq", "nats"
	Driver string `json:"driver" yaml:"driver" mapstructure:"driver"`

	// Connection settings
	URL      string   `json:"url,omitempty" yaml:"url,omitempty" mapstructure:"url"`
	Hosts    []string `json:"hosts,omitempty" yaml:"hosts,omitempty" mapstructure:"hosts"`
	Username string   `json:"username,omitempty" yaml:"username,omitempty" mapstructure:"username"`
	Password string   `json:"password,omitempty" yaml:"password,omitempty" mapstructure:"password"`
	VHost    string   `json:"vhost,omitempty" yaml:"vhost,omitempty" mapstructure:"vhost"` // RabbitMQ only

	// Connection pool
	MaxConnections     int           `json:"max_connections" yaml:"max_connections" mapstructure:"max_connections"`
	MaxIdleConnections int           `json:"max_idle_connections" yaml:"max_idle_connections" mapstructure:"max_idle_connections"`
	ConnectTimeout     time.Duration `json:"connect_timeout" yaml:"connect_timeout" mapstructure:"connect_timeout"`
	ReadTimeout        time.Duration `json:"read_timeout" yaml:"read_timeout" mapstructure:"read_timeout"`
	WriteTimeout       time.Duration `json:"write_timeout" yaml:"write_timeout" mapstructure:"write_timeout"`
	KeepAlive          time.Duration `json:"keep_alive" yaml:"keep_alive" mapstructure:"keep_alive"`

	// Retry policy
	MaxRetries      int           `json:"max_retries" yaml:"max_retries" mapstructure:"max_retries"`
	RetryBackoff    time.Duration `json:"retry_backoff" yaml:"retry_backoff" mapstructure:"retry_backoff"`
	RetryMultiplier float64       `json:"retry_multiplier" yaml:"retry_multiplier" mapstructure:"retry_multiplier"`
	MaxRetryBackoff time.Duration `json:"max_retry_backoff" yaml:"max_retry_backoff" mapstructure:"max_retry_backoff"`

	// Default queue settings
	DefaultPrefetch    int           `json:"default_prefetch" yaml:"default_prefetch" mapstructure:"default_prefetch"`
	DefaultConcurrency int           `json:"default_concurrency" yaml:"default_concurrency" mapstructure:"default_concurrency"`
	DefaultTimeout     time.Duration `json:"default_timeout" yaml:"default_timeout" mapstructure:"default_timeout"`
	EnableDeadLetter   bool          `json:"enable_dead_letter" yaml:"enable_dead_letter" mapstructure:"enable_dead_letter"`
	DeadLetterSuffix   string        `json:"dead_letter_suffix" yaml:"dead_letter_suffix" mapstructure:"dead_letter_suffix"`

	// Performance
	EnablePersistence bool  `json:"enable_persistence" yaml:"enable_persistence" mapstructure:"enable_persistence"`
	EnablePriority    bool  `json:"enable_priority" yaml:"enable_priority" mapstructure:"enable_priority"`
	EnableDelayed     bool  `json:"enable_delayed" yaml:"enable_delayed" mapstructure:"enable_delayed"`
	MaxMessageSize    int64 `json:"max_message_size" yaml:"max_message_size" mapstructure:"max_message_size"`

	// Security
	EnableTLS          bool   `json:"enable_tls" yaml:"enable_tls" mapstructure:"enable_tls"`
	TLSCertFile        string `json:"tls_cert_file,omitempty" yaml:"tls_cert_file,omitempty" mapstructure:"tls_cert_file"`
	TLSKeyFile         string `json:"tls_key_file,omitempty" yaml:"tls_key_file,omitempty" mapstructure:"tls_key_file"`
	TLSCAFile          string `json:"tls_ca_file,omitempty" yaml:"tls_ca_file,omitempty" mapstructure:"tls_ca_file"`
	InsecureSkipVerify bool   `json:"insecure_skip_verify" yaml:"insecure_skip_verify" mapstructure:"insecure_skip_verify"`

	// Monitoring
	EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics" mapstructure:"enable_metrics"`
	EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing" mapstructure:"enable_tracing"`

	// Config loading flags (not serialized)
	RequireConfig bool `json:"-" yaml:"-" mapstructure:"-"`
}

Config contains configuration for the queue extension

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default queue configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config

func WithAuth

func WithAuth(username, password string) ConfigOption

WithAuth sets authentication credentials

func WithConcurrency

func WithConcurrency(concurrency int) ConfigOption

WithConcurrency sets default concurrency

func WithConfig

func WithConfig(config Config) ConfigOption

WithConfig sets the complete config

func WithDeadLetter

func WithDeadLetter(enable bool) ConfigOption

WithDeadLetter enables/disables dead letter queue

func WithDelayed

func WithDelayed(enable bool) ConfigOption

WithDelayed enables/disables delayed messages

func WithDriver

func WithDriver(driver string) ConfigOption

WithDriver sets the driver

func WithHosts

func WithHosts(hosts ...string) ConfigOption

WithHosts sets the hosts

func WithMaxConnections

func WithMaxConnections(max int) ConfigOption

WithMaxConnections sets max connections

func WithMetrics

func WithMetrics(enable bool) ConfigOption

WithMetrics enables metrics

func WithPersistence

func WithPersistence(enable bool) ConfigOption

WithPersistence enables/disables message persistence

func WithPrefetch

func WithPrefetch(prefetch int) ConfigOption

WithPrefetch sets default prefetch count

func WithPriority

func WithPriority(enable bool) ConfigOption

WithPriority enables/disables priority queues

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

WithRequireConfig requires config from ConfigManager

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ConfigOption

WithTLS enables TLS

func WithTimeout

func WithTimeout(timeout time.Duration) ConfigOption

WithTimeout sets default timeout

func WithTracing

func WithTracing(enable bool) ConfigOption

WithTracing enables tracing

func WithURL

func WithURL(url string) ConfigOption

WithURL sets the URL

func WithVHost

func WithVHost(vhost string) ConfigOption

WithVHost sets the virtual host (RabbitMQ only)

type ConsumeOptions

type ConsumeOptions struct {
	ConsumerTag   string                 `json:"consumer_tag,omitempty"`
	AutoAck       bool                   `json:"auto_ack"`           // Auto-acknowledge messages
	Exclusive     bool                   `json:"exclusive"`          // Exclusive consumer
	PrefetchCount int                    `json:"prefetch_count"`     // Number of messages to prefetch
	Priority      int                    `json:"priority,omitempty"` // Consumer priority
	Concurrency   int                    `json:"concurrency"`        // Number of concurrent workers
	RetryStrategy RetryStrategy          `json:"retry_strategy"`     // Retry configuration
	Timeout       time.Duration          `json:"timeout,omitempty"`  // Message processing timeout
	Arguments     map[string]interface{} `json:"arguments,omitempty"`
}

ConsumeOptions contains consumer configuration

func DefaultConsumeOptions

func DefaultConsumeOptions() ConsumeOptions

DefaultConsumeOptions returns default consume options

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for queue functionality

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks if the queue is healthy

func (*Extension) Queue

func (e *Extension) Queue() Queue

Queue returns the queue instance

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the queue extension with the app

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the queue extension

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the queue extension

type InMemoryQueue

type InMemoryQueue struct {
	// contains filtered or unexported fields
}

InMemoryQueue implements Queue interface with an in-memory store

func NewInMemoryQueue

func NewInMemoryQueue(config Config, logger forge.Logger, metrics forge.Metrics) *InMemoryQueue

NewInMemoryQueue creates a new in-memory queue instance

func (*InMemoryQueue) Ack

func (q *InMemoryQueue) Ack(ctx context.Context, messageID string) error

func (*InMemoryQueue) Connect

func (q *InMemoryQueue) Connect(ctx context.Context) error

func (*InMemoryQueue) Consume

func (q *InMemoryQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*InMemoryQueue) DeclareQueue

func (q *InMemoryQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*InMemoryQueue) DeleteQueue

func (q *InMemoryQueue) DeleteQueue(ctx context.Context, name string) error

func (*InMemoryQueue) Disconnect

func (q *InMemoryQueue) Disconnect(ctx context.Context) error

func (*InMemoryQueue) GetDeadLetterQueue

func (q *InMemoryQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*InMemoryQueue) GetQueueInfo

func (q *InMemoryQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*InMemoryQueue) ListQueues

func (q *InMemoryQueue) ListQueues(ctx context.Context) ([]string, error)

func (*InMemoryQueue) Nack

func (q *InMemoryQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*InMemoryQueue) Ping

func (q *InMemoryQueue) Ping(ctx context.Context) error

func (*InMemoryQueue) Publish

func (q *InMemoryQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*InMemoryQueue) PublishBatch

func (q *InMemoryQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*InMemoryQueue) PublishDelayed

func (q *InMemoryQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*InMemoryQueue) PurgeQueue

func (q *InMemoryQueue) PurgeQueue(ctx context.Context, name string) error

func (*InMemoryQueue) Reject

func (q *InMemoryQueue) Reject(ctx context.Context, messageID string) error

func (*InMemoryQueue) RequeueDeadLetter

func (q *InMemoryQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*InMemoryQueue) Stats

func (q *InMemoryQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*InMemoryQueue) StopConsuming

func (q *InMemoryQueue) StopConsuming(ctx context.Context, queueName string) error

type Message

type Message struct {
	ID          string                 `json:"id"`
	Queue       string                 `json:"queue"`
	Body        []byte                 `json:"body"`
	Headers     map[string]string      `json:"headers,omitempty"`
	Priority    int                    `json:"priority,omitempty"`    // 0-9, higher = more priority
	Delay       time.Duration          `json:"delay,omitempty"`       // Delay before processing
	Expiration  time.Duration          `json:"expiration,omitempty"`  // Message TTL
	Retries     int                    `json:"retries,omitempty"`     // Current retry count
	MaxRetries  int                    `json:"max_retries,omitempty"` // Maximum retry attempts
	PublishedAt time.Time              `json:"published_at"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

Message represents a queue message

type MessageHandler

type MessageHandler func(ctx context.Context, msg Message) error

MessageHandler is called for each received message

type NATSQueue

type NATSQueue struct {
	// contains filtered or unexported fields
}

NATSQueue implements Queue interface using NATS JetStream

func NewNATSQueue

func NewNATSQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*NATSQueue, error)

NewNATSQueue creates a new NATS-backed queue instance

func (*NATSQueue) Ack

func (q *NATSQueue) Ack(ctx context.Context, messageID string) error

func (*NATSQueue) Connect

func (q *NATSQueue) Connect(ctx context.Context) error

func (*NATSQueue) Consume

func (q *NATSQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*NATSQueue) DeclareQueue

func (q *NATSQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*NATSQueue) DeleteQueue

func (q *NATSQueue) DeleteQueue(ctx context.Context, name string) error

func (*NATSQueue) Disconnect

func (q *NATSQueue) Disconnect(ctx context.Context) error

func (*NATSQueue) GetDeadLetterQueue

func (q *NATSQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*NATSQueue) GetQueueInfo

func (q *NATSQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*NATSQueue) ListQueues

func (q *NATSQueue) ListQueues(ctx context.Context) ([]string, error)

func (*NATSQueue) Nack

func (q *NATSQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*NATSQueue) Ping

func (q *NATSQueue) Ping(ctx context.Context) error

func (*NATSQueue) Publish

func (q *NATSQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*NATSQueue) PublishBatch

func (q *NATSQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*NATSQueue) PublishDelayed

func (q *NATSQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*NATSQueue) PurgeQueue

func (q *NATSQueue) PurgeQueue(ctx context.Context, name string) error

func (*NATSQueue) Reject

func (q *NATSQueue) Reject(ctx context.Context, messageID string) error

func (*NATSQueue) RequeueDeadLetter

func (q *NATSQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*NATSQueue) Stats

func (q *NATSQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*NATSQueue) StopConsuming

func (q *NATSQueue) StopConsuming(ctx context.Context, queueName string) error

type Queue

type Queue interface {
	// Connection management
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error

	// Queue management
	DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
	DeleteQueue(ctx context.Context, name string) error
	ListQueues(ctx context.Context) ([]string, error)
	GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
	PurgeQueue(ctx context.Context, name string) error

	// Publishing
	Publish(ctx context.Context, queue string, message Message) error
	PublishBatch(ctx context.Context, queue string, messages []Message) error
	PublishDelayed(ctx context.Context, queue string, message Message, delay time.Duration) error

	// Consuming
	Consume(ctx context.Context, queue string, handler MessageHandler, opts ConsumeOptions) error
	StopConsuming(ctx context.Context, queue string) error

	// Message operations
	Ack(ctx context.Context, messageID string) error
	Nack(ctx context.Context, messageID string, requeue bool) error
	Reject(ctx context.Context, messageID string) error

	// Dead letter queue
	GetDeadLetterQueue(ctx context.Context, queue string) ([]Message, error)
	RequeueDeadLetter(ctx context.Context, queue string, messageID string) error

	// Stats
	Stats(ctx context.Context) (*QueueStats, error)
}

Queue represents a unified message queue interface supporting multiple backends

type QueueInfo

type QueueInfo struct {
	Name          string    `json:"name"`
	Messages      int64     `json:"messages"`
	Consumers     int       `json:"consumers"`
	MessageBytes  int64     `json:"message_bytes"`
	PublishRate   float64   `json:"publish_rate"`
	DeliverRate   float64   `json:"deliver_rate"`
	AckRate       float64   `json:"ack_rate"`
	Durable       bool      `json:"durable"`
	AutoDelete    bool      `json:"auto_delete"`
	CreatedAt     time.Time `json:"created_at"`
	LastMessageAt time.Time `json:"last_message_at,omitempty"`
}

QueueInfo contains queue metadata

type QueueOptions

type QueueOptions struct {
	Durable         bool                   `json:"durable"`           // Survives broker restart
	AutoDelete      bool                   `json:"auto_delete"`       // Deleted when no consumers
	Exclusive       bool                   `json:"exclusive"`         // Used by only one connection
	MaxLength       int64                  `json:"max_length"`        // Maximum number of messages
	MaxLengthBytes  int64                  `json:"max_length_bytes"`  // Maximum total message bytes
	MessageTTL      time.Duration          `json:"message_ttl"`       // Message time-to-live
	MaxPriority     int                    `json:"max_priority"`      // Maximum priority (0-255)
	DeadLetterQueue string                 `json:"dead_letter_queue"` // DLQ for failed messages
	Arguments       map[string]interface{} `json:"arguments,omitempty"`
}

QueueOptions contains queue configuration

func DefaultQueueOptions

func DefaultQueueOptions() QueueOptions

DefaultQueueOptions returns default queue options

type QueueStats

type QueueStats struct {
	QueueCount      int64                  `json:"queue_count"`
	TotalMessages   int64                  `json:"total_messages"`
	TotalConsumers  int                    `json:"total_consumers"`
	PublishRate     float64                `json:"publish_rate"`
	DeliverRate     float64                `json:"deliver_rate"`
	AckRate         float64                `json:"ack_rate"`
	UnackedMessages int64                  `json:"unacked_messages"`
	ReadyMessages   int64                  `json:"ready_messages"`
	Uptime          time.Duration          `json:"uptime"`
	MemoryUsed      int64                  `json:"memory_used"`
	ConnectionCount int                    `json:"connection_count"`
	Version         string                 `json:"version"`
	Extra           map[string]interface{} `json:"extra,omitempty"`
}

QueueStats contains queue system statistics

type RabbitMQQueue

type RabbitMQQueue struct {
	// contains filtered or unexported fields
}

RabbitMQQueue implements Queue interface using RabbitMQ

func NewRabbitMQQueue

func NewRabbitMQQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*RabbitMQQueue, error)

NewRabbitMQQueue creates a new RabbitMQ-backed queue instance

func (*RabbitMQQueue) Ack

func (q *RabbitMQQueue) Ack(ctx context.Context, messageID string) error

func (*RabbitMQQueue) Connect

func (q *RabbitMQQueue) Connect(ctx context.Context) error

func (*RabbitMQQueue) Consume

func (q *RabbitMQQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*RabbitMQQueue) DeclareQueue

func (q *RabbitMQQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*RabbitMQQueue) DeleteQueue

func (q *RabbitMQQueue) DeleteQueue(ctx context.Context, name string) error

func (*RabbitMQQueue) Disconnect

func (q *RabbitMQQueue) Disconnect(ctx context.Context) error

func (*RabbitMQQueue) GetDeadLetterQueue

func (q *RabbitMQQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*RabbitMQQueue) GetQueueInfo

func (q *RabbitMQQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*RabbitMQQueue) ListQueues

func (q *RabbitMQQueue) ListQueues(ctx context.Context) ([]string, error)

func (*RabbitMQQueue) Nack

func (q *RabbitMQQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*RabbitMQQueue) Ping

func (q *RabbitMQQueue) Ping(ctx context.Context) error

func (*RabbitMQQueue) Publish

func (q *RabbitMQQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*RabbitMQQueue) PublishBatch

func (q *RabbitMQQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*RabbitMQQueue) PublishDelayed

func (q *RabbitMQQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*RabbitMQQueue) PurgeQueue

func (q *RabbitMQQueue) PurgeQueue(ctx context.Context, name string) error

func (*RabbitMQQueue) Reject

func (q *RabbitMQQueue) Reject(ctx context.Context, messageID string) error

func (*RabbitMQQueue) RequeueDeadLetter

func (q *RabbitMQQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*RabbitMQQueue) Stats

func (q *RabbitMQQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*RabbitMQQueue) StopConsuming

func (q *RabbitMQQueue) StopConsuming(ctx context.Context, queueName string) error

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements Queue interface using Redis Streams

func NewRedisQueue

func NewRedisQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*RedisQueue, error)

NewRedisQueue creates a new Redis-backed queue instance

func (*RedisQueue) Ack

func (q *RedisQueue) Ack(ctx context.Context, messageID string) error

func (*RedisQueue) Connect

func (q *RedisQueue) Connect(ctx context.Context) error

func (*RedisQueue) Consume

func (q *RedisQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*RedisQueue) DeclareQueue

func (q *RedisQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*RedisQueue) DeleteQueue

func (q *RedisQueue) DeleteQueue(ctx context.Context, name string) error

func (*RedisQueue) Disconnect

func (q *RedisQueue) Disconnect(ctx context.Context) error

func (*RedisQueue) GetDeadLetterQueue

func (q *RedisQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*RedisQueue) GetQueueInfo

func (q *RedisQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*RedisQueue) ListQueues

func (q *RedisQueue) ListQueues(ctx context.Context) ([]string, error)

func (*RedisQueue) Nack

func (q *RedisQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*RedisQueue) Ping

func (q *RedisQueue) Ping(ctx context.Context) error

func (*RedisQueue) Publish

func (q *RedisQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*RedisQueue) PublishBatch

func (q *RedisQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*RedisQueue) PublishDelayed

func (q *RedisQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*RedisQueue) PurgeQueue

func (q *RedisQueue) PurgeQueue(ctx context.Context, name string) error

func (*RedisQueue) Reject

func (q *RedisQueue) Reject(ctx context.Context, messageID string) error

func (*RedisQueue) RequeueDeadLetter

func (q *RedisQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*RedisQueue) Stats

func (q *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*RedisQueue) StopConsuming

func (q *RedisQueue) StopConsuming(ctx context.Context, queueName string) error

type RetryStrategy

type RetryStrategy struct {
	MaxRetries      int           `json:"max_retries"`
	InitialInterval time.Duration `json:"initial_interval"`
	MaxInterval     time.Duration `json:"max_interval"`
	Multiplier      float64       `json:"multiplier"` // Exponential backoff multiplier
}

RetryStrategy defines retry behavior

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL