Documentation
¶
Index ¶
- Variables
- type Backend
- type Config
- type Envelope
- type GoQueue
- type Handler
- type HandlerFunc
- type MemoryBackend
- func (b *MemoryBackend) Ack(ctx context.Context, messageID string) error
- func (b *MemoryBackend) Close() error
- func (b *MemoryBackend) Nack(ctx context.Context, messageID string) error
- func (b *MemoryBackend) Publish(ctx context.Context, queue string, envelope *Envelope) error
- func (b *MemoryBackend) Subscribe(ctx context.Context, queue string) (<-chan *Envelope, error)
- type MemoryOption
- type Option
- type QueueMessage
- type RabbitMQBackend
- func (b *RabbitMQBackend) Ack(ctx context.Context, messageID string) error
- func (b *RabbitMQBackend) Close() error
- func (b *RabbitMQBackend) Nack(ctx context.Context, messageID string) error
- func (b *RabbitMQBackend) Ping(ctx context.Context) error
- func (b *RabbitMQBackend) Publish(ctx context.Context, queue string, envelope *Envelope) error
- func (b *RabbitMQBackend) Subscribe(ctx context.Context, queue string) (<-chan *Envelope, error)
- type RabbitMQOption
- func WithRabbitMQAuth(username, password string) RabbitMQOption
- func WithRabbitMQAutoDelete(autoDelete bool) RabbitMQOption
- func WithRabbitMQDLX(dlxName, dlqName string) RabbitMQOption
- func WithRabbitMQDLXRoutingKey(key string) RabbitMQOption
- func WithRabbitMQDurableQueues(durable bool) RabbitMQOption
- func WithRabbitMQExclusive(exclusive bool) RabbitMQOption
- func WithRabbitMQHost(host string, port int) RabbitMQOption
- func WithRabbitMQInsecureSkipVerify() RabbitMQOption
- func WithRabbitMQMaxChannels(max int) RabbitMQOption
- func WithRabbitMQPersistence(persistent bool) RabbitMQOption
- func WithRabbitMQPrefetch(count, size int) RabbitMQOption
- func WithRabbitMQPriority(maxPriority uint8) RabbitMQOption
- func WithRabbitMQTLS(certFile, keyFile, caFile string) RabbitMQOption
- func WithRabbitMQTLSConfig(tlsConfig *tls.Config) RabbitMQOption
- func WithRabbitMQURL(url string) RabbitMQOption
- func WithRabbitMQVHost(vhost string) RabbitMQOption
- type RedisBackend
- func (b *RedisBackend) Ack(ctx context.Context, messageID string) error
- func (b *RedisBackend) Close() error
- func (b *RedisBackend) Nack(ctx context.Context, messageID string) error
- func (b *RedisBackend) Ping(ctx context.Context) error
- func (b *RedisBackend) Publish(ctx context.Context, queue string, envelope *Envelope) error
- func (b *RedisBackend) Subscribe(ctx context.Context, queue string) (<-chan *Envelope, error)
- type RedisOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueueStopped is returned when trying to use a stopped queue ErrQueueStopped = errors.New("queue is stopped") // ErrWorkersAlreadyRunning is returned when trying to start workers for a queue that already has workers running ErrWorkersAlreadyRunning = errors.New("workers already running for this queue") // ErrMessageNotFound is returned when a message with the given ID is not found ErrMessageNotFound = errors.New("message not found") // ErrHandlerAlreadyRegistered is returned when trying to register a handler for a queue that already has one ErrHandlerAlreadyRegistered = errors.New("handler already registered for this queue") // ErrNoHandlersRegistered is returned when trying to start without any registered handlers ErrNoHandlersRegistered = errors.New("no handlers registered") )
Functions ¶
This section is empty.
Types ¶
type Backend ¶
type Backend interface {
// Publish sends a message envelope to the specified queue
Publish(ctx context.Context, queue string, envelope *Envelope) error
// Subscribe creates a subscription to the specified queue and returns
// a channel that will receive message envelopes
Subscribe(ctx context.Context, queue string) (<-chan *Envelope, error)
// Ack acknowledges successful processing of a message
Ack(ctx context.Context, messageID string) error
// Nack indicates that a message failed to process and should be retried
Nack(ctx context.Context, messageID string) error
// Close releases any resources held by the backend
Close() error
}
Backend defines the interface that all queue backends must implement. This allows easy swapping between different queue implementations (e.g., Redis, RabbitMQ, in-memory, etc.)
type Config ¶
type Config struct {
// WorkerCount is the number of concurrent workers processing messages
WorkerCount int
// RetryCount is the maximum number of times a failed message will be retried
RetryCount int
// RetryDelay is the initial delay between retries (uses exponential backoff)
RetryDelay time.Duration
// DLQEnabled enables the dead letter queue for permanently failed messages
DLQEnabled bool
// DLQName is the name of the dead letter queue
DLQName string
}
Config holds the configuration for a GoQueue instance
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults
type Envelope ¶
type Envelope struct {
ID string `json:"id"`
Queue string `json:"queue"`
Data []byte `json:"data"`
RetryCount int `json:"retry_count"`
CreatedAt time.Time `json:"created_at"`
ProcessedAt *time.Time `json:"processed_at,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Envelope represents the internal wrapper around queue messages with metadata.
func NewEnvelope ¶
NewEnvelope creates a new envelope with the given queue name and data
type GoQueue ¶
type GoQueue struct {
// contains filtered or unexported fields
}
GoQueue is the main queue manager that coordinates workers and the backend
func (*GoQueue) Publish ¶
func (gq *GoQueue) Publish(ctx context.Context, message QueueMessage) error
Publish publishes a message to its designated queue. The message declares which queue it belongs to via QueueName(). The message will be automatically serialized to JSON.
func (*GoQueue) PublishBytes ¶
PublishBytes publishes raw bytes to the specified queue.
func (*GoQueue) Register ¶
Register registers a handler for its designated queue. Must be called before Start().
type Handler ¶
type Handler interface {
// QueueName returns the name of the queue this handler processes
QueueName() string
// Handle processes a single message envelope.
// Return an error to trigger retry logic, or nil on success.
Handle(ctx context.Context, envelope *Envelope) error
}
Handler defines the interface that message handlers must implement. Handlers declare which queue they process and how to handle messages.
type HandlerFunc ¶
type HandlerFunc struct {
// contains filtered or unexported fields
}
HandlerFunc is a function adapter that allows using functions as Handlers. This provides backward compatibility and convenience for simple use cases.
func NewHandlerFunc ¶
func NewHandlerFunc(queueName string, fn func(ctx context.Context, envelope *Envelope) error) HandlerFunc
NewHandlerFunc creates a new HandlerFunc for the given queue
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(ctx context.Context, envelope *Envelope) error
Handle implements the Handler interface for HandlerFunc
func (HandlerFunc) QueueName ¶
func (f HandlerFunc) QueueName() string
QueueName implements the Handler interface
type MemoryBackend ¶
type MemoryBackend struct {
// contains filtered or unexported fields
}
MemoryBackend is an in-memory queue backend using Go channels. This is ideal for testing, development, and single-process applications.
func NewMemoryBackend ¶
func NewMemoryBackend(opts ...MemoryOption) *MemoryBackend
NewMemoryBackend creates a new in-memory backend
func (*MemoryBackend) Ack ¶
func (b *MemoryBackend) Ack(ctx context.Context, messageID string) error
Ack acknowledges successful processing of a message
func (*MemoryBackend) Close ¶
func (b *MemoryBackend) Close() error
Close releases resources held by the backend
func (*MemoryBackend) Nack ¶
func (b *MemoryBackend) Nack(ctx context.Context, messageID string) error
Nack indicates that a message failed to process
type MemoryOption ¶
type MemoryOption func(*MemoryBackend)
MemoryOption is a function that configures the memory backend
func WithBufferSize ¶
func WithBufferSize(size int) MemoryOption
WithBufferSize sets the channel buffer size for each queue
type Option ¶
type Option func(*Config)
Option is a function that modifies a Config
func WithRetryCount ¶
WithRetryCount sets the maximum number of retries for failed messages
func WithRetryDelay ¶
WithRetryDelay sets the initial retry delay (exponential backoff is applied)
func WithWorkerCount ¶
WithWorkerCount sets the number of concurrent workers
type QueueMessage ¶
type QueueMessage interface {
QueueName() string
}
QueueMessage is the interface that all queue messages must implement.
type RabbitMQBackend ¶ added in v1.1.0
type RabbitMQBackend struct {
// contains filtered or unexported fields
}
RabbitMQBackend implements the Backend interface using RabbitMQ
func NewRabbitMQBackend ¶ added in v1.1.0
func NewRabbitMQBackend(url string, opts ...RabbitMQOption) (*RabbitMQBackend, error)
NewRabbitMQBackend creates a new RabbitMQ backend
func NewRabbitMQBackendWithConnection ¶ added in v1.1.0
func NewRabbitMQBackendWithConnection(conn *amqp.Connection, opts ...RabbitMQOption) (*RabbitMQBackend, error)
NewRabbitMQBackendWithConnection creates a new RabbitMQ backend with an existing connection
func (*RabbitMQBackend) Ack ¶ added in v1.1.0
func (b *RabbitMQBackend) Ack(ctx context.Context, messageID string) error
Ack acknowledges successful processing of a message
func (*RabbitMQBackend) Close ¶ added in v1.1.0
func (b *RabbitMQBackend) Close() error
Close releases all resources held by the backend
func (*RabbitMQBackend) Nack ¶ added in v1.1.0
func (b *RabbitMQBackend) Nack(ctx context.Context, messageID string) error
Nack indicates that a message failed to process
func (*RabbitMQBackend) Ping ¶ added in v1.1.0
func (b *RabbitMQBackend) Ping(ctx context.Context) error
Ping checks the connection to RabbitMQ
type RabbitMQOption ¶ added in v1.1.0
type RabbitMQOption func(*rabbitmqConfig)
RabbitMQOption is a function that configures RabbitMQBackend
func WithRabbitMQAuth ¶ added in v1.1.0
func WithRabbitMQAuth(username, password string) RabbitMQOption
WithRabbitMQAuth sets the username and password
func WithRabbitMQAutoDelete ¶ added in v1.1.0
func WithRabbitMQAutoDelete(autoDelete bool) RabbitMQOption
WithRabbitMQAutoDelete sets whether queues should auto-delete
func WithRabbitMQDLX ¶ added in v1.1.0
func WithRabbitMQDLX(dlxName, dlqName string) RabbitMQOption
WithRabbitMQDLX configures Dead Letter Exchange
func WithRabbitMQDLXRoutingKey ¶ added in v1.1.0
func WithRabbitMQDLXRoutingKey(key string) RabbitMQOption
WithRabbitMQDLXRoutingKey sets the routing key for DLX
func WithRabbitMQDurableQueues ¶ added in v1.1.0
func WithRabbitMQDurableQueues(durable bool) RabbitMQOption
WithRabbitMQDurableQueues sets whether queues should be durable
func WithRabbitMQExclusive ¶ added in v1.1.0
func WithRabbitMQExclusive(exclusive bool) RabbitMQOption
WithRabbitMQExclusive sets whether queues should be exclusive
func WithRabbitMQHost ¶ added in v1.1.0
func WithRabbitMQHost(host string, port int) RabbitMQOption
WithRabbitMQHost sets the host and port
func WithRabbitMQInsecureSkipVerify ¶ added in v1.1.0
func WithRabbitMQInsecureSkipVerify() RabbitMQOption
WithRabbitMQInsecureSkipVerify skips TLS certificate verification (use only for development)
func WithRabbitMQMaxChannels ¶ added in v1.1.0
func WithRabbitMQMaxChannels(max int) RabbitMQOption
WithRabbitMQMaxChannels sets the maximum number of channels in the pool
func WithRabbitMQPersistence ¶ added in v1.1.0
func WithRabbitMQPersistence(persistent bool) RabbitMQOption
WithRabbitMQPersistence sets whether messages should be persistent
func WithRabbitMQPrefetch ¶ added in v1.1.0
func WithRabbitMQPrefetch(count, size int) RabbitMQOption
WithRabbitMQPrefetch sets QoS prefetch count and size
func WithRabbitMQPriority ¶ added in v1.1.0
func WithRabbitMQPriority(maxPriority uint8) RabbitMQOption
WithRabbitMQPriority enables priority queues with the given max priority
func WithRabbitMQTLS ¶ added in v1.1.0
func WithRabbitMQTLS(certFile, keyFile, caFile string) RabbitMQOption
WithRabbitMQTLS sets TLS configuration from certificate files
func WithRabbitMQTLSConfig ¶ added in v1.1.0
func WithRabbitMQTLSConfig(tlsConfig *tls.Config) RabbitMQOption
WithRabbitMQTLSConfig sets a custom TLS configuration
func WithRabbitMQURL ¶ added in v1.1.0
func WithRabbitMQURL(url string) RabbitMQOption
WithRabbitMQURL sets the full AMQP URL
func WithRabbitMQVHost ¶ added in v1.1.0
func WithRabbitMQVHost(vhost string) RabbitMQOption
WithRabbitMQVHost sets the virtual host
type RedisBackend ¶
type RedisBackend struct {
// contains filtered or unexported fields
}
RedisBackend is a Redis-based queue backend. This is suitable for production use with distributed systems.
func NewRedisBackend ¶
func NewRedisBackend(addr string, opts ...RedisOption) *RedisBackend
NewRedisBackend creates a new Redis backend addr is the Redis server address (e.g., "localhost:6379")
func NewRedisBackendWithClient ¶
func NewRedisBackendWithClient(client *redis.Client, opts ...RedisOption) *RedisBackend
NewRedisBackendWithClient creates a new Redis backend with an existing Redis client
func (*RedisBackend) Ack ¶
func (b *RedisBackend) Ack(ctx context.Context, messageID string) error
Ack acknowledges successful processing of a message
func (*RedisBackend) Close ¶
func (b *RedisBackend) Close() error
Close releases resources held by the backend
func (*RedisBackend) Nack ¶
func (b *RedisBackend) Nack(ctx context.Context, messageID string) error
Nack indicates that a message failed to process
func (*RedisBackend) Ping ¶
func (b *RedisBackend) Ping(ctx context.Context) error
Ping checks if the Redis connection is alive
type RedisOption ¶
type RedisOption func(*RedisBackend)
RedisOption is a function that configures the Redis backend
func WithPollTimeout ¶
func WithPollTimeout(timeout time.Duration) RedisOption
WithPollTimeout sets the timeout for polling messages from Redis
func WithRedisDB ¶
func WithRedisDB(db int) RedisOption
WithRedisDB sets the Redis database number (0-15)
func WithRedisPassword ¶
func WithRedisPassword(password string) RedisOption
WithRedisPassword sets the password for Redis authentication
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
advanced
command
|
|
|
basic
command
|
|
|
rabbitmq
command
|
|
|
redis
command
|
|
|
redis-distributed/publisher
command
|
|
|
redis-distributed/worker
command
|