snssqs

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package snssqs provides a simple AWS SNS+SQS broker implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroker

func NewBroker(ctx context.Context, option ...Option) (pubsub.Broker, error)

NewBroker returns a broker that allows to push to AWS SNS topics and subscribe messages brokered by SQS. This provider does not (yet) support automatic creation of SQS queues or SNS topics, so those will have to exist in your infrastructure before attempting to send/receive.

This broker will start running a background goroutine that will poll the SQS queue for new messages. Topics must be firstly created on AWS SNS before starting this broker, and each topic must be tagged with the "topic-name" key on AWS. This is used to hold the name of the topic as seen by the broker implementation (`pubsub.topic`). This allows to keep the topic name agnostic to their underlying implementation.

IMPORTANT: this broker must be used in conjunction with a Codec middleware in order to ensure that the messages are properly encoded and decoded. Otherwise, only binary messages will be accepted when publishing or delivering messages.

Types

type AWSSNSAPI

type AWSSNSAPI interface {
	Publish(ctx context.Context, params *sns.PublishInput, optFns ...func(*sns.Options)) (*sns.PublishOutput, error)
	Subscribe(ctx context.Context, params *sns.SubscribeInput, optFns ...func(*sns.Options)) (*sns.SubscribeOutput, error)
	Unsubscribe(ctx context.Context, params *sns.UnsubscribeInput, optFns ...func(*sns.Options)) (*sns.UnsubscribeOutput, error)
	ListTopics(ctx context.Context, params *sns.ListTopicsInput, optFns ...func(*sns.Options)) (*sns.ListTopicsOutput, error)
	ListTagsForResource(ctx context.Context, params *sns.ListTagsForResourceInput, optFns ...func(*sns.Options)) (*sns.ListTagsForResourceOutput, error)
}

AWSSNSAPI captures the subset of the AWS SNS API that we need.

type AWSSQSAPI

type AWSSQSAPI interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
}

AWSSQSAPI captures the subset of the AWS SQS API that we need.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option defines a function signature for configuration options.

func WithDeliveryTimeout

func WithDeliveryTimeout(t time.Duration) Option

WithDeliveryTimeout sets the max execution time a handler has to handle a message. Default: 5s

func WithMaxMessages

func WithMaxMessages(n int32) Option

WithMaxMessages sets the maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 5.

func WithSNSClient

func WithSNSClient(snsClient AWSSNSAPI) Option

WithSNSClient sets the SNS client to be used by broker.

func WithSQSClient

func WithSQSClient(sqsClient AWSSQSAPI) Option

WithSQSClient sets the SQS client to be used by broker.

func WithSQSQueueURL

func WithSQSQueueURL(sqsQueueURL string) Option

WithSQSQueueURL sets the SQS queue URL to be used by broker.

func WithTopicsReloadInterval

func WithTopicsReloadInterval(t time.Duration) Option

WithTopicsReloadInterval determines how often in-memory topics cache should be reloaded by connecting to AWS. A lower value means that the broker will be less responsive as it will have to connect to AWS more often. Default: 60s.

func WithVisibilityTimeout

func WithVisibilityTimeout(t int32) Option

WithVisibilityTimeout sets the duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.

func WithWaitTimeSeconds

func WithWaitTimeSeconds(t int32) Option

WithWaitTimeSeconds sets the duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.

type Subscription

type Subscription interface {
	pubsub.Subscription
	ARN() string
}

Subscription represents a subscription to SNS topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL