Documentation
¶
Index ¶
- Constants
- Variables
- func WithConsumerCount(consumerCount int) func(*Config)
- func WithMaxHandleFailures(maxHandleFailures int) func(*Config)
- func WithMaxRetries(maxRetries int) func(*Config)
- func WithMaxSize(maxSize int) func(*Config)
- func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)
- func WithPollInterval(pollInterval time.Duration) func(*Config)
- type BaseDLQ
- type BaseQueue
- func (q *BaseQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *BaseQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *BaseQueue[T]) Close()
- func (q *BaseQueue[T]) DLQ() (DLQ[T], error)
- func (q *BaseQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *BaseQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *BaseQueue[T]) GetConfig() *Config
- func (q *BaseQueue[T]) GetDeadletterQueueKey() string
- func (q *BaseQueue[T]) GetDeadletterQueueName() string
- func (q *BaseQueue[T]) GetLocker() locker.SyncLocker
- func (q *BaseQueue[T]) GetQueueKey() string
- func (q *BaseQueue[T]) Kind() Kind
- func (q *BaseQueue[T]) MaxHandleFailures() int
- func (q *BaseQueue[T]) MaxSize() int
- func (q *BaseQueue[T]) Name() string
- func (q *BaseQueue[T]) NewMessage(data T) (Message[T], error)
- func (q *BaseQueue[T]) Pack(data T) ([]byte, error)
- func (q *BaseQueue[T]) SetDLQ(dlq DLQ[T])
- func (q *BaseQueue[T]) Subscribe(cb Handler[T])
- func (q *BaseQueue[T]) TriggerCallbacks(msg Message[T])
- func (q *BaseQueue[T]) Unpack(data []byte) (Message[T], error)
- func (q *BaseQueue[T]) ValidateQueueClosed() error
- type Config
- type DLQ
- type DLQer
- type Factory
- type Handler
- type JsonMessage
- func (m *JsonMessage[T]) AddRetryCount()
- func (m *JsonMessage[T]) CreatedAt() time.Time
- func (m *JsonMessage[T]) RefreshRetryCount()
- func (m *JsonMessage[T]) RefreshUpdatedAt()
- func (m *JsonMessage[T]) RetryCount() int
- func (m *JsonMessage[T]) TotalRetryCount() int
- func (m *JsonMessage[T]) UpdatedAt() time.Time
- type Kind
- type MemoryFactory
- type MemoryQueue
- func (q *MemoryQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *MemoryQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *MemoryQueue[T]) Close()
- func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *MemoryQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *MemoryQueue[T]) Kind() Kind
- func (q *MemoryQueue[T]) Name() string
- func (q *MemoryQueue[T]) Purge(ctx context.Context) error
- func (q *MemoryQueue[T]) Recover(ctx context.Context, msg Message[T]) error
- type Message
- type Option
- type Purgeable
- type Queue
- type Recoverable
- type RecoverableQueue
- type RedisQueue
- func (q *RedisQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *RedisQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *RedisQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *RedisQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *RedisQueue[T]) Purge(ctx context.Context) error
- func (q *RedisQueue[T]) Recover(ctx context.Context, msg Message[T]) error
- type RedisQueueFactory
- type SafeQueue
- type SimpleQueue
- func (q *SimpleQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *SimpleQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *SimpleQueue[T]) Close()
- func (q *SimpleQueue[T]) DLQ() (DLQ[T], error)
- func (q *SimpleQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *SimpleQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *SimpleQueue[T]) IsDLQSupported() bool
- func (q *SimpleQueue[T]) IsPurgeable() bool
- func (q *SimpleQueue[T]) IsRecoverable() bool
- func (q *SimpleQueue[T]) Kind() Kind
- func (q *SimpleQueue[T]) MaxHandleFailures() int
- func (q *SimpleQueue[T]) MaxSize() int
- func (q *SimpleQueue[T]) Name() string
- func (q *SimpleQueue[T]) Purge(ctx context.Context) error
- func (q *SimpleQueue[T]) Recover(ctx context.Context, msg Message[T]) error
- func (q *SimpleQueue[T]) Subscribe(cb Handler[T])
- func (q *SimpleQueue[T]) Unwrap() Queue[T]
Constants ¶
View Source
const ( UnlimitedSize = -1 Namespace = "container::queue::" )
Variables ¶
View Source
var ( DefaultPollInterval = 10 * time.Millisecond DefaultMaxRetries = 10 DefaultOptions = Config{ LockerGenerator: locker.NewMemoryLockerGenerator(), MaxSize: UnlimitedSize, MaxHandleFailures: 10, PollInterval: DefaultPollInterval, MaxRetries: DefaultMaxRetries, ConsumerCount: 1, MessageIDGenerator: message.GenerateRandomID, } )
View Source
var ( ErrQueueClosed = errors.New("queue is closed") ErrQueueFull = errors.New("queue is full") ErrQueueEmpty = errors.New("queue is empty") ErrQueueRecovered = errors.New("queue recovered") ErrInvalidData = errors.New("invalid data") )
Errors
Functions ¶
func WithConsumerCount ¶ added in v0.1.0
func WithMaxHandleFailures ¶ added in v0.1.0
func WithMaxRetries ¶ added in v0.1.0
func WithMaxSize ¶ added in v0.1.0
func WithMessageIDGenerator ¶ added in v0.1.0
func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)
func WithPollInterval ¶ added in v0.1.0
Types ¶
type BaseDLQ ¶ added in v0.9.1
func (*BaseDLQ[T]) AssociatedQueue ¶ added in v0.9.1
type BaseQueue ¶ added in v0.0.2
type BaseQueue[T any] struct { // contains filtered or unexported fields }
func NewBaseQueue ¶ added in v0.0.2
func (*BaseQueue[T]) GetDeadletterQueueKey ¶ added in v0.9.1
func (*BaseQueue[T]) GetDeadletterQueueName ¶ added in v0.9.1
func (*BaseQueue[T]) GetLocker ¶ added in v0.3.1
func (q *BaseQueue[T]) GetLocker() locker.SyncLocker
func (*BaseQueue[T]) GetQueueKey ¶ added in v0.9.0
func (*BaseQueue[T]) MaxHandleFailures ¶ added in v0.1.0
func (*BaseQueue[T]) NewMessage ¶ added in v0.1.0
func (*BaseQueue[T]) TriggerCallbacks ¶ added in v0.9.0
func (*BaseQueue[T]) ValidateQueueClosed ¶ added in v0.1.0
type Config ¶ added in v0.1.0
type Config struct { LockerGenerator locker.SyncLockerGenerator MaxSize int // Messages will be discarded after this many failures, or // pushed to DLQ if DLQ is supported MaxHandleFailures int PollInterval time.Duration // Used for internal retrying, not for message retrying MaxRetries int // Specify how many consumers are consuming the queue using `Subscribe`. // Be aware that too many consumers can cause order of messages to be changed. // If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1 ConsumerCount int MessageIDGenerator message.MessageIDGenerator }
Configuarable options here, but some implementations of queue may not support all options
type Factory ¶
type Factory[T any] interface { // Create a new queue if name does not exist // If name already exists, return the existing queue GetOrCreate(name string, options ...Option) (Queue[T], error) // Same as GetOrCreate but returns SafeQueue GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error) }
type JsonMessage ¶ added in v0.1.0
type JsonMessage[T any] struct { message.JsonMessage[T] }
func NewJsonMessage ¶ added in v0.1.0
func NewJsonMessage[T any](data T) *JsonMessage[T]
func (*JsonMessage[T]) AddRetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) AddRetryCount()
func (*JsonMessage[T]) CreatedAt ¶ added in v0.1.0
func (m *JsonMessage[T]) CreatedAt() time.Time
func (*JsonMessage[T]) RefreshRetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) RefreshRetryCount()
func (*JsonMessage[T]) RefreshUpdatedAt ¶ added in v0.1.0
func (m *JsonMessage[T]) RefreshUpdatedAt()
func (*JsonMessage[T]) RetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) RetryCount() int
func (*JsonMessage[T]) TotalRetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) TotalRetryCount() int
func (*JsonMessage[T]) UpdatedAt ¶ added in v0.1.0
func (m *JsonMessage[T]) UpdatedAt() time.Time
type MemoryFactory ¶
type MemoryFactory[T any] struct { // contains filtered or unexported fields }
func NewMemoryFactory ¶
func NewMemoryFactory[T any](defaultMsg Message[T]) *MemoryFactory[T]
func (*MemoryFactory[T]) GetOrCreate ¶
func (f *MemoryFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)
func (*MemoryFactory[T]) GetOrCreateSafe ¶ added in v0.2.0
func (f *MemoryFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
type MemoryQueue ¶
func NewMemoryQueue ¶
func (*MemoryQueue[T]) BDequeue ¶ added in v0.9.3
func (q *MemoryQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
func (*MemoryQueue[T]) BEnqueue ¶ added in v0.9.3
func (q *MemoryQueue[T]) BEnqueue(ctx context.Context, data T) error
func (*MemoryQueue[T]) Close ¶
func (q *MemoryQueue[T]) Close()
func (*MemoryQueue[T]) Dequeue ¶
func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
func (*MemoryQueue[T]) Enqueue ¶
func (q *MemoryQueue[T]) Enqueue(ctx context.Context, data T) error
func (*MemoryQueue[T]) Kind ¶
func (q *MemoryQueue[T]) Kind() Kind
func (*MemoryQueue[T]) Name ¶ added in v0.0.2
func (q *MemoryQueue[T]) Name() string
type Queue ¶
type Queue[T any] interface { Kind() Kind Name() string // Reports max size of queue // -1 for unlimited MaxSize() int // Reports max handle failures // Messages will be discarded after this many failures, or // pushed to DLQ if DLQ is supported MaxHandleFailures() int // Push data to end of queue // Failed if queue is full or closed Enqueue(ctx context.Context, data T) error // Same as Enqueue but block thread until queue is not full and message is enqueued BEnqueue(ctx context.Context, data T) error // The implementation MUST set the retryCount of the message to 0 if its retryCount > MaxHandleFailures, // in the case, the message is from DLQ redriving. Dequeue(context.Context) (Message[T], error) // Same as Dequeue but block thread until message is available BDequeue(context.Context) (Message[T], error) // Subscribe queue with message confirmation. // Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally. Subscribe(h Handler[T]) Close() }
The interface of queue The implementation of queue should be thread-safe
type Recoverable ¶ added in v0.0.2
type Recoverable[T any] interface { // If the queue supports `visibility window` like AWS SQS, the message will be put back to queue atomically without calling `Recover`. // It's useful if the panic is from outside of the queue handler. // But it's recommended to use `Recover` if the panic is from inside the queue handler for retrying the message fast. Recover(context.Context, Message[T]) error }
type RecoverableQueue ¶
type RecoverableQueue[T any] interface { Queue[T] Recoverable[T] }
type RedisQueue ¶ added in v0.0.2
func NewRedisQueue ¶ added in v0.0.2
func (*RedisQueue[T]) BDequeue ¶ added in v0.9.3
func (q *RedisQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
func (*RedisQueue[T]) BEnqueue ¶ added in v0.9.3
func (q *RedisQueue[T]) BEnqueue(ctx context.Context, data T) error
func (*RedisQueue[T]) Dequeue ¶ added in v0.0.2
func (q *RedisQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
func (*RedisQueue[T]) Enqueue ¶ added in v0.0.2
func (q *RedisQueue[T]) Enqueue(ctx context.Context, data T) error
type RedisQueueFactory ¶ added in v0.0.2
type RedisQueueFactory[T any] struct { // contains filtered or unexported fields }
func NewRedisQueueFactory ¶ added in v0.0.2
func NewRedisQueueFactory[T any](redisClient redis.Cmdable, defaultMsg Message[T]) *RedisQueueFactory[T]
func (*RedisQueueFactory[T]) GetOrCreate ¶ added in v0.0.2
func (f *RedisQueueFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)
func (*RedisQueueFactory[T]) GetOrCreateSafe ¶ added in v0.2.0
func (f *RedisQueueFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
type SafeQueue ¶
type SafeQueue[T any] interface { Queue[T] Recoverable[T] IsRecoverable() bool Purgeable IsPurgeable() bool DLQer[T] IsDLQSupported() bool }
SafeQueue provides ability to put message back to queue when handler encounters panic and makes sure all function calls are safe. e.g, Returns ErrNotImplemented if calling Recover and it is not implemented
type SimpleQueue ¶ added in v0.1.0
type SimpleQueue[T any] struct { // contains filtered or unexported fields }
func NewSimpleQueue ¶ added in v0.2.0
func NewSimpleQueue[T any](queue Queue[T]) (*SimpleQueue[T], error)
func (*SimpleQueue[T]) BDequeue ¶ added in v0.9.3
func (q *SimpleQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
func (*SimpleQueue[T]) BEnqueue ¶ added in v0.9.3
func (q *SimpleQueue[T]) BEnqueue(ctx context.Context, data T) error
func (*SimpleQueue[T]) Close ¶ added in v0.1.0
func (q *SimpleQueue[T]) Close()
func (*SimpleQueue[T]) DLQ ¶ added in v0.1.0
func (q *SimpleQueue[T]) DLQ() (DLQ[T], error)
func (*SimpleQueue[T]) Dequeue ¶ added in v0.1.0
func (q *SimpleQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
func (*SimpleQueue[T]) Enqueue ¶ added in v0.1.0
func (q *SimpleQueue[T]) Enqueue(ctx context.Context, data T) error
func (*SimpleQueue[T]) IsDLQSupported ¶ added in v0.1.0
func (q *SimpleQueue[T]) IsDLQSupported() bool
func (*SimpleQueue[T]) IsPurgeable ¶ added in v0.1.0
func (q *SimpleQueue[T]) IsPurgeable() bool
func (*SimpleQueue[T]) IsRecoverable ¶ added in v0.1.0
func (q *SimpleQueue[T]) IsRecoverable() bool
func (*SimpleQueue[T]) Kind ¶ added in v0.1.0
func (q *SimpleQueue[T]) Kind() Kind
func (*SimpleQueue[T]) MaxHandleFailures ¶ added in v0.1.0
func (q *SimpleQueue[T]) MaxHandleFailures() int
func (*SimpleQueue[T]) MaxSize ¶ added in v0.1.0
func (q *SimpleQueue[T]) MaxSize() int
func (*SimpleQueue[T]) Name ¶ added in v0.1.0
func (q *SimpleQueue[T]) Name() string
func (*SimpleQueue[T]) Purge ¶ added in v0.1.0
func (q *SimpleQueue[T]) Purge(ctx context.Context) error
func (*SimpleQueue[T]) Recover ¶ added in v0.1.0
func (q *SimpleQueue[T]) Recover(ctx context.Context, msg Message[T]) error
func (*SimpleQueue[T]) Subscribe ¶ added in v0.1.0
func (q *SimpleQueue[T]) Subscribe(cb Handler[T])
func (*SimpleQueue[T]) Unwrap ¶ added in v0.1.0
func (q *SimpleQueue[T]) Unwrap() Queue[T]
Click to show internal directories.
Click to hide internal directories.