listener

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: MIT Imports: 16 Imported by: 0

README

AWS SNS listener package

Go Reference

The listener package can be used for listening to an Amazon SNS topic. It supports both regular topics and FIFO topics. The original motivation for this project was to be able to subscribe to a SNS topic and identify the shape and content of messages being published, making it easier to build other software. A CLI that wraps this package is available in the root of this repository.

Using the package

Importing

listener uses semantic versioning and conventional commits for releases. The latest published version can be fetched with:

go get github.com/whatsfordinner/aws-sns-listener/pkg/listener

A specific version can be fetched with:

go get github.com/whatsfordinner/aws-sns-listener/pkg/listener@v0.5.0
Tracing

The package uses the OpenTelemetry library for distributed tracing and if an exporter is defined in the calling service it will emit spans using that exporter.

Setup

The package uses three AWS APIs for operation:

  • Simple Notification Service for subscribing and unsubscribing to the topic
  • Simple Queue Service for creating a queue, receiving messages from it and then destroying it
  • Systems Manager for (optionally) fetching the SNS Topic ARN from a Parameter Store parameter

The package defines three interfaces for this purpose: SNSAPI, SQSAPI and SSMAPI which are all shims over the the clients provided by v2 of the AWS SDK. The easiest way to use this package would be to create those 3 clients and provide them to listener.ListenToTopic.

A configuration object listener.ListenerConfiguration is passed in that controls the behaviour of the package when starting up. It is used by the package to identify the topic to subscribe to and some other configuration details.

The simplest possible implementation would be:

listenerCfg := listener.ListenerConfiguration {
    TopicArn: "arn:aws:sns:us-east-1:123456789012:my-sns-topic"
}

A more involved implementation that uses a parameter to derive the topic ARN, overrides the queue name, sets a custom polling interval and asks the package to log to stderr would be:

listenerCfg := listener.ListenerConfiguration {
    ParameterPath:   "/path/to/topic-arn"
    PollingInterval: 30 * time.Second 
    QueueName:       "my-listener-queue"
    Verbose:         true
}

The final component is the listener.Consumer interface which is used by the package to notify the calling service of messages. It requires two methods OnMessage(context.Context, listener.MessageContent) and OnError(context.Context, error). Context is supplied for use with tracing and to notify the consumer in the event of cancellation. An example implementation would be:

type consumer struct{}

func (c consumer) OnMessage(ctx context.Context, msg listener.MessageContent) {
    fmt.Printf("Message content:\n\tID: %s\n\tBody: %s\n", msg.Id, msg.Body)
}

func (c consumer) OnError(ctx context.Context, err error) {
    fmt.Print(err.Error())
}
Runtime

The package can start being used once the AWS clients, configuration object and consumer have been set up. The entrypoint is listener.ListenToTopic. Once running, the function will block until the context provided to it is cancelled, after which it will teardown any infrastructure it has created and return any errors.

ctx, cancel := context.WithCancel(context.Background())

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt) // stop on Ctrl+C

errCh := make(chan error)

go func() {
    listener.ListenToTopic(
        ctx,
        sqsClient,
        snsClient,
        ssmClient,
        consumer{}, // our consumer from earlier
        listenerCfg, // one of the configuration object from before
        errCh,
    )
}()

select {
case err := <- errCh: // catch any errors on startup
    for errs := range errCh {
        err = errors.Join(err, errs) // process additional errors from the teardown process
    }
    panic(err)
case <- sigCh: // catch SIGINT
    cancel() // notify the goroutine it's time to shutdown
    err := <- errCh // wait for the result of teardown
    for errs := range errCh {
        err = errors.Join(err, errs) // process additional errors from the teardown process
    }

    if err != nil {
        panic(err)
    }
}
Teardown

When context has been cancelled or in the event of an error, the package will make a best effort attempt to destroy the SNS subcription and SQS queue to ensure it's left things as it's found them. This is not guaranteed to succeed (credential expiry, network failures, might have been authorised to create a resource but not delete it, etc.) and so sometimes there will be resources that are leftover. In that case, the package will return an error containing the errors that occurred while trying to shutdown which can be used to identify and delete those resources manually.

Documentation

Overview

Package listener provides a mechanism for listening to events published to an Amazon Simple Notification Service topic.

The listener package supports regular topics and first-in first-out (FIFO) topics as well as deriving the topic ARN from a Systems Manager (SSM) parameter. It works by creating an Amazon Simple Queue Service (SQS) queue, authorizing the SNS topic to publish messages to that queue, registering the queue as a subscriber to the topic and then receiving messages from the queue on a loop.

The package does not instantiate any AWS clients itself and doesn't have any opinions on where the SQS queue is created relative to the SNS topic.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ListenToTopic

func ListenToTopic(ctx context.Context, sqsClient SQSAPI, snsClient SNSAPI, ssmClient SSMAPI, consumer Consumer, cfg ListenerConfiguration, errCh chan error)

ListenToTopic is the entrypoint to the package. In the course of normal operations, ListenToTopic will create a new SQS queue, subscribe it to the SNS topic and receive messages from the queue until the provided context has been cancelled. When the context has been cancelled it will destroy the subscription and queue. ListenToTopic will always make a best effort to clean up any infrastructure that it's created and so can emit one or more errors on the error channel as it tries to shutdown. The error channel will be closed when the function is finished.

Types

type Consumer

type Consumer interface {
	// OnMessage is called when a message is successfully processed from the SQS queue. If no messages are processed then OnMessage won't
	// be called.
	OnMessage(ctx context.Context, msg MessageContent)

	// OnError is called when there is an error attempting to receive messages from the SQS queue or processing a given message from the
	// queue.
	OnError(ctx context.Context, err error)
}

A Consumer is used by ListenToTopic to process messages and errors during the course of oeprations. Both of its methods are provided with the existing context used by the package so that any implementing type is able to be propagate traces or be made aware of context cancellations.

type ListenerConfiguration

type ListenerConfiguration struct {
	// Polling Interval is the time between attempts to receive messages from the SQS queue. Defaults to 1 second.
	PollingInterval time.Duration
	// QueueName is the name of the SQS queue to create. If the SNS topic is FIFO then the listener package will automatically
	// add the ".fifo" suffix. Defaults to a v4 UUID prefixed with "sns-listener-".
	QueueName string
	// ParameterPath is the location of the SSM parameter that contains the topic ARN. If this is unset then the listener package
	// will use the value of TopicArn.
	ParameterPath string
	// TopicArn is the full ARN of the topic to subscribe to. If this is unset and ParameterPath is unset then startup will fail.
	TopicArn string
	// Verbose controls whether logs will be output to stderr or silenced. Defaults to false.
	Verbose bool
}

A ListenerConfiguration is used to set up the listener package when ListenToTopic is called.

type MessageContent

type MessageContent struct {
	Body *string
	Id   *string
}

A MessageContent maps the message body and message ID of a SQS message to a much more straightforward struct. For the purpose of listening to an SNS topic, the Body contains the full message that was published

type SNSAPI

type SNSAPI interface {
	Subscribe(ctx context.Context,
		params *sns.SubscribeInput,
		optFns ...func(*sns.Options)) (*sns.SubscribeOutput, error)

	Unsubscribe(ctx context.Context,
		params *sns.UnsubscribeInput,
		optFns ...func(*sns.Options)) (*sns.UnsubscribeOutput, error)
}

SNSAPI is a shim over v2 of the AWS SDK's sns client. The sns client provided by github.com/aws/aws-sdk-go-v2/service/sns automatically satisfies this.

type SQSAPI

type SQSAPI interface {
	CreateQueue(ctx context.Context,
		params *sqs.CreateQueueInput,
		optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error)

	DeleteQueue(ctx context.Context,
		params *sqs.DeleteQueueInput,
		optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error)

	GetQueueAttributes(ctx context.Context,
		params *sqs.GetQueueAttributesInput,
		optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error)

	ReceiveMessage(ctx context.Context,
		params *sqs.ReceiveMessageInput,
		optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)

	DeleteMessage(ctx context.Context,
		params *sqs.DeleteMessageInput,
		optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

SQSAPI is a shim over v2 of the AWS SDK's sqs client. The sqs client provided by github.com/aws/aws-sdk-go-v2/service/sqs automatically satisfies this.

type SSMAPI

type SSMAPI interface {
	GetParameter(ctx context.Context,
		params *ssm.GetParameterInput,
		optFns ...func(*ssm.Options)) (*ssm.GetParameterOutput, error)
}

SSMAPI is a shim over v2 of the AWS SDK's ssm client. The ssm client provided by github.com/aws/aws-sdk-go-v2/service/ssm automatically satisfies this.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL