Documentation ¶
Overview ¶
Package listener provides a mechanism for listening to events published to an Amazon Simple Notification Service topic.
The listener package supports regular topics and first-in first-out (FIFO) topics as well as deriving the topic ARN from a Systems Manager (SSM) parameter. It works by creating an Amazon Simple Queue Service (SQS) queue, authorizing the SNS topic to publish messages to that queue, registering the queue as a subscriber to the topic and then receiving messages from the queue on a loop.
The package does not instantiate any AWS clients itself and doesn't have any opinions on where the SQS queue is created relative to the SNS topic.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ListenToTopic ¶
func ListenToTopic(ctx context.Context, sqsClient SQSAPI, snsClient SNSAPI, ssmClient SSMAPI, consumer Consumer, cfg ListenerConfiguration, errCh chan error)
ListenToTopic is the entrypoint to the package. In the course of normal operations, ListenToTopic will create a new SQS queue, subscribe it to the SNS topic and receive messages from the queue until the provided context has been cancelled. When the context has been cancelled it will destroy the subscription and queue. ListenToTopic will always make a best effort to clean up any infrastructure that it's created and so can emit one or more errors on the error channel as it tries to shutdown. The error channel will be closed when the function is finished.
Types ¶
type Consumer ¶
type Consumer interface { // OnMessage is called when a message is successfully processed from the SQS queue. If no messages are processed then OnMessage won't // be called. OnMessage(ctx context.Context, msg MessageContent) // OnError is called when there is an error attempting to receive messages from the SQS queue or processing a given message from the // queue. OnError(ctx context.Context, err error) }
A Consumer is used by ListenToTopic to process messages and errors during the course of oeprations. Both of its methods are provided with the existing context used by the package so that any implementing type is able to be propagate traces or be made aware of context cancellations.
type ListenerConfiguration ¶
type ListenerConfiguration struct { // Polling Interval is the time between attempts to receive messages from the SQS queue. Defaults to 1 second. PollingInterval time.Duration // QueueName is the name of the SQS queue to create. If the SNS topic is FIFO then the listener package will automatically // add the ".fifo" suffix. Defaults to a v4 UUID prefixed with "sns-listener-". QueueName string // ParameterPath is the location of the SSM parameter that contains the topic ARN. If this is unset then the listener package // will use the value of TopicArn. ParameterPath string // TopicArn is the full ARN of the topic to subscribe to. If this is unset and ParameterPath is unset then startup will fail. TopicArn string // Verbose controls whether logs will be output to stderr or silenced. Defaults to false. Verbose bool }
A ListenerConfiguration is used to set up the listener package when ListenToTopic is called.
type MessageContent ¶
A MessageContent maps the message body and message ID of a SQS message to a much more straightforward struct. For the purpose of listening to an SNS topic, the Body contains the full message that was published
type SNSAPI ¶
type SNSAPI interface { Subscribe(ctx context.Context, params *sns.SubscribeInput, optFns ...func(*sns.Options)) (*sns.SubscribeOutput, error) Unsubscribe(ctx context.Context, params *sns.UnsubscribeInput, optFns ...func(*sns.Options)) (*sns.UnsubscribeOutput, error) }
SNSAPI is a shim over v2 of the AWS SDK's sns client. The sns client provided by github.com/aws/aws-sdk-go-v2/service/sns automatically satisfies this.
type SQSAPI ¶
type SQSAPI interface { CreateQueue(ctx context.Context, params *sqs.CreateQueueInput, optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error) DeleteQueue(ctx context.Context, params *sqs.DeleteQueueInput, optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error) GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) }
SQSAPI is a shim over v2 of the AWS SDK's sqs client. The sqs client provided by github.com/aws/aws-sdk-go-v2/service/sqs automatically satisfies this.
type SSMAPI ¶
type SSMAPI interface { GetParameter(ctx context.Context, params *ssm.GetParameterInput, optFns ...func(*ssm.Options)) (*ssm.GetParameterOutput, error) }
SSMAPI is a shim over v2 of the AWS SDK's ssm client. The ssm client provided by github.com/aws/aws-sdk-go-v2/service/ssm automatically satisfies this.