asyncsqs

package module
v0.0.0-...-1f30825 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2021 License: MIT Imports: 9 Imported by: 1

README

asyncsqs

Go.Dev reference Build, Unit Tests, Linters Status codecov Go Report Card MIT license

asyncsqs wraps around SQS client from aws-sdk-go-v2 to provide an async buffered client which batches send message and delete message requests to optimise AWS costs.

Messages can be scheduled to be sent and deleted. Requests will be dispatched when

  • either batch becomes full
  • or waiting period exhausts (if configured)

...whichever occurs earlier.

Getting started

Add dependency

asyncsqs requires a Go version with modules support. If you're starting a new project, make sure to initialise a Go module:

$ mkdir ~/hellosqs
$ cd ~/hellosqs
$ go mod init github.com/my/hellosqs

And then add asyncsqs as a dependency to your existing or new project:

$ go get github.com/prashanthpai/asyncsqs
Write Code
package main

import (
	"context"
	"log"
	"strconv"

	"github.com/prashanthpai/asyncsqs"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

func main() {
	// Create a SQS client with appropriate credentials/IAM role, region etc.
	awsCfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		log.Fatalf("config.LoadDefaultConfig() failed: %v", err)
	}
	sqsClient := sqs.NewFromConfig(awsCfg)

	// Create a asyncsqs buffered client; you'd have one per SQS queue
	client, err := asyncsqs.NewBufferedClient(asyncsqs.Config{
		SQSClient:          sqsClient,
		QueueURL:           "https://sqs.us-east-1.amazonaws.com/xxxxxxxxxxxx/qqqqqqqqqqqq",
		OnSendMessageBatch: sendResponseHandler, // register callback function (recommended)
	})
	if err != nil {
		log.Fatalf("asyncsqs.NewBufferedClient() failed: %v", err)
	}
	// important! Stop() ensures that requests in memory are gracefully
	// flushed/dispatched and resources like goroutines are cleaned-up
	defer client.Stop()

	for i := 0; i < 100; i++ {
		_ = client.SendMessageAsync(types.SendMessageBatchRequestEntry{
			Id:          aws.String(strconv.Itoa(i)),
			MessageBody: aws.String("hello world"),
		})
	}
}

func sendResponseHandler(output *sqs.SendMessageBatchOutput, err error) {
	if err != nil {
		log.Printf("send returned error: %v", err)
	}
	for _, s := range output.Successful {
		log.Printf("message send successful: msg id = %s", *s.Id)
	}
	for _, f := range output.Failed {
		log.Printf("message send failed: msg id = %s", *f.Id)
	}
}
Limitation

While asyncsqs ensures batch size doesn't exceed SQS's limit of 10 messages, it does not validate size of the payload yet. SQS places following limits on batch request payload:

The maximum allowed individual message size and the maximum total payload size
(the sum of the individual lengths of all of the batched messages) are both
256 KB (262,144 bytes).

This translates to an average payload limit of around 25KB per individual message.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferedClient

type BufferedClient struct {
	Config
	// contains filtered or unexported fields
}

BufferedClient wraps aws-sdk-go-v2's sqs.Client to provide a async buffered client.

func NewBufferedClient

func NewBufferedClient(config Config) (*BufferedClient, error)

NewBufferedClient creates and returns a new instance of BufferedClient. You will need one BufferedClient client per SQS queue. Stop() must be eventually called to free resources created by NewBufferedClient.

func (*BufferedClient) DeleteMessageAsync

func (c *BufferedClient) DeleteMessageAsync(entries ...types.DeleteMessageBatchRequestEntry) error

DeleteMessageAsync schedules message(s) to be deleted. It blocks if the delete buffer is full.

func (*BufferedClient) SendMessageAsync

func (c *BufferedClient) SendMessageAsync(entries ...types.SendMessageBatchRequestEntry) error

SendMessageAsync schedules message(s) to be sent. It blocks if the send buffer is full.

func (*BufferedClient) Stats

func (c *BufferedClient) Stats() Stats

Stats returns client statistics.

func (*BufferedClient) Stop

func (c *BufferedClient) Stop()

Stop stops all the batcher and dispatcher goroutines. It blocks until all pending requests in buffer are gracefully drained. Stop should be called only after calls to SendMessageAsync() and DeleteMessageAsync() have stopped.

type Config

type Config struct {
	// SQSClient abstracts *sqs.Client from aws-sdk-go-v2. You can bring your
	// own fully initialised SQS client (with required credentials, options
	// etc). This is a required field.
	SQSClient SQSClient

	// QueueURL specifies AWS SQS Queue URL for a queue.
	// This is a required field.
	QueueURL string

	// SendWaitTime specifies a time limit for how long the client will
	// wait before it will dispatch accumulated send message requests
	// even if the batch isn't full. If not specified, send message
	// requests will be dispatched only when a batch is full.
	SendWaitTime time.Duration

	// SendBufferSize specifies a limit on the number of send message
	// requests that can be held in memory. If not specified, defaults
	// to 1000.
	SendBufferSize int

	// SendConcurrency limits the number of concurrent send message SQS
	// requests in progress. If not specified, defaults to SendBufferSize/10.
	SendConcurrency int

	// OnSendMessageBatch will be called with results returned by SQSClient
	// for a send message batch operation. If set, this callback function
	// needs to be goroutine safe.
	OnSendMessageBatch func(*sqs.SendMessageBatchOutput, error)

	// DeleteWaitTime specifies a time limit for how long the client will
	// wait before it will dispatch accumulated delete message requests
	// even if the batch isn't full. If not specified, delete message
	// requests will be dispatched only when a batch is full.
	DeleteWaitTime time.Duration

	// DeleteBufferSize specifies a limit on the number of delete message
	// requests that can be held in memory. If not specified, defaults
	// to 1000.
	DeleteBufferSize int

	// DeleteConcurrency limits the number of concurrent delete message SQS
	// requests in progress. If not specified, defaults to DeleteBufferSize/10.
	DeleteConcurrency int

	// OnDeleteMessageBatch will be called with results returned by SQSClient
	// for a delete message batch operation. If set, this callback function
	// needs to be goroutine safe.
	OnDeleteMessageBatch func(*sqs.DeleteMessageBatchOutput, error)
}

Config is used to configure BufferedClient.

type SQSClient

type SQSClient interface {
	SendMessageBatch(context.Context, *sqs.SendMessageBatchInput, ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
	DeleteMessageBatch(context.Context, *sqs.DeleteMessageBatchInput, ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error)
}

SQSClient wraps *sqs.Client from aws-sdk-go-v2

type Stats

type Stats struct {
	MessagesSent            uint64
	MessagesDeleted         uint64
	SendMessageBatchCalls   uint64
	DeleteMessageBatchCalls uint64
}

Stats contains client statistics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL