Documentation ¶
Overview ¶
Package sqsworker implements a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a result topic.
Overview ¶
The inenteded use of this package is for multiple consumers reading from the same queue. Consumers are represented by structs that implement the Processor interface, which are managed by the Worker type. This package *only* does long-polling based sqs receives.
To use this package, you must implement the following interface:
type Processor interface { Process(context.Context, *sqs.Message, *sns.PublishInput) error }
For example:
import ( "context" "github.com/ajbeach2/sqsworker" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" "strings" ) type LowerCaseWorker struct { } func (l *LowerCaseWorker) Process(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error { *w.Message = strings.ToLower(*m.Body) return nil } func ExampleWorker() { lowerCaseWorker := &LowerCaseWorker{} sess := session.New(&aws.Config{Region: aws.String("us-east-1")}) queueURL, _ := sqsworker.GetOrCreateQueue("In", sqs.New(sess)) topicArn, _ := sqsworker.GetOrCreateTopic("Out", sns.New(sess)) w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{ QueueURL: queueURL, TopicArn: topicArn, Workers: 1, Processor: lowerCaseWorker, Name: "TestApp", }) w.Run() }
A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound topic, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU(). There are helper functions provided for getting or creating topcis and queues. The worker will send messages to the TopicArn on successful runs.
Concurrency ¶
The Process function defined by the Processor interface will be called concurrently by multiple workers depending on the configuration. It is best to ensure that Process functions can be executed concurrently.
Index ¶
Examples ¶
Constants ¶
const DefaultMaxNumberOfMessages = 10
DefaultMaxNumberOfMessages amount of messages received by each SQS request
const DefaultVisibilityTimeout = 60
DefaultVisibilityTimeout SQS visibility Timeout
const DefaultWaitTimeSeconds = 20
DefaultWaitTimeSeconds Long-polling interval for SQS
const DefaultWorkers = 1
DefaultWorkers Number of worker goroutines to spawn, each runs the handler function
Variables ¶
This section is empty.
Functions ¶
func CreateQueue ¶ added in v1.0.3
CreateQueue Create queue by name.
func GetOrCreateQueue ¶ added in v1.0.3
GetOrCreateQueue an SQS Queue by name.
Types ¶
type Worker ¶
type Worker struct { QueueURL string TopicArn string Queue sqsiface.SQSAPI Topic snsiface.SNSAPI Session *session.Session Consumers int Logger *zap.Logger Processor Processor Callback Callback Name string // contains filtered or unexported fields }
Worker encapsulates the SQS consumer
Example ¶
package main import ( "context" "github.com/ajbeach2/sqsworker" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" "strings" ) type LowerCaseWorker struct { } func (l *LowerCaseWorker) Process(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error { *w.Message = strings.ToLower(*m.Body) return nil } func main() { lowerCaseWorker := &LowerCaseWorker{} sess := session.New(&aws.Config{Region: aws.String("us-east-1")}) queueURL, _ := sqsworker.GetOrCreateQueue("In", sqs.New(sess)) topicArn, _ := sqsworker.GetOrCreateTopic("Out", sns.New(sess)) w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{ QueueURL: queueURL, TopicArn: topicArn, Workers: 1, Processor: lowerCaseWorker, Name: "TestApp", }) w.Run() }
Output:
func NewWorker ¶
func NewWorker(sess *session.Session, wc WorkerConfig) *Worker
NewWorker constructor for SQS Worker
type WorkerConfig ¶
type WorkerConfig struct { QueueURL string TopicArn string // If the number of workers is 0, the number of workers defaults to runtime.NumCPU() Workers int Processor Processor Callback Callback Name string Logger *zap.Logger }
WorkerConfig settings for Worker to be passed in NewWorker Contstuctor