formigo

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MIT Imports: 11 Imported by: 0

README

Formigo - A Golang Library for Efficient Queue Processing.

Formigo is a powerful and flexible Golang library designed to simplify the processing of messages from queues. It currently supports AWS SQS, with the capability to extend its functionality to accommodate multiple types of queues. With this library, you can effortlessly manage and scale the concurrent processing of messages, ensuring efficient utilization of resources and increased throughput.

Key Features

  • Efficient Throughput Management: it offers optimal throughput management, allowing you to fine-tune the number of Go routines responsible for both polling messages from the queue and processing them. This dynamic control ensures maximum efficiency in various scenarios, making the library highly adaptable to your application's needs.

  • Configurable Batch Processing: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Multiple Message Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads.

  • Context Cancellation: Effortlessly stop the QueueWorker by canceling its context. This feature guarantees smooth and controlled termination of the worker whenever required.

  • Custom Error Reporting: Define a custom reporting function to receive and manage any errors that occur during message processing. This flexibility enables seamless integration with your existing error reporting mechanisms.

  • Error Threshold Management: Set the worker to stop automatically if a certain number of errors (X) occur within a specific interval (T). This proactive approach ensures the system's stability and helps prevent cascading failures.

  • Message context timeout: Each message or batch is associated with a context that expires within its visibility timeout. The handler must process the message or batch within this visibility timeout to prevent re-processing by other workers, ensuring reliable message handling.

Installation

Make sure you have Go installed download

Initialize your project by creating a folder and then running go mod init github.com/your/repo inside the folder. Then install the library with the go get command:

go get -u github.com/francescopepe/formigo

Examples

Let's create some simple examples to demonstrate how to use this library to process messages from an AWS SQS queue.

Basic example
import (
    "context"
    "fmt"
    "log"

    "github.com/francescopepe/formigo"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
    "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

func main() {
    ctx := context.Background()

    awsCfg, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Fatalln("Unable to create AWS config", err)
    }

    sqsSvc := sqs.NewFromConfig(awsCfg)
    sqsClient, err := formigo.NewSqsClient(ctx, formigo.SqsClientConfiguration{
        Svc: sqsSvc,
        ReceiveMessageInput: &sqs.ReceiveMessageInput{
            QueueUrl:            &queueUrl,
            MaxNumberOfMessages: 1,
            VisibilityTimeout:   30,
            WaitTimeSeconds:     20,
        },
    })
    if err != nil {
        return fmt.Errorf("unable to create sqs client: %w", err)
    }

    wkr := formigo.NewWorker(formigo.Configuration{
        Client: sqsClient,
        Concurrency: 100,
        Consumer: formigo.NewSingleMessageConsumer(formigo.SingleMessageConsumerConfiguration{
            Handler: func(ctx context.Context, msg interface{}) error {
                log.Println("Got Message", msgs)

                // Assert the type of message to get the body or any other attributes
                log.Println("Message body", *msg.(types.Message).Body)

                return nil
            },
        }),
    })

    err = wkr.Run(ctx)
    if err != nil {
        log.Fataln("Worker stopped with error", err)
    }
}

In this example, we have created a worker that consumes messages one at a time from an AWS SQS queue. The polling phase retrieves 10 messages from the queue, but the handler processes them individually.

By default, the worker's concurrency is set to 100, meaning it can process up to 100 messages concurrently, optimizing throughput and efficiency.

If any errors occur during message handling, the worker will log them using log.PrintLn by default. Additionally, the worker is configured to stop if it encounters more than 3 errors within any 120-second interval.

Please note that these are the default settings, and you can customize the concurrency level, error handling, and other parameters to suit your specific requirements.

Batching
import (
    "context"
    "fmt"
    "log"

    "github.com/francescopepe/formigo"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
    "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

func main() {
    ctx := context.Background()

    awsCfg, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Fatalln("Unable to create AWS config", err)
    }

    sqsSvc := sqs.NewFromConfig(awsCfg)
    sqsClient, err := formigo.NewSqsClient(ctx, formigo.SqsClientConfiguration{
        Svc: sqsSvc,
        ReceiveMessageInput: &sqs.ReceiveMessageInput{
            QueueUrl:            &queueUrl,
            MaxNumberOfMessages: 1,
            VisibilityTimeout:   30,
            WaitTimeSeconds:     20,
        },
    })
    if err != nil {
        return fmt.Errorf("unable to create sqs client: %w", err)
    }

    wkr := formigo.NewWorker(formigo.Configuration{
        Client: sqsClient,
        Concurrency: 100,
        Consumer: formigo.NewMultiMessageConsumer(formigo.MultiMessageConsumerConfiguration{
            BufferConfig: formigo.MultiMessageBufferConfiguration{
                Size:    100,
                Timeout: time.Second * 5,
            },
            Handler: func(ctx context.Context, msgs []interface{}) error {
                log.Printf("Got %d messages to process\n", len(msgs)

                // Assert the type of message to get the body or any other attributes

                for i, msg := range msgs {
                    log.Printf("Message %d body: %s", i, *msg.(types.Message).Body)
                }

                return nil
            },
        }),
    })

    err = wkr.Run(ctx)
    if err != nil {
        log.Fataln("Worker stopped with error", err)
    }
}

In this example, we have created a worker that efficiently consumes batches of messages from an AWS SQS queue. The handler will be invoked either when the buffer is full or when a specified timeout expires.

It's essential to note that the timer starts as soon as the first message is added to the buffer.

By processing messages in batches, the worker can significantly enhance throughput for specific use cases or reduce resource consumption. For instance, it can be leveraged for batch insertions or deletions.

Configuration

Configuration Explanation Default Value
Client The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. None
Concurrency Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. 100
Retrievers Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. 1
ErrorConfig Defines the error threshold and interval for worker termination and error reporting function. None
Consumer The message consumer, either SingleMessageConsumer or MultipleMessageConsumer. None

License

This library is distributed under the MIT License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMultiMessageConsumer

func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMessageConsumer

func NewSingleMessageConsumer

func NewSingleMessageConsumer(config SingleMessageConsumerConfiguration) *singleMessageConsumer

func NewSqsClient added in v0.0.6

func NewSqsClient(ctx context.Context, config SqsClientConfiguration) (sqsClient, error)

func NewWorker

func NewWorker(config Configuration) worker

Types

type Configuration

type Configuration struct {
	// A queue client
	Client client.Client

	// Number of Go routines that process the messages from the Queue.
	// The higher this value, the more Go routines are spawned to process the messages.
	// Using a high value can be useful when the Handler of the consumer perform slow I/O operations.
	// Default: 100.
	Concurrency int

	// Number of Go routines that retrieve messages from the Queue.
	// The higher this value, the more Go routines are spawned to read the messages from the
	// queue and provide them to the worker's consumers.
	// Using a high value can be useful when the network is slow or when consumers are quicker
	// than retrievers.
	// Default: 1.
	Retrievers int

	// The ErrorConfiguration.
	ErrorConfig ErrorConfiguration

	// The messages Consumer.
	Consumer consumer

	// Configuration for the deleter
	DeleterConfig DeleterConfiguration
}

type DeleterConfiguration

type DeleterConfiguration struct {
	BufferSize    int
	BufferTimeout time.Duration
}

type ErrorConfiguration

type ErrorConfiguration struct {
	// Number of errors that must occur in the Period before the worker stops.
	// Default: 3.
	Threshold int

	// Duration of the period for which, if the number of errors passes the Threshold, the worker stops.
	// Default: 120s.
	Period time.Duration

	// The error report function
	ReportFunc func(err error)
}

The ErrorConfiguration defines a threshold for which the worker stops. If the number of errors occurred during the worker execution passes the given Threshold on the specified Period, the worker stops.

type MultiMessageBufferConfiguration

type MultiMessageBufferConfiguration struct {
	// Max number of messages that the buffer can contain.
	// Default: 10.
	Size int

	// Time after which the buffer gets processed, no matter whether it is full or not.
	// This value MUST be smaller tha VisibilityTimeout in the
	// RetrieveMessageConfiguration + the maximum processing time of the handler.
	// If this is not set correctly, the same message could be processed multiple times.
	// Default: 1s.
	Timeout time.Duration
}

The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either the buffer is full or the timeout has passed since the first message got added.

type MultiMessageConsumerConfiguration

type MultiMessageConsumerConfiguration struct {
	Handler      multiMessageHandler
	BufferConfig MultiMessageBufferConfiguration
}

type SingleMessageConsumerConfiguration

type SingleMessageConsumerConfiguration struct {
	Handler singleMessageHandler
}

type SqsClientConfiguration added in v0.0.6

type SqsClientConfiguration struct {
	// The AWS Sqs Service Client
	Svc *awsSqs.Client

	// The AWS ReceiveMessageInput
	ReceiveMessageInput *awsSqs.ReceiveMessageInput

	// Defines the interval within which the message must be processed.
	// If empty, it tries to set the value from the ReceiveMessageInput's
	// VisibilityTimeout.
	// If ReceiveMessageInput's VisibilityTimeout is empty it retrieves the
	// default value set on the queue. This action will fail if the client
	// does not have the permission to retrieve the SQS queue's attributes.
	MessageCtxTimeout time.Duration
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL