Documentation
¶
Index ¶
- Variables
- func NewExtension(opts ...ConfigOption) forge.Extension
- func NewExtensionWithConfig(config Config) forge.Extension
- type Config
- type ConfigOption
- func WithAuth(username, password string) ConfigOption
- func WithConcurrency(concurrency int) ConfigOption
- func WithConfig(config Config) ConfigOption
- func WithDatabaseRedisConnection(name string) ConfigOption
- func WithDeadLetter(enable bool) ConfigOption
- func WithDelayed(enable bool) ConfigOption
- func WithDriver(driver string) ConfigOption
- func WithHosts(hosts ...string) ConfigOption
- func WithMaxConnections(max int) ConfigOption
- func WithMetrics(enable bool) ConfigOption
- func WithPersistence(enable bool) ConfigOption
- func WithPrefetch(prefetch int) ConfigOption
- func WithPriority(enable bool) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithTLS(certFile, keyFile, caFile string) ConfigOption
- func WithTimeout(timeout time.Duration) ConfigOption
- func WithTracing(enable bool) ConfigOption
- func WithURL(url string) ConfigOption
- func WithVHost(vhost string) ConfigOption
- type ConsumeOptions
- type Extension
- type InMemoryQueue
- func (q *InMemoryQueue) Ack(ctx context.Context, messageID string) error
- func (q *InMemoryQueue) Connect(ctx context.Context) error
- func (q *InMemoryQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, ...) error
- func (q *InMemoryQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
- func (q *InMemoryQueue) DeleteQueue(ctx context.Context, name string) error
- func (q *InMemoryQueue) Disconnect(ctx context.Context) error
- func (q *InMemoryQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)
- func (q *InMemoryQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
- func (q *InMemoryQueue) ListQueues(ctx context.Context) ([]string, error)
- func (q *InMemoryQueue) Nack(ctx context.Context, messageID string, requeue bool) error
- func (q *InMemoryQueue) Ping(ctx context.Context) error
- func (q *InMemoryQueue) Publish(ctx context.Context, queueName string, message Message) error
- func (q *InMemoryQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error
- func (q *InMemoryQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error
- func (q *InMemoryQueue) PurgeQueue(ctx context.Context, name string) error
- func (q *InMemoryQueue) Reject(ctx context.Context, messageID string) error
- func (q *InMemoryQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error
- func (q *InMemoryQueue) Stats(ctx context.Context) (*QueueStats, error)
- func (q *InMemoryQueue) StopConsuming(ctx context.Context, queueName string) error
- type Message
- type MessageHandler
- type NATSQueue
- func (q *NATSQueue) Ack(ctx context.Context, messageID string) error
- func (q *NATSQueue) Connect(ctx context.Context) error
- func (q *NATSQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, ...) error
- func (q *NATSQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
- func (q *NATSQueue) DeleteQueue(ctx context.Context, name string) error
- func (q *NATSQueue) Disconnect(ctx context.Context) error
- func (q *NATSQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)
- func (q *NATSQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
- func (q *NATSQueue) ListQueues(ctx context.Context) ([]string, error)
- func (q *NATSQueue) Nack(ctx context.Context, messageID string, requeue bool) error
- func (q *NATSQueue) Ping(ctx context.Context) error
- func (q *NATSQueue) Publish(ctx context.Context, queueName string, message Message) error
- func (q *NATSQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error
- func (q *NATSQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error
- func (q *NATSQueue) PurgeQueue(ctx context.Context, name string) error
- func (q *NATSQueue) Reject(ctx context.Context, messageID string) error
- func (q *NATSQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error
- func (q *NATSQueue) Stats(ctx context.Context) (*QueueStats, error)
- func (q *NATSQueue) StopConsuming(ctx context.Context, queueName string) error
- type Queue
- type QueueInfo
- type QueueOptions
- type QueueStats
- type RabbitMQQueue
- func (q *RabbitMQQueue) Ack(ctx context.Context, messageID string) error
- func (q *RabbitMQQueue) Connect(ctx context.Context) error
- func (q *RabbitMQQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, ...) error
- func (q *RabbitMQQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
- func (q *RabbitMQQueue) DeleteQueue(ctx context.Context, name string) error
- func (q *RabbitMQQueue) Disconnect(ctx context.Context) error
- func (q *RabbitMQQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)
- func (q *RabbitMQQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
- func (q *RabbitMQQueue) ListQueues(ctx context.Context) ([]string, error)
- func (q *RabbitMQQueue) Nack(ctx context.Context, messageID string, requeue bool) error
- func (q *RabbitMQQueue) Ping(ctx context.Context) error
- func (q *RabbitMQQueue) Publish(ctx context.Context, queueName string, message Message) error
- func (q *RabbitMQQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error
- func (q *RabbitMQQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error
- func (q *RabbitMQQueue) PurgeQueue(ctx context.Context, name string) error
- func (q *RabbitMQQueue) Reject(ctx context.Context, messageID string) error
- func (q *RabbitMQQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error
- func (q *RabbitMQQueue) Stats(ctx context.Context) (*QueueStats, error)
- func (q *RabbitMQQueue) StopConsuming(ctx context.Context, queueName string) error
- type RedisQueue
- func (q *RedisQueue) Ack(ctx context.Context, messageID string) error
- func (q *RedisQueue) Connect(ctx context.Context) error
- func (q *RedisQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, ...) error
- func (q *RedisQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
- func (q *RedisQueue) DeleteQueue(ctx context.Context, name string) error
- func (q *RedisQueue) Disconnect(ctx context.Context) error
- func (q *RedisQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)
- func (q *RedisQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
- func (q *RedisQueue) ListQueues(ctx context.Context) ([]string, error)
- func (q *RedisQueue) Nack(ctx context.Context, messageID string, requeue bool) error
- func (q *RedisQueue) Ping(ctx context.Context) error
- func (q *RedisQueue) Publish(ctx context.Context, queueName string, message Message) error
- func (q *RedisQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error
- func (q *RedisQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error
- func (q *RedisQueue) PurgeQueue(ctx context.Context, name string) error
- func (q *RedisQueue) Reject(ctx context.Context, messageID string) error
- func (q *RedisQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error
- func (q *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)
- func (q *RedisQueue) StopConsuming(ctx context.Context, queueName string) error
- type RetryStrategy
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotConnected = errors.New("queue: not connected") ErrAlreadyConnected = errors.New("queue: already connected") ErrConnectionFailed = errors.New("queue: connection failed") ErrQueueNotFound = errors.New("queue: queue not found") ErrQueueAlreadyExists = errors.New("queue: queue already exists") ErrMessageNotFound = errors.New("queue: message not found") ErrInvalidMessage = errors.New("queue: invalid message") ErrConsumerNotFound = errors.New("queue: consumer not found") ErrPublishFailed = errors.New("queue: publish failed") ErrConsumeFailed = errors.New("queue: consume failed") ErrAckFailed = errors.New("queue: acknowledgment failed") ErrNackFailed = errors.New("queue: negative acknowledgment failed") ErrTimeout = errors.New("queue: operation timeout") ErrInvalidConfig = errors.New("queue: invalid configuration") ErrUnsupportedDriver = errors.New("queue: unsupported driver") ErrMessageTooLarge = errors.New("queue: message too large") ErrQueueFull = errors.New("queue: queue is full") )
Common queue errors.
Functions ¶
func NewExtension ¶
func NewExtension(opts ...ConfigOption) forge.Extension
NewExtension creates a new queue extension with functional options.
func NewExtensionWithConfig ¶
NewExtensionWithConfig creates a new queue extension with a complete config.
Types ¶
type Config ¶
type Config struct {
// Driver specifies the queue backend: "inmemory", "redis", "rabbitmq", "nats"
Driver string `json:"driver" mapstructure:"driver" yaml:"driver"`
// Connection settings
URL string `json:"url,omitempty" mapstructure:"url" yaml:"url,omitempty"`
Hosts []string `json:"hosts,omitempty" mapstructure:"hosts" yaml:"hosts,omitempty"`
Username string `json:"username,omitempty" mapstructure:"username" yaml:"username,omitempty"`
Password string `json:"password,omitempty" mapstructure:"password" yaml:"password,omitempty"`
VHost string `json:"vhost,omitempty" mapstructure:"vhost" yaml:"vhost,omitempty"` // RabbitMQ only
// Connection pool
MaxConnections int `json:"max_connections" mapstructure:"max_connections" yaml:"max_connections"`
MaxIdleConnections int `json:"max_idle_connections" mapstructure:"max_idle_connections" yaml:"max_idle_connections"`
ConnectTimeout time.Duration `json:"connect_timeout" mapstructure:"connect_timeout" yaml:"connect_timeout"`
ReadTimeout time.Duration `json:"read_timeout" mapstructure:"read_timeout" yaml:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout" mapstructure:"write_timeout" yaml:"write_timeout"`
KeepAlive time.Duration `json:"keep_alive" mapstructure:"keep_alive" yaml:"keep_alive"`
// Retry policy
MaxRetries int `json:"max_retries" mapstructure:"max_retries" yaml:"max_retries"`
RetryBackoff time.Duration `json:"retry_backoff" mapstructure:"retry_backoff" yaml:"retry_backoff"`
RetryMultiplier float64 `json:"retry_multiplier" mapstructure:"retry_multiplier" yaml:"retry_multiplier"`
MaxRetryBackoff time.Duration `json:"max_retry_backoff" mapstructure:"max_retry_backoff" yaml:"max_retry_backoff"`
// Default queue settings
DefaultPrefetch int `json:"default_prefetch" mapstructure:"default_prefetch" yaml:"default_prefetch"`
DefaultConcurrency int `json:"default_concurrency" mapstructure:"default_concurrency" yaml:"default_concurrency"`
DefaultTimeout time.Duration `json:"default_timeout" mapstructure:"default_timeout" yaml:"default_timeout"`
EnableDeadLetter bool `json:"enable_dead_letter" mapstructure:"enable_dead_letter" yaml:"enable_dead_letter"`
DeadLetterSuffix string `json:"dead_letter_suffix" mapstructure:"dead_letter_suffix" yaml:"dead_letter_suffix"`
// Performance
EnablePersistence bool `json:"enable_persistence" mapstructure:"enable_persistence" yaml:"enable_persistence"`
EnablePriority bool `json:"enable_priority" mapstructure:"enable_priority" yaml:"enable_priority"`
EnableDelayed bool `json:"enable_delayed" mapstructure:"enable_delayed" yaml:"enable_delayed"`
MaxMessageSize int64 `json:"max_message_size" mapstructure:"max_message_size" yaml:"max_message_size"`
// Security
EnableTLS bool `json:"enable_tls" mapstructure:"enable_tls" yaml:"enable_tls"`
TLSCertFile string `json:"tls_cert_file,omitempty" mapstructure:"tls_cert_file" yaml:"tls_cert_file,omitempty"`
TLSKeyFile string `json:"tls_key_file,omitempty" mapstructure:"tls_key_file" yaml:"tls_key_file,omitempty"`
TLSCAFile string `json:"tls_ca_file,omitempty" mapstructure:"tls_ca_file" yaml:"tls_ca_file,omitempty"`
InsecureSkipVerify bool `json:"insecure_skip_verify" mapstructure:"insecure_skip_verify" yaml:"insecure_skip_verify"`
// Monitoring
EnableMetrics bool `json:"enable_metrics" mapstructure:"enable_metrics" yaml:"enable_metrics"`
EnableTracing bool `json:"enable_tracing" mapstructure:"enable_tracing" yaml:"enable_tracing"`
// Database integration
// DatabaseRedisConnection specifies the name of a Redis connection
// from the database extension to reuse instead of creating a new one.
// Only valid when Driver is "redis".
// If specified, the queue will use this connection and ignore
// URL, Hosts, Username, Password, and other connection settings.
DatabaseRedisConnection string `` /* 126-byte string literal not displayed */
// Config loading flags (not serialized)
RequireConfig bool `json:"-" mapstructure:"-" yaml:"-"`
}
Config contains configuration for the queue extension.
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config.
func WithAuth ¶
func WithAuth(username, password string) ConfigOption
WithAuth sets authentication credentials.
func WithConcurrency ¶
func WithConcurrency(concurrency int) ConfigOption
WithConcurrency sets default concurrency.
func WithDatabaseRedisConnection ¶ added in v0.7.4
func WithDatabaseRedisConnection(name string) ConfigOption
WithDatabaseRedisConnection configures queue to use an existing Redis connection from the database extension.
func WithDeadLetter ¶
func WithDeadLetter(enable bool) ConfigOption
WithDeadLetter enables/disables dead letter queue.
func WithDelayed ¶
func WithDelayed(enable bool) ConfigOption
WithDelayed enables/disables delayed messages.
func WithMaxConnections ¶
func WithMaxConnections(max int) ConfigOption
WithMaxConnections sets max connections.
func WithPersistence ¶
func WithPersistence(enable bool) ConfigOption
WithPersistence enables/disables message persistence.
func WithPrefetch ¶
func WithPrefetch(prefetch int) ConfigOption
WithPrefetch sets default prefetch count.
func WithPriority ¶
func WithPriority(enable bool) ConfigOption
WithPriority enables/disables priority queues.
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
WithRequireConfig requires config from ConfigManager.
func WithTimeout ¶
func WithTimeout(timeout time.Duration) ConfigOption
WithTimeout sets default timeout.
func WithVHost ¶
func WithVHost(vhost string) ConfigOption
WithVHost sets the virtual host (RabbitMQ only).
type ConsumeOptions ¶
type ConsumeOptions struct {
ConsumerTag string `json:"consumer_tag,omitempty"`
AutoAck bool `json:"auto_ack"` // Auto-acknowledge messages
Exclusive bool `json:"exclusive"` // Exclusive consumer
PrefetchCount int `json:"prefetch_count"` // Number of messages to prefetch
Priority int `json:"priority,omitempty"` // Consumer priority
Concurrency int `json:"concurrency"` // Number of concurrent workers
RetryStrategy RetryStrategy `json:"retry_strategy"` // Retry configuration
Timeout time.Duration `json:"timeout,omitempty"` // Message processing timeout
Arguments map[string]interface{} `json:"arguments,omitempty"`
}
ConsumeOptions contains consumer configuration.
func DefaultConsumeOptions ¶
func DefaultConsumeOptions() ConsumeOptions
DefaultConsumeOptions returns default consume options.
type Extension ¶
type Extension struct {
*forge.BaseExtension
// contains filtered or unexported fields
}
Extension implements forge.Extension for queue functionality.
func (*Extension) Dependencies ¶ added in v0.8.0
Dependencies returns the names of extensions this extension depends on. When using a database Redis connection, the queue depends on the database extension.
type InMemoryQueue ¶
type InMemoryQueue struct {
// contains filtered or unexported fields
}
InMemoryQueue implements Queue interface with an in-memory store.
func NewInMemoryQueue ¶
NewInMemoryQueue creates a new in-memory queue instance.
func (*InMemoryQueue) Ack ¶
func (q *InMemoryQueue) Ack(ctx context.Context, messageID string) error
func (*InMemoryQueue) Consume ¶
func (q *InMemoryQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error
func (*InMemoryQueue) DeclareQueue ¶
func (q *InMemoryQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
func (*InMemoryQueue) DeleteQueue ¶
func (q *InMemoryQueue) DeleteQueue(ctx context.Context, name string) error
func (*InMemoryQueue) Disconnect ¶
func (q *InMemoryQueue) Disconnect(ctx context.Context) error
func (*InMemoryQueue) GetDeadLetterQueue ¶
func (*InMemoryQueue) GetQueueInfo ¶
func (*InMemoryQueue) ListQueues ¶
func (q *InMemoryQueue) ListQueues(ctx context.Context) ([]string, error)
func (*InMemoryQueue) PublishBatch ¶
func (*InMemoryQueue) PublishDelayed ¶
func (*InMemoryQueue) PurgeQueue ¶
func (q *InMemoryQueue) PurgeQueue(ctx context.Context, name string) error
func (*InMemoryQueue) Reject ¶
func (q *InMemoryQueue) Reject(ctx context.Context, messageID string) error
func (*InMemoryQueue) RequeueDeadLetter ¶
func (*InMemoryQueue) Stats ¶
func (q *InMemoryQueue) Stats(ctx context.Context) (*QueueStats, error)
func (*InMemoryQueue) StopConsuming ¶
func (q *InMemoryQueue) StopConsuming(ctx context.Context, queueName string) error
type Message ¶
type Message struct {
ID string `json:"id"`
Queue string `json:"queue"`
Body []byte `json:"body"`
Headers map[string]string `json:"headers,omitempty"`
Priority int `json:"priority,omitempty"` // 0-9, higher = more priority
Delay time.Duration `json:"delay,omitempty"` // Delay before processing
Expiration time.Duration `json:"expiration,omitempty"` // Message TTL
Retries int `json:"retries,omitempty"` // Current retry count
MaxRetries int `json:"max_retries,omitempty"` // Maximum retry attempts
PublishedAt time.Time `json:"published_at"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Message represents a queue message.
type MessageHandler ¶
MessageHandler is called for each received message.
type NATSQueue ¶
type NATSQueue struct {
// contains filtered or unexported fields
}
NATSQueue implements Queue interface using NATS JetStream.
func NewNATSQueue ¶
NewNATSQueue creates a new NATS-backed queue instance.
func (*NATSQueue) Consume ¶
func (q *NATSQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error
func (*NATSQueue) DeclareQueue ¶
func (*NATSQueue) DeleteQueue ¶
func (*NATSQueue) GetDeadLetterQueue ¶
func (*NATSQueue) GetQueueInfo ¶
func (*NATSQueue) ListQueues ¶
func (*NATSQueue) PublishBatch ¶
func (*NATSQueue) PublishDelayed ¶
func (*NATSQueue) PurgeQueue ¶
func (*NATSQueue) RequeueDeadLetter ¶
type Queue ¶
type Queue interface {
// Connection management
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
// Queue management
DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
DeleteQueue(ctx context.Context, name string) error
ListQueues(ctx context.Context) ([]string, error)
GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
PurgeQueue(ctx context.Context, name string) error
// Publishing
Publish(ctx context.Context, queue string, message Message) error
PublishBatch(ctx context.Context, queue string, messages []Message) error
PublishDelayed(ctx context.Context, queue string, message Message, delay time.Duration) error
// Consuming
Consume(ctx context.Context, queue string, handler MessageHandler, opts ConsumeOptions) error
StopConsuming(ctx context.Context, queue string) error
// Message operations
Ack(ctx context.Context, messageID string) error
Nack(ctx context.Context, messageID string, requeue bool) error
Reject(ctx context.Context, messageID string) error
// Dead letter queue
GetDeadLetterQueue(ctx context.Context, queue string) ([]Message, error)
RequeueDeadLetter(ctx context.Context, queue string, messageID string) error
// Stats
Stats(ctx context.Context) (*QueueStats, error)
}
Queue represents a unified message queue interface supporting multiple backends.
func Get ¶ added in v0.7.4
Get retrieves the queue service from the container. Returns error if queue is not registered.
func GetFromApp ¶ added in v0.7.4
GetFromApp is a convenience helper to get queue from an App.
func MustGet ¶ added in v0.7.4
MustGet retrieves the queue service from the container. Panics if queue is not registered.
func MustGetFromApp ¶ added in v0.7.4
MustGetFromApp is a convenience helper to get queue from an App. Panics if queue is not registered.
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Messages int64 `json:"messages"`
Consumers int `json:"consumers"`
MessageBytes int64 `json:"message_bytes"`
PublishRate float64 `json:"publish_rate"`
DeliverRate float64 `json:"deliver_rate"`
AckRate float64 `json:"ack_rate"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
CreatedAt time.Time `json:"created_at"`
LastMessageAt time.Time `json:"last_message_at,omitempty"`
}
QueueInfo contains queue metadata.
type QueueOptions ¶
type QueueOptions struct {
Durable bool `json:"durable"` // Survives broker restart
AutoDelete bool `json:"auto_delete"` // Deleted when no consumers
Exclusive bool `json:"exclusive"` // Used by only one connection
MaxLength int64 `json:"max_length"` // Maximum number of messages
MaxLengthBytes int64 `json:"max_length_bytes"` // Maximum total message bytes
MessageTTL time.Duration `json:"message_ttl"` // Message time-to-live
MaxPriority int `json:"max_priority"` // Maximum priority (0-255)
DeadLetterQueue string `json:"dead_letter_queue"` // DLQ for failed messages
Arguments map[string]interface{} `json:"arguments,omitempty"`
}
QueueOptions contains queue configuration.
func DefaultQueueOptions ¶
func DefaultQueueOptions() QueueOptions
DefaultQueueOptions returns default queue options.
type QueueStats ¶
type QueueStats struct {
QueueCount int64 `json:"queue_count"`
TotalMessages int64 `json:"total_messages"`
TotalConsumers int `json:"total_consumers"`
PublishRate float64 `json:"publish_rate"`
DeliverRate float64 `json:"deliver_rate"`
AckRate float64 `json:"ack_rate"`
UnackedMessages int64 `json:"unacked_messages"`
ReadyMessages int64 `json:"ready_messages"`
Uptime time.Duration `json:"uptime"`
MemoryUsed int64 `json:"memory_used"`
ConnectionCount int `json:"connection_count"`
Version string `json:"version"`
Extra map[string]interface{} `json:"extra,omitempty"`
}
QueueStats contains queue system statistics.
type RabbitMQQueue ¶
type RabbitMQQueue struct {
// contains filtered or unexported fields
}
RabbitMQQueue implements Queue interface using RabbitMQ.
func NewRabbitMQQueue ¶
func NewRabbitMQQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*RabbitMQQueue, error)
NewRabbitMQQueue creates a new RabbitMQ-backed queue instance.
func (*RabbitMQQueue) Ack ¶
func (q *RabbitMQQueue) Ack(ctx context.Context, messageID string) error
func (*RabbitMQQueue) Consume ¶
func (q *RabbitMQQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error
func (*RabbitMQQueue) DeclareQueue ¶
func (q *RabbitMQQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
func (*RabbitMQQueue) DeleteQueue ¶
func (q *RabbitMQQueue) DeleteQueue(ctx context.Context, name string) error
func (*RabbitMQQueue) Disconnect ¶
func (q *RabbitMQQueue) Disconnect(ctx context.Context) error
func (*RabbitMQQueue) GetDeadLetterQueue ¶
func (*RabbitMQQueue) GetQueueInfo ¶
func (*RabbitMQQueue) ListQueues ¶
func (q *RabbitMQQueue) ListQueues(ctx context.Context) ([]string, error)
func (*RabbitMQQueue) PublishBatch ¶
func (*RabbitMQQueue) PublishDelayed ¶
func (*RabbitMQQueue) PurgeQueue ¶
func (q *RabbitMQQueue) PurgeQueue(ctx context.Context, name string) error
func (*RabbitMQQueue) Reject ¶
func (q *RabbitMQQueue) Reject(ctx context.Context, messageID string) error
func (*RabbitMQQueue) RequeueDeadLetter ¶
func (*RabbitMQQueue) Stats ¶
func (q *RabbitMQQueue) Stats(ctx context.Context) (*QueueStats, error)
func (*RabbitMQQueue) StopConsuming ¶
func (q *RabbitMQQueue) StopConsuming(ctx context.Context, queueName string) error
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue implements Queue interface using Redis Streams.
func NewRedisQueue ¶
NewRedisQueue creates a new Redis-backed queue instance.
func NewRedisQueueWithClient ¶ added in v0.7.4
func NewRedisQueueWithClient(config Config, logger forge.Logger, metrics forge.Metrics, client redis.UniversalClient) (*RedisQueue, error)
NewRedisQueueWithClient creates a new Redis-backed queue instance with an external client.
func (*RedisQueue) Consume ¶
func (q *RedisQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error
func (*RedisQueue) DeclareQueue ¶
func (q *RedisQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
func (*RedisQueue) DeleteQueue ¶
func (q *RedisQueue) DeleteQueue(ctx context.Context, name string) error
func (*RedisQueue) Disconnect ¶
func (q *RedisQueue) Disconnect(ctx context.Context) error
func (*RedisQueue) GetDeadLetterQueue ¶
func (*RedisQueue) GetQueueInfo ¶
func (*RedisQueue) ListQueues ¶
func (q *RedisQueue) ListQueues(ctx context.Context) ([]string, error)
func (*RedisQueue) PublishBatch ¶
func (*RedisQueue) PublishDelayed ¶
func (*RedisQueue) PurgeQueue ¶
func (q *RedisQueue) PurgeQueue(ctx context.Context, name string) error
func (*RedisQueue) Reject ¶
func (q *RedisQueue) Reject(ctx context.Context, messageID string) error
func (*RedisQueue) RequeueDeadLetter ¶
func (*RedisQueue) Stats ¶
func (q *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)
func (*RedisQueue) StopConsuming ¶
func (q *RedisQueue) StopConsuming(ctx context.Context, queueName string) error
type RetryStrategy ¶
type RetryStrategy struct {
MaxRetries int `json:"max_retries"`
InitialInterval time.Duration `json:"initial_interval"`
MaxInterval time.Duration `json:"max_interval"`
Multiplier float64 `json:"multiplier"` // Exponential backoff multiplier
}
RetryStrategy defines retry behavior.