Documentation ¶
Index ¶
- type ConsumerClient
- type IConsumer
- type MessageProcessorFunc
- type Option
- func WithAwsClient(sqsClient *client.Client) Option
- func WithConcurrencyFactor(concurrencyFactor int) Option
- func WithInstrumentationClient(instrumentationClient *instrumentation.Client) Option
- func WithLogger(logger *zap.Logger) Option
- func WithMessageProcessTimeout(messageProcessTimeout time.Duration) Option
- func WithQueuePollingDuration(queuePollingDuration time.Duration) Option
- func WithQueueUrl(queueUrl *string) Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerClient ¶
type ConsumerClient struct { // `Logger` is a pointer to a `zap.Logger` object, which is a logging library for Go. It is used to log // messages and errors during the execution of the consumer client. This allows for easy debugging and // monitoring of the client's behavior. Logger *zap.Logger // `NewRelicClient` is a pointer to a `newrelic.Application` object, which is used for monitoring and // tracing application performance. It allows for the collection of metrics and tracing of requests // across distributed systems. This suggests that the consumer client is being monitored and tracked // for performance using New Relic. InstrumentationClient *instrumentation.Client // `Client` is a pointer to an instance of the `client.Client` struct. This struct is likely used to // interact with some external service or API. It is not clear from the given code what specific // service or API the `client.Client` is interacting with. SqsClient *client.Client // The `QueueUrl` property is a pointer to a string that represents the URL of the queue from which the // consumer client will receive messages. It is likely used by the `client.Client` to connect to the // queue and retrieve messages. The use of a pointer to a string allows for the value of the URL to be // easily updated or changed if necessary. QueueUrl *string // `ConcurrencyFactor` is an integer property of the `ConsumerClient` struct that represents the number // of concurrent message processing operations that can be performed by the client. It determines how // many messages can be processed simultaneously from the queue. ConcurrencyFactor int // `QueuePollingDuration` is a property of the `ConsumerClient` struct that represents the duration for // which the client will poll the queue for new messages. It is of type `time.Duration`, which is a // built-in Go type that represents a duration of time. The value of this property is set by the user // and determines how long the client will wait for new messages before checking the queue again. QueuePollingDuration time.Duration // `MessageProcessTimeout` is a property of the `ConsumerClient` struct that represents the maximum // amount of time that a message can be processed before timing out. If the message processing takes // longer than this duration, it will be considered as failed and the message will be returned to the // queue for processing again. MessageProcessTimeout time.Duration }
The ConsumerClient type contains various fields related to a client that consumes messages from a queue. @property Logger - A pointer to a zap logger, which is a logging library for Go. It is used to log messages and errors during the execution of the consumer client. @property NewRelicClient - NewRelicClient is a pointer to a newrelic.Application object, which is used for monitoring and tracing application performance. It allows for the collection of metrics and tracing of requests across distributed systems. @property Client - The `Client` property is a pointer to an instance of the `client.Client` struct. This struct is likely used to interact with some external service or API. @property QueueUrl - The URL of the queue from which the consumer client will receive messages. @property {int} ConcurrencyFactor - ConcurrencyFactor is an integer property that represents the number of concurrent message processing operations that can be performed by the ConsumerClient. It determines how many messages can be processed simultaneously from the queue. @property QueuePollingDuration - QueuePollingDuration is a property of the ConsumerClient struct that represents the duration for which the client will poll the queue for new messages. It is of type time.Duration, which is a built-in Go type that represents a duration of time. The value of this property is set by the user and @property MessageProcessTimeout - MessageProcessTimeout is a property of the ConsumerClient struct and it represents the maximum amount of time that a message can be processed before timing out. If the message processing takes longer than this duration, it will be considered as failed and the message will be returned to the queue for processing again.
func New ¶
func New(options ...Option) (*ConsumerClient, error)
The New function creates a new ConsumerClient instance with optional configuration options.
func (*ConsumerClient) NaiveConsumer ¶
func (c *ConsumerClient) NaiveConsumer(f MessageProcessorFunc)
As standard aws sqs receive call gives us maximum of 10 messages, the naive approach will be to process them
in parallel, then call the next batch.
With approach like this we will be limited to the 1 minute / slowest message processing in batch * 10, for example having the slowest message being processed in 50ms it will give us (1000 ms / 50ms) * 10 = 200 messages per second of processing time minus network latency, that can eat up most of the projected capacity.
func (*ConsumerClient) StartConcurentConsumer ¶
func (c *ConsumerClient) StartConcurentConsumer(f MessageProcessorFunc)
ConcurrentConsumer creates a limited parallel queue, and continues to poll AWS until all the limit is reached. This is performed by implementing a token bucket” using a buffered channel hence this approach is only limited by aws throughput
Some scenarios will require a different set of resources consumed, depending on the message type (Lets say you want your handler to be able to process from 1 to N emails in 1 message).
To maintain our limitations, we could introduce the timely based token bucket algorithm , which will ensure we don’t process more than N emails over a period of time (like 1 minute), by grabbing the exact amount of “worker tokens” from the pool, depending on emails count in message. Also, if your code can be timed out, there is a good approach to impose timeout and cancellation, based on golang context.WithCancel function. Check out the golang semaphore library to build the nuclear-resistant solution. (the mechanics are the same as in our example, abstracted to library,
so instead of using channel for limiting our operation we will call semaphore.Acquire, which will also block our execution until “worker tokens” will be refilled).
LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/model/domain-analysis LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/design/interservice-communication
func (*ConsumerClient) Validate ¶
func (c *ConsumerClient) Validate() error
Validate validates whether all the required parameters for the consumer client have been set. It checks if the `SqsClient`, `Logger`, `NewRelicClient`, `QueueUrl`, `ConcurrencyFactor`, `MessageProcessTimeout`, and `QueuePollingDuration` fields are not nil or zero. If any of these fields are nil or zero, it returns an error indicating that the consumer client is invalid.
type IConsumer ¶
type IConsumer interface { // `StartConcurentConsumer(f MessageProcessorFunc)` is a method of the `IConsumer` interface that takes a // `MessageProcessorFunc` as an argument and processes messages concurrently. This means that multiple // messages can be processed at the same time, potentially improving performance. The implementation of // this method is not shown in the given code snippet. StartConcurentConsumer(f MessageProcessorFunc) // `NaiveConsumer(f MessageProcessorFunc)` is a method of the `IConsumer` interface that takes a // `MessageProcessorFunc` as an argument and processes messages in a single-threaded, sequential // manner. This means that messages will be processed one at a time, in the order in which they were // received. This method is not optimized for performance and may not be suitable for high-throughput // scenarios. NaiveConsumer(f MessageProcessorFunc) }
IConsumer defines an interface for a consumer that can process messages either concurrently or naively. @property ConcurrentConsumer - A method that takes a MessageProcessorFunc as an argument and processes messages concurrently. This means that multiple messages can be processed at the same time, potentially improving performance. @property NaiveConsumer - A method that takes a MessageProcessorFunc as an argument and processes messages in a single-threaded, sequential manner. This method is not optimized for performance and may not be suitable for high-throughput scenarios.
type MessageProcessorFunc ¶
MessageProcessorFunc serves as the logic used to process each incoming message from a msg queue
type Option ¶
type Option func(*ConsumerClient)
func WithAwsClient ¶
WithAwsClient sets the aws client for the consumer
func WithConcurrencyFactor ¶
WithConcurrencyFactor sets the concurrency factor for the consumer
func WithInstrumentationClient ¶ added in v1.0.5
func WithInstrumentationClient(instrumentationClient *instrumentation.Client) Option
WithNewRelicClient sets the new relic client for the consumer
func WithLogger ¶
WithLogger sets the logger for the consumer
func WithMessageProcessTimeout ¶
WithMessageProcessTimeout sets the message process timeout for the consumer
func WithQueuePollingDuration ¶
WithQueuePollingDuration sets the queue polling duration for the consumer
func WithQueueUrl ¶
WithQueueUrl sets the queue url for the consumer