mq

package module
v0.0.0-...-5922c95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2018 License: BSD-2-Clause Imports: 15 Imported by: 0

README

MQ - A package for consuming SQS message queues

The goal of this project is to provide tooling to utilize SQS effectively in Go.

Features

  • Familiar net/http Handler interface.
  • Retry with expontial backoff via visibility timeouts and dead letter queues.
  • Router Handler for multiplexing messages over a single queue.
  • Server with configurable concurrency and graceful shutdown.
  • Automatic batch fetching and deletion.
  • Publisher for batch sending.
  • Opentracing support

Documentation

https://godoc.org/github.com/remind101/mq-go

QuickStart

func main() {
	queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"

	h := mq.HandlerFunc(func(m *mq.Message) error {
		fmt.Printf("Received message: %s", aws.StringValue(m.SQSMessage.Body))

		// Returning no error signifies the message was processed successfully.
		// The Server will queue the message for deletion.
		return nil
	})

	// Configure mq.Server
	s := mq.NewServer(queueURL, h)

	// Start a loop to receive SQS messages and pass them to the Handler.
	s.Start()
	defer s.Shutdown(context.Background())

	// Start a publisher
	p := mq.NewPublisher(queueURL)
	p.Start()
	defer p.Shutdown(context.Background())

	// Publish messages (will be batched).
	p.Publish(&sqs.SendMessageBatchRequestEntry{
		MessageBody: aws.String("Hello"),
	})
	p.Publish(&sqs.SendMessageBatchRequestEntry{
		MessageBody: aws.String("World!"),
	})
}

Documentation

Overview

Package mq provides primitives for consuming SQS queues.

Example
package main

import (
	"context"
	"fmt"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"

	mq "github.com/remind101/mq-go"
)

func main() {
	queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"

	h := mq.HandlerFunc(func(m *mq.Message) error {
		fmt.Printf("Received message: %s", aws.StringValue(m.SQSMessage.Body))

		// Returning no error signifies the message was processed successfully.
		// The Server will queue the message for deletion.
		return nil
	})

	// Configure mq.Server
	s := mq.NewServer(queueURL, h)

	// Start a loop to receive SQS messages and pass them to the Handler.
	s.Start()
	defer s.Shutdown(context.Background())

	// Start a publisher
	p := mq.NewPublisher(queueURL)
	p.Start()
	defer p.Shutdown(context.Background())

	// Publish messages (will be batched).
	p.Publish(&sqs.SendMessageBatchRequestEntry{
		MessageBody: aws.String("Hello"),
	})
	p.Publish(&sqs.SendMessageBatchRequestEntry{
		MessageBody: aws.String("World!"),
	})
}
Output:

Example (GracefulShutdown)
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	mq "github.com/remind101/mq-go"
)

func main() {
	queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"

	h := mq.HandlerFunc(func(m *mq.Message) error {
		fmt.Printf("Received message: %s", aws.StringValue(m.SQSMessage.Body))

		// Returning no error signifies the message was processed successfully.
		// The Server will queue the message for deletion.
		return nil
	})

	// Configure mq.Server
	s := mq.NewServer(queueURL, h)

	// Handle SIGINT and SIGTERM gracefully.
	go func() {
		sigCh := make(chan os.Signal, 1)
		signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
		<-sigCh

		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		// We received an interrupt signal, shut down gracefully.
		if err := s.Shutdown(ctx); err != nil {
			fmt.Printf("SQS server shutdown: %v\n", err)
		}
	}()

	// Start a loop to receive SQS messages and pass them to the Handler.
	s.Start()
}
Output:

Example (Router)
package main

import (
	"fmt"

	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/aws/aws-sdk-go/service/sqs/sqsiface"

	"github.com/aws/aws-sdk-go/aws"
	mq "github.com/remind101/mq-go"
)

func main() {
	queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"

	r := mq.NewRouter()
	r.Handle("foo-jobs", mq.HandlerFunc(func(m *mq.Message) error {
		fmt.Printf("Received foo message: %s", aws.StringValue(m.SQSMessage.Body))
		return nil
	}))

	r.Handle("bar-jobs", mq.HandlerFunc(func(m *mq.Message) error {
		fmt.Printf("Received bar message: %s", aws.StringValue(m.SQSMessage.Body))
		return nil
	}))

	// Configure mq.Server
	s := mq.NewServer(queueURL, r)

	// Start a loop to receive SQS messages and pass them to the Handler.
	go s.Start()

	// Publish a foo message
	publish(s.Client, queueURL, "foo-jobs", "this will route to foo-jobs handler func")

	// Publish a bar message
	publish(s.Client, queueURL, "bar-jobs", "this will route to bar-jobs handler func")
}

func publish(client sqsiface.SQSAPI, queueURL, route, message string) error {
	input := &sqs.SendMessageInput{
		QueueUrl: aws.String(queueURL),
		MessageAttributes: map[string]*sqs.MessageAttributeValue{
			mq.MessageAttributeNameRoute: &sqs.MessageAttributeValue{
				DataType:    aws.String("String"),
				StringValue: aws.String(route),
			},
		},
		MessageBody: aws.String(message),
	}

	_, err := client.SendMessage(input)
	return err
}
Output:

Index

Examples

Constants

View Source
const (
	// DefaultConcurrency is the default concurrency for the Server.
	DefaultConcurrency = 1

	// DefaultMaxNumberOfMessages defaults to the maximum number of messages the
	// Server can request when receiving messages.
	DefaultMaxNumberOfMessages = 10

	// DefaultWaitTimeSeconds is the default WaitTimeSeconds used when receiving
	// messages.
	DefaultWaitTimeSeconds = 1

	// DefaultVisibilityTimeout is the default VisibilityTimeout used when
	// receiving messages in seconds.
	DefaultVisibilityTimeout = 30

	// DefaultDeletionInterval is the default interval at which messages pending
	// deletion are batch deleted (if number of pending has not reached
	// BatchDeleteMaxMessages).
	DefaultDeletionInterval = 10 * time.Second

	// DefaultBatchDeleteMaxMessages defaults to the the maximum allowed number
	// of messages in a batch delete request.
	DefaultBatchDeleteMaxMessages = 10
)
View Source
const DefaultPublishInterval = 1 * time.Second

DefaultPublishInterval is the default interval the Publisher will send messages if the batch is not full.

View Source
const MaxVisibilityTimeout = 43200 // 12 hours

MaxVisibilityTimeout is the maximum allowed VisibilityTimeout by SQS.

View Source
const MessageAttributeNamePartitionKey = "partition_key"

MessageAttributeNamePartitionKey is the messages attribute used to determine the partition to process the message in.

View Source
const MessageAttributeNameRoute = "route"

MessageAttributeNameRoute is a MessageAttribute name used as a routing key by the Router.

Variables

View Source
var DefaultRetryPolicy = &defaultRetryPolicy{}

DefaultRetryPolicy increases the Message VisibilityTimeout exponentially based on the received count up to MaxVisibilityTimeout and MaxReceives

View Source
var PublisherDefaults = func(p *Publisher) {
	p.Client = sqs.New(session.New())
	p.PublishInterval = DefaultPublishInterval
	p.BatchMaxMessages = DefaultMaxNumberOfMessages
	p.OutputHandler = func(out *sqs.SendMessageBatchOutput, err error) {
		if err != nil {
			fmt.Println(err.Error())
		}
		if len(out.Failed) > 0 {
			for _, entry := range out.Failed {
				fmt.Printf("Failed message send: %+v\n", entry)
			}
		}
	}
	p.Logger = &discardLogger{}
}

PublisherDefaults contains the default configuration for a new Publisher.

View Source
var WithPartitionedProcessor = func(s *Server) {
	s.Processor = &PartitionedProcessor{s}
}

WithPartitionedProcessor configures a Server with a partitioned Processor.

Functions

func ChangeVisibilityWithRetryPolicy

func ChangeVisibilityWithRetryPolicy(m *Message) error

ChangeVisibilityWithRetryPolicy will change the visibility of a message based on the error of message retry policy.

If the delay is equal to the zero, this is a no op.

func ServerDefaults

func ServerDefaults(s *Server)

ServerDefaults is used by NewServer to initialize a Server with defaults.

func WithClient

func WithClient(c sqsiface.SQSAPI) func(s *Server)

WithClient configures a Server with a custom sqs Client.

func WithConcurrency

func WithConcurrency(c int) func(s *Server)

WithConcurrency configures a Server with c Concurrency.

Types

type BoundedProcessor

type BoundedProcessor struct {
	Server *Server
}

BoundedProcessor is the default message processor. It creates Server.Concurrency goroutines that all consume from the messages channel.

func (*BoundedProcessor) Process

func (p *BoundedProcessor) Process(messagesCh <-chan *Message, deletionsCh chan<- *Message, errorsCh chan<- error, done chan struct{})

Process satisfies the Processor interface.

type Handler

type Handler interface {
	HandleMessage(*Message) error
}

A Handler processes a Message.

func RootHandler

func RootHandler(h Handler) Handler

RootHandler is a root handler responsible for adding delay in messages that have error'd.

Queues MUST have a dead letter queue or else messages that cannot succeed will never be removed from the queue.

type HandlerFunc

type HandlerFunc func(*Message) error

HandlerFunc is an adaptor to allow the use of ordinary functions as message Handlers.

func (HandlerFunc) HandleMessage

func (h HandlerFunc) HandleMessage(m *Message) error

HandleMessage satisfies the Handler interface.

type Logger

type Logger interface {
	Println(...interface{})
}

Logger defines a simple interface to support debug logging in the Server.

type Message

type Message struct {
	QueueURL    string
	SQSMessage  *sqs.Message
	RetryPolicy RetryPolicy
	// contains filtered or unexported fields
}

Message wraps an sqs.Message.

func NewMessage

func NewMessage(queueURL string, sqsMessage *sqs.Message, client sqsiface.SQSAPI) *Message

NewMessage returns a fully initialized Message.

func (*Message) ChangeVisibility

func (m *Message) ChangeVisibility(timeout *int64) error

ChangeVisibility changes the VisibilityTimeout to timeout seconds.

func (*Message) Context

func (m *Message) Context() context.Context

Context returns the message context.

func (*Message) Delete

func (m *Message) Delete() error

Delete removes the message from the queue. Use is discouraged however, since the Server will handle message deletion more efficiently.

func (*Message) WithContext

func (m *Message) WithContext(ctx context.Context) *Message

WithContext returns a shallow copy of the message its context changed to ctx.

type PartitionedProcessor

type PartitionedProcessor struct {
	Server *Server
}

PartitionedProcessor is a processor that creates Server.Concurrency goroutines to process messages except each message is partitioned to the same goroutine based on the a consistent hash of the message's partition key. Messages with the same partition key are guaranteed to be processed by the same goroutine.

func (*PartitionedProcessor) Process

func (p *PartitionedProcessor) Process(messagesCh <-chan *Message, deletionsCh chan<- *Message, errorsCh chan<- error, done chan struct{})

Process satisfies the Processor interface.

type Processor

type Processor interface {
	// Process processes messages. A well behaved processor will:
	// * Receive messages from the messagesCh in a loop until that channel is
	//   closed.
	// * Send messages to the deletionsCh if message was successfully processed.
	// * Send errors to the errorsCh.
	// * Close the done channel when finished processing.
	Process(messagesCh <-chan *Message, deletionsCh chan<- *Message, errorsCh chan<- error, done chan struct{})
}

Processor defines an interface for processing messages.

type Publisher

type Publisher struct {
	QueueURL string
	Client   sqsiface.SQSAPI

	PublishInterval  time.Duration
	BatchMaxMessages int

	OutputHandler func(*sqs.SendMessageBatchOutput, error)
	Logger        Logger
	// contains filtered or unexported fields
}

Publisher is a publisher that efficiently sends messages to a single SQS Queue. It maintains a buffer of messages that is sent to SQS when it is full or when the publish interval is reached.

func NewPublisher

func NewPublisher(queueURL string, opts ...PublisherOpt) *Publisher

NewPublisher returns a new Publisher with sensible defaults.

func (*Publisher) Publish

func (p *Publisher) Publish(entry *sqs.SendMessageBatchRequestEntry)

Publish adds entry to the internal messages buffer.

func (*Publisher) Shutdown

func (p *Publisher) Shutdown(ctx context.Context) error

Shutdown shuts the Publisher message batching routine down cleanly.

func (*Publisher) Start

func (p *Publisher) Start()

Start starts the message batching routine.

type PublisherOpt

type PublisherOpt func(*Publisher)

PublisherOpt defines a function that configures a Publisher.

type RetryPolicy

type RetryPolicy interface {
	// Amount to delay the message visibility in seconds from the time the
	// message was first received, based on the number of times it has been
	// received so far.
	Delay(receiveCount int) *int64 // Seconds
}

RetryPolicy defines an interface to determine when to retry a Message.

type Router

type Router struct {
	sync.Mutex

	// Resolver maps a Message to a string identifier used to match to a registered Handler. The
	// default implementation returns a MessageAttribute named "route".
	Resolver func(*Message) (string, bool)
	// contains filtered or unexported fields
}

Router will route a message based on MessageAttributes to other registered Handlers.

func NewRouter

func NewRouter() *Router

NewRouter returns a new Router.

func (*Router) Handle

func (r *Router) Handle(route string, h Handler)

Handle registers a Handler under a route key.

func (*Router) HandleMessage

func (r *Router) HandleMessage(m *Message) error

HandleMessage satisfies the Handler interface.

type Server

type Server struct {
	QueueURL     string
	Client       sqsiface.SQSAPI
	Handler      Handler
	ErrorHandler func(error)
	Concurrency  int

	AttributeNames        []*string
	MessageAttributeNames []*string
	MaxNumberOfMessages   *int64
	WaitTimeSeconds       *int64
	VisibilityTimeout     *int64

	BatchDeleteMaxMessages int
	DeletionInterval       time.Duration

	Logger    Logger
	Processor Processor
	// contains filtered or unexported fields
}

Server is responsible for running the request loop to receive messages from a single SQS Queue. It manages the message processing pipeline.

There are three sections of the processing pipeline:

1. Receiving messages The Server starts a single goroutine to batch request messages from QueueURL. The frequency of this call is controlled with WaitTimeSeconds. Messages are sent to an unbuffered channel. This ensures that the Server does not continue requesting messages is the processing goroutines are unable to keep up.

2. Processing messages The Server starts one or more goroutines for processing messages from the messages channel. Concurrency is controlled by Server.Processor and Server.Concurrency. These goroutines simply pass messages to the Handler. If the Handler returns no error, the message will be sent to the deletions channel.

3. Deleting messages The Server starts a single goroutine to batch delete processed messages. It will delete messages when the batch size is reached or if no deletions have occurred within an interval. This interval must be smaller than the VisibilityTimeout or messages could be received again before deletion occurs.

On shutdown, the receiving loop ends, and the messages channel used by processing loops is closed.

Once processing loops have drained the messages channel and finished processing, they will signal to the deletion goroutine to finish.

When the deletion goroutine receives the signal, the deletion loop will drain the deletions channel and after finishing, close the done channel, signaling that the Server has shutdown gracefully.

func NewServer

func NewServer(queueURL string, h Handler, opts ...func(*Server)) *Server

NewServer creates a new Server.

func (*Server) Shutdown

func (c *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the Server.

func (*Server) Start

func (c *Server) Start()

Start starts the request loop for receiving messages and a configurable number of Handler routines for message processing.

type ServerGroup

type ServerGroup struct {
	Servers []*Server
}

ServerGroup represents a list of Servers.

func (*ServerGroup) Shutdown

func (sg *ServerGroup) Shutdown(ctx context.Context) <-chan error

Shutdown gracefully shuts down all servers.

func (*ServerGroup) Start

func (sg *ServerGroup) Start()

Start starts all servers in the group.

type UnBoundedProcessor

type UnBoundedProcessor struct {
	Server *Server
}

UnBoundedProcessor is a message processor that creates a new goroutine to process each message. It ignores the Server.Concurrency value.

func (*UnBoundedProcessor) Process

func (p *UnBoundedProcessor) Process(messagesCh <-chan *Message, deletionsCh chan<- *Message, errorsCh chan<- error, done chan struct{})

Process satisfies the Processor interface.

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL