queue

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UnlimitedSize = -1
	Namespace     = "container::queue::"
)

Variables

View Source
var (
	DefaultPollInterval = 10 * time.Millisecond
	DefaultMaxRetries   = 10
	DefaultOptions      = Config{
		LockerGenerator: locker.NewMemoryLockerGenerator(),

		MaxSize: UnlimitedSize,

		MaxHandleFailures: 10,

		PollInterval: DefaultPollInterval,
		MaxRetries:   DefaultMaxRetries,

		ConsumerCount: 1,

		MessageIDGenerator: message.GenerateRandomID,
	}
)
View Source
var (
	ErrQueueClosed    = errors.New("queue is closed")
	ErrQueueFull      = errors.New("queue is full")
	ErrQueueEmpty     = errors.New("queue is empty")
	ErrQueueRecovered = errors.New("queue recovered")
	ErrInvalidData    = errors.New("invalid data")
)

Errors

Functions

func WithConsumerCount added in v0.1.0

func WithConsumerCount(consumerCount int) func(*Config)

func WithMaxHandleFailures added in v0.1.0

func WithMaxHandleFailures(maxHandleFailures int) func(*Config)

func WithMaxRetries added in v0.1.0

func WithMaxRetries(maxRetries int) func(*Config)

func WithMaxSize added in v0.1.0

func WithMaxSize(maxSize int) func(*Config)

func WithMessageIDGenerator added in v0.1.0

func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)

func WithPollInterval added in v0.1.0

func WithPollInterval(pollInterval time.Duration) func(*Config)

Types

type BaseDLQ added in v0.9.1

type BaseDLQ[T any] struct {
	Queue[T]
	// contains filtered or unexported fields
}

func (*BaseDLQ[T]) AssociatedQueue added in v0.9.1

func (q *BaseDLQ[T]) AssociatedQueue() Queue[T]

func (*BaseDLQ[T]) Redrive added in v0.9.1

func (q *BaseDLQ[T]) Redrive(ctx context.Context, items int) error

type BaseQueue added in v0.0.2

type BaseQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewBaseQueue added in v0.0.2

func NewBaseQueue[T any](name string, defaultMsg Message[T], options ...Option) (*BaseQueue[T], error)

func (*BaseQueue[T]) BDequeue added in v0.9.3

func (q *BaseQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*BaseQueue[T]) BEnqueue added in v0.9.3

func (q *BaseQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*BaseQueue[T]) Close added in v0.0.2

func (q *BaseQueue[T]) Close()

func (*BaseQueue[T]) DLQ added in v0.9.1

func (q *BaseQueue[T]) DLQ() (DLQ[T], error)

func (*BaseQueue[T]) Dequeue added in v0.0.2

func (q *BaseQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*BaseQueue[T]) Enqueue added in v0.0.2

func (q *BaseQueue[T]) Enqueue(ctx context.Context, data T) error

func (*BaseQueue[T]) GetConfig added in v0.9.0

func (q *BaseQueue[T]) GetConfig() *Config

func (*BaseQueue[T]) GetDeadletterQueueKey added in v0.9.1

func (q *BaseQueue[T]) GetDeadletterQueueKey() string

func (*BaseQueue[T]) GetDeadletterQueueName added in v0.9.1

func (q *BaseQueue[T]) GetDeadletterQueueName() string

func (*BaseQueue[T]) GetLocker added in v0.3.1

func (q *BaseQueue[T]) GetLocker() locker.SyncLocker

func (*BaseQueue[T]) GetQueueKey added in v0.9.0

func (q *BaseQueue[T]) GetQueueKey() string

func (*BaseQueue[T]) Kind added in v0.0.2

func (q *BaseQueue[T]) Kind() Kind

func (*BaseQueue[T]) MaxHandleFailures added in v0.1.0

func (q *BaseQueue[T]) MaxHandleFailures() int

func (*BaseQueue[T]) MaxSize added in v0.0.2

func (q *BaseQueue[T]) MaxSize() int

func (*BaseQueue[T]) Name added in v0.0.2

func (q *BaseQueue[T]) Name() string

func (*BaseQueue[T]) NewMessage added in v0.1.0

func (q *BaseQueue[T]) NewMessage(data T) (Message[T], error)

func (*BaseQueue[T]) Pack added in v0.1.0

func (q *BaseQueue[T]) Pack(data T) ([]byte, error)

func (*BaseQueue[T]) SetDLQ added in v0.9.1

func (q *BaseQueue[T]) SetDLQ(dlq DLQ[T])

func (*BaseQueue[T]) Subscribe added in v0.0.2

func (q *BaseQueue[T]) Subscribe(cb Handler[T])

func (*BaseQueue[T]) TriggerCallbacks added in v0.9.0

func (q *BaseQueue[T]) TriggerCallbacks(msg Message[T])

func (*BaseQueue[T]) Unpack added in v0.1.0

func (q *BaseQueue[T]) Unpack(data []byte) (Message[T], error)

func (*BaseQueue[T]) ValidateQueueClosed added in v0.1.0

func (q *BaseQueue[T]) ValidateQueueClosed() error

type Config added in v0.1.0

type Config struct {
	LockerGenerator locker.SyncLockerGenerator

	MaxSize int

	// Messages will be discarded after this many failures, or
	// pushed to DLQ if DLQ is supported
	MaxHandleFailures int

	PollInterval time.Duration

	// Used for internal retrying, not for message retrying
	MaxRetries int

	// Specify how many consumers are consuming the queue using `Subscribe`.
	// Be aware that too many consumers can cause order of messages to be changed.
	// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
	ConsumerCount int

	MessageIDGenerator message.MessageIDGenerator
}

Configuarable options here, but some implementations of queue may not support all options

type DLQ added in v0.1.0

type DLQ[T any] interface {
	Queue[T]

	// Push `items` of messages to associated Queue
	Redrive(ctx context.Context, items int) error

	AssociatedQueue() Queue[T]
}

type DLQer added in v0.1.0

type DLQer[T any] interface {
	DLQ() (DLQ[T], error)
}

type Factory

type Factory[T any] interface {
	// Create a new queue if name does not exist
	// If name already exists, return the existing queue
	GetOrCreate(name string, options ...Option) (Queue[T], error)

	// Same as GetOrCreate but returns SafeQueue
	GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
}

type Handler

type Handler[T any] func(msg Message[T]) error

type JsonMessage added in v0.1.0

type JsonMessage[T any] struct {
	message.JsonMessage[T]
}

func NewJsonMessage added in v0.1.0

func NewJsonMessage[T any](data T) *JsonMessage[T]

func (*JsonMessage[T]) AddRetryCount added in v0.1.0

func (m *JsonMessage[T]) AddRetryCount()

func (*JsonMessage[T]) CreatedAt added in v0.1.0

func (m *JsonMessage[T]) CreatedAt() time.Time

func (*JsonMessage[T]) RefreshRetryCount added in v0.1.0

func (m *JsonMessage[T]) RefreshRetryCount()

func (*JsonMessage[T]) RefreshUpdatedAt added in v0.1.0

func (m *JsonMessage[T]) RefreshUpdatedAt()

func (*JsonMessage[T]) RetryCount added in v0.1.0

func (m *JsonMessage[T]) RetryCount() int

func (*JsonMessage[T]) TotalRetryCount added in v0.1.0

func (m *JsonMessage[T]) TotalRetryCount() int

func (*JsonMessage[T]) UpdatedAt added in v0.1.0

func (m *JsonMessage[T]) UpdatedAt() time.Time

type Kind

type Kind uint8
const (
	KindFIFO Kind = iota + 1
	KindStandard
)

type MemoryFactory

type MemoryFactory[T any] struct {
	// contains filtered or unexported fields
}

func NewMemoryFactory

func NewMemoryFactory[T any](defaultMsg Message[T]) *MemoryFactory[T]

func (*MemoryFactory[T]) GetOrCreate

func (f *MemoryFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)

func (*MemoryFactory[T]) GetOrCreateSafe added in v0.2.0

func (f *MemoryFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)

type MemoryQueue

type MemoryQueue[T any] struct {
	*BaseQueue[T]
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue[T any](name string, defaultMsg Message[T], options ...Option) (*MemoryQueue[T], error)

func (*MemoryQueue[T]) BDequeue added in v0.9.3

func (q *MemoryQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*MemoryQueue[T]) BEnqueue added in v0.9.3

func (q *MemoryQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*MemoryQueue[T]) Close

func (q *MemoryQueue[T]) Close()

func (*MemoryQueue[T]) Dequeue

func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*MemoryQueue[T]) Enqueue

func (q *MemoryQueue[T]) Enqueue(ctx context.Context, data T) error

func (*MemoryQueue[T]) Kind

func (q *MemoryQueue[T]) Kind() Kind

func (*MemoryQueue[T]) Name added in v0.0.2

func (q *MemoryQueue[T]) Name() string

func (*MemoryQueue[T]) Purge added in v0.1.0

func (q *MemoryQueue[T]) Purge(ctx context.Context) error

func (*MemoryQueue[T]) Recover

func (q *MemoryQueue[T]) Recover(ctx context.Context, msg Message[T]) error

type Message added in v0.1.0

type Message[T any] interface {
	message.Message[T]

	RetryCount() int
	AddRetryCount()
	TotalRetryCount() int
	RefreshRetryCount()

	CreatedAt() time.Time
	UpdatedAt() time.Time

	RefreshUpdatedAt()
}

type Option added in v0.1.0

type Option func(*Config)

type Purgeable added in v0.1.0

type Purgeable interface {
	// Clean up the queue
	Purge(context.Context) error
}

type Queue

type Queue[T any] interface {
	Kind() Kind

	Name() string

	// Reports max size of queue
	// -1 for unlimited
	MaxSize() int

	// Reports max handle failures
	// Messages will be discarded after this many failures, or
	// pushed to DLQ if DLQ is supported
	MaxHandleFailures() int

	// Push data to end of queue
	// Failed if queue is full or closed
	Enqueue(ctx context.Context, data T) error

	// Same as Enqueue but block thread until queue is not full and message is enqueued
	BEnqueue(ctx context.Context, data T) error

	// The implementation MUST set the retryCount of the message to 0 if its retryCount > MaxHandleFailures,
	// in the case, the message is from DLQ redriving.
	Dequeue(context.Context) (Message[T], error)

	// Same as Dequeue but block thread until message is available
	BDequeue(context.Context) (Message[T], error)

	// Subscribe queue with message confirmation.
	// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
	Subscribe(h Handler[T])

	Close()
}

The interface of queue The implementation of queue should be thread-safe

type Recoverable added in v0.0.2

type Recoverable[T any] interface {

	// If the queue supports `visibility window` like AWS SQS, the message will be put back to queue atomically without calling `Recover`.
	// It's useful if the panic is from outside of the queue handler.
	// But it's recommended to use `Recover` if the panic is from inside the queue handler for retrying the message fast.
	Recover(context.Context, Message[T]) error
}

type RecoverableQueue

type RecoverableQueue[T any] interface {
	Queue[T]

	Recoverable[T]
}

type RedisQueue added in v0.0.2

type RedisQueue[T any] struct {
	*BaseQueue[T]
	// contains filtered or unexported fields
}

func NewRedisQueue added in v0.0.2

func NewRedisQueue[T any](redisClient redis.Cmdable, name string, defaultMsg Message[T], options ...Option) (*RedisQueue[T], error)

func (*RedisQueue[T]) BDequeue added in v0.9.3

func (q *RedisQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*RedisQueue[T]) BEnqueue added in v0.9.3

func (q *RedisQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*RedisQueue[T]) Dequeue added in v0.0.2

func (q *RedisQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*RedisQueue[T]) Enqueue added in v0.0.2

func (q *RedisQueue[T]) Enqueue(ctx context.Context, data T) error

func (*RedisQueue[T]) Purge added in v0.9.1

func (q *RedisQueue[T]) Purge(ctx context.Context) error

func (*RedisQueue[T]) Recover added in v0.0.2

func (q *RedisQueue[T]) Recover(ctx context.Context, msg Message[T]) error

type RedisQueueFactory added in v0.0.2

type RedisQueueFactory[T any] struct {
	// contains filtered or unexported fields
}

func NewRedisQueueFactory added in v0.0.2

func NewRedisQueueFactory[T any](redisClient redis.Cmdable, defaultMsg Message[T]) *RedisQueueFactory[T]

func (*RedisQueueFactory[T]) GetOrCreate added in v0.0.2

func (f *RedisQueueFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)

func (*RedisQueueFactory[T]) GetOrCreateSafe added in v0.2.0

func (f *RedisQueueFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)

type SafeQueue

type SafeQueue[T any] interface {
	Queue[T]

	Recoverable[T]
	IsRecoverable() bool

	Purgeable
	IsPurgeable() bool

	DLQer[T]
	IsDLQSupported() bool
}

SafeQueue provides ability to put message back to queue when handler encounters panic and makes sure all function calls are safe. e.g, Returns ErrNotImplemented if calling Recover and it is not implemented

type SimpleQueue added in v0.1.0

type SimpleQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewSimpleQueue added in v0.2.0

func NewSimpleQueue[T any](queue Queue[T]) (*SimpleQueue[T], error)

func (*SimpleQueue[T]) BDequeue added in v0.9.3

func (q *SimpleQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*SimpleQueue[T]) BEnqueue added in v0.9.3

func (q *SimpleQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*SimpleQueue[T]) Close added in v0.1.0

func (q *SimpleQueue[T]) Close()

func (*SimpleQueue[T]) DLQ added in v0.1.0

func (q *SimpleQueue[T]) DLQ() (DLQ[T], error)

func (*SimpleQueue[T]) Dequeue added in v0.1.0

func (q *SimpleQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*SimpleQueue[T]) Enqueue added in v0.1.0

func (q *SimpleQueue[T]) Enqueue(ctx context.Context, data T) error

func (*SimpleQueue[T]) IsDLQSupported added in v0.1.0

func (q *SimpleQueue[T]) IsDLQSupported() bool

func (*SimpleQueue[T]) IsPurgeable added in v0.1.0

func (q *SimpleQueue[T]) IsPurgeable() bool

func (*SimpleQueue[T]) IsRecoverable added in v0.1.0

func (q *SimpleQueue[T]) IsRecoverable() bool

func (*SimpleQueue[T]) Kind added in v0.1.0

func (q *SimpleQueue[T]) Kind() Kind

func (*SimpleQueue[T]) MaxHandleFailures added in v0.1.0

func (q *SimpleQueue[T]) MaxHandleFailures() int

func (*SimpleQueue[T]) MaxSize added in v0.1.0

func (q *SimpleQueue[T]) MaxSize() int

func (*SimpleQueue[T]) Name added in v0.1.0

func (q *SimpleQueue[T]) Name() string

func (*SimpleQueue[T]) Purge added in v0.1.0

func (q *SimpleQueue[T]) Purge(ctx context.Context) error

func (*SimpleQueue[T]) Recover added in v0.1.0

func (q *SimpleQueue[T]) Recover(ctx context.Context, msg Message[T]) error

func (*SimpleQueue[T]) Subscribe added in v0.1.0

func (q *SimpleQueue[T]) Subscribe(cb Handler[T])

func (*SimpleQueue[T]) Unwrap added in v0.1.0

func (q *SimpleQueue[T]) Unwrap() Queue[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL