gosqs

package module
v0.0.0-...-f0cceae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2019 License: MIT Imports: 12 Imported by: 0

README

GOSQS

This is a thin wrapper library around the GO AWS SDK's SQS functions. This wrapper seeks to simplify the the usage and unit testing of SQS queues.

Usage

Setting an endpoint

This method will pass the configuration parameters necessary to connect to SQS. Please note that this method does not confirm these settings.

endPoint, err := gosqs.SetEndPoint("Endpoint", "Region").Wait(20, 3, 8)
if err != nil {
    return err
}

If you would like to attempt to connect to this endpoint and pause execution until it becomes available you can append .Wait this will re-try the connection for the selected number of times and jitter the back off time.

endPoint, err := gosqs.SetEndPoint("Endpoint", "Region").
    Wait(20, 3, 8) //num retries 20, min backoff 3, max backoff 8
Checking if an endpoint is available

The Ping method will return an error if the endpoint is not available. This is an alternative to the Wait method

err := endpoint.Ping()
Creating a queue

Before we can send messages to a queue we will need to create one

queueUrl, err := endPoint.CreateQueue("queue name", 1, 86400) // queue name, default polling delay in seconds, default message retention period in seconds
Delete a queue
err := endpoint.DeleteQueue("queue name")
Send message to a queue
err := endpoint.SendMessage("message string", "queue name")
Retrieve messages from a queue

Sqs needs to be polled for messages. This is done by setting a callback method which will be triggered each time a message is received. If the method returns true the message will be deleted from the queue.

callback := func(message string) bool {
    log.Println("received message from queue: " + message)
    err := processMessage(message)
    if err != nil {
        log.Println("Failed to process message")
        return false
    }
    return true
}

err := endpoint.PollQueue("queue name", callback, 1) // queue name, callback function, the time between polls in seconds

If you want to consume a queue which might not initially be created or on an endpoint which might not be available you can poll with a retry.

callback := func(message string) bool {
    log.Println("received message from queue: " + message)
    err := processMessage(message)
    if err != nil {
        log.Println("Failed to process message")
        return false
    }
    return true
}

err := endpoint.PollQueue("queue name", callback, 1, 20, 3, 8) // queue name, callback function, the time between polls in seconds, num retries, min backoof in seconds, max mackoff in seconds

If your callback needs to access pre-initialized structs you can use a wrapper function:

func createCallback(dependency dependencyType) func(string) bool {
    return func(message string) bool {
        log.Println("received message from queue: " + message)
        err := dependency.processMessage(message)
        if err != nil {
            log.Println("Failed to process message")
            return false
        }
        return true
    }
}

callback := createCallback(dependency)
err := endpoint.PollQueue("queue name", callback, 1)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Mock

type Mock struct {
	// contains filtered or unexported fields
}

func (*Mock) CreateDefaultFifoQueue

func (sqs *Mock) CreateDefaultFifoQueue(queue string) (string, error)

func (*Mock) CreateDefaultQueue

func (sqs *Mock) CreateDefaultQueue(queue string) (string, error)

func (*Mock) CreateQueue

func (sqs *Mock) CreateQueue(queue string, retentionPeriod int, visibilityTimeout int, fifo bool, encrypted bool) (string, error)

func (*Mock) DeleteQueue

func (sqs *Mock) DeleteQueue(queue string) error

func (*Mock) GetQueue

func (sqs *Mock) GetQueue(queue string) *MockQueue

func (*Mock) ListQueues

func (sqs *Mock) ListQueues() ([]string, error)

func (Mock) Ping

func (Mock) Ping() error

func (*Mock) PollQueue

func (sqs *Mock) PollQueue(queue string, callback func(string) bool, pollWaitTime int, maxNumberOfMessagesPerPoll int) error

func (*Mock) PollQueueWithRetry

func (sqs *Mock) PollQueueWithRetry(queue string, callback func(string) bool, pollWaitTime int, maxNumberOfMessagesPerPoll int, numRetries int, minBackOff int, maxBackOff int) error

func (*Mock) SendMessage

func (sqs *Mock) SendMessage(message string, queue string) error

func (*Mock) Wait

func (sqs *Mock) Wait(numRetries int, minBackOff int, maxBackOff int) (Sqs, error)

type MockQueue

type MockQueue struct {
	Queue    string
	QueueUrl string
	Messages []string
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func (*Service) CreateDefaultFifoQueue

func (mq *Service) CreateDefaultFifoQueue(queue string) (string, error)

func (*Service) CreateDefaultQueue

func (mq *Service) CreateDefaultQueue(queue string) (string, error)

func (*Service) CreateQueue

func (mq *Service) CreateQueue(queue string, retentionPeriod int, visibilityTimeout int, fifo bool, encrypted bool) (string, error)
  • Retention period: time the messages will remain in the queue if not deleted
  • VisibilityTimeout: time the message will be hidden from other consumers after it is retried from the queue. If this time expires it will be assumed that the message was not processed successfully and will be available to other consumers for retry.
  • Fifo: whether the queue should be first in first out with deliver once guarantee
  • encrypted: whether the queue should use server side encryption using the default kms

func (*Service) DeleteQueue

func (mq *Service) DeleteQueue(queue string) error

func (*Service) ListQueues

func (mq *Service) ListQueues() ([]string, error)

func (*Service) Ping

func (mq *Service) Ping() error

func (*Service) PollQueue

func (mq *Service) PollQueue(queue string, callback func(string) bool, pollWaitTime int, maxNumberOfMessagesPerPoll int) error

Polls the queue for new messages

  • queue: the name of the queue to poll
  • callback: the function that will be called to process each message. If the function returns true the message will be deleted from the queue
  • pollWaitTime: the amount of time in seconds sqs will wait for a message on a queue. As soon as a message is received the function will return (regardless of the wait time) and a new request started.
  • maxNumberOfMessagesPerPoll: the max number of messages (between 1 and 10) that can be returned from a single poll.

func (*Service) PollQueueWithRetry

func (mq *Service) PollQueueWithRetry(queue string, callback func(string) bool, pollWaitTime int, maxNumberOfMessagesPerPoll int, numRetries int, minBackOff int, maxBackOff int) error

func (*Service) SendMessage

func (mq *Service) SendMessage(message string, queue string) error

func (*Service) Wait

func (mq *Service) Wait(numRetries int, minBackOff int, maxBackOff int) (Sqs, error)

type Sqs

type Sqs interface {
	Ping() error
	Wait(numRetries int, minBackOff int, maxBackOff int) (Sqs, error)
	SendMessage(message string, queue string) error
	PollQueue(queue string, callback func(string) bool, pollWaitTime int, maxNumberOfMessagesPerPoll int) error
	PollQueueWithRetry(queue string, callback func(string) bool, pollWaitTime int, maxNumberOfMessagesPerPoll int, numRetries int, minBackOff int, maxBackOff int) error
	CreateQueue(queue string, retentionPeriod int, visibilityTimeout int, fifo bool, encrypted bool) (string, error)
	CreateDefaultQueue(queue string) (string, error)
	CreateDefaultFifoQueue(queue string) (string, error)
	DeleteQueue(queue string) error
	ListQueues() ([]string, error)
}

func SetEndPoint

func SetEndPoint(endpoint string, region string) Sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL