sqsdr

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2021 License: MIT Imports: 15 Imported by: 0

README

sqsdr

Simple Queue Service (SQS) doctor wants to be your one stop shop for triaging and redriving messages in your SQS queues.

CLI Usage

$ sqsdr --help
NAME:
   sqsdr - AWS Simple Queue Service (SQS) Doctor

USAGE:
   sqsdr [global options] command [command options] [arguments...]

VERSION:
   0.1.0

DESCRIPTION:
   SQS tool for helping you investigate, debug, and redrive SQS messages

COMMANDS:
     redrive, r  redrive messages from source queue to a destination queue
     dump, d     dump messages from a source queue to disk
     help, h     Shows a list of commands or help for one command

GLOBAL OPTIONS:
   --loquacious, -l  log loquaciously (read: verbosely, loudly, a lot) (default: false)
   --help, -h        show help
   --version, -v     print the version

Redrive

redrive is a generic command for moving messages from one queue to another. It also exposes filtering functionality with the --regex and --jmespath flags allowing you to send a subset of the messages in your source queue to the destination queue.

Help
$ sqsdr redrive --help
NAME:
   sqsdr redrive - redrive messages from source queue to a destination queue

USAGE:
   sqsdr redrive [command options] [arguments...]

OPTIONS:
   --source value, -s value       source queue name (required)
   --destination value, -d value  destination queue name (required)
   --regex value, -x value        only message bodies that match the regex will be sent to the destination queue (optional)
   --jmespath value, -j value     JMESPath expression applied to the message body. output is passed to the regular expression (optional)
   --region value, -r value       AWS region of the queues region (default: "us-east-1")
Example

Imagine you have the following queues:

Source Queue Name: my-queue-dlq
Destination Queue Name: my-queue

With messages that look like this with some variations in my-queue-dlq:

{
  "user_id": "2",
  "book_id": 1,
  "review": {
    "text": "noice",
    "lang": "en-US"
  }
}

If you want to redrive all the messages from my-queue-dlq to my-queue you can run this command:

sqsdr redrive \
  --source my-queue-dlq \
  --destination my-queue \
  --region us-west-2

If you want to redrive all of the messages that have reviews in English you can use the following JMESPath expression reviews.lang with the --jmespath flag to drive down into the "lang" attribute in concert with the regex flag --regex to specify that we want languages that match the pattern "en-US".

sqsdr redrive \
  --source my-queue-dlq \
  --destination my-queue \
  --region us-west-2 \
  --jmespath "review.lang" \
  --regex "en-US" 

Messages that pass the JMESPath and the Regex will be sent to the destination queue.

Dump Messages to Disk

Help
$ sqsdr dump --help
NAME:
   sqsdr dump - dump messages from a source queue to STDOUT

USAGE:
   sqsdr dump [command options] [arguments...]

OPTIONS:
   --source value, -s value  source queue name
   --region value, -r value  AWS region of the queues region (default: "us-east-1")

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateClientAndValidateQueue

func CreateClientAndValidateQueue(region, queueName string) (*sqs.SQS, string, error)

CreateClientAndValidateQueue takes in an AWS region and a Queue name and returns an intialized SQS client, the Queue URL for a given and an error if one exists.

Types

type Chooser

type Chooser interface {
	Choose([]*sqs.Message) ([]*sqs.Message, []*sqs.Message)
}

Chooser is an interface that given an array of SQS messages will choose if the SQS messages go to the left sink, the right sink, or both. It is assumed all messages that are passed in are passed back out.

By convention the right sink is always the fallthrough sink. Pass any message that don't meet your criteria to the

type Dump

type Dump struct {
	SourceClient   sqsiface.SQSAPI
	SourceQueueURL string
	Out            io.Writer
}

Dump persists all messages in a queue to disk and then makes sure all of the messages end up back in the source queue.

func (*Dump) Dump

func (d *Dump) Dump() error

Dump uses a FallthroughPipeline to place all messages in a temporary queue after they've been written to disk. The messages will be placed back into the source queue using a pipline in the reverse direction.

type FallthroughPipeline

type FallthroughPipeline struct {
	Chooser       Chooser
	LeftSink      Sinker
	RightSinkFunc func(queueURL string, client sqsiface.SQSAPI) Sinker

	SourceClient   sqsiface.SQSAPI
	SourceQueueURL string
}

FallthroughPipeline is a higher level pipeline that manages creating fallthrough queues, running the forward pipeline, redriving the messages in the fallthrough queues back into the source queues, and removing the fallthrough queue.

It's written to be as general as possible so you may want to check out filtered redrive or Dump for examples on how it's used

func (*FallthroughPipeline) Run

func (f *FallthroughPipeline) Run() error

Run is the entrypoint for running the FilterRunner

type FilterChooser

type FilterChooser struct {
	JMESPath string
	Regex    *regexp.Regexp
}

FilterChooser passes the body of a SQS message through a JMESPath, if it is present, and through a Regular Expression. Messages that satisfy the regular expression go to the left sink and all other go to the right sink.

func NewFilterChooser

func NewFilterChooser(jmespath string, regex string) (*FilterChooser, error)

NewFilterChooser returns an initialized FilterChooser if the passed in regular expression can be compiled. It returns an error otherwise.

func (*FilterChooser) Choose

