Documentation
¶
Overview ¶
Package sqserv provides a net/http style interface for handling AWS SQS messages. http.Request objects are generated from SQS messages. Using the net/http interface allows for reuse of http middleware and other related utilities.
Path Matching ¶
The url of the generated http.Request is set to the name of the SQS queue. For example, with a SQS queue named 'worker' the url will be '/worker'. Therefore your http.Handler needs to handle '/worker' requests as follows.
mux.Handle("/worker", workerHandler)
To allow more than one handler per queue you can add the 'Path' Message Attribute to your SQS messages. The Path attribute is appended to the queue name to form the url. For example, by appending Path = 'highpri' or Path = 'lowpri' to your SQS messages allows multiple handlers as follows:
mux.Handle("/worker", genericHandler) mux.Handle("/worker/highpri", highHandler) mux.Handle("/worker/lowpri", lowHandler)
SQS and Message Attributes ¶
SQS Attributes (MD5 digest, message id, etc) are converted into http headers prefixed with 'X-Amzn-'. For example 'ApproximateFirstReceiveTimestamp' is converted to the header 'X-Amzn-ApproximateFirstReceiveTimestamp'.
Additionally, the Content-MD5 header is set as the SQS body MD5 ¶
All non-binary message attributes are converted directly into HTTP Headers. Note that the net/http header standardizes header names. For example an attribute named 'log-path' will be standardized to 'Log-Path'. See net/http docuement for standardization details.
Polling and goroutines ¶
This package will long poll SQS with a configurable read batch size. Each SQS message is invoked in its own goroutine.
Heartbeats ¶
If message processing time starts to get close to the SQS visibility timeout, this package will automatically extend the visibility timeout (aka heartbeat) to allow work to continue. This allows SQS queues to have tight timeouts.
Leaky Abstraction ¶
Forcing SQS messages into http.Request provides a lot of benefits but does have some caveats.
You must always provide an http response code or else the processing goroutine runs forever and the SQS message is never acked.
SQS doesn't support 'nacks'. So when processing results in a failure, the SQS message doesn't get redriven until it times out.
Example (Basic) ¶
// For SQS queue name 'message-queue' queueHandler := func(w http.ResponseWriter, req *http.Request) { // Handle as a you would an http request // Ack the message by returning a success code w.WriteHeader(http.StatusNoContent) } // Use any http.Handler compatible router/middleware mux := http.NewServeMux() mux.Handle("/message-queue", http.HandlerFunc(queueHandler)) conf := aws.Config{} qsrv, err := sqserv.New(conf, mux) if err != nil { log.Fatal(err) } // non-blocking call to start polling for SQS messages err = qsrv.ListenAndServe("message-queue") if err == nil { log.Fatal(err) } // Block until shutdown ch := make(chan os.Signal) // Blocks until a signal is received signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) log.Println("Shutting down queue listener.") qsrv.Shutdown(2 * time.Second)
Output:
Example (Metrics) ¶
metricHandler := func(mtype sqserv.MetricType, val float64, inflight int) { switch mtype { case sqserv.MetricAck: log.Printf("ACK: Inflight %d", inflight) case sqserv.MetricNack: log.Printf("NACK: Inflight %d", inflight) case sqserv.MetricHeartBeat: log.Printf("Heartbeat: Inflight %d", inflight) case sqserv.MetricPollFailure: log.Printf("Poll Failure: Inflight %d", inflight) case sqserv.MetricReceive: log.Printf("Received %f: Inflight %d", val, inflight) case sqserv.MetricShutdown: log.Printf("Shutdown: Inflight %d", inflight) } } // For SQS queue name 'message-queue' queueHandler := func(w http.ResponseWriter, req *http.Request) { // Handle as a you would an http request // Ack the message by returning a success code w.WriteHeader(http.StatusNoContent) } // Use any http.Handler compatible router/middleware mux := http.NewServeMux() mux.Handle("/worker", http.HandlerFunc(queueHandler)) conf := aws.Config{} qsrv, err := sqserv.New(conf, mux) if err != nil { log.Fatal(err) } // Listen to queues from two region qconf := sqserv.QueueConf{ Name: "workers", Metrics: metricHandler, } // non-blocking call to start polling for SQS messages err = qsrv.ListenAndServeQueues(qconf) if err == nil { log.Fatal(err) } // Block until shutdown ch := make(chan os.Signal) // Blocks until a signal is received signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) log.Println("Shutting down queue listener.") qsrv.Shutdown(2 * time.Second)
Output:
Example (WithConf) ¶
// For SQS queue name 'message-queue' queueHandler := func(w http.ResponseWriter, req *http.Request) { // Handle as a you would an http request // Ack the message by returning a success code w.WriteHeader(http.StatusNoContent) } // Use any http.Handler compatible router/middleware mux := http.NewServeMux() mux.Handle("/worker-virgina", http.HandlerFunc(queueHandler)) mux.Handle("/worker-oregon", http.HandlerFunc(queueHandler)) conf := aws.Config{} qsrv, err := sqserv.New(conf, mux) if err != nil { log.Fatal(err) } // Listen to queues from two region queues := []sqserv.QueueConf{ { Name: "workers-virginia", Region: "us-east-1", }, { Name: "workers-oregon", Region: "us-west-2", }, } // non-blocking call to start polling for SQS messages err = qsrv.ListenAndServeQueues(queues...) if err == nil { log.Fatal(err) } // Block until shutdown ch := make(chan os.Signal) // Blocks until a signal is received signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) log.Println("Shutting down queue listener.") qsrv.Shutdown(2 * time.Second)
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MetricCallback ¶
type MetricCallback func(m MetricType, val float64, inflight int)
MetricCallback provides telemetry for library users. 'val' is specific to the MetricType. 'inflight' is the current count of all messages received but not yet processed.
type MetricType ¶
type MetricType int
MetricType lists each action that generates a MetricCallback
const ( MetricAck MetricType = iota // Called after message processing success response. val is always 1. MetricNack // Called after message processing failure response. val is always 1. Note that SQS does not have nacks. MetricHeartBeat // Called when a heartbeat/extend vibisility timeout was triggered. val is always 1. MetricPollFailure // Called after failure of an SQS long poll request. val is always 1. MetricReceive // Called after a successful batch message receive. val is the number of messages received in the batch. MetricShutdown // Called when the the Shutdown method is invoked. val is always 1. )
type QueueConf ¶
type QueueConf struct { Name string Region string // Region will override the region in the aws.Config passed to New() ReadBatch uint // Size of read batch. Defaults to maximum allows by SQS. Metrics MetricCallback }
QueueConf allows for details queue configuration
type SQSServer ¶
type SQSServer struct { // ErrorLog specifies an optional logger for message related // errors. If nil, logging goes to os.Stderr ErrorLog *log.Logger Handler http.Handler // contains filtered or unexported fields }
SQSServer handles SQS messages in a similar fashion to http.Server
func New ¶
New creates a new SQSServer. If Handler is nil http.DefaultServeMux is used. Must specify a region in conf that will be the default queue region.
Example ¶
conf := aws.Config{} qsrv, err := sqserv.New(conf, nil) if err != nil { log.Fatal(err) } qsrv.ListenAndServe("myqueue")
Output:
func (*SQSServer) ListenAndServe ¶
ListenAndServe begins polling SQS without blocking.
func (*SQSServer) ListenAndServeQueues ¶
ListenAndServeQueues begins polling SQS without blocking.
func (*SQSServer) Shutdown ¶
Shutdown gracefully stops queue listeners and gives running tasks a change to finish. Shutdown will wait up maxWait duration before returning if any tasks are still running
Example ¶
conf := aws.Config{} qsrv, err := sqserv.New(conf, nil) if err != nil { log.Fatal(err) } // non-blocking call to start polling for SQS messages err = qsrv.ListenAndServe("message-queue") if err == nil { log.Fatal(err) } // Block until shutdown ch := make(chan os.Signal) // Blocks until a signal is received signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) log.Println("Shutting down queue listener.") qsrv.Shutdown(2 * time.Second)
Output: