sqserv

package module
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2022 License: MIT Imports: 16 Imported by: 0

README

sqserv

GoDoc

Package sqserv provides a net/http style interface for handling AWS SQS messages. http.Request objects are generated from SQS messages then handled with standard http.Handler methods.

Why?

Using the net/http interface allows for handy reuse of http middleware and other related utilities.

Basic Example
    queueHandler := func(w http.ResponseWriter, req *http.Request) {
		// Handle as a you would an http request

		// Ack the message by returning a success code
		w.WriteHeader(http.StatusNoContent)
	}
        // For SQS queue name 'message-queue'
	// Use any http.Handler compatible router/middleware
	mux := http.NewServeMux()
	mux.Handle("/message-queue", http.HandlerFunc(queueHandler))

	conf := &aws.Config{}
	qsrv, err := sqserv.New(conf.WithRegion("us-west-2"), mux)
	if err != nil {
		log.Fatal(err)
	}

	// non-blocking call to start polling for SQS messages
	err = qsrv.ListenAndServe("message-queue")
	if err == nil {
		log.Fatal(err)
	}

The package doc contains more examples and implementation details.

Documentation

godoc contains more examples and implementation details.

Documentation

Overview

Package sqserv provides a net/http style interface for handling AWS SQS messages. http.Request objects are generated from SQS messages. Using the net/http interface allows for reuse of http middleware and other related utilities.

Path Matching

The url of the generated http.Request is set to the name of the SQS queue. For example, with a SQS queue named 'worker' the url will be '/worker'. Therefore your http.Handler needs to handle '/worker' requests as follows.

mux.Handle("/worker", workerHandler)

To allow more than one handler per queue you can add the 'Path' Message Attribute to your SQS messages. The Path attribute is appended to the queue name to form the url. For example, by appending Path = 'highpri' or Path = 'lowpri' to your SQS messages allows multiple handlers as follows:

mux.Handle("/worker", genericHandler)
mux.Handle("/worker/highpri", highHandler)
mux.Handle("/worker/lowpri", lowHandler)

SQS and Message Attributes

SQS Attributes (MD5 digest, message id, etc) are converted into http headers prefixed with 'X-Amzn-'. For example 'ApproximateFirstReceiveTimestamp' is converted to the header 'X-Amzn-ApproximateFirstReceiveTimestamp'.

Additionally, the Content-MD5 header is set as the SQS body MD5

All non-binary message attributes are converted directly into HTTP Headers. Note that the net/http header standardizes header names. For example an attribute named 'log-path' will be standardized to 'Log-Path'. See net/http docuement for standardization details.

Polling and goroutines

This package will long poll SQS with a configurable read batch size. Each SQS message is invoked in its own goroutine.

Heartbeats

If message processing time starts to get close to the SQS visibility timeout, this package will automatically extend the visibility timeout (aka heartbeat) to allow work to continue. This allows SQS queues to have tight timeouts.

Leaky Abstraction

Forcing SQS messages into http.Request provides a lot of benefits but does have some caveats.

You must always provide an http response code or else the processing goroutine runs forever and the SQS message is never acked.

SQS doesn't support 'nacks'. So when processing results in a failure, the SQS message doesn't get redriven until it times out.

Example (Basic)
// For SQS queue name 'message-queue'
queueHandler := func(w http.ResponseWriter, req *http.Request) {
	// Handle as a you would an http request

	// Ack the message by returning a success code
	w.WriteHeader(http.StatusNoContent)
}

// Use any http.Handler compatible router/middleware
mux := http.NewServeMux()
mux.Handle("/message-queue", http.HandlerFunc(queueHandler))

conf := aws.Config{}
qsrv, err := sqserv.New(conf, mux)
if err != nil {
	log.Fatal(err)
}

// non-blocking call to start polling for SQS messages
err = qsrv.ListenAndServe("message-queue")
if err == nil {
	log.Fatal(err)
}

// Block until shutdown
ch := make(chan os.Signal)
// Blocks until a signal is received
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println("Shutting down queue listener.")
qsrv.Shutdown(2 * time.Second)
Output:

Example (Metrics)
metricHandler := func(mtype sqserv.MetricType, val float64, inflight int) {
	switch mtype {
	case sqserv.MetricAck:
		log.Printf("ACK: Inflight %d", inflight)
	case sqserv.MetricNack:
		log.Printf("NACK: Inflight %d", inflight)
	case sqserv.MetricHeartBeat:
		log.Printf("Heartbeat: Inflight %d", inflight)
	case sqserv.MetricPollFailure:
		log.Printf("Poll Failure: Inflight %d", inflight)
	case sqserv.MetricReceive:
		log.Printf("Received %f: Inflight %d", val, inflight)
	case sqserv.MetricShutdown:
		log.Printf("Shutdown: Inflight %d", inflight)
	}
}

// For SQS queue name 'message-queue'
queueHandler := func(w http.ResponseWriter, req *http.Request) {
	// Handle as a you would an http request

	// Ack the message by returning a success code
	w.WriteHeader(http.StatusNoContent)
}

// Use any http.Handler compatible router/middleware
mux := http.NewServeMux()
mux.Handle("/worker", http.HandlerFunc(queueHandler))

conf := aws.Config{}
qsrv, err := sqserv.New(conf, mux)
if err != nil {
	log.Fatal(err)
}

// Listen to queues from two region
qconf := sqserv.QueueConf{
	Name:    "workers",
	Metrics: metricHandler,
}

// non-blocking call to start polling for SQS messages
err = qsrv.ListenAndServeQueues(qconf)
if err == nil {
	log.Fatal(err)
}

// Block until shutdown
ch := make(chan os.Signal)
// Blocks until a signal is received
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println("Shutting down queue listener.")
qsrv.Shutdown(2 * time.Second)
Output:

Example (WithConf)
// For SQS queue name 'message-queue'
queueHandler := func(w http.ResponseWriter, req *http.Request) {
	// Handle as a you would an http request

	// Ack the message by returning a success code
	w.WriteHeader(http.StatusNoContent)
}

// Use any http.Handler compatible router/middleware
mux := http.NewServeMux()
mux.Handle("/worker-virgina", http.HandlerFunc(queueHandler))
mux.Handle("/worker-oregon", http.HandlerFunc(queueHandler))

conf := aws.Config{}
qsrv, err := sqserv.New(conf, mux)
if err != nil {
	log.Fatal(err)
}

// Listen to queues from two region
queues := []sqserv.QueueConf{
	{
		Name:   "workers-virginia",
		Region: "us-east-1",
	},
	{
		Name:   "workers-oregon",
		Region: "us-west-2",
	},
}

// non-blocking call to start polling for SQS messages
err = qsrv.ListenAndServeQueues(queues...)
if err == nil {
	log.Fatal(err)
}

// Block until shutdown
ch := make(chan os.Signal)
// Blocks until a signal is received
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println("Shutting down queue listener.")
qsrv.Shutdown(2 * time.Second)
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MetricCallback

type MetricCallback func(m MetricType, val float64, inflight int)

MetricCallback provides telemetry for library users. 'val' is specific to the MetricType. 'inflight' is the current count of all messages received but not yet processed.

type MetricType

type MetricType int

MetricType lists each action that generates a MetricCallback

const (
	MetricAck         MetricType = iota // Called after message processing success response. val is always 1.
	MetricNack                          // Called after message processing failure response. val is always 1. Note that SQS does not have nacks.
	MetricHeartBeat                     // Called when a heartbeat/extend vibisility timeout was triggered. val is always 1.
	MetricPollFailure                   // Called after failure of an SQS long poll request. val is always 1.
	MetricReceive                       // Called after a successful batch message receive. val is the number of messages received in the batch.
	MetricShutdown                      // Called when the the Shutdown method is invoked. val is always 1.
)

type QueueConf

type QueueConf struct {
	Name      string
	Region    string // Region will override the region in the aws.Config passed to New()
	ReadBatch uint   // Size of read batch. Defaults to maximum allows by SQS.
	Metrics   MetricCallback
}

QueueConf allows for details queue configuration

type SQSServer

type SQSServer struct {
	// ErrorLog specifies an optional logger for message related
	// errors. If nil, logging goes to os.Stderr
	ErrorLog *log.Logger
	Handler  http.Handler
	// contains filtered or unexported fields
}

SQSServer handles SQS messages in a similar fashion to http.Server

func New

func New(conf aws.Config, h http.Handler) (*SQSServer, error)

New creates a new SQSServer. If Handler is nil http.DefaultServeMux is used. Must specify a region in conf that will be the default queue region.

Example
conf := aws.Config{}
qsrv, err := sqserv.New(conf, nil)
if err != nil {
	log.Fatal(err)
}
qsrv.ListenAndServe("myqueue")
Output:

func (*SQSServer) ListenAndServe

func (s *SQSServer) ListenAndServe(queues ...string) error

ListenAndServe begins polling SQS without blocking.

func (*SQSServer) ListenAndServeQueues

func (s *SQSServer) ListenAndServeQueues(queues ...QueueConf) error

ListenAndServeQueues begins polling SQS without blocking.

func (*SQSServer) Shutdown

func (s *SQSServer) Shutdown(maxWait time.Duration)

Shutdown gracefully stops queue listeners and gives running tasks a change to finish. Shutdown will wait up maxWait duration before returning if any tasks are still running

Example
conf := aws.Config{}
qsrv, err := sqserv.New(conf, nil)
if err != nil {
	log.Fatal(err)
}

// non-blocking call to start polling for SQS messages
err = qsrv.ListenAndServe("message-queue")
if err == nil {
	log.Fatal(err)
}

// Block until shutdown
ch := make(chan os.Signal)
// Blocks until a signal is received
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println("Shutting down queue listener.")
qsrv.Shutdown(2 * time.Second)
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL