aws

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2022 License: MIT Imports: 20 Imported by: 0

README

AWS SNS+SQS

ci

Implementation for publishing messages with SNS and SQS using v1 of the Go SDK for AWS.

Publisher

There are three available publishers for AWS

  • SNS Publisher: publishes to a SNS topic.
  • SQS Publisher: publishes to directly to a SQS queue.
  • Publisher: combines the previous one allowing you to publish both to SNS topics and SQS queues.
snsPublisher := aws.NewPublisher(
    snsClient,
    sqsClient,
    nil,
)

With this setup you'll need to provide either a topic ARN or a queue URL to publish, you can also provide a map between a unique resource IR and either topic ARN or a queue URL while still being able to publish to real AWS resources.

snsPublisher := aws.NewPublisher(
    snsClient,
    sqsClient,
    map[string]string{
        "topic-one": "arn:aws:sns:us-east-2:444455556666:topic-one",
        "topic-two": "arn:aws:sns:us-east-2:444455556666:topic-two",
        "queue-one": "https://sqs.eu-west-3.amazonaws.com/444455556666/queue-one",
        // ... more mappings
    },
)
SNS Publisher

We use SNS to publish messages to a topic ARN.

The publisher accepts a map of topic names (or alias) to its AWS topic ARN counterpart. This allows to decouple the application topic names from AWS, although it also supports publishing directly to an AWS topic ARN.

snsPublisher := aws.NewSNSPublisher(
    snsClient,
    map[string]string{
        "topic-one": "arn:aws:sns:us-east-2:444455556666:topic-one",
        "topic-two": "arn:aws:sns:us-east-2:444455556666:topic-two",
    }
)

// These are equivalent
snsPublisher.Publish(ctx, "topic-one", msg)
snsPublisher.Publish(ctx, "arn:aws:sns:us-east-2:444455556666:topic-one", msg)
SQS Publisher

Publishes directly to a single SQS queue.

The publisher accepts a map of queue names (or alias) to its AWS queue URL ARN counterpart. This allows to decouple the application queue names from AWS, although it also supports publishing directly to an queue URL if a valid URL is given.

sqsPublisher := aws.NewSQSPublisher(
    snsClient,
    map[string]string{
        "queue-one": "https://sqs.eu-west-3.amazonaws.com/444455556666/queue-one",
        "queue-two": "https://sqs.eu-west-3.amazonaws.com/444455556666/queue-two",
    }
)

// These are equivalent
sqsPublisher.Publish(ctx, "queue-one", msg)
sqsPublisher.Publish(ctx, "https://sqs.eu-west-3.amazonaws.com/444455556666/queue-one", msg)

There is a helper constructor NewSQSDirectPublisher that can be used if no mapping is necessary

// These initializations are equivalent
aws.NewSQSDirectPublisher(snsClient)
aws.NewSQSPublisher(snsClient, map[string]string{})
Scheduling

When using a pubsub.Scheduler in combination with the SQS publisher you can leverage the normal message delay that SQS allows, in this way only scheduled messages with a due date than more than 15 minutes will be sent to the scheduler storage, but published instead.

You can use the SQSSchedulerStorage as both EnvelopePubliser and SchedulerStorage

dbStorage := storage.NewPostgres("instanceID", "table", dbConn)
sqsPub := NewSQSDirectPublisher(sqsTest)

// With this storage, messages with delay < 15 min
// will be directly published to SQS with the proper delay.
// Otherwise, they will be stored in the database. 
s := pubsub.NewSQSSchedulerStorage(sqsPub, dbStorage)

// build the final scheduler/publisher
scheduler := pubsub.NewSchedulerPublisher(pubsub.NewPublisher(sqsPub, marshaller), sqsStor)

// publish message with 5 minutes delay. 
scheduler.Delay(ctx, 5*time.Minutes, queuURL, message)

Queue Subscriber

We use SQS to consume messages from a queue, using its queue URL:

snsSubscriber := aws.NewSQSSubscriber(
    sqsClient,
    "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
)

The subscriber will use long polling for 20s max, and will get a batch of 10 messages on each consumption, although it will feed the messages one by one to the subscription channel.

These parameters can be tweaked when initializing the subscriber

Maximum number of messages

You can use the optional parameter function WithMaxMessages to change the maximum number of message for every receive request. Valid values are from 1 to 10.

Message Visibility Timeout

By default, the queue message visibility will be used. You can use the optional parameter function WithVisibilityTimeout to change it for a single subscriber. Valid values are from 1s to 43200s (12 hours).

Receiving Messages Wait Time

You can tweak the long-poling mechanism or disable it completely using WithWaitTime. Valid values are from 0 to 20s

Acknowledgement wait time.

After receiving a new batch of messages the subscriber will pause until all the messages are acknowledged (either with ack or nack), This ensures that the new batch will have the full visibility window available, It could occur though, that some message is not acknowledged, blocking the subscriber indefinitely. To prevent this situation the subscriber will request a new batch if the "acknowledgement wait time" expires, by default this value is 30s, but ideally this value should be greater than the queue or the subscriber visibility timeout. You can tweak the value using WithAckWaitTime.

ReScheduling messages with delay

The strategy to re-schedule a message some delay will depend on the delay time

  • up to 12 hours: re-scheduling a message will be done by changing the message visibility.
  • less than 12 hours: as long as there is a scheduler storage available using WithStorage the message will be scheduled.
Batch & Async acknowledgement

We can enable batching the acknowledgements, this will reduce the number of requests to SQS, also, speed up the consumption of the next message.

To do you can tweak the AckConfig in the subscriber:

  • Async: if set true it will acknowledge the messages asynchronously.
  • BatchSize: set a value greater than 0 to use batch acknowledgements, enabling batching will enable asynchronous acknowledgements automatically.
  • FlushEvery: use it to force a flush of the pending acknowledgements after certain time since the last batch. This is key for very low frequency topics, given that SNS re-delivers messages that have not been acknowledged after certain amount of time (30s by default).
  • ChangeVisibilityOnNack: by default nothing is done when a message is negatively acknowledged, you can set this flag to true to force setting the message visibility to zero, which will indicate AWS that the message is can be delivered again. Please note that this mode is not compatible with asynchronous acknowledgements, and the subscriber initialization will trigger an error.

In this mode, the errors that may happen acknowledging a message will be delivered to the next call to acknowledge a message but the message will be added to the next batch anyways. Also, when the subscriber stops, it will wait until all the pending acknowledgements are flushed, and the possible errors returned as the result of the Stop method.

SNS+SQS integration

It's worth noting some gotchas while working with SNS+SQS

Message constraints

There are certain constraints imposed to the messages that could lead to publishing errors, the most important one is the message body is sent as a string using an HTTP request, so there is a limitation in the character set that is supported.

It is advised to use a marshaller that encodes the binary payload within the supported set. For example the JSONMarshaller or the ProtoTextMarshaller. Please note that this limitation also applies to the message attributes.

Allow SNS to fan out to SQS queues

After subscribing a SQS queue to a SNS topic, you still need to set the correct queue policy to allow the topic to send messages to the queue

This package provides a helper that will a single topic to publish to a single queue

subscriptionARN, err := aws.Subscribe(ctx, snsClient, topicARN, queueARN)
err := aws.AttachQueueForwardingPolicy(ctx, sqsClient, queueURL, queueARN, topicARN)
Raw Delivery

In general this package provides some helper to bootstrap creating new topics and queues and subscribing them with the necessary options to work with the parent pubsub package.

Documentation

Index

Constants

View Source
const (
	QueueForwardingPolicyAttribute = "Policy"
	QueueRedrivePolicyAttribute    = "RedrivePolicy"
)
View Source
const (
	MaxChangeVisibilityDelay = 12 * time.Hour
)

Variables

View Source
var (
	ErrSubscriberStopped = errors.New("subscriber stopped")
	ErrAcknowledgement   = errors.New("cannot ack message")
	ErrNAcknowledgement  = errors.New("cannot nack message")
	ErrChangeVisibility  = errors.New("cannot change message visibility")
	ErrReSchedule        = errors.New("cannot re-schedule message")
	ErrAlreadyStarted    = errors.New("already started")
	ErrAlreadyStopped    = errors.New("already stopped")
	ErrMissingConfig     = errors.New("missing configuration")
)
View Source
var ErrAsyncNAckNotSupported = errors.New("NAck on async strategy is not supported")
View Source
var ErrQueueNotFound = errors.New("could not find queue URL")
View Source
var ErrTopicNotFound = errors.New("could not find topic ARN")
View Source
var ErrTopicOrQueueNotFound = errors.New("could not find neither topic ARN nor queue URL")

Functions

func AttachQueueForwardingPolicy

func AttachQueueForwardingPolicy(ctx context.Context, svc *sqs.Client, queueURL, queueARN string, topicARNs ...string) error

AttachQueueForwardingPolicy attaches a queue policy that enables a topic to send messages to it.

func CreateQueue

func CreateQueue(ctx context.Context, svc *sqs.Client, queueName string) (string, error)

CreateQueue creates a SQS queue. Returns the QueueURL.

func CreateTopic

func CreateTopic(ctx context.Context, svc *sns.Client, topicName string) (string, error)

CreateTopic creates a SNS topic.

func DeleteQueue

func DeleteQueue(ctx context.Context, svc *sqs.Client, queueURL string) error

DeleteQueue deletes a queue.

func DeleteTopic

func DeleteTopic(ctx context.Context, svc *sns.Client, topicARN string) error

DeleteTopic deletes a topic.

func ForwardingPolicy added in v0.4.0

func ForwardingPolicy(queueARN string, topicARNs ...string) string

ForwardingPolicy generates the forwarding policy for a queue to be able to receive messages from the given topics.

func GetQueueARN

func GetQueueARN(ctx context.Context, svc *sqs.Client, queueURL string) (string, error)

GetQueueARN gets the queue ARN.

func Must

func Must(err error)

Must will panic if wrapped operation has failed.

func MustGetResource

func MustGetResource(s string, err error) string

MustGetResource will panic if the creation of a AWS resource has failed.

func RedrivePolicy added in v0.4.0

func RedrivePolicy(deadLetterQueueARN string, maxReceiveCount int) string

RedrivePolicy return the string to use for the redrive policy attribute of a queue.

func SetQueueAttributes added in v0.4.0

func SetQueueAttributes(ctx context.Context, svc *sqs.Client, queueURL string, attributes map[string]string) error

SetQueueAttributes sets the queue attributes.

func Subscribe

func Subscribe(ctx context.Context, svc *sns.Client, topicARN, queueARN string) (string, error)

Subscribe a queue to a topic with raw delivery enabled

func Unsubscribe

func Unsubscribe(ctx context.Context, svc *sns.Client, subscriptionARN string) error

Unsubscribe removes the subscription of the topic.

func WithAck added in v0.2.1

func WithAck(cfg AckConfig) func(s *Subscriber)

WithAck configures the acknowledgements behaviour

func WithAckWaitTime added in v0.4.1

func WithAckWaitTime(ackWaitTime time.Duration) func(s *Subscriber)

WithAckWaitTime indicates how much time the subscriber should wait for all the messages in the batch to be acknowledged before requesting a new batch. Ideally this time should be greater than the message visibility, either the specific for this subscriber or the queue default.

func WithMaxMessages added in v0.4.0

func WithMaxMessages(maxMessages int) func(s *Subscriber)

WithMaxMessages configures the number of messages to retrieve per request. If max messages <= 0 or > 10 the default will be used (10 messages).

func WithStorage added in v0.6.0

func WithStorage(storage pubsub.SchedulerStorage) func(s *Subscriber)

WithStorage sets an optional storage that can be used to re-schedule the message beyond the maximum message visibility in SQS (15 minutes)

func WithStorageThreshold added in v0.6.0

func WithStorageThreshold(threshold time.Duration) func(s *Subscriber)

WithStorageThreshold sets the threshold above which the storage will be used when changing the message visibility.

func WithVisibilityTimeout added in v0.4.0

func WithVisibilityTimeout(visibilityTimeout int) func(s *Subscriber)

WithVisibilityTimeout configures the time that the retrieved messages will be hidden from subsequent retrieve requests. If visibilityTimeout <= 0 the queue's default will be used. If it's greater than the 12 hours maximum, the maximum will be used: 43200s.

func WithWaitTime added in v0.4.0

func WithWaitTime(waitTime int) func(s *Subscriber)

WithWaitTime configures the time to wait during long poling waiting for new messages in the queue until the request is cancelled.

Types

type AckConfig

type AckConfig struct {
	// Async will ack on the message asynchronously returning
	// immediately with success.
	//
	// Errors will be reported in the next consuming cycle.
	//
	// When the subscriber closes, it will wait until all
	// acknowledge operations finish, reporting any errors.
	Async bool

	// BatchSize will indicate to buffer acknowledgements
	// until a certain amount of messages are pending.
	//
	// Batching acknowledgements creates
	//
	// Calling Ack on the message will return success, and
	// the errors will be reported when consuming new messages
	//
	// When the subscriber closes, it will wait until all
	// acknowledge operation finish.
	BatchSize int

	// ChangeVisibilityOnNack when true, the message visibility
	// will be reset to zero so the message is redelivered again
	// immediately. It doesn't support batching.
	ChangeVisibilityOnNack bool

	// FlushEvery indicates how often the messages should be
	// acknowledged even if the batch is not full yet.
	//
	// This value has no effect if Batch is not true.
	FlushEvery time.Duration
}

AckConfig configures the acknowledgements behaviour.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher SNS+SQS publisher.

func NewPublisher added in v0.6.0

func NewPublisher(sns *sns.Client, sqs *sqs.Client, resources map[string]string) *Publisher

NewPublisher creates a new SNS+SQS publisher.

func (*Publisher) AddResource added in v0.9.1

func (p *Publisher) AddResource(resourceID, resource string)

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, resourceID string, envelopes ...*pubsub.Envelope) error

Publish a message trough SNS.

type SNSPublisher added in v0.4.2

type SNSPublisher struct {
	// contains filtered or unexported fields
}

SNSPublisher SNS publisher.

func NewSNSPublisher added in v0.2.1

func NewSNSPublisher(sns *sns.Client, topicARNs map[string]string) *SNSPublisher

NewSNSPublisher creates a new SNS publisher.

func (*SNSPublisher) Publish added in v0.4.2

func (p *SNSPublisher) Publish(ctx context.Context, topic string, envelopes ...*pubsub.Envelope) error

Publish a message trough SNS.

type SQSPublisher added in v0.4.2

type SQSPublisher struct {
	// contains filtered or unexported fields
}

SQSPublisher a publisher that publishes directly to queues.

func NewSQSDirectPublisher added in v0.4.2

func NewSQSDirectPublisher(sqs *sqs.Client) *SQSPublisher

NewSQSDirectPublisher creates a new SQS publisher without any queue alias.

func NewSQSPublisher added in v0.4.2

func NewSQSPublisher(sqs *sqs.Client, queueURLs map[string]string) *SQSPublisher

NewSQSPublisher creates a new SQS publisher with a custom map for queue URLs.

func (*SQSPublisher) Publish added in v0.4.2

func (p *SQSPublisher) Publish(ctx context.Context, queue string, envelopes ...*pubsub.Envelope) error

Publish a message to a SQS queue.

type SQSSchedulerStorage added in v0.5.0

type SQSSchedulerStorage struct {
	*SQSPublisher
	pubsub.SchedulerStorage
}

SQSSchedulerStorage a publisher that can publish directly to queues.

func NewSQSSchedulerStorage added in v0.5.0

func NewSQSSchedulerStorage(pub *SQSPublisher, storage pubsub.SchedulerStorage) *SQSSchedulerStorage

NewSQSSchedulerStorage creates a new hybrid SQS publisher + scheduler storage

func (*SQSSchedulerStorage) Schedule added in v0.5.0

func (s *SQSSchedulerStorage) Schedule(ctx context.Context, dueDate time.Time, queue string, messages ...*pubsub.Envelope) error

Schedule schedules a message to be published in the future.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber for AWS SQS.

func NewSQSSubscriber added in v0.2.1

func NewSQSSubscriber(sqs sqsSvc, queueURL string, opts ...func(s *Subscriber)) *Subscriber

NewSQSSubscriber creates a new SQS subscriber.

func (*Subscriber) Stop

func (s *Subscriber) Stop(ctx context.Context) (err error)

Stop stops consuming messages.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe() (<-chan pubsub.Next, error)

Subscribe subscribes to a SQS queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL