sqs

package module
v0.0.0-...-c330695 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2020 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRetriableError

func NewRetriableError(delay time.Duration) error

NewRetriableError creates a RetriableError with the given delay.

Types

type Configuration

type Configuration struct {
	QueueURL            string        `toml:"queueURL" default:"" comment:"URL of target SQS queue"`
	MaxNumberOfMessages int64         `toml:"maxNumberOfMessages" default:"10" comment:"Max number of messages to retrieve from SQS queue"`
	VisibilityTimeout   time.Duration `toml:"visibilityTimeout" default:"2m30s" comment:"Visibility timeout of messages retrieved from SQS queue"`
	HeartbeatInterval   time.Duration `toml:"heartbeatInterval" default:"1m" comment:"Interval at which visibility timeouts are renewed"`
	WaitTime            time.Duration `toml:"waitTime" default:"20s" comment:"Wait time for long polling"`
	Forever             bool          `toml:"forever" default:"true" comment:"Continue polling when the queue is empty"`
}

Configuration to consume an AWS SQS queue.

type MessageHandler

type MessageHandler func(ctx context.Context, message string) error

MessageHandler is responsible to consume a single message.

type MessageLock

type MessageLock struct {
	// contains filtered or unexported fields
}

MessageLock holds a message which is kept invisible to other consumers until Release is called.

The invibility is garanteed be a background routine which peridically resets the visibility timeout of the locked message.

func NewMessageLock

func NewMessageLock(ctx context.Context, svc sqsiface.SQSAPI, queue string, visibilityTimeout, heartbeatInterval time.Duration, message *sqs.Message) *MessageLock

NewMessageLock creates a lock for the given SQS message.

func (*MessageLock) Message

func (l *MessageLock) Message() *sqs.Message

Message returns the locked SQS message.

func (*MessageLock) Release

func (l *MessageLock) Release(retryDelay *time.Duration) error

Release the lock.

If retryDelay is nil, then the locked message is deleted. Otherwise, the message visibility timeout is set to the given duration.

Release must be called only once.

type QueueConsumer

type QueueConsumer struct {
	// contains filtered or unexported fields
}

QueueConsumer allows to consume an AWS SQS queue.

func NewQueueConsumer

func NewQueueConsumer(conf *Configuration, awsSession client.ConfigProvider) *QueueConsumer

NewQueueConsumer creates a QueueConsumer from the given configuration.

func NewQueueConsumerWithClient

func NewQueueConsumerWithClient(conf *Configuration, sqsClient sqsiface.SQSAPI) *QueueConsumer

NewQueueConsumerWithClient creates a QueueConsumer from the given configuration and using a preconfigured SQS client.

func (*QueueConsumer) ConsumeMessages

func (m *QueueConsumer) ConsumeMessages(ctx context.Context, handler MessageHandler) (int, error)

ConsumeMessages using the given handler.

Each message is kept invisible to other consumers until its handler returns. Returns the count of consumed messages along with the encountered error, if any.

type RetriableError

type RetriableError struct {
	Delay time.Duration
}

RetriableError allows to retry the handling of a SQS message after a delay.

func (RetriableError) Error

func (e RetriableError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL