Documentation
¶
Overview ¶
Package queue provides a simple interface to interact with a queue.
Index ¶
- Constants
- Variables
- func Flatten2D[T any](data [][]T) []T
- func NewMap[P, S any]() map[string]IQueue[P, S]
- func ParseToStruct(from, to any) error
- func Publish[P any, S any](ctx context.Context, s IQueue[P, S], queueName string, msg *Message, prm *P, ...) error
- func PublishMany[P, S any](ctx context.Context, q IQueue[P, S], queueName string, items []*Message, ...) error
- func PublishToMany[P, S any](ctx context.Context, m map[string]IQueue[P, S], queueName string, msg *Message, ...) error
- func String[P, S any](m map[string]IQueue[P, S]) string
- func Subscribe[P any, S any](ctx context.Context, s IQueue[P, S], queueName string, cb CallbackFunc, prm *S, ...) error
- type CallbackFunc
- type HookFunc
- type IQueue
- type Message
- type Mock
- func (m *Mock[PublishParams, SubscribeParams]) GetClient() any
- func (m *Mock[PublishParams, SubscribeParams]) GetCounterPublished() *expvar.Int
- func (m *Mock[PublishParams, SubscribeParams]) GetCounterPublishedFailed() *expvar.Int
- func (m *Mock[PublishParams, SubscribeParams]) GetCounterReceived() *expvar.Int
- func (m *Mock[PublishParams, SubscribeParams]) GetCounterReceivedFailed() *expvar.Int
- func (m *Mock[PublishParams, SubscribeParams]) GetCounterSubscribed() *expvar.Int
- func (m *Mock[PublishParams, SubscribeParams]) GetCounterSubscribedFailed() *expvar.Int
- func (m *Mock[PublishParams, SubscribeParams]) GetLogger() sypl.ISypl
- func (m *Mock[PublishParams, SubscribeParams]) GetName() string
- func (m *Mock[PublishParams, SubscribeParams]) GetType() string
- func (m *Mock[PublishParams, SubscribeParams]) Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, ...) error
- func (m *Mock[PublishParams, SubscribeParams]) Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, ...) error
- type Operation
- type Options
- type OptionsFunc
- type PublishParams
- type Queue
- func (s *Queue) GetCounterPublished() *expvar.Int
- func (s *Queue) GetCounterPublishedFailed() *expvar.Int
- func (s *Queue) GetCounterReceived() *expvar.Int
- func (s *Queue) GetCounterReceivedFailed() *expvar.Int
- func (s *Queue) GetCounterSubscribed() *expvar.Int
- func (s *Queue) GetCounterSubscribedFailed() *expvar.Int
- func (s *Queue) GetLogger() sypl.ISypl
- func (s *Queue) GetName() string
- func (s *Queue) GetType() string
- type SubscribeParams
Constants ¶
const ( DefaultMetricCounterLabel = "counter" Type = "queue" )
Type is the type of the entity regarding the framework. It is used to for example, to identify the entity in the logs, metrics, and for tracing.
Variables ¶
var ( // ErrRequiredPostHook is the error returned when the post-hook function is // missing. ErrRequiredPostHook = customerror.NewRequiredError("post-hook function", customerror.WithErrorCode("ERR_REQUIRED_POST_HOOK")) // ErrRequiredPreHook is the error returned when the pre-hook function is // missing. ErrRequiredPreHook = customerror.NewRequiredError("pre-hook function", customerror.WithErrorCode("ERR_REQUIRED_PRE_HOOK")) )
Functions ¶
func Flatten2D ¶
func Flatten2D[T any](data [][]T) []T
Flatten2D takes a 2D slice and returns a 1D slice containing all the elements.
func ParseToStruct ¶
ParseToStruct parses the given JSON (`from`) to struct (`to`).
func Publish ¶
func Publish[P any, S any](ctx context.Context, s IQueue[P, S], queueName string, msg *Message, prm *P, options ...OptionsFunc[P, S]) error
Publish data.
func PublishMany ¶
func PublishMany[P, S any]( ctx context.Context, q IQueue[P, S], queueName string, items []*Message, prm *P, options ...OptionsFunc[P, S], ) error
PublishMany publish `items` concurrently, against the specified Queue.
func PublishToMany ¶
func PublishToMany[P, S any]( ctx context.Context, m map[string]IQueue[P, S], queueName string, msg *Message, prm *P, options ...OptionsFunc[P, S], ) error
PublishToMany publishes `msg` concurrently, against all Queues in `m`.
Types ¶
type CallbackFunc ¶
CallbackFunc is the function that will be once a message is received on a subscribed channel.
type HookFunc ¶
type HookFunc[P, S any] func(ctx context.Context, q IQueue[P, S], queueName string, m *Message) error
HookFunc specifies the function that will be called before and after the operation.
type IQueue ¶
type IQueue[P, S any] interface { // Publish data. Publish(ctx context.Context, queueName string, msg *Message, prm *P, options ...OptionsFunc[P, S]) error // Subscribe to channel. Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *S, options ...OptionsFunc[P, S]) error // GetType returns its type. GetType() string // GetClient returns the queue client. Use that to interact with the // underlying queue client. GetClient() any // GetLogger returns the logger. GetLogger() sypl.ISypl // GetName returns the queue name. GetName() string // GetCounterPublished returns the metric. GetCounterPublished() *expvar.Int // GetCounterPublishedFailed returns the metric. GetCounterPublishedFailed() *expvar.Int // GetCounterReceived returns the metric. GetCounterReceived() *expvar.Int // GetCounterReceivedFailed returns the metric. GetCounterReceivedFailed() *expvar.Int // GetCounterSubscribed returns the metric. GetCounterSubscribed() *expvar.Int // GetCounterSubscribedFailed returns the metric. GetCounterSubscribedFailed() *expvar.Int }
IQueue defines the queue abstraction layer - interface.
type Message ¶
type Message struct {
// Body is the raw message payload in bytes.
Body []byte
// MessageID uniquely identifies the message across a queue system.
// Example: Message ID in SQS, Delivery Tag in RabbitMQ, Offset in Kafka, etc.
MessageID string
// Metadata holds specific attributes that don't fit into standard fields.
Metadata map[string]interface{}
// Timestamp indicates when the message was published to the queue.
Timestamp time.Time
}
Message represents a generic queue message
func NewMessage ¶
NewMessage returns a new Message with a unique ID and current timestamp.
func NewMessageFromStruct ¶
NewMessageFromStruct creates a new Message from a struct.
func NewMustMessageFromStruct ¶
NewMustMessageFromStruct creates a new Message from a struct, panicking on error.
type Mock ¶
type Mock[P, S any] struct { // Publish data. MockPublish func(ctx context.Context, queueName string, msg *Message, prm *P, options ...OptionsFunc[P, S]) error // Subscribe to channel. MockSubscribe func(ctx context.Context, queueName string, cb CallbackFunc, prm *S, options ...OptionsFunc[P, S]) error // GetType returns its type. MockGetType func() string // GetClient returns the queue client. Use that to interact with the underlying queue client. MockGetClient func() any // GetLogger returns the logger. MockGetLogger func() sypl.ISypl // GetName returns the queue name. MockGetName func() string // GetCounterPublished returns the metric. MockGetCounterPublished func() *expvar.Int // GetCounterPublishedFailed returns the metric. MockGetCounterPublishedFailed func() *expvar.Int // GetCounterReceived returns the metric. MockGetCounterReceived func() *expvar.Int // GetCounterReceivedFailed returns the metric. MockGetCounterReceivedFailed func() *expvar.Int // GetCounterSubscribed returns the metric. MockGetCounterSubscribed func() *expvar.Int // GetCounterSubscribedFailed returns the metric. MockGetCounterSubscribedFailed func() *expvar.Int }
Mock is a struct which satisfies the queue.IQueue interface.
func (*Mock[PublishParams, SubscribeParams]) GetClient ¶
GetClient returns the queue client. Use that to interact with the underlying queue client.
func (*Mock[PublishParams, SubscribeParams]) GetCounterPublished ¶
GetCounterPublished returns the metric.
func (*Mock[PublishParams, SubscribeParams]) GetCounterPublishedFailed ¶
GetCounterPublishedFailed returns the metric.
func (*Mock[PublishParams, SubscribeParams]) GetCounterReceived ¶
GetCounterReceived returns the metric.
func (*Mock[PublishParams, SubscribeParams]) GetCounterReceivedFailed ¶
GetCounterReceivedFailed returns the metric.
func (*Mock[PublishParams, SubscribeParams]) GetCounterSubscribed ¶
GetCounterSubscribed returns the metric.
func (*Mock[PublishParams, SubscribeParams]) GetCounterSubscribedFailed ¶
GetCounterSubscribedFailed returns the metric.
func (*Mock[PublishParams, SubscribeParams]) Publish ¶
func (m *Mock[PublishParams, SubscribeParams]) Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc[PublishParams, SubscribeParams]) error
Publish data.
func (*Mock[PublishParams, SubscribeParams]) Subscribe ¶
func (m *Mock[PublishParams, SubscribeParams]) Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc[PublishParams, SubscribeParams]) error
Subscribe to channel.
type Options ¶
type Options[P, S any] struct { // QueueName name. QueueName string `json:"queueName"` // PreHookFunc is the function which runs before the operation. PreHookFunc HookFunc[P, S] `json:"-"` // PostHookFunc is the function which runs after the operation. PostHookFunc HookFunc[P, S] `json:"-"` }
Options for operations.
type OptionsFunc ¶
OptionsFunc allows to set options.
func WithPostHook ¶
func WithPostHook[P, S any](fn HookFunc[P, S]) OptionsFunc[P, S]
WithPostHook set the post-hook function.
func WithPreHook ¶
func WithPreHook[P, S any](fn HookFunc[P, S]) OptionsFunc[P, S]
WithPreHook set the pre-hook function.
func WithQueueName ¶
func WithQueueName[P, S any](queueName string) OptionsFunc[P, S]
WithQueueName sets the queue name.
type PublishParams ¶
type PublishParams struct {
// Tags attach categorization metadata to the message.
// Example: Message Tags in SQS, Message Properties in RabbitMQ, Record Headers in Kafka, etc.
Tags []string
// Route specifies where to publish the message.
// Example: Routing Key in RabbitMQ, Topic in Kafka, unused in standard SQS, etc.
Route string
// DelaySeconds postpones message delivery by specified seconds.
// Example: Message Timer in SQS, Delayed Exchange in RabbitMQ, etc.
DelaySeconds int
// MessageGroupID ensures ordering for related messages.
// Example: Group ID in FIFO queues (SQS), Partition Key in Kafka, etc.
MessageGroupID string
// Priority influences message delivery order where supported.
// Example: Message Priority in RabbitMQ, custom implementation in other queues, etc.
Priority int
// Metadata holds queue-specific attributes for publishing.
// Example: System Attributes in SQS, Headers in RabbitMQ/Kafka, etc.
Metadata map[string]interface{}
}
PublishParams defines the parameters for publishing a message to a queue.
type Queue ¶
type Queue struct {
// Logger.
Logger sypl.ISypl `json:"-" validate:"required"`
// Name of the queue type.
Name string `json:"name" validate:"required,lowercase,gte=1"`
// contains filtered or unexported fields
}
Queue definition.
func (*Queue) GetCounterPublished ¶
GetCounterPublished returns the counterPublished.
func (*Queue) GetCounterPublishedFailed ¶
GetCounterPublishedFailed returns the counterPublishedFailed.
func (*Queue) GetCounterReceived ¶
GetCounterReceived returns the counterReceived.
func (*Queue) GetCounterReceivedFailed ¶
GetCounterReceivedFailed returns the counterReceivedFailed.
func (*Queue) GetCounterSubscribed ¶
GetCounterSubscribed returns the counterSubscribed.
func (*Queue) GetCounterSubscribedFailed ¶
GetCounterSubscribedFailed returns the counterSubscribedFailed.
type SubscribeParams ¶
type SubscribeParams struct {
// GroupID identifies a group of consumers that work as one unit.
// Example: Consumer Group in Kafka/SQS, Consumer Tag prefix in RabbitMQ, etc.
GroupID string
// Tags filter which messages this subscriber receives.
// Example: Message Filtering in SQS, Binding Keys in RabbitMQ, Topic Subscriptions in Kafka, etc.
Tags []string
// Route specifies which route/topic to subscribe to.
// Example: Binding Pattern in RabbitMQ, Topic in Kafka, unused in standard SQS, etc.
Route string
// BatchSize defines how many messages to process in a single batch.
// This helps optimize throughput vs. latency across all queue systems.
BatchSize int
// MaxMessages sets the maximum number of messages to receive in total.
// Useful for controlling message flow and preventing overwhelming consumers.
MaxMessages int
// WaitTimeout specifies how long to wait for messages when none are immediately available.
// Example: Long Polling in SQS, Consumer Timeout in Kafka, etc.
WaitTimeout time.Duration
// Metadata holds queue-specific attributes that don't fit into standard fields.
// Example: Message Attributes in SQS, Headers in RabbitMQ/Kafka, etc.
Metadata map[string]interface{}
// ContextTimeout specifies how long to wait for the callback to finish.
ContextTimeout time.Duration
}
SubscribeParams defines the parameters for subscribing to a queue.