func (f *FilterChooser) Choose(msgs []*sqs.Message) ([]*sqs.Message, []*sqs.Message)

Choose will pass the body of the SQS Messages through the JMESPath, if it is present, and then through the regular expression. If a message body fails anywhere in the path an error message is logged and the SQS message is put in the right sink.

If a JMESPath is present and is successfully run against the SQS Message body the JMESPath output replaces the SQS message body in the filter. This allows you to run a JMESPath against a large object to filter it down and then run a regular expression against the output to choose between the left sink or the right sink.

NOTE: the JMESPath expression must return a valid JSON object. If the output is not valid the SQS message will be put in the right sink.

type Handler

type Handler interface {
	Handle(context.Context, []*sqs.Message) ([]*sqs.Message, error)
}

Handler represents any type that can process SQS messages. Messages returned by the handler will be removed from the source queue if there is one. In other words the handler need not worry about the lifecyle of the SQS messages.

type MessageOutput

type MessageOutput struct {
	Body              *string                               `json:",omitempty"`
	MessageAttributes map[string]*sqs.MessageAttributeValue `json:",omitempty"`
	MessageId         *string
	ReceiptHandle     *string
}

MessageOutput is a simplified version of the SQS Message that's appropriate to write to disk or STDOUT.

TODO: will have to handle the case when putting messages from STDIN to queue

where the message body is JSON.

type NoOpSink

type NoOpSink struct{}

NoOpSink drops the messages on the floor. Use it only as a signal to other developers that your other sink is doing all of the work.

func (NoOpSink) Sink

func (n NoOpSink) Sink(ctx context.Context, msgs []*sqs.Message) error

Sink does nothing with the messages and returns a nil error

type PassthroughChooser

type PassthroughChooser struct{}

PassthroughChooser always passes messages to the left

func (*PassthroughChooser) Choose

func (p *PassthroughChooser) Choose(msgs []*sqs.Message) (left []*sqs.Message, right []*sqs.Message)

Choose always passes messages through to the left

type Pipeline

type Pipeline struct {
	Chooser   Chooser
	LeftSink  Sinker
	RightSink Sinker
}

Pipeline is a simple struct to manage the interaction between a source, a chooser, and sinks. Pipeline satisfies the Handler interface and can be passed to a Poller.

func (*Pipeline) Handle

func (p *Pipeline) Handle(ctx context.Context, msgs []*sqs.Message) ([]*sqs.Message, error)

Handle is the entry point into the pipeline

type Poller

type Poller struct {
	QueueURL         string
	Handler          Handler
	Client           sqsClient
	MaxEmptyReceives int

	// SQS ReceiveMessage API pass through
	WaitTimeSeconds     int64
	MaxNumberOfMessages int64
}

Poller manages the business logic of polling a queue for messages, handing them off to a Handler, and deleting the succesfully processed messages from the queue.

func NewPoller

func NewPoller(queueURL string, client sqsClient, handler Handler) *Poller

NewPoller returns a Poller that defaults to long polling and receiving at most 10 messages at a time

func (*Poller) Process

func (p *Poller) Process(ctx context.Context) error

Process is the entry point for the Poller. It is a blocking function. If you desire more concurrency call Process() in a separate go routine as many times as needed.

func (*Poller) ProcessOnce

func (p *Poller) ProcessOnce(ctx context.Context) (int, error)

ProcessOnce polls, handles, and deletes successfully processed messages from the queue one time. This could be handy if you're running Poller in an environment with a limited runtime like AWS Lambda.

type Redrive

type Redrive struct {
	SourceClient   *sqs.SQS
	SourceQueueURL string

	DestClient   *sqs.SQS
	DestQueueURL string

	JMESPath string
	Regex    string
	// contains filtered or unexported fields
}

Redrive is a simple strategy that moves messages from a source queue to a destination queue.

func (*Redrive) Redrive

func (r *Redrive) Redrive() error

Redrive is the entry point into the redriving strategy

type RightPassthroughChooser

type RightPassthroughChooser struct{}

RightPassthroughChooser passes all messages to the right

func (*RightPassthroughChooser) Choose

func (r *RightPassthroughChooser) Choose(msgs []*sqs.Message) (left []*sqs.Message, right []*sqs.Message)

Choose always passes message to the right

type SQSSink

type SQSSink struct {
	QueueURL string
	Client   sqsiface.SQSAPI
}

SQSSink pass all messages to the QueueURL with the provided SQS Client

func (*SQSSink) Sink

func (s *SQSSink) Sink(ctx context.Context, msgs []*sqs.Message) error

Sink performs a BatchSend with the passed in messages

type Sinker

type Sinker interface {
	Sink(context.Context, []*sqs.Message) error
}

Sinker is an interface that accepts an array of SQS messages and puts them somewhere. Where "somewhere" could be into another SQS queue, a file on disk, or whatever.

type WriterSink

type WriterSink struct {
	Writer      io.Writer
	Passthrough Sinker
}

WriterSink will write SQS Message in the MessageOutput format to the Writer with the delimiter as a separator

func (*WriterSink) Sink

func (w *WriterSink) Sink(ctx context.Context, msgs []*sqs.Message) error

Sink writes converts the SQS Message to a MessageOutput and writes the message the Writer.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL