sqsworker

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2020 License: MIT Imports: 13 Imported by: 0

README

sqsworker

Package sqsworker implements a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a result topic.

⚠️WARNING⚠️: This repo is under active development, and there may be rapid and incompatible changes.

CircleCI GoDoc Maintainability Test Coverage Go Report Card Release

Overview

The inenteded use of this package is for multiple consumers reading from the same queue. Consumers are represented by structs that implement the Processor interface, which are managed by the Worker type. This package only does long-polling based sqs receives.

To use this package, you must implement the following interface:

type Processor interface {
	Process(context.Context, *sqs.Message, *sns.PublishInput) error
}

For example:

import (
	"context"
	"github.com/ajbeach2/sqsworker"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sqs"
	"strings"
)

type LowerCaseWorker struct {
}

func (l *LowerCaseWorker) Process(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
	*w.Message = strings.ToLower(*m.Body)
	return nil
}

func ExampleWorker() {

	lowerCaseWorker := &LowerCaseWorker{}
	sess := session.New(&aws.Config{Region: aws.String("us-east-1")})

	queueURL, _ := sqsworker.GetOrCreateQueue("In", sqs.New(sess))
	topicArn, _ := sqsworker.GetOrCreateTopic("Out", sns.New(sess))

	w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
		QueueURL:  queueURL,
		TopicArn:  topicArn,
		Workers:   1,
		Processor: lowerCaseWorker,
		Name:      "TestApp",
	})

	w.Run()
}

A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound topic, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU(). There are helper functions provided for getting or creating topcis and queues. The worker will send messages to the TopicArn on successful runs.

Concurrency

The Process function defined by the Processor interface will be called concurrently by multiple workers depending on the configuration. It is best to ensure that Process functions can be executed concurrently.

Performance

Real world performace will be dictated by latency to sqs. The benchmarks mock sqs and sns calls to illustrate that the package adds very little overhead to consuming messages, and to ensure that memory is managed to not create more garbage collection than needed.

From the SQS documentation in AWS: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-throughput-horizontal-scaling-and-batching.html

Because you access Amazon SQS through an HTTP request-response protocol, the request latency (the interval between initiating a request and receiving a response) limits the throughput that you can achieve from a single thread using a single connection. For example, if the latency from an Amazon EC2-based client to Amazon SQS in the same region averages 20 ms, the maximum throughput from a single thread over a single connection averages 50 TPS.

->cat /proc/cpuinfo | grep "model name" | head -1
model name	: Intel(R) Core(TM) i5-6600K CPU @ 3.50GHz

->go test -bench .
goos: linux
goarch: amd64
pkg: github.com/ajbeach2/sqsworker
BenchmarkWorkerSQSSNS-4   	 1476417	       862 ns/op	      64 B/op	       1 allocs/op
BenchmarkWorkerSQS-4      	 2012392	       561 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/ajbeach2/sqsworker	3.835s

Documentation

Overview

Package sqsworker implements a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a result topic.

Overview

The inenteded use of this package is for multiple consumers reading from the same queue. Consumers are represented by structs that implement the Processor interface, which are managed by the Worker type. This package *only* does long-polling based sqs receives.

To use this package, you must implement the following interface:

type Processor interface {
	Process(context.Context, *sqs.Message, *sns.PublishInput) error
}

For example:

import (
	"context"
	"github.com/ajbeach2/sqsworker"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sqs"
	"strings"
)
type LowerCaseWorker struct {
}

func (l *LowerCaseWorker) Process(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
	*w.Message = strings.ToLower(*m.Body)
	return nil
}
func ExampleWorker() {
	lowerCaseWorker := &LowerCaseWorker{}
	sess := session.New(&aws.Config{Region: aws.String("us-east-1")})
	queueURL, _ := sqsworker.GetOrCreateQueue("In", sqs.New(sess))
	topicArn, _ := sqsworker.GetOrCreateTopic("Out", sns.New(sess))

	w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
		QueueURL:  queueURL,
		TopicArn:  topicArn,
		Workers:   1,
		Processor: lowerCaseWorker,
		Name:      "TestApp",
	})
	w.Run()
}

A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound topic, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU(). There are helper functions provided for getting or creating topcis and queues. The worker will send messages to the TopicArn on successful runs.

Concurrency

The Process function defined by the Processor interface will be called concurrently by multiple workers depending on the configuration. It is best to ensure that Process functions can be executed concurrently.

Index

Examples

Constants

View Source
const DefaultMaxNumberOfMessages = 10

DefaultMaxNumberOfMessages amount of messages received by each SQS request

View Source
const DefaultVisibilityTimeout = 60

DefaultVisibilityTimeout SQS visibility Timeout

View Source
const DefaultWaitTimeSeconds = 20

DefaultWaitTimeSeconds Long-polling interval for SQS

View Source
const DefaultWorkers = 1

DefaultWorkers Number of worker goroutines to spawn, each runs the handler function

Variables

This section is empty.

Functions

func CreateQueue added in v1.0.3

func CreateQueue(name string, sqsc sqsiface.SQSAPI) (string, error)

CreateQueue Create queue by name.

func GetOrCreateQueue added in v1.0.3

func GetOrCreateQueue(name string, sqsc sqsiface.SQSAPI) (string, error)

GetOrCreateQueue an SQS Queue by name.

func GetOrCreateTopic added in v1.0.3

func GetOrCreateTopic(name string, snsc snsiface.SNSAPI) (string, error)

GetOrCreateTopic Create SNS topic by name.

Types

type Callback

type Callback func(*string, error)

Callback which is passed result from handler on success

type Processor added in v1.1.0

type Processor interface {
	Process(context.Context, *sqs.Message, *sns.PublishInput) error
}

Handler interface for SQS consumers

type Worker

type Worker struct {
	QueueURL  string
	TopicArn  string
	Queue     sqsiface.SQSAPI
	Topic     snsiface.SNSAPI
	Session   *session.Session
	Consumers int
	Logger    *zap.Logger
	Processor Processor
	Callback  Callback
	Name      string
	// contains filtered or unexported fields
}

Worker encapsulates the SQS consumer

Example
package main

import (
	"context"
	"github.com/ajbeach2/sqsworker"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sqs"
	"strings"
)

type LowerCaseWorker struct {
}

func (l *LowerCaseWorker) Process(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
	*w.Message = strings.ToLower(*m.Body)
	return nil
}

func main() {

	lowerCaseWorker := &LowerCaseWorker{}
	sess := session.New(&aws.Config{Region: aws.String("us-east-1")})

	queueURL, _ := sqsworker.GetOrCreateQueue("In", sqs.New(sess))
	topicArn, _ := sqsworker.GetOrCreateTopic("Out", sns.New(sess))

	w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
		QueueURL:  queueURL,
		TopicArn:  topicArn,
		Workers:   1,
		Processor: lowerCaseWorker,
		Name:      "TestApp",
	})

	w.Run()
}
Output:

func NewWorker

func NewWorker(sess *session.Session, wc WorkerConfig) *Worker

NewWorker constructor for SQS Worker

func (*Worker) Close

func (w *Worker) Close()

Close function will send a signal to all workers to exit

func (*Worker) Run

func (w *Worker) Run()

Run does the main consumer/producer loop

type WorkerConfig

type WorkerConfig struct {
	QueueURL string
	TopicArn string
	// If the number of workers is 0, the number of workers defaults to runtime.NumCPU()
	Workers   int
	Processor Processor
	Callback  Callback
	Name      string
	Logger    *zap.Logger
}

WorkerConfig settings for Worker to be passed in NewWorker Contstuctor

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL