Documentation
¶
Index ¶
- Variables
- func AddGoQueueEncoding(contentType headerVal.ContentType, encoding *Encoding)
- type Consumer
- type DecoderFn
- type DelayFn
- type EncoderFn
- type Encoding
- type InboundMessage
- type InboundMessageHandler
- type InboundMessageHandlerFunc
- type InboundMessageHandlerMiddlewareFunc
- type Message
- type Option
- type OptionFunc
- type Publisher
- type PublisherFunc
- type PublisherHandler
- type PublisherMiddlewareFunc
- type QueueService
Constants ¶
This section is empty.
Variables ¶
var ( // JSONEncoder is an implementation of the EncoderFn interface // that encodes a Message into JSON format. JSONEncoder EncoderFn = func(ctx context.Context, m Message) (data []byte, err error) { return json.Marshal(m) } // JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message. JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m Message, err error) { err = json.Unmarshal(data, &m) return } DefaultEncoder EncoderFn = JSONEncoder DefaultDecoder DecoderFn = JSONDecoder )
var ( // JSONEncoding represents the encoding configuration for JSON. JSONEncoding = &Encoding{ ContentType: headerVal.ContentTypeJSON, Encode: JSONEncoder, Decode: JSONDecoder, } DefaultEncoding = JSONEncoding )
Functions ¶
func AddGoQueueEncoding ¶
func AddGoQueueEncoding(contentType headerVal.ContentType, encoding *Encoding)
AddGoQueueEncoding stores the given encoding for the specified content type in the goQueueEncodingMap. The goQueueEncodingMap is a concurrent-safe map that maps content types to encodings. The content type is specified by the `contentType` parameter, and the encoding is specified by the `encoding` parameter. This function is typically used to register custom encodings for specific content types in the GoQueue library.
Types ¶
type Consumer ¶
type Consumer interface {
// Consume consumes messages from the queue and passes them to the provided handler.
// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
// It returns an error if there was a problem consuming the messages.
Consume(ctx context.Context, handler InboundMessageHandler, meta map[string]interface{}) (err error)
// Stop stops the consumer from consuming messages.
// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
Stop(ctx context.Context) (err error)
}
Consumer represents an entity that consumes messages from a queue.
type DecoderFn ¶
DecoderFn is a function type that decodes a byte slice into a Message. It takes a context and a byte slice as input and returns a Message and an error.
type DelayFn ¶
var ( // ExponentialBackoffDelayFn is a delay function that implements exponential backoff. // It takes the number of retries as input and returns the delay in milliseconds. ExponentialBackoffDelayFn DelayFn = func(retries int64) (delay int64) { return 2 << retries } // NoDelayFn is a DelayFn implementation that returns 0 delay for retries. NoDelayFn DelayFn = func(retries int64) (delay int64) { return 0 } DefaultDelayFn DelayFn = NoDelayFn )
type EncoderFn ¶
EncoderFn is a function type that encodes a message into a byte slice. It takes a context and a message as input and returns the encoded data and an error (if any).
type Encoding ¶
type Encoding struct {
ContentType headerVal.ContentType // The content type associated with this encoding.
Encode EncoderFn // The encoding function used to encode data.
Decode DecoderFn // The decoding function used to decode data.
}
Encoding represents an encoding configuration for a specific content type.
func GetGoQueueEncoding ¶
func GetGoQueueEncoding(contentType headerVal.ContentType) (res *Encoding, ok bool)
GetGoQueueEncoding returns the encoding associated with the given content type. It looks up the encoding in the goQueueEncodingMap and returns it along with a boolean value indicating if the encoding was found. If the encoding is not found, it returns nil and false.
type InboundMessage ¶
type InboundMessage struct {
Message
RetryCount int64 `json:"retryCount"`
Metadata map[string]interface{} `json:"metadata"`
// Ack is used for confirming the message. It will drop the message from the queue.
Ack func(ctx context.Context) (err error) `json:"-"`
// Nack is used for rejecting the message. It will requeue the message to be re-delivered again.
Nack func(ctx context.Context) (err error) `json:"-"`
// MoveToDeadLetterQueue is used for rejecting the message same with Nack, but instead of requeueing the message,
// Read how to configure dead letter queue in each queue provider.
// eg RabbitMQ: https://www.rabbitmq.com/docs/dlx
MoveToDeadLetterQueue func(ctx context.Context) (err error) `json:"-"`
// Requeue is used to put the message back to the tail of the queue after a delay.
PutToBackOfQueueWithDelay func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
}
type InboundMessageHandler ¶
type InboundMessageHandler interface {
HandleMessage(ctx context.Context, m InboundMessage) (err error)
}
type InboundMessageHandlerFunc ¶
type InboundMessageHandlerFunc func(ctx context.Context, m InboundMessage) (err error)
func (InboundMessageHandlerFunc) HandleMessage ¶
func (mhf InboundMessageHandlerFunc) HandleMessage(ctx context.Context, m InboundMessage) (err error)
type InboundMessageHandlerMiddlewareFunc ¶
type InboundMessageHandlerMiddlewareFunc func(next InboundMessageHandlerFunc) InboundMessageHandlerFunc
type Message ¶
type Message struct {
ID string `json:"id"`
Action string `json:"action"`
Topic string `json:"topic"`
Data any `json:"data"`
ContentType headerVal.ContentType `json:"-"`
Timestamp time.Time `json:"timestamp"`
Headers map[string]interface{} `json:"-"`
ServiceAgent headerVal.GoquServiceAgent `json:"-"`
// contains filtered or unexported fields
}
Message represents a message that will be published to the queue It contains the message ID, action, topic, data, content type, timestamp, headers, and service agent. The schema version is set by the SetSchemaVersion method. The message is used to publish messages to the queue. Read the concept of message publishing in the documentation, here: TODO(bxcodec): Add link to the documentation
func (*Message) GetSchemaVersion ¶
func (*Message) SetSchemaVersion ¶
type Option ¶
type Option struct {
// number of consumer/worker/goroutine that will be spawned in one goqueue instance
NumberOfConsumer int
Consumer Consumer
Publisher Publisher
MessageHandler InboundMessageHandler
}
type OptionFunc ¶
type OptionFunc func(opt *Option)
OptionFunc used for option chaining
func WithConsumer ¶
func WithConsumer(c Consumer) OptionFunc
func WithMessageHandler ¶
func WithMessageHandler(h InboundMessageHandler) OptionFunc
func WithNumberOfConsumer ¶
func WithNumberOfConsumer(n int) OptionFunc
func WithPublisher ¶
func WithPublisher(p Publisher) OptionFunc
type Publisher ¶
type Publisher interface {
PublisherHandler
Close(ctx context.Context) (err error)
}
Publisher represents an interface for publishing messages.
type PublisherFunc ¶
PublisherFunc is a function type that represents a publisher function. It takes a context and a message as input parameters and returns an error.
type PublisherHandler ¶
PublisherHandler is an interface that defines the behavior of a message publisher.
type PublisherMiddlewareFunc ¶
type PublisherMiddlewareFunc func(next PublisherFunc) PublisherFunc
type QueueService ¶
type QueueService struct {
NumberOfConsumer int // The number of consumers to process messages concurrently.
// contains filtered or unexported fields
}
QueueService represents a queue service that handles incoming messages.
func NewQueueService ¶
func NewQueueService(opts ...OptionFunc) *QueueService
NewQueueService creates a new instance of QueueService with the provided options. It accepts zero or more OptionFunc functions to customize the behavior of the QueueService. Returns a pointer to the created QueueService.
func (*QueueService) Publish ¶
func (qs *QueueService) Publish(ctx context.Context, m Message) (err error)
Publish sends a message to the queue using the defined publisher. It returns an error if the publisher is not defined or if there was an error while publishing the message.
func (*QueueService) Start ¶
func (qs *QueueService) Start(ctx context.Context) (err error)
Start starts the queue service by spawning multiple consumers to process messages. It returns an error if the consumer or handler is not defined. The method uses the provided context to manage the lifecycle of the consumers. Each consumer is assigned a unique consumer ID and the start time is recorded in the meta data. The method uses the errgroup package to manage the goroutines and waits for all consumers to finish.