Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRetriableError ¶
NewRetriableError creates a RetriableError with the given delay.
Types ¶
type Configuration ¶
type Configuration struct { QueueURL string `toml:"queueURL" default:"" comment:"URL of target SQS queue"` MaxNumberOfMessages int64 `toml:"maxNumberOfMessages" default:"10" comment:"Max number of messages to retrieve from SQS queue"` VisibilityTimeout time.Duration `toml:"visibilityTimeout" default:"2m30s" comment:"Visibility timeout of messages retrieved from SQS queue"` HeartbeatInterval time.Duration `toml:"heartbeatInterval" default:"1m" comment:"Interval at which visibility timeouts are renewed"` WaitTime time.Duration `toml:"waitTime" default:"20s" comment:"Wait time for long polling"` Forever bool `toml:"forever" default:"true" comment:"Continue polling when the queue is empty"` }
Configuration to consume an AWS SQS queue.
type MessageHandler ¶
MessageHandler is responsible to consume a single message.
type MessageLock ¶
type MessageLock struct {
// contains filtered or unexported fields
}
MessageLock holds a message which is kept invisible to other consumers until Release is called.
The invibility is garanteed be a background routine which peridically resets the visibility timeout of the locked message.
func NewMessageLock ¶
func NewMessageLock(ctx context.Context, svc sqsiface.SQSAPI, queue string, visibilityTimeout, heartbeatInterval time.Duration, message *sqs.Message) *MessageLock
NewMessageLock creates a lock for the given SQS message.
func (*MessageLock) Message ¶
func (l *MessageLock) Message() *sqs.Message
Message returns the locked SQS message.
type QueueConsumer ¶
type QueueConsumer struct {
// contains filtered or unexported fields
}
QueueConsumer allows to consume an AWS SQS queue.
func NewQueueConsumer ¶
func NewQueueConsumer(conf *Configuration, awsSession client.ConfigProvider) *QueueConsumer
NewQueueConsumer creates a QueueConsumer from the given configuration.
func NewQueueConsumerWithClient ¶
func NewQueueConsumerWithClient(conf *Configuration, sqsClient sqsiface.SQSAPI) *QueueConsumer
NewQueueConsumerWithClient creates a QueueConsumer from the given configuration and using a preconfigured SQS client.
func (*QueueConsumer) ConsumeMessages ¶
func (m *QueueConsumer) ConsumeMessages(ctx context.Context, handler MessageHandler) (int, error)
ConsumeMessages using the given handler.
Each message is kept invisible to other consumers until its handler returns. Returns the count of consumed messages along with the encountered error, if any.
type RetriableError ¶
RetriableError allows to retry the handling of a SQS message after a delay.
func (RetriableError) Error ¶
func (e RetriableError) Error() string