asyncsqs

package module
v0.0.0-...-04ec6e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2023 License: MIT Imports: 9 Imported by: 0

README

asyncsqs

Go.Dev reference Build, Unit Tests, Linters Status codecov Go Report Card MIT license

asyncsqs wraps around SQS client from aws-sdk-go-v2 to provide an async buffered client which batches send message and delete message requests to optimise AWS costs.

Messages can be scheduled to be sent and deleted. Requests will be dispatched when

  • either batch becomes full
  • or waiting period exhausts (if configured)
  • or the batch total body size becomes grater than or equal to 256 kb. (addition)

...whichever occurs earlier.

Getting started

Add dependency

asyncsqs requires a Go version with modules support. If you're starting a new project, make sure to initialise a Go module:

$ mkdir ~/hellosqs
$ cd ~/hellosqs
$ go mod init github.com/my/hellosqs

And then add asyncsqs as a dependency to your existing or new project:

$ go get github.com/ngoyal16/asyncsqs
Write Code

please follow the demo code in the example folder

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferedClient

type BufferedClient struct {
	Config
	// contains filtered or unexported fields
}

BufferedClient wraps aws-sdk-go-v2's sqs.Client to provide a async buffered client.

func NewBufferedClient

func NewBufferedClient(config Config) (*BufferedClient, error)

NewBufferedClient creates and returns a new instance of BufferedClient. You will need one BufferedClient client per SQS queue. Stop() must be eventually called to free resources created by NewBufferedClient.

func (*BufferedClient) ChangeMessageVisibilityAsync

func (c *BufferedClient) ChangeMessageVisibilityAsync(entries ...types.ChangeMessageVisibilityBatchRequestEntry) error

ChangeMessageVisibilityAsync schedules message(s) which visibility needs to be change. It blocks if the change message visibility buffer is full.

func (*BufferedClient) DeleteMessageAsync

func (c *BufferedClient) DeleteMessageAsync(entries ...types.DeleteMessageBatchRequestEntry) error

DeleteMessageAsync schedules message(s) to be deleted. It blocks if the delete buffer is full.

func (*BufferedClient) ReceiveMessages

func (c *BufferedClient) ReceiveMessages()

func (*BufferedClient) SendMessageAsync

func (c *BufferedClient) SendMessageAsync(entries ...types.SendMessageBatchRequestEntry) error

SendMessageAsync schedules message(s) to be sent. It blocks if the send buffer is full.

func (*BufferedClient) Stats

func (c *BufferedClient) Stats() Stats

Stats returns client statistics.

func (*BufferedClient) Stop

func (c *BufferedClient) Stop()

Stop stops all the batcher and dispatcher goroutines. It blocks until all pending requests in buffer are gracefully drained. Stop should be called only after calls to SendMessageAsync() and DeleteMessageAsync() have stopped.

type Config

type Config struct {
	// SQSClient abstracts *sqs.Client from aws-sdk-go-v2. You can bring your
	// own fully initialised SQS client (with required credentials, options
	// etc). This is a required field.
	SQSClient SQSClient

	// QueueURL specifies AWS SQS Queue URL for a queue.
	// This is a required field.
	QueueURL string

	// SendBatchEnabled specifies that send message dispatcher will
	// be enabled or not. If not specified, defaults to false.
	SendBatchEnabled bool

	// SendWaitTime specifies a time limit for how long the client will
	// wait before it will dispatch accumulated send message requests
	// even if the batch isn't full. If not specified, send message
	// requests will be dispatched only when a batch is full.
	SendWaitTime time.Duration

	// SendBufferSize specifies a limit on the number of send message
	// requests that can be held in memory. If not specified, defaults
	// to 1000.
	SendBufferSize int

	// SendConcurrency limits the number of concurrent send message SQS
	// requests in progress. If not specified, defaults to SendBufferSize/10.
	SendConcurrency int

	// OnSendMessageBatch will be called with results returned by SQSClient
	// for a send message batch operation. If set, this callback function
	// needs to be goroutine safe.
	OnSendMessageBatch func(*sqs.SendMessageBatchOutput, error)

	// DeleteBatchEnabled specifies that delete message dispatcher will
	// be enabled or not. If not specified, defaults to false.
	DeleteBatchEnabled bool

	// DeleteWaitTime specifies a time limit for how long the client will
	// wait before it will dispatch accumulated delete message requests
	// even if the batch isn't full. If not specified, delete message
	// requests will be dispatched only when a batch is full.
	DeleteWaitTime time.Duration

	// DeleteBufferSize specifies a limit on the number of delete message
	// requests that can be held in memory. If not specified, defaults
	// to 1000.
	DeleteBufferSize int

	// DeleteConcurrency limits the number of concurrent delete message SQS
	// requests in progress. If not specified, defaults to DeleteBufferSize/10.
	DeleteConcurrency int

	// OnDeleteMessageBatch will be called with results returned by SQSClient
	// for a delete message batch operation. If set, this callback function
	// needs to be goroutine safe.
	OnDeleteMessageBatch func(*sqs.DeleteMessageBatchOutput, error)

	// ReceiveBatchEnabled specifies that receive message dispatcher will
	// be enabled or not. If not specified, defaults to false.
	ReceiveBatchEnabled bool

	// ReceiveWaitTime specifies a time limit for how long the client will
	// wait before it will get response from receive message(s) requests
	// event if the batch isn't full. If not specified, receive message
	// request will be wait till the batch is full.
	ReceiveWaitTime int32

	// ReceiveVisibilityTimeout specifies a time limit for how long the message
	// will be invisible for other consumers. If not specified, defaults to
	// 0.
	ReceiveVisibilityTimeout int32

	// ReceiveBufferSize specifies a limit on the number of receive message
	// request that can be held in memory. If not specified, defaults to
	// 1000.
	ReceiveBufferSize int

	// ReceiveConcurrency limits the number of concurrent receive message SQS
	// requests in progress. If not specified, defaults to ReceiveBufferSize/10.
	ReceiveConcurrency int

	// OnReceiveMessage will be called with results returned by SQSClient
	// for receive message operation. If set, this callback function
	// needs to be goroutine safe.
	OnReceiveMessage func(*sqs.ReceiveMessageOutput, error)

	// ChangeVisibilityBatchEnabled specifies that change message visibility
	// dispatcher will be enabled or not. If not specified, defaults to false.
	ChangeVisibilityBatchEnabled bool

	// ChangeVisibilityWaitTime specifies a time limit for how long the
	// client will wait before it will dispatch accumulated change message visibility
	// requests even if the batch isn't full. If not specified, change message
	// visibility requests will be dispatched only when a batch is full.
	ChangeVisibilityWaitTime time.Duration

	// ChangeVisibilityBufferSize specifies a limit on the number of change
	// message visibility requests that can be held in memory. If not specified,
	// defaults to 1000.
	ChangeVisibilityBufferSize int

	// ChangeVisibilityConcurrency limits the number of concurrent change
	// message visibility SQS requests in progress. If not specified, defaults to
	// ChangeVisibilityBufferSize/10.
	ChangeVisibilityConcurrency int

	// OnChangeMessageVisibilityBatch will be called with results returned by
	// SQSClient for a change message visibility batch operation. If set, this
	// callback function needs to be goroutine safe.
	OnChangeMessageVisibilityBatch func(*sqs.ChangeMessageVisibilityBatchOutput, error)
}

Config is used to configure BufferedClient.

type SQSClient

SQSClient wraps *sqs.Client from aws-sdk-go-v2

type Stats

type Stats struct {
	MessagesSent                      uint64
	MessagesDeleted                   uint64
	MessagesReceived                  uint64
	MessagesVisibilityChanged         uint64
	SendMessageBatchCalls             uint64
	DeleteMessageBatchCalls           uint64
	ReceiveMessageCalls               uint64
	ChangeMessageVisibilityBatchCalls uint64
}

Stats contains client statistics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL