dynamodbqueue

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ColumnPK is the name of the partition key
	ColumnPK = "PK"
	// ColumnSK is the name of the sort key
	ColumnSK = "SK"
	// ColumnHiddenUntil is the name of the hidden_until attribute where the visibility timeout
	// while polling messages is stored and compared against.
	ColumnHiddenUntil = "hidden_until"
	// ColumnOwner is the name of the owner attribute where the clientID is stored.
	ColumnOwner = "owner"
	// ColumnTTL is the name of the TTL attribute where the time to live is stored.
	// This is used when the message is pushed to the queue and DynamoDB is configured to
	// automatically delete the message when the TTL is reached.
	ColumnTTL = "TTL"
	// ColumnEvent is the name of the event attribute where the `events.SQSMessage` is stored.
	ColumnEvent = "event"
)
View Source
const (
	TableNameNotSet = "table name not set"
	QueueNameNotSet = "queue name not set"
	ClientIDNotSet  = "client ID not set"
)
View Source
const (
	AttrSentTimestamp = "SentTimestamp"
)
View Source
const PartitionKeySeparator = "|"
View Source
const RecipientHandleSeparator = "&"

Variables

This section is empty.

Functions

func RandomInt

func RandomInt(min, max int) int

func RandomPostfix

func RandomPostfix(name string) string

RandomPostfix postfixes the in param name with an random integer

func RandomString

func RandomString(n int) string

func ToBatches

func ToBatches[T any](items []T, batchSize int) [][]T

func ToReceiptHandles

func ToReceiptHandles(msgs []events.SQSMessage) []string

ToReceiptHandles will extract the receipt handles from the SQS messages.

Types

type DynamoDBQueue

type DynamoDBQueue struct {
	// contains filtered or unexported fields
}

DynamoDBQueue is a implementation of a queue but have DynamoDB table as the backing store.

This implements a FIFO queue and allows for multiple "queues" to be present in the same table by concatenating the queue-name and consumer-id to the partition key.

Each consumer Id is therefore also considered as a separate queue.

Since multiple `DynamoDbQueue`/processes can simultaneously read/write/delete from the queue it will use a `visibilityTimeout` to ensure that only one consumer can process a message at a time.

The DynamoDB Table have the following schema:

|=== |PK |SK |hidden_until |owner |TTL |event |queueName-clientID |{unix-64-bit-timestamp + "_" + random_string} |{now()+visibilityTimeout} |owner |ttl |{events.SQSMessage} |===

The PK is separated using the `PartitionKeySeparator` of which is "+-+" by default.

When the query is commenced, it will query for all messages within PK (oldest first) and that now() is greater than hidden_until.

NOTE: The DynamoDB table must be configured with the TTL option on the TTL column in order for it to be deleted automatically.

func New added in v0.4.0

func New(cfg aws.Config, ttl time.Duration) *DynamoDBQueue

New creates a new `DynamoDBQueue` instance.

If the ttl is set to zero, it will use the default of 14 days.

func (*DynamoDBQueue) AsQueueType

func (dp *DynamoDBQueue) AsQueueType(queueType QueueType) *DynamoDBQueue

AsQueueType will set the queue type to use. This affects the sorting of messages. FIFO is the default.

func (*DynamoDBQueue) ClientID

func (dp *DynamoDBQueue) ClientID() string

func (*DynamoDBQueue) Count

func (dq *DynamoDBQueue) Count(ctx context.Context) (int32, error)

Count will return the number of messages in the queue. It will return an error if it fails.

This will not count all messages in all queues in the table, just the one for current _queueName_ and _clientID_.

func (*DynamoDBQueue) CreateQueueTable

func (dq *DynamoDBQueue) CreateQueueTable(ctx context.Context) (bool, error)

CreateQueueTable will create the DynamoDB table for the queue in on-demand mode.

It will have the _TTL_ enabled and therefore messages are automatically deleted.

It will return `true` if created, `false` if it already exists and an error if it fails.

NOTE: It may return an error *and* `true`, hence the table was created but there where some other issues, and the table needs to be managed.

func (*DynamoDBQueue) DeleteMessages

func (dq *DynamoDBQueue) DeleteMessages(
	ctx context.Context,
	receiptHandles ...string,
) ([]string, error)

DeleteMessages will delete one or more messages from the queue.

Use the `events.SQSMessage.ReceiptHandle` to delete the message.

It will return those not deleted and an error if any.

func (*DynamoDBQueue) DropQueueTable

func (dq *DynamoDBQueue) DropQueueTable(ctx context.Context) error

DropQueueTable will drop the queue table.

func (*DynamoDBQueue) List added in v0.3.0

func (dq *DynamoDBQueue) List(ctx context.Context) ([]QueueAndClientId, error)

List will return all queues that exists in the system.

CAUTION: This is a really expensive operation since it will perform a table scan. to get all partition keys.

func (*DynamoDBQueue) Logging added in v0.9.0

func (dq *DynamoDBQueue) Logging() bool

Logging returns `true` if logging is enabled.

func (*DynamoDBQueue) PartitionKey

func (dq *DynamoDBQueue) PartitionKey() string

PartitionKey is the _queueName_ + '_' + _clientID_.

func (*DynamoDBQueue) PollMessages

func (dq *DynamoDBQueue) PollMessages(
	ctx context.Context,
	timeout, visibilityTimeout time.Duration,
	minMessages, maxMessages int,
) ([]events.SQSMessage, error)

PollMessages will poll messages from the queue up to either maxMessages or until the timeout is reached. It will query for all messages within PK (oldest first) and that now() is greater than hidden_until.

If the message has been polled, it will be invisible to other consumers for the duration of the visibilityTimeout. The visibilityTimeout is set in milliseconds on the hidden_until attribute and hence cannot be any lower than that.

NOTE: The visibilityTimeout is when a message been fetched. If the message was fetched early and the _timeout_ is great, it may have consumed all the visibility timeout before the message is returned. Make sure to set the _visibilityTimeout_ much higher than the _timeout_ and calculate the `actual visibility timeout = timeout - visibilityTimeout`.

If the table is not set, it will return an error. If either of _queueName_ or _clientID_ is not set, it will return an error.

The recipient handle is a combination of the PK and SK. It also adds the hidden_until and owner to the receipt handle to ensure that the message is not accidentally deleted by another consumer.

It will poll until _timeout_ for _minMessages_, and will return at most _maxMessages_.

If _timeout_ is set to zero, it will do one fetch and return even if _minMessages_ is not reached.

func (*DynamoDBQueue) Purge

func (dq *DynamoDBQueue) Purge(ctx context.Context) error

Purge deletes all messages from the queue based on the queueName and clientID.

If it fails it will return an error.

func (*DynamoDBQueue) PurgeAll

func (dq *DynamoDBQueue) PurgeAll(ctx context.Context) error

PurgeAll deletes all items from the DynamoDB table.

If it fails, it will return an error.

func (*DynamoDBQueue) PushMessages

func (dq *DynamoDBQueue) PushMessages(
	ctx context.Context,
	ttl time.Duration,
	messages ...events.SQSMessage,
) ([]events.SQSMessage, error)

PushMessages pushes one or more messages onto the database. It uses the unix nanosecond timestamp as SK. The primary key is the queueName{`PartitionKeySeparator`}clientID. If ttl is set to zero, it will use the default DynamoDBQueue.ttl.

This function strives to create unique SKs by using the unix nanosecond timestamp plus a random number with n digits. The n is by default 6 but can be altered to be higher by `SetRandomDigits()` function.

Maximum of 25 messages or 16MB is allowed.

The function returns any message that was not sent it is accompanied by an error.

func (*DynamoDBQueue) QueueName

func (dp *DynamoDBQueue) QueueName() string

func (*DynamoDBQueue) RandomDigits

func (dq *DynamoDBQueue) RandomDigits() int

func (*DynamoDBQueue) SetLogging added in v0.9.0

func (dq *DynamoDBQueue) SetLogging(enabled bool)

SetLogging will enable/disable logging.

func (*DynamoDBQueue) SetRandomDigits

func (dp *DynamoDBQueue) SetRandomDigits(digits int) *DynamoDBQueue

SetRandomDigits will set a new number of random digits to append to the SK to ensure uniqueness. This is set to 6 by default.

func (*DynamoDBQueue) Table

func (dp *DynamoDBQueue) Table() string

func (*DynamoDBQueue) TableExists added in v0.2.0

func (dq *DynamoDBQueue) TableExists(ctx context.Context) bool

TableExists will check if the table exist, if not, it will return `false`, otherwise it will return `true`.

func (*DynamoDBQueue) UseClientID

func (dq *DynamoDBQueue) UseClientID(clientID string) *DynamoDBQueue

UseClientID will change the clientID for the queue.

CAUTION: The clientID *must* be set to a valid string otherwise it will panic!

func (*DynamoDBQueue) UseQueueName

func (dq *DynamoDBQueue) UseQueueName(queueName string) *DynamoDBQueue

UseQueueName will change the queueName for the queue.

CAUTION: The queueName *must* be set to a valid string otherwise it will panic!

func (*DynamoDBQueue) UseTable

func (dq *DynamoDBQueue) UseTable(table string) *DynamoDBQueue

type QueueAndClientId added in v0.3.0

type QueueAndClientId struct {
	QueueName string `json:"qn"`
	ClientID  string `json:"cid"`
}

QueueAndClientId is used when `List` operation.

type QueueType

type QueueType int

QueueType specifies how polling works, this may be dynamically changed for each poll. Default is _FIFO_.

const (
	// QueueTypeFIFO is a FIFO queue. This affect sorting of messages.
	QueueTypeFIFO QueueType = 0
	// QueueTypeLIFO is a LIFO queue. This affect sorting of messages.
	QueueTypeLIFO QueueType = 1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL