queue

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2019 License: MIT Imports: 6 Imported by: 0

README

A helper package for Amazon SQS

How to use

Setup

You need to set up AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID environment variables.

Create a new queue
func main() {
        // ...
  
	if _, err := NewQueue(); err != nil {
		// Do something with the error...
	}
  
        // ...
}

func NewQueue() (sqs.Processor, error) {
	pq, err := queue.New("your-queue-name"))
	if err != nil {
		return queue.Processor{}, err
	}

	processor := sqs.Processor{
		Queue:             pq,
		HandleMessageBody: handleMessageBody,
	}

	message := new(yourQueueMessage)
	go processor.Process(message)

	return processor, nil
}
Add a new message to the queue
func addMessageToQueue() error {
        queue := &sqs.Queue{
		Name: "your-queue-name",
	}
	if err := queue.Init(); err != nil {
		return err
	}

	message := yourQueueMessage{
		Foo:        "baz",
	}
	if _, err := queue.SendMessage(message); err != nil {
		return err
	}
  
        return nil
}
Handle messages
func handleMessageBody(processor sqs.Processor, b *interface{}) (err error) {
	message := (*b).(*yourQueueMessage)

	// Do somethong with the message...

	return
}

Documentation

Index

Constants

View Source
const MaxReceiveCountBeforeDead = 5

MaxReceiveCountBeforeDead is the receive count before a message is sent to a dead letter queue.

Variables

This section is empty.

Functions

func UnmarshalMessageBody

func UnmarshalMessageBody(message *sqs.Message, v interface{}) (err error)

UnmarshalMessageBody will return a MessageBody struct from the given sqs.Message.

Types

type Processor

type Processor struct {
	Queue             *Queue
	HandleMessageBody func(Processor, *interface{}) error
}

Processor represents a method that handles incoming sqs messages.

func (*Processor) Process

func (processor *Processor) Process(body interface{})

Process handles incoming sqs messages. The body parameter is not typed, so we can decode the incoming message in a structure that is passed via this parameter. On passing nil, the Json marshaller will marshall it as map[string]interface{}.

Since we are passing the containing structure, this method is not threadsafe. On the other hand multiple Processors can process the same sqs queues parallel without any problem.

type Queue

type Queue struct {
	Name               string
	URL                string
	DeadLetterQueueURL string
}

A Queue represents an SQS queue.

func New

func New(name string) (*Queue, error)

New returns a prepared SQS queue.

func (*Queue) DeleteMessage

func (queue *Queue) DeleteMessage(message *sqs.Message) (resp *sqs.DeleteMessageOutput, err error)

DeleteMessage removes a message from the Queue.

func (*Queue) DeleteMessageByReceiptHandle

func (queue *Queue) DeleteMessageByReceiptHandle(receiptHandle *string) (resp *sqs.DeleteMessageOutput, err error)

DeleteMessageByReceiptHandle removes a message from the Queue by it's receiptHandle.

func (*Queue) GetAttributesByQueueURL

func (queue *Queue) GetAttributesByQueueURL(url string, attributeNames []*string) (resp *sqs.GetQueueAttributesOutput, err error)

GetAttributesByQueueURL returns queue attributes by it's URL.

func (*Queue) GetClient

func (queue *Queue) GetClient() *sqs.SQS

GetClient returns an SQS client with a live session.

func (*Queue) Init

func (queue *Queue) Init() (err error)

Init will create the actual queue and set a Client with a live session to it.

func (*Queue) ReceiveMessage

func (queue *Queue) ReceiveMessage() (message *sqs.Message, err error)

ReceiveMessage will return one message and it's body from the queue.

func (*Queue) SendMessage

func (queue *Queue) SendMessage(messageBody interface{}) (resp *sqs.SendMessageOutput, err error)

SendMessage will send message to the queue with the file path.

type RedrivePolicy

type RedrivePolicy struct {
	MaxReceiveCount     int    `json:"maxReceiveCount"`
	DeadLetterTargetArn string `json:"deadLetterTargetArn"`
}

A RedrivePolicy is an sqs policy of a dead letter queue.

func (RedrivePolicy) GetAsAWSString

func (policy RedrivePolicy) GetAsAWSString() (policyString *string, err error)

GetAsAWSString returns the RedrivePolicy as a JSON string poninter for sqs attribute.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL