Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { Logger *zap.Logger NewRelicClient *newrelic.Application MetricEngine *telemetry.MetricsEngine ServiceMetrics *telemetry.ServiceMetrics Client *core_message_queue.SqsQueueHandle QueueUrl *string ConcurrencyFactor int QueuePollingDuration time.Duration MessageProcessTimeout time.Duration }
func NewConsumer ¶
func NewConsumer(params *ConsumerParams) (*Consumer, error)
NewConsumer instantiates a new instance of the aws consumer object
func (*Consumer) ConcurrentConsumer ¶
func (c *Consumer) ConcurrentConsumer(f MessageProcessorFunc)
ConcurrentConsumer creates a limited parallel queue, and continues to poll AWS until all the limit is reached. This is performed by implementing a token bucket” using a buffered channel hence this approach is only limited by aws throughput
Some scenarios will require a different set of resources consumed, depending on the message type (Lets say you want your handler to be able to process from 1 to N emails in 1 message).
To maintain our limitations, we could introduce the timely based token bucket algorithm , which will ensure we don’t process more than N emails over a period of time (like 1 minute), by grabbing the exact amount of “worker tokens” from the pool, depending on emails count in message. Also, if your code can be timed out, there is a good approach to impose timeout and cancellation, based on golang context.WithCancel function. Check out the golang semaphore library to build the nuclear-resistant solution. (the mechanics are the same as in our example, abstracted to library,
so instead of using channel for limiting our operation we will call semaphore.Acquire, which will also block our execution until “worker tokens” will be refilled).
LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/model/domain-analysis LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/design/interservice-communication
func (*Consumer) NaiveConsumer ¶
func (c *Consumer) NaiveConsumer(f MessageProcessorFunc)
As standard aws sqs receive call gives us maximum of 10 messages, the naive approach will be to process them
in parallel, then call the next batch.
With approach like this we will be limited to the 1 minute / slowest message processing in batch * 10, for example having the slowest message being processed in 50ms it will give us (1000 ms / 50ms) * 10 = 200 messages per second of processing time minus network latency, that can eat up most of the projected capacity.
type ConsumerConfigs ¶
type ConsumerParams ¶
type ConsumerParams struct { QueueURl *string Logger *zap.Logger NrClient *newrelic.Application MetricsEngine *telemetry.MetricsEngine ServiceMetrics *telemetry.ServiceMetrics AwsClient *core_message_queue.SqsQueueHandle Config *ConsumerConfigs }
type IConsumer ¶
type IConsumer interface { ConcurrentConsumer(f MessageProcessorFunc) NaiveConsumer(f MessageProcessorFunc) }
type MessageProcessorFunc ¶
type MessageProcessorFunc = func(ctx context.Context, message *core_message_queue.Message) error
MessageProcessorFunc serves as the logic used to process each incoming message from a msg queue