sqsconsumer

package module
v0.0.0-...-d2be49e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2021 License: MIT Imports: 8 Imported by: 4

README

sqsconsumer

Build Status

sqsconsumer helps write programs that should respond to messages on an AWS SQS queue. Users of the package only write a processor implementation and then start a consumer bound to a specific queue and processor. The consumer will take care of extending message visibility if the processor takes a long time to run and of only deleting messages from the queue which were successfully processed. Context is passed into the processor so that complex/long workers can gracefully exit when the consumer is interrupted/killed.

See example/main.go for a simple demonstration.

AWS IAM

sqs-consumer requires IAM permissions for the following SQS API actions:

  • sqs:ChangeMessageVisibility
  • sqs:ChangeMessageVisibilityBatch
  • sqs:DeleteMessage
  • sqs:DeleteMessageBatch
  • sqs:GetQueueAttributes
  • sqs:GetQueueUrl
  • sqs:ReceiveMessage

Shutting down gracefully

Canceling the context passed to the Run method will propagate cancelation to all running handlers, likely preventing them from completing their task. To have sqsconsumer stop consuming messages while still allowing the in-flight handlers to complete their work, you may provide an optional shutDown channel.

Example:

  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  shutDown := make(chan struct{})
  // ... Set up shutdown signaling, service, and handler ...
  // shutDown will be closed when it's time to shut down

  c := sqsconsumer.NewConsumer(service, handler)

  wg := &sync.WaitGroup{}
  for i := 0; i < numFetchers; i++ {
    wg.Add(1)
    go func() {
      // Consumer will stop when shutDown is closed
      err := c.Run(ctx, sqsconsumer.WithShutdownChan(shutDown))
      // Handle error
      wg.Done()
    }()
  }

  <-shutDown
  // Force shutdown after deadline
  time.AfterFunc(30*time.Second, cancel)

  // Wait for all the consumers to exit cleanly
  wg.Wait()

TODO

Documentation

Overview

Package sqsconsumer enables easy and efficient message processing from an SQS queue.

Overview

Consumers will read from queues in batches and run a handler func for each message. Note that no retry limit is managed by this package, so use the SQS Dead Letter Queue facility. Of course, you can use another consumer to handle messages that end up in the Dead Letter Queue.

SQS

SQS provides at-least-once delivery with no guarantee of message ordering. When messages are received, a visibility timeout starts and when the timeout expires then the message will be delivered again. Long running message handlers must extend the timeout periodically to ensure that they retain exclusivity on the message, and they must explicitly delete messages that were successfully consumed to avoid redelivery.

To read more about how SQS works, check the SQS documentation at https://aws.amazon.com/documentation/sqs/

Middleware

Visibility timeout extension and deleting messages after successful handling are implemented as handler middleware. See github.com/Wattpad/sqsconsumer/middleware for details on these and other middleware layers available.

Use

See the example directory for a demonstration of use.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrShutdownChannelClosed = errors.New("shutDown channel is already closed")
)

Functions

func NewBatchDeleter

func NewBatchDeleter(ctx context.Context, wg *sync.WaitGroup, s *SQSService, every, drainTimeout time.Duration) chan<- *sqs.Message

NewBatchDeleter starts a batch deleter routine that deletes messages after they are sent to the returned channel

func NewBatchVisibilityExtender

func NewBatchVisibilityExtender(ctx context.Context, s *SQSService, ticker <-chan time.Time, extensionSecs int64, pending []*sqs.Message) chan<- *sqs.Message

NewBatchBatchVisibilityExtender starts a batch visibility extender routine that extends visibilty on messages until they are sent to the returned channel

func NoopLogger

func NoopLogger(_ string, _ ...interface{})

func SetupQueue

func SetupQueue(svc SQSAPI, name string) (*string, error)

SetupQueue creates the queue to listen on and returns the URL.

Types

type AWSConfigOption

type AWSConfigOption func(*aws.Config)

func OptAWSRegion

func OptAWSRegion(region string) AWSConfigOption

type Consumer

type Consumer struct {
	Logger                         func(string, ...interface{})
	WaitSeconds                    int64
	ReceiveVisibilityTimoutSeconds int64

	ExtendVisibilityTimeoutBySeconds int64
	ExtendVisibilityTimeoutEvery     time.Duration
	DeleteMessageAccumulatorTimeout  time.Duration
	DeleteMessageDrainTimeout        time.Duration
	// contains filtered or unexported fields
}

Consumer is an SQS queue consumer

func NewConsumer

func NewConsumer(s *SQSService, handler MessageHandlerFunc) *Consumer

NewConsumer creates a Consumer that uses the given SQSService to connect and invokes the handler for each message received.

func (*Consumer) Run

func (mf *Consumer) Run(ctx context.Context, opts ...RunOption) error

Run starts the Consumer, stopping it when the given context is cancelled. To shut down without canceling the Context, and allow in-flight messages to drain, use the WithShutdownChan RunOption.

If the context is canceled, the returned error is the context's error. If the optional shutDown channel is closed before Run is called, the returned error is ErrShutdownChannelClosed. If in-flight messages drain to completion after shutdown, the returned error is nil.

func (*Consumer) SetLogger

func (mf *Consumer) SetLogger(fn func(format string, args ...interface{}))

SetLogger sets the consumer and service loggers to a function similar to fmt.Printf

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, msg string) error

MessageHandlerFunc is the interface that users of this library should implement. It will be called once per message and should return an error if there was a problem processing the message. Note that Consumer ignores the error, but it is necessary for some middleware to know whether handling was successful or not.

type RunOption

type RunOption func(o *runOpts)

func WithShutdownChan

func WithShutdownChan(shutDown <-chan struct{}) RunOption

WithShutdownChan accepts a channel that will gracefully shut down the consumer when it is closed. The consumer will stop receiving messages from SQS and will return from the Run method once in-flight handlers have completed.

The channel must be closed for shutdown to occur. Sending a value down the channel will not shut down the consumer.

The Run method's context.Context may be canceled during this time to abort pending operations early. This is done co-operatively and requires the consumer's handler func to honour the context cancelation.

type SQSAPI

SQSAPI is the part of the AWS SQS API which is used by the sqsconsumer package.

type SQSService

type SQSService struct {
	Svc    SQSAPI
	URL    *string
	Logger func(format string, args ...interface{})
}

SQSService links an SQS client with a queue URL.

func NewSQSService

func NewSQSService(queueName string, svc SQSAPI) (*SQSService, error)

Takes SQS type as an argument so the library may be mocked and tested locally

func SQSServiceForQueue

func SQSServiceForQueue(queueName string, opts ...AWSConfigOption) (*SQSService, error)

SQSServiceForQueue creates an AWS SQS client configured for the given region and gets or creates a queue with the given name.

Directories

Path Synopsis
cmd
sqsdrain
sqsdrain reads messages until interrupted off of an SQS queue and writes them to a file.
sqsdrain reads messages until interrupted off of an SQS queue and writes them to a file.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL