sqshandler

package module
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2023 License: MIT Imports: 14 Imported by: 0

README

Go Reference Go Report Card codecov

go-lambda-sqs-handler

A lightweight wrapper for handling SQS Events inside Lambda functions, in Go.

Description

Dealing with single events inside Lambda functions is usually pretty straightforward, as the event is either handled successfully, or not.

Dealing with multiple messages inside a single event, however, usually proves harder. What if only one message couldn't be handled? Should I return an error? What if many of them couldn't, but not all? Do I delete them manually from the queue? If I set up a DLQ, does that mean that all messages that I couldn't handle will keep returning to the queue again and again until they finally go away? What if I only want some of them to go away immediately, while others are retried?

To alleviate said issues that many developers face when building Lambda functions with SQS triggers, go-lambda-sqs-handler aims to reduced the complexity of dealing with problems related to infrastructure so you can focus on the logic that matters.

Features

Simplified handling of different scenarios using a work-based abstraction.

This package works by wrapping an sqs.Event and shifting focus to individual messages instead, which will be handled by your customized Worker. The Result of said Worker.Work() will then be processed individually for each message, which brings us to the second point:

Extended error handling to accommodate both transient and non-transient scenarios.

By defining different Status properties, we can differentiate between which messages should be retried and which should not.

Server returned a 503 Unavailable? Return this message to the queue with an exponential Backoff.

Malformed message payload? Send it straight to a pre-configured DLQ to avoid queue-poisoning.

Improved usage of Lambda execution times by handling multiple messages in parallel.

Make use of idle threads during I/O operations by handling other messages in the batch.

Requirements

Make sure you have:

  • A DLQ set on your SQS queue
  • A Lambda function connected to a SQS trigger
  • The Property Report batch item failures set to Yes on said trigger

Installation

Run go get github.com/ram-sa/go-lambda-sqs-handler

Then add import "github.com/ram-sa/go-lambda-sqs-handler" to your function handler

Example

package main
import "github.com/ram-sa/go-lambda-sqs-handler"

type MyWorker struct {
    // Set up your worker as needed
}

// This is the function that will be called by the Handler for processing a message.
func (w MyWorker) Work(c context.Context, m events.SQSMessage) {
    body := parseTheMessageBody(m.Body)
    doSomeStuff(body)
    err := sendItToSomeone(body)
    // Did everything go well?
    if isTransient(err){
        // Message will return to the queue with an exponential backoff.
        return sqshandler.Result{
            Message: &m, Status: Retry, Error: err
        }
    }else {
        // Everything went great, message acknowledged!
        return sqshandler.Result{
            Message: &m, Status: Success
        }
    }
}

func main() {
	lambda.Start(handleRequest)
}

func handleRequest(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) {
    //Create a new handler with default configuration
	handler := sqshandler.New(ctx)
    //Initialize your worker as needed
    worker := MyWorker{}
	return handler.HandleEvent(&sqsEvent, worker)
}

For more information on how this package works and how it interacts with the Lambda framework, check the docs at https://pkg.go.dev/github.com/ram-sa/go-lambda-sqs-handler

Documentation

Overview

Package sqshandler implements a lightweight wrapper for handling SQS Events inside Lambda functions, reducing the complexity of dealing with different and sometimes unexpected conditions to a set of defined results in a work-based abstraction.

Index

Constants

View Source
const (
	DefaultIniTimeout = 5
	DefaultMaxTimeout = 300
	DefaultMultiplier = 2.5
	DefaultRandFactor = 0.3
)

Default values for BackOff.

View Source
const SQSMaxVisibility = 43200

SQS maximum value for message visibility

Variables

This section is empty.

Functions

This section is empty.

Types

type BackOff

type BackOff struct {
	InitTimeoutSec, MaxTimeoutSec uint16
	Multiplier, RandFactor        float64
}

BackOff defines values used for calculating a message's exponential backoff in case of a transient failure by altering its visibility timeout. Each retry attempt will take exponentially longer based on the amount of delivery attempts (attribute ApproximateReceiveCount) until the message is either delivered or sent to a DLQ, according to the following parameters:

InitTimeoutSec

Defines the initial timeout value for the message, in seconds, ranging from 0 to 43200 (12h, the maximum value accepted by AWS).

MaxTimeoutSec

Defines the maximum timeout value for the message, in seconds, ranging from 0 to 43200 (12h, the maximum value accepted by AWS). Note that this does not include jitter ranges, unless the final value exceeds 43200.

For example, if MaxTimeoutSec is set to 6, and RandFactor is set to 0.5, the final timeout value can be any integer from 3 to 9. However if MaxTimeoutSec is set to 43200, the values will range from 21600 to 43200 (instead of 64800).

Multiplier

Defines the scaling factor of the exponential function. Note that the resulting values will be rounded down, as AWS only accepts positive integer values. Setting this value to 1 will linearize the backoff curve.

RandFactor

Adds a jitter factor to the function by making it so that the final timeout value ranges in [interval * (1 ± RandFactor)], rounded down. Setting this value to 0 disables it.

Example

For the default values 5, 300, 2.5 and 0.2:

D	Timeout		Timeout		Timeout
#	(Raw)		(NoJitter)	(WithJitter)

1	5   		5   		[4, 6]
2	12.5		12  		[9, 14]
3	31.25		31  		[24, 37]
4	78.125		78  		[62, 93]
5	195.3125	195 		[156, 234]
6	488.28125	300 		[240, 360]
7	1220.7031	300 		[240, 360]

Based on `https://github.com/cenkalti/backoff/blob/v4/exponential.go`.

For more information about message visibility, see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.

func NewBackOff

func NewBackOff() BackOff

NewBackoff creates an instance of BackOff using default values.

type Handler

type Handler struct {
	BackOff       BackOff
	Context       context.Context
	FailureDlqURL string
	SQSClient     SQSClient
}

Handler functions as a configurable wrapper of an SQS Event, letting you control some of the behaviors related to a message's lifetime inside the queue.

BackOff

BackOff configures the exponential backoff values of retried messages.

Context

The Lambda's context.Context. Note that if this context has an invalid Deadline, such as in a context.TODO, Lambda's default of 15 minutes will be considered by the Handler as the ceiling for execution.

FailureDlqURL

This property is optional.

The URL of a DLQ to which messages with a Status of FAILURE will be sent to. This can be the original queue's own DLQ, or a separate one. Make sure that your Lambda has an execution role with enough permissions to write to said queue.

SQSClient

A properly configured sqs.Client, which will be used by the Handler to interface with the queue.

Configuring parallelism and number of retries

By design, a Handler will process all messages in a batch in parallel. Thus, the degree of parallelism can be controlled by altering the property Batch Size of your Lambda's trigger.

In much the same way, the amount of retries is tied to your queue's Max Receive Count property, which defines how many delivery attempts will be made on a single message until it is moved to a DLQ. For more information, see:

func New

func New(c context.Context) *Handler

New creates an instance of Handler with default BackOff values on retries, no DLQ set for messages with a Status of FAILURE and a sqs.Client with default configurations, as per the following:

cfg, err := config.LoadDefaultConfig(c)
if err != nil {
	log.Fatalf("unable to load SDK config, %v", err)
}
return sqs.NewFromConfig(cfg)

func (*Handler) HandleEvent

func (b *Handler) HandleEvent(event *events.SQSEvent, worker Worker) (events.SQSEventResponse, error)

HandleEvent is, as the name implies, the method called for handling an events.SQSEvent. It works by separating the event into processable messages which will then be given to a user defined Worker. After all messages have been processed, a Report will be printed to stdout detailing their Status and any errors that may have occurred.

Handling Messages

[event.SQSMessage]s inside the events.SQSEvent will be delivered as they are to the Worker for processing, in parallel. The Handler will wait for a Status callback for all messages up until 5 seconds before the Lambda's configured timeout. After that, any message which is still being processed will be ignored and returned to the queue with their default VisibilityTimeout value. If any Worker panics during execution, the Handler will consider that it failed to process the message, and assign it such Status itself.

At this point, all possible messages should be in one of the four states defined by Status. Any property that deviates from those 4 will be treated as a [Status.FAILURE], regardless if the message was successfully processed or not, with an appended error describing the issue.

Reporting to the Lambda framework

While the Lambda framework accepts quite a few handler signatures, this Handler requires the use of

func (context.Context, TIn) (TOut, error)

as we will be reporting any messages that we want to retry in a events.SQSEventResponse in order to avoid unnecessary calls to the SQS API.

With this option, the framework behaves differently depending on which kind of response is given after execution. We will be focusing on 3 specific kinds (the complete list can be found here):

  1. Empty events.SQSEventResponse, no error
  2. Populated events.SQSEventResponse, no error
  3. Anything, error

These 3 responses will be utilized in different scenarios, which will vary depending on the Status reported by your Worker:

  • If all messages were reported as [Status.SUCCESS], [Status.SKIP] or [Status.FAILURE] response number 1 will be used for the Lambda, signaling the framework that all messages were processed successfully and can be deleted from the queue. Failed messages will be reported as such by the Handler and sent to a DLQ, if one was configured. This response was chosen for failures due to how the framework interprets errors, which will be explained later.
  • If any message was reported as [Status.RETRY], then response number 2 will be used, where events.SQSEventResponse will contain the ids of all messages that need reprocessing.
  • If the Lambda reached a timeout, then response number 3 will be used, and the Handler will behave a bit differently, due to how the framework deals with errors.

Which brings us to the next point:

Reporting Errors

If any error reaches the Lambda framework, it will consider the execution as having completely failed and any and all messages that were delivered will be returned to the queue with their default VisibilityTimeout.

Thus, even if your Worker returns an error on a [Status.FAILURE], it will not be reported directly to the Lambda framework, as doing so would mean that all messages, including successfully processed ones, would be returned to the queue. These errors will still show up during logging, as they are printed to stdout in the Report.

On a timeout, however, the Handler doesn't know how to proceed with the messages that didn't report back. Thus, the best idea is to return them to the queue as they were, where they will follow the redelivery pattern until the problem is fixed or they are naturally sent to a DLQ. Note, however, that in order to ensure that only these messages will be returned, the Handler will have to MANUALLY DELETE every message with a Status of SUCCESS, SKIP and FAILURE from the queue.

Handler Errors

When dealing with messages, the Handler will perform a few checks and make calls to the SQS API (like SendMessage for sending failures to a DLQ and ChangeMessageVisibility for exponential backoff on retries). If any of these fail, they will be reported at the end of execution. Statuses will try to be preserved as much as possible. Failures will still be removed from the queue in order to avoid queue-poisoning and Retries will still be sent back, only with their VisibilityTimeout unchanged. Note that these behaviors are open to debate, so feel free to reach out with your thoughts on the matter.

type Report added in v0.9.1

type Report struct {
	BatchSize     int                 `json:"batchSize"`
	Success       int                 `json:"success,omitempty"`
	Skip          int                 `json:"skip,omitempty"`
	Retry         *retryFailureReport `json:"retry,omitempty"`
	Failure       *retryFailureReport `json:"failure,omitempty"`
	HandlerErrors []errorReport       `json:"handlerErrors,omitempty"`
}

Report defines a struct used for serializing and printing an execution report of a batch of messages.

BatchSize

A count of how many events.SQSMessage were contained in the events.SQSEvent

Success, Skip

A count of how many messages had a processing Status of said values.

Retry, Failure

A count of how many messages had a processing Status of said values, including any individual errors reported by either the Handler or Worker.

HandlerErrors

A collection of errors that occurred during message handling (changing visibility, sending to a DLQ, etc.).

type SQSClient

type SQSClient interface {
	ChangeMessageVisibility(context.Context, *sqs.ChangeMessageVisibilityInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
	SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
	DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

Interface to enable mocking of a SQSClient, usually for testing purposes.

type Status

type Status string

Status defines the four possible states a Worker can report to the Handler. These states are:

SUCCESS

Denotes that the message has been processed successfully and thus can be safely removed from the queue.

SKIP

Denotes that the message is not relevant and has thus been skipped. Functionally identical to SUCCESS, used for reporting purposes.

RETRY

Denotes that the message was not processed successfully due to a transient error. This signals the Handler to return the message to the queue with an updated VisibilityTimeout, following its [Backoff] configuration.

FAILURE

Denotes that the message was not processed successfully due to immutable reasons that cannot be solved by simply retrying the operation. This signals the Handler to send this message to a DLQ, if one was specified during configuration, and then to remove said message from the queue.

const (
	Failure Status = "FAILURE"
	Retry   Status = "RETRY"
	Skip    Status = "SKIP"
	Success Status = "SUCCESS"
)

type Worker

type Worker interface {
	Work(context.Context, events.SQSMessage) (Status, error)
}

The Worker interface defines the main method which will be called by the Handler for message processing: [Worker.Work], and should be considered the entry point of your lambda function.

Work

This method will be called once per message by the Handler, which will pass along the Lambda context and the SQSMessage to be worked upon.

IMPORTANT: This means that this method will be called for all messages IN PARALLEL, and should be treated as a Go Routine. Pay special attention if your code makes use of shared memory structs or any other non-thread safe components!

Note that the Lambda context contains its preconfigured timeout, which your Worker should respect. Also note that if a timeout is imminent, the Handler will reserve 5 seconds of total runtime in order to cleanup all previously handled messages. Any Work that does not return before that will be discarded, and its message will be returned to the queue with its DefaultVisibilityTimeout.

After processing a message, the Worker should return the relevant Status and errors that may have occurred.

If a Worker panics during execution, the Handler will consider that processing has failed, and treat the message as if a [Status.FAILURE] was returned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL