sqs

package
v0.0.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2021 License: MIT Imports: 17 Imported by: 0

README

Experimenting with SQS queues on localhost

This repository doesn't have any tests, but it should. In the meantime, it's simple to get up and running with SQS locally using localstack.

First run the docker-compose.yml file that can be found in this directory:

docker-compose up

Then, in a file somewhere create a new SQS instance (implements MessageBroker):

config := aws.NewConfig().WithEndpoint("http://localhost:4566")
sqs := sqs.New(awsconfig, <TIMEOUT>)

If these lines run without panicing (.Must will panic if session is invalid), then you're ready to start experimenting!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MessageJSON

type MessageJSON struct {
	// contains filtered or unexported fields
}

MessageJSON implements the broker.ReceiveMessage interface.

func (*MessageJSON) Ack

func (m *MessageJSON) Ack() error

Ack acknowledges receipt of the message. This must be performed or the message will reappear on the queue after its invisibility timeout.

func (*MessageJSON) ID

func (m *MessageJSON) ID() string

ID returns the identifier created by SQS that represents this message on the broker.

func (*MessageJSON) SetVisibilityTimeout added in v0.0.4

func (m *MessageJSON) SetVisibilityTimeout(numSeconds int) error

SetVisibilityTimeout sets the number of seconds that a message should not be visible to other consumers.

func (*MessageJSON) Unmarshal

func (m *MessageJSON) Unmarshal(v interface{}) error

Unmarshal implements the broker.ReceiveMessage interface and returns the contents received from the broker into v.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements the broker.Queue interface.

func (*Queue) Delete

func (q *Queue) Delete() error

Delete removes the queue from SQS.

func (*Queue) FetchAttributes

func (q *Queue) FetchAttributes(attrs []string) (map[string]string, error)

FetchAttributes makes a request to SQS to retreive the attributes specified.

func (*Queue) GetApproximateNumberOfMessages added in v0.0.14

func (q *Queue) GetApproximateNumberOfMessages() (*int, error)

GetApproximateNumberOfMessages returns the approx number of messages waiting to be consumed on this queue. We _only_ want to return an int if it's a value we received from aws. Otherwise higher-level processes could break in really bad ways.

func (*Queue) MoveMessages

func (q *Queue) MoveMessages(dest broker.Queue) (int, error)

MoveMessages moves all messages from this queue to the destination queue.

Probably only for the case where we're moving messages from a dead-letter queue back to its source queue.

func (*Queue) ReceiveOne

func (q *Queue) ReceiveOne() (broker.ReceiveMessage, error)

ReceiveOne fetches a mesage from the message broker if there is one on the queue, otherwise it returns broker.ErrQueueNoMessages

After handling the message, make sure to Ack() to remove the message from the queue or it will be redelivered after its VisibilityTimeout.

func (*Queue) Requeue added in v0.0.19

func (q *Queue) Requeue(m broker.ReceiveMessage) error

Requeue turns ReceiveMessage into SendMessage and sends it back up to its original queue. Note that Requeue will Ack the current ReceiveMessage, as otherwise we'll end up with a dup

func (*Queue) SendMany added in v0.0.8

func (q *Queue) SendMany(msgs []broker.SendMessage) error

SendMany sends up to 10 messages to the SQS queue. An error is returned if an error is returned for any message sent to the queue.

An error returned from this method does not indicate that all the messages failed to be sent to SQS, but that *at least* one failed to send.

func (*Queue) SendOne

func (q *Queue) SendOne(m broker.SendMessage) error

SendOne converts sends a message to the queue.

func (*Queue) String

func (q *Queue) String() string

type SQS

type SQS struct {
	// contains filtered or unexported fields
}

SQS implements the broker.MessageBroker interface to provide message publish and retrieve functionality provided by AWS SQS.

func New

func New(config aws.Config, waitTimeSeconds int) *SQS

New creates an sqs client to interact with aws sqs using the provided configuration.

func (*SQS) CreateQueue

func (s *SQS) CreateQueue(qd *broker.QueueDefinition) ([]broker.Queue, error)

CreateQueue creates the named queue if it does not exist, otherwise it creates the queue with the options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL