subscriber

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package subscriber provides the functionalities to consume messages from an AWS SQS queue. For more information about to AWS SQS go to https://aws.amazon.com/sqs/

AWS SQS Subscriber

Subscriber is a high throughput golang AWS SQS client that can create multiple consumers that concurrently receive messages from AWS SQS and push them into a single channel for consumption.

Worker

Worker is the service implementation of a Subscriber.

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkerClosed = errors.New("worker closed")

ErrWorkerClosed is returned by the Worker 'Start' method after a call to 'Stop'.

Functions

This section is empty.

Types

type Config

type Config struct {

	// AWS session
	AWSSession *session.Session

	// SQS queue from which the subscriber is going to consume from
	SqsQueueURL string

	// number of messages the subscriber will attempt to fetch on each receive.
	MaxMessagesPerBatch *int64

	// the duration (in seconds) for which the call waits for a message to arrive
	// in the queue before returning. If a message is available, the call returns
	// sooner than TimeSeconds. If no messages are available and the wait time
	// expires, the call returns successfully with an empty list of messages.
	TimeoutSeconds *int64

	// The duration (in seconds) that the received messages are hidden from subsequent
	// retrieve requests after being retrieved by a ReceiveMessage request.
	// VisibilityTimeout should be < time needed to process a message
	VisibilityTimeout *int64

	// number of consumers per subscriber
	NumConsumers int

	// subscriber logger
	Logger Logger
}

Config holds the info required to work with Amazon SQS

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger interface allows to use other loggers than standard log.Logger

type SQSMessage

type SQSMessage struct {
	// contains filtered or unexported fields
}

SQSMessage is the implementation of a SQS message

func (*SQSMessage) ChangeMessageVisibility

func (m *SQSMessage) ChangeMessageVisibility(newVisibilityTimeout *int64) error

ChangeMessageVisibility modifies current message visibility timeout to the one specified in the parameters. This is normally useful when the message processing is taking more time than the default visibility timeout

func (*SQSMessage) Done

func (m *SQSMessage) Done() error

Done deletes the message from SQS.

func (*SQSMessage) Message

func (m *SQSMessage) Message() []byte

Message returns the body of the SQS message in bytes

func (*SQSMessage) MessageAttributes

func (m *SQSMessage) MessageAttributes() map[string]*sqs.MessageAttributeValue

MessageAttributes returns the message attributes

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber is an SQS client that allows a user to consume messages from AWS SQS. Once Stop has been called on subscriber, it might not be reused; future calls to methods such as Consume or Stop will return an error.

func New

func New(cfg Config) *Subscriber

New creates a new AWS SQS subscriber

func (*Subscriber) Consume

func (s *Subscriber) Consume() (<-chan *SQSMessage, <-chan error, error)

Consume starts consuming messages from the SQS queue. Returns a channel of SubscriberMessage to consume them and a channel of errors

func (*Subscriber) Stop

func (s *Subscriber) Stop() error

Stop stop gracefully the Subscriber. Blocks until all consumers from the subscriber are gracefully stopped

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a SQS worker service

func NewWorker

func NewWorker(conf WorkerConfig) *Worker

NewWorker creates a new Worker based on the given configuration that process messages from AWS SQS

func (*Worker) Config

func (w *Worker) Config() *WorkerConfig

Config returns current configuration

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start triggers the process to start consuming messages from the SQS subscriber. Blocks until `lastErr` is set or `Stop()` is called

func (*Worker) Stop

func (w *Worker) Stop() error

Stop gracefully stops the subscriber.

type WorkerConfig

type WorkerConfig struct {

	// SQS subscriber
	Subscriber *Subscriber

	// SQS Message Handler
	MessageHandler func(context.Context, *Worker, *SQSMessage)

	// SQS Error Handler
	ErrorHandler func(context.Context, *Worker, error)
}

WorkerConfig is the worker startup config

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL