README

kinluek GoDoc

SQS-Poller

SQS-Poller is a simple queue polling framework, designed specifically to work with AWS SQS.

Contents

Installation

  1. Install sqspoller:
$ go get -u github.com/kinluek/sqspoller
  1. Import code:
import "github.com/kinluek/sqspoller"

Features

  • Timeouts
  • Polling intervals
  • Polling back offs on empty responses
  • Graceful shutdowns
  • Middleware
  • Remove message from queue with simple delete API

Quick Start

package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/kinluek/sqspoller"
	"log"
	"time"
)

func main() {
	// create SQS client.
	sess := session.Must(session.NewSession())
	sqsClient := sqs.New(sess)

	// use client to create default Poller instance.
	poller := sqspoller.Default(sqsClient)

	// supply polling parameters.
	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	// configure idle poll interval and handler timeout
	poller.SetIdlePollInterval(30 * time.Second)
	poller.SetHandlerTimeout(120 * time.Second)

	// supply handler to handle new messages
	poller.OnMessage(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput) error {
		msg := msgOutput.Messages[0]
		// do work on message
		fmt.Println("GOT MESSAGE: ", msg)
		// delete message from queue
		if _, err := msg.Delete(); err != nil {
			return err
		}
		return nil
	})

	// supply handler to handle errors returned from poll requests to
	// SQS returning a non nil error will cause the poller to exit.
	poller.OnError(func(ctx context.Context, err error) error {
		// log error and exit poller.
		log.Println(err)
		return err
	})

	// Run poller.
	if err := poller.Run(); err != nil {
		log.Fatal(err)
	}
}

Using Middleware

func main() {

	poller := sqspoller.New(sqsClient)

	// IgnoreEmptyResponses stops empty message outputs from reaching the core handler
	// and therefore the user can guarantee that there will be at least one message in
	// the message output.
	//
	// Note: Default poller comes with this middleware.
	poller.Use(sqspoller.IgnoreEmptyResponses())

	// supply handler to handle new messages
	poller.OnMessage(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput) error {
		// can guarantee messages will have length greater than or equal to one.
		msg := msgOutput.Messages[0]

		// delete message from queue
		if _, err := msg.Delete(); err != nil {
			return err
		}
		return nil
	})
}

Shutdown

When shutting down the poller, there are three different modes of shutdown to choose from.

ShutdownNow
 poller.ShutdownNow()

The ShutdownNow method cancels the context object immediately and exits the Run() function. It does not wait for any jobs to finish handling before exiting.

ShutdownGracefully
 poller.ShutdownGracefully()

The ShutdownGracefully method waits for the handler to finish handling the current message before cancelling the context object and exiting the Run() function. If the handler is blocked then ShutdownGracefully will not exit.

ShutdownAfter
 poller.ShutdownAfter(30*time.Second)

The ShutdownAfter method attempts to shutdown gracefully within the given time, if the handler cannot complete it's current job within the given time, then the context object is cancelled at that time allowing the Run() function to exit.

If the timeout happens before the poller can shutdown gracefully then ShutdownAfter returns error, ErrShutdownGraceful.

func main() {
	poller := sqspoller.Default(sqsClient)

	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	poller.OnMessage(messageHandler)
	poller.OnError(errorHandler)

	// run poller in a separate goroutine and wait for errors on channel
	pollerErrors := make(chan error, 1)
	go func() {
		pollerErrors <- poller.Run()
	}()

	// listen for shutdown signals
	shutdown := make(chan os.Signal, 1)
	signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

	select {
	case err := <-pollerErrors:
		log.Fatal(err)
	case <-shutdown:
		if err := poller.ShutdownAfter(30 * time.Second); err != nil {
			log.Fatal(err)
		}
	}
}

The Playground

To see how you can experiment and play around with a local SQS and poller instance, take a look here.

Dependencies

  • Go Version 1.13+

Just in case you was worried about dependency bloat, the core package functions only rely on two third party modules, which are:

  • github.com/aws/aws-sdk-go v1.28.9 - What the framework is built for.
  • github.com/google/uuid v1.1.1 - To generate reliable UUIDs for tracing.

The rest of the dependencies that can be found in go.mod, are test dependencies. These modules provide functionality to effectively test the framework.

Testing

Tests in the sqspoller_test.go file require that docker is installed and running on your machine, as the tests spin up local SQS containers to test the framework against against.

When running the tests, the setup code will check to see if the localstack/localstack:0.10.7 image exists on the machine, if it does not, the image will be pulled from docker.io before the tests are run. To avoid this stall, try pulling the image, manually, before running the tests, like so:

docker pull localstack/localstack:0.10.7
Expand ▾ Collapse ▴

Documentation

Overview

    package sqspoller is a simple queue polling framework, designed specifically to work with AWS SQS.

    Index

    Constants

    View Source
    const TrackingKey ctxKey = 1

      TrackingKey should be used to access the values on the context object of type *TrackingValue.

      Variables

      View Source
      var (
      	// ErrNoMessageHandler occurs when the caller tries to run the poller before attaching a MessageHandler.
      	ErrNoMessageHandler = errors.New("ErrNoMessageHandler: no message handler set on poller instance")
      
      	// ErrNoErrorHandler occurs when the caller tries to run the poller before attaching an ErrorHandler.
      	ErrNoErrorHandler = errors.New("ErrNoErrorHandler: no error handler set on poller instance")
      
      	// ErrNoReceiveMessageParams occurs when the caller tries to run the poller before setting the ReceiveMessageParams.
      	ErrNoReceiveMessageParams = errors.New("ErrNoReceiveMessageParams: no ReceiveMessage parameters have been set")
      
      	// ErrHandlerTimeout occurs when the MessageHandler times out before processing the message.
      	ErrHandlerTimeout = errors.New("ErrHandlerTimeout: message handler took to long to process message")
      
      	// ErrRequestTimeout occurs when the poller times out while requesting for a message off the SQS queue.
      	ErrRequestTimeout = errors.New("ErrRequestTimeout: requesting message from queue timed out")
      
      	// ErrShutdownNow occurs when the poller is suddenly shutdown.
      	ErrShutdownNow = errors.New("ErrShutdownNow: poller was suddenly shutdown")
      
      	// ErrShutdownGraceful occurs when the poller fails to shutdown gracefully.
      	ErrShutdownGraceful = errors.New("ErrShutdownGraceful: poller could not shutdown gracefully in time")
      
      	// ErrNotCloseable occurs when the caller tries to shut down the poller is already stopped or in the process of shutting down.
      	ErrNotCloseable = errors.New("ErrNotCloseable: poller is either stopped or already shutting down")
      
      	// ErrNotRunnable occurs when the caller tries to run the poller while the poller is already running or shutting down.
      	ErrNotRunnable = errors.New("ErrNotRunnable: poller is either already running or shutting down")
      
      	// ErrNotRunnable occurs when there is an integrity issue in the system.
      	ErrIntegrityIssue = errors.New("ErrIntegrityIssue: unknown integrity issue")
      )

      Functions

      This section is empty.

      Types

      type ErrorHandler

      type ErrorHandler func(ctx context.Context, err error) error

        ErrorHandler is a function which handlers errors returned from sqs.ReceiveMessageWithContext, it will only be invoked if the error is not nil. Returning nil from the ErrorHandler will allow the poller to continue, returning an error will cause the poller to exit.

        Errors should be of type awserr.Error, if the sqs.ReceiveMessageWithContext function returns the errors as expected.

        type Message

        type Message struct {
        	*sqs.Message
        	// contains filtered or unexported fields
        }

          Message is an individual message, contained within a MessageOutput, it provides methods to remove itself from the SQS queue.

          func (*Message) Delete

          func (m *Message) Delete() (*sqs.DeleteMessageOutput, error)

            Delete removes the message from the queue, permanently.

            type MessageHandler

            type MessageHandler func(ctx context.Context, client *sqs.SQS, msgOutput *MessageOutput) error

              MessageHandler is a function which handles the incoming SQS message.

              The sqs Client used to instantiate the poller will also be made available to allow the user to perform standard sqs operations.

              type MessageOutput

              type MessageOutput struct {
              	*sqs.ReceiveMessageOutput
              	Messages []*Message
              	// contains filtered or unexported fields
              }

                MessageOutput is contains the SQS ReceiveMessageOutput and is passed down to the MessageHandler when the Poller is running.

                type Middleware

                type Middleware func(MessageHandler) MessageHandler

                  Middleware is a function which that wraps a MessageHandler to add functionality before or after the MessageHandler code.

                  func IgnoreEmptyResponses

                  func IgnoreEmptyResponses() Middleware

                    IgnoreEmptyResponses stops the data from being passed down to the inner message handler, if there is no message to be handled.

                    type Poller

                    type Poller struct {
                    	*sync.Mutex
                    
                    	// Holds the time of the last poll request that was made. This can be checked
                    	// periodically, to confirm the Poller is running as expected.
                    	LastPollTime time.Time
                    
                    	// Maximum time interval between each poll when poll requests are returning
                    	// empty responses.
                    	IdlePollInterval time.Duration
                    
                    	// Current poll interval, this interval will reach the IdlePollInterval
                    	// upon enough consecutive empty poll requests. Once a successful message
                    	// response is received, the CurrentInterval will drop back down to 0.
                    	CurrentInterval time.Duration
                    
                    	// Timeout on requesting for a new message from SQS. By default, this will
                    	// be 30 seconds, if it has not been set manually.
                    	RequestTimeout time.Duration
                    	// contains filtered or unexported fields
                    }

                      Poller is an instance of the polling framework, it contains the SQS client and provides a simple API for polling an SQS queue.

                      func Default

                      func Default(sqsSvc *sqs.SQS) *Poller

                        Default creates a new instance of the SQS Poller from an instance of sqs.SQS. It also comes set up with the recommend outerMiddleware plugged in.

                        func New

                        func New(sqsSvc *sqs.SQS) *Poller

                          New creates a new instance of the SQS Poller from an instance of sqs.SQS.

                          func (*Poller) OnError

                          func (p *Poller) OnError(handler ErrorHandler)

                            OnError attaches an ErrorHandler to the Poller instance. It is the first line of defence against message request errors from SQS.

                            func (*Poller) OnMessage

                            func (p *Poller) OnMessage(handler MessageHandler, middleware ...Middleware)

                              OnMessage attaches a MessageHandler to the Poller instance, if a MessageHandler already exists on the Poller instance, it will be replaced. The Middleware supplied to OnMessage will be applied first before any global middleware set by Use().

                              func (*Poller) ReceiveMessageParams

                              func (p *Poller) ReceiveMessageParams(input *sqs.ReceiveMessageInput, opts ...request.Option)

                                ReceiveMessageParams accepts the same parameters as the SQS ReceiveMessage method. It configures how the poller receives new messages, the parameters must be set before the Poller is run.

                                func (*Poller) Run

                                func (p *Poller) Run() error

                                  Run starts the poller, the poller will continuously poll SQS until an error is returned, or explicitly told to shutdown.

                                  func (*Poller) SetHandlerTimeout

                                  func (p *Poller) SetHandlerTimeout(t time.Duration)

                                    SetHandlerTimeout lets the user set the timeout for handling a message, if the messageHandler function cannot finish execution within this time frame, the MessageHandler will return ErrHandlerTimeout. The error can be caught and handled by custom middleware, so the user can choose to move onto the next poll request if they so wish.

                                    func (*Poller) SetIdlePollInterval

                                    func (p *Poller) SetIdlePollInterval(t time.Duration)

                                      SetIdlePollInterval sets the polling interval for when the queue is empty and poll requests are returning empty responses, leaving the handler idle.

                                      This interval will be reached through an exponential back off starting at 1 second from when the first empty response is received after a non-empty response, consecutive empty responses will cause the interval to double each time until the set interval is reached. Once a successful response is returned, the interval drops back down to 0.

                                      func (*Poller) SetRequestTimeout

                                      func (p *Poller) SetRequestTimeout(t time.Duration)

                                        SetRequestTimeout lets the user set the timeout on requesting for a new message from the SQS queue. If the timeout occurs, ErrRequestTimeout will be passed to the OnError handler. If the caller wishes to continue polling after a the timeout, the ErrRequestTimeout error must be whitelisted in the error handler.

                                        func (*Poller) ShutdownAfter

                                        func (p *Poller) ShutdownAfter(t time.Duration) error

                                          ShutdownAfter will attempt to shutdown gracefully, if graceful shutdown cannot be achieved within the given time frame, the Poller will exit, potentially leaking unhandled resources.

                                          func (*Poller) ShutdownGracefully

                                          func (p *Poller) ShutdownGracefully() error

                                            ShutdownGracefully gracefully shuts down the poller.

                                            func (*Poller) ShutdownNow

                                            func (p *Poller) ShutdownNow() error

                                              ShutdownNow shuts down the Poller instantly, potentially leaking unhandled resources.

                                              func (*Poller) Use

                                              func (p *Poller) Use(middleware ...Middleware)

                                                Use attaches global outerMiddleware to the Poller instance which will wrap any MessageHandler and MessageHandler specific outerMiddleware.

                                                type TrackingValue

                                                type TrackingValue struct {
                                                	TraceID string
                                                	Now     time.Time
                                                }

                                                  TrackingValue represents the values stored on the context object, for each poll the context object will store the time of message received and a trace ID.

                                                  Directories

                                                  Path Synopsis
                                                  cmd
                                                  playground
                                                  The Playground is where you can run the poller locally against a containerized SQS service.
                                                  The Playground is where you can run the poller locally against a containerized SQS service.
                                                  examples
                                                  internal