consumer

package
v1.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2023 License: MIT Imports: 7 Imported by: 0

README

Consumer

To build a super performant SQS consumer in Golang, there are several best practices to follow. Here are some recommendations:

  • Use long polling: Use long polling instead of short polling to receive messages from SQS. Long polling reduces the number of requests and therefore reduces the cost and latency. Set the WaitTimeSeconds parameter to a high value (up to 20 seconds).

  • Use multiple goroutines: Consume messages from SQS using multiple goroutines to process messages concurrently. Create a worker pool and dispatch incoming messages to the workers. This approach can improve performance by taking advantage of multiple cores and minimizing idle time.

  • Reduce message processing time: Keep message processing time to a minimum to avoid blocking the worker pool. If possible, move the heavy processing to another service or to a batch processing job.

  • Use batch deletion: Use batch deletion to delete messages from SQS. This approach reduces the number of API calls and can significantly improve performance.

  • Enable buffering: Use buffering to reduce the number of API calls. Buffer messages in memory or on disk before processing them. This approach can be especially useful when processing large messages or when processing messages at high volumes.

package main

import (
	"log"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
)

const (
	maxNumberOfMessages = 10
	waitTimeSeconds     = 20
)

func main() {
	// Create an AWS session
	sess, err := session.NewSession(&aws.Config{})
	if err != nil {
		log.Fatalf("Failed to create AWS session: %v", err)
	}

	// Create an SQS client
	sqsClient := sqs.New(sess)

	// Create a WaitGroup to synchronize worker goroutines
	var wg sync.WaitGroup

	// Create a channel to receive messages
	msgChan := make(chan *sqs.Message)

	// Create a worker pool
	numWorkers := 10
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, msgChan, sqsClient, &wg)
	}

	// Create a receive message input
	input := &sqs.ReceiveMessageInput{
		QueueUrl:            aws.String("<QUEUE_URL>"),
		MaxNumberOfMessages: aws.Int64(maxNumberOfMessages),
		WaitTimeSeconds:     aws.Int64(waitTimeSeconds),
	}

	// Continuously receive messages from SQS and dispatch to worker pool
	for {
		output, err := sqsClient.ReceiveMessage(input)
		if err != nil {
			log.Printf("Failed to receive messages: %v", err)
			time.Sleep(1 * time.Second)
			continue
		}

		if len(output.Messages) == 0 {
			continue
		}

		// Dispatch messages to worker pool
		for _, message := range output.Messages {
			msgChan <- message
		}
	}

	// Wait for worker pool to complete
	close(msgChan)
	wg.Wait()
}

// Worker function to process messages
func worker(id int, msgChan <-chan *sqs.Message, sqsClient *sqs.SQS, wg *sync.WaitGroup) {
	defer wg.Done()

	// Process incoming messages
	for message := range msgChan {
	    log.Printf("Worker %d received message: %s", id, *message.Body)

	    // Process message here...

	    // Delete the message from the queue
	    if _, err := sqsClient.DeleteMessage(&s,sqs.DeleteMessageInput{
                QueueUrl: aws.String("<QUEUE_URL>"),
                ReceiptHandle: message.ReceiptHandle,
                }); err != nil {
        
            log.Printf("Failed to delete message: %v", err)
        }
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerClient

type ConsumerClient struct {
	// `Logger` is a pointer to a `zap.Logger` object, which is a logging library for Go. It is used to log
	// messages and errors during the execution of the consumer client. This allows for easy debugging and
	// monitoring of the client's behavior.
	Logger *zap.Logger
	// `NewRelicClient` is a pointer to a `newrelic.Application` object, which is used for monitoring and
	// tracing application performance. It allows for the collection of metrics and tracing of requests
	// across distributed systems. This suggests that the consumer client is being monitored and tracked
	// for performance using New Relic.
	InstrumentationClient *instrumentation.Client
	// `Client` is a pointer to an instance of the `client.Client` struct. This struct is likely used to
	// interact with some external service or API. It is not clear from the given code what specific
	// service or API the `client.Client` is interacting with.
	SqsClient *client.Client
	// The `QueueUrl` property is a pointer to a string that represents the URL of the queue from which the
	// consumer client will receive messages. It is likely used by the `client.Client` to connect to the
	// queue and retrieve messages. The use of a pointer to a string allows for the value of the URL to be
	// easily updated or changed if necessary.
	QueueUrl *string
	// `ConcurrencyFactor` is an integer property of the `ConsumerClient` struct that represents the number
	// of concurrent message processing operations that can be performed by the client. It determines how
	// many messages can be processed simultaneously from the queue.
	ConcurrencyFactor int
	// `QueuePollingDuration` is a property of the `ConsumerClient` struct that represents the duration for
	// which the client will poll the queue for new messages. It is of type `time.Duration`, which is a
	// built-in Go type that represents a duration of time. The value of this property is set by the user
	// and determines how long the client will wait for new messages before checking the queue again.
	QueuePollingDuration time.Duration
	// `MessageProcessTimeout` is a property of the `ConsumerClient` struct that represents the maximum
	// amount of time that a message can be processed before timing out. If the message processing takes
	// longer than this duration, it will be considered as failed and the message will be returned to the
	// queue for processing again.
	MessageProcessTimeout time.Duration
}

The ConsumerClient type contains various fields related to a client that consumes messages from a queue. @property Logger - A pointer to a zap logger, which is a logging library for Go. It is used to log messages and errors during the execution of the consumer client. @property NewRelicClient - NewRelicClient is a pointer to a newrelic.Application object, which is used for monitoring and tracing application performance. It allows for the collection of metrics and tracing of requests across distributed systems. @property Client - The `Client` property is a pointer to an instance of the `client.Client` struct. This struct is likely used to interact with some external service or API. @property QueueUrl - The URL of the queue from which the consumer client will receive messages. @property {int} ConcurrencyFactor - ConcurrencyFactor is an integer property that represents the number of concurrent message processing operations that can be performed by the ConsumerClient. It determines how many messages can be processed simultaneously from the queue. @property QueuePollingDuration - QueuePollingDuration is a property of the ConsumerClient struct that represents the duration for which the client will poll the queue for new messages. It is of type time.Duration, which is a built-in Go type that represents a duration of time. The value of this property is set by the user and @property MessageProcessTimeout - MessageProcessTimeout is a property of the ConsumerClient struct and it represents the maximum amount of time that a message can be processed before timing out. If the message processing takes longer than this duration, it will be considered as failed and the message will be returned to the queue for processing again.

func New

func New(options ...Option) (*ConsumerClient, error)

The New function creates a new ConsumerClient instance with optional configuration options.

func (*ConsumerClient) NaiveConsumer

func (c *ConsumerClient) NaiveConsumer(f MessageProcessorFunc)

As standard aws sqs receive call gives us maximum of 10 messages, the naive approach will be to process them

in parallel, then call the next batch.

With approach like this we will be limited to the 1 minute / slowest message processing in batch * 10, for example having the slowest message being processed in 50ms it will give us (1000 ms / 50ms) * 10 = 200 messages per second of processing time minus network latency, that can eat up most of the projected capacity.

func (*ConsumerClient) StartConcurentConsumer

func (c *ConsumerClient) StartConcurentConsumer(f MessageProcessorFunc)

ConcurrentConsumer creates a limited parallel queue, and continues to poll AWS until all the limit is reached. This is performed by implementing a token bucket” using a buffered channel hence this approach is only limited by aws throughput

Some scenarios will require a different set of resources consumed, depending on the message type (Lets say you want your handler to be able to process from 1 to N emails in 1 message).

To maintain our limitations, we could introduce the timely based token bucket algorithm , which will ensure we don’t process more than N emails over a period of time (like 1 minute),
by grabbing the exact amount of “worker tokens” from the pool, depending on emails count in message. Also, if your code can be timed out, there is a good approach to impose timeout and cancellation,
based on golang context.WithCancel function. Check out the golang semaphore library to build the nuclear-resistant solution. (the mechanics are the same as in our example, abstracted to library,

so instead of using channel for limiting our operation we will call semaphore.Acquire, which will also block our execution until “worker tokens” will be refilled).

LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/model/domain-analysis LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/design/interservice-communication

func (*ConsumerClient) Validate

func (c *ConsumerClient) Validate() error

Validate validates whether all the required parameters for the consumer client have been set. It checks if the `SqsClient`, `Logger`, `NewRelicClient`, `QueueUrl`, `ConcurrencyFactor`, `MessageProcessTimeout`, and `QueuePollingDuration` fields are not nil or zero. If any of these fields are nil or zero, it returns an error indicating that the consumer client is invalid.

type IConsumer

type IConsumer interface {
	// `StartConcurentConsumer(f MessageProcessorFunc)` is a method of the `IConsumer` interface that takes a
	// `MessageProcessorFunc` as an argument and processes messages concurrently. This means that multiple
	// messages can be processed at the same time, potentially improving performance. The implementation of
	// this method is not shown in the given code snippet.
	StartConcurentConsumer(f MessageProcessorFunc)
	// `NaiveConsumer(f MessageProcessorFunc)` is a method of the `IConsumer` interface that takes a
	// `MessageProcessorFunc` as an argument and processes messages in a single-threaded, sequential
	// manner. This means that messages will be processed one at a time, in the order in which they were
	// received. This method is not optimized for performance and may not be suitable for high-throughput
	// scenarios.
	NaiveConsumer(f MessageProcessorFunc)
}

IConsumer defines an interface for a consumer that can process messages either concurrently or naively. @property ConcurrentConsumer - A method that takes a MessageProcessorFunc as an argument and processes messages concurrently. This means that multiple messages can be processed at the same time, potentially improving performance. @property NaiveConsumer - A method that takes a MessageProcessorFunc as an argument and processes messages in a single-threaded, sequential manner. This method is not optimized for performance and may not be suitable for high-throughput scenarios.

type MessageProcessorFunc

type MessageProcessorFunc = func(ctx context.Context, message *client.Message) error

MessageProcessorFunc serves as the logic used to process each incoming message from a msg queue

type Option

type Option func(*ConsumerClient)

func WithAwsClient

func WithAwsClient(sqsClient *client.Client) Option

WithAwsClient sets the aws client for the consumer

func WithConcurrencyFactor

func WithConcurrencyFactor(concurrencyFactor int) Option

WithConcurrencyFactor sets the concurrency factor for the consumer

func WithInstrumentationClient added in v1.0.5

func WithInstrumentationClient(instrumentationClient *instrumentation.Client) Option

WithNewRelicClient sets the new relic client for the consumer

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger sets the logger for the consumer

func WithMessageProcessTimeout

func WithMessageProcessTimeout(messageProcessTimeout time.Duration) Option

WithMessageProcessTimeout sets the message process timeout for the consumer

func WithQueuePollingDuration

func WithQueuePollingDuration(queuePollingDuration time.Duration) Option

WithQueuePollingDuration sets the queue polling duration for the consumer

func WithQueueUrl

func WithQueueUrl(queueUrl *string) Option

WithQueueUrl sets the queue url for the consumer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL