Documentation ¶
Index ¶
- Constants
- func RandomInt(min, max int) int
- func RandomPostfix(name string) string
- func RandomString(n int) string
- func ToBatches[T any](items []T, batchSize int) [][]T
- func ToReceiptHandles(msgs []events.SQSMessage) []string
- type DynamoDBQueue
- func (dp *DynamoDBQueue) AsQueueType(queueType QueueType) *DynamoDBQueue
- func (dp *DynamoDBQueue) ClientID() string
- func (dq *DynamoDBQueue) Count(ctx context.Context) (int32, error)
- func (dq *DynamoDBQueue) CreateQueueTable(ctx context.Context) (bool, error)
- func (dq *DynamoDBQueue) DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
- func (dq *DynamoDBQueue) DropQueueTable(ctx context.Context) error
- func (dq *DynamoDBQueue) List(ctx context.Context) ([]QueueAndClientId, error)
- func (dq *DynamoDBQueue) Logging() bool
- func (dq *DynamoDBQueue) PartitionKey() string
- func (dq *DynamoDBQueue) PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, ...) ([]events.SQSMessage, error)
- func (dq *DynamoDBQueue) Purge(ctx context.Context) error
- func (dq *DynamoDBQueue) PurgeAll(ctx context.Context) error
- func (dq *DynamoDBQueue) PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)
- func (dp *DynamoDBQueue) QueueName() string
- func (dq *DynamoDBQueue) RandomDigits() int
- func (dq *DynamoDBQueue) SetLogging(enabled bool)
- func (dp *DynamoDBQueue) SetRandomDigits(digits int) *DynamoDBQueue
- func (dp *DynamoDBQueue) Table() string
- func (dq *DynamoDBQueue) TableExists(ctx context.Context) bool
- func (dq *DynamoDBQueue) UseClientID(clientID string) *DynamoDBQueue
- func (dq *DynamoDBQueue) UseQueueName(queueName string) *DynamoDBQueue
- func (dq *DynamoDBQueue) UseTable(table string) *DynamoDBQueue
- type QueueAndClientId
- type QueueType
Constants ¶
const ( // ColumnPK is the name of the partition key ColumnPK = "PK" // ColumnSK is the name of the sort key ColumnSK = "SK" // ColumnHiddenUntil is the name of the hidden_until attribute where the visibility timeout // while polling messages is stored and compared against. ColumnHiddenUntil = "hidden_until" // ColumnOwner is the name of the owner attribute where the clientID is stored. ColumnOwner = "owner" // ColumnTTL is the name of the TTL attribute where the time to live is stored. // This is used when the message is pushed to the queue and DynamoDB is configured to // automatically delete the message when the TTL is reached. ColumnTTL = "TTL" // ColumnEvent is the name of the event attribute where the `events.SQSMessage` is stored. ColumnEvent = "event" )
const ( TableNameNotSet = "table name not set" QueueNameNotSet = "queue name not set" ClientIDNotSet = "client ID not set" )
const (
AttrSentTimestamp = "SentTimestamp"
)
const PartitionKeySeparator = "|"
const RecipientHandleSeparator = "&"
Variables ¶
This section is empty.
Functions ¶
func RandomPostfix ¶
RandomPostfix postfixes the in param name with an random integer
func RandomString ¶
func ToReceiptHandles ¶
func ToReceiptHandles(msgs []events.SQSMessage) []string
ToReceiptHandles will extract the receipt handles from the SQS messages.
Types ¶
type DynamoDBQueue ¶
type DynamoDBQueue struct {
// contains filtered or unexported fields
}
DynamoDBQueue is a implementation of a queue but have DynamoDB table as the backing store.
This implements a FIFO queue and allows for multiple "queues" to be present in the same table by concatenating the queue-name and consumer-id to the partition key.
Each consumer Id is therefore also considered as a separate queue.
Since multiple `DynamoDbQueue`/processes can simultaneously read/write/delete from the queue it will use a `visibilityTimeout` to ensure that only one consumer can process a message at a time.
The DynamoDB Table have the following schema:
|=== |PK |SK |hidden_until |owner |TTL |event |queueName-clientID |{unix-64-bit-timestamp + "_" + random_string} |{now()+visibilityTimeout} |owner |ttl |{events.SQSMessage} |===
The PK is separated using the `PartitionKeySeparator` of which is "+-+" by default.
When the query is commenced, it will query for all messages within PK (oldest first) and that now() is greater than hidden_until.
NOTE: The DynamoDB table must be configured with the TTL option on the TTL column in order for it to be deleted automatically.
func New ¶ added in v0.4.0
func New(cfg aws.Config, ttl time.Duration) *DynamoDBQueue
New creates a new `DynamoDBQueue` instance.
If the ttl is set to zero, it will use the default of 14 days.
func (*DynamoDBQueue) AsQueueType ¶
func (dp *DynamoDBQueue) AsQueueType(queueType QueueType) *DynamoDBQueue
AsQueueType will set the queue type to use. This affects the sorting of messages. FIFO is the default.
func (*DynamoDBQueue) ClientID ¶
func (dp *DynamoDBQueue) ClientID() string
func (*DynamoDBQueue) Count ¶
func (dq *DynamoDBQueue) Count(ctx context.Context) (int32, error)
Count will return the number of messages in the queue. It will return an error if it fails.
This will not count all messages in all queues in the table, just the one for current _queueName_ and _clientID_.
func (*DynamoDBQueue) CreateQueueTable ¶
func (dq *DynamoDBQueue) CreateQueueTable(ctx context.Context) (bool, error)
CreateQueueTable will create the DynamoDB table for the queue in on-demand mode.
It will have the _TTL_ enabled and therefore messages are automatically deleted.
It will return `true` if created, `false` if it already exists and an error if it fails.
NOTE: It may return an error *and* `true`, hence the table was created but there where some other issues, and the table needs to be managed.
func (*DynamoDBQueue) DeleteMessages ¶
func (dq *DynamoDBQueue) DeleteMessages( ctx context.Context, receiptHandles ...string, ) ([]string, error)
DeleteMessages will delete one or more messages from the queue.
Use the `events.SQSMessage.ReceiptHandle` to delete the message.
It will return those not deleted and an error if any.
func (*DynamoDBQueue) DropQueueTable ¶
func (dq *DynamoDBQueue) DropQueueTable(ctx context.Context) error
DropQueueTable will drop the queue table.
func (*DynamoDBQueue) List ¶ added in v0.3.0
func (dq *DynamoDBQueue) List(ctx context.Context) ([]QueueAndClientId, error)
List will return all queues that exists in the system.
CAUTION: This is a really expensive operation since it will perform a table scan. to get all partition keys.
func (*DynamoDBQueue) Logging ¶ added in v0.9.0
func (dq *DynamoDBQueue) Logging() bool
Logging returns `true` if logging is enabled.
func (*DynamoDBQueue) PartitionKey ¶
func (dq *DynamoDBQueue) PartitionKey() string
PartitionKey is the _queueName_ + '_' + _clientID_.
func (*DynamoDBQueue) PollMessages ¶
func (dq *DynamoDBQueue) PollMessages( ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int, ) ([]events.SQSMessage, error)
PollMessages will poll messages from the queue up to either maxMessages or until the timeout is reached. It will query for all messages within PK (oldest first) and that now() is greater than hidden_until.
If the message has been polled, it will be invisible to other consumers for the duration of the visibilityTimeout. The visibilityTimeout is set in milliseconds on the hidden_until attribute and hence cannot be any lower than that.
NOTE: The visibilityTimeout is when a message been fetched. If the message was fetched early and the _timeout_ is great, it may have consumed all the visibility timeout before the message is returned. Make sure to set the _visibilityTimeout_ much higher than the _timeout_ and calculate the `actual visibility timeout = timeout - visibilityTimeout`.
If the table is not set, it will return an error. If either of _queueName_ or _clientID_ is not set, it will return an error.
The recipient handle is a combination of the PK and SK. It also adds the hidden_until and owner to the receipt handle to ensure that the message is not accidentally deleted by another consumer.
It will poll until _timeout_ for _minMessages_, and will return at most _maxMessages_.
If _timeout_ is set to zero, it will do one fetch and return even if _minMessages_ is not reached.
func (*DynamoDBQueue) Purge ¶
func (dq *DynamoDBQueue) Purge(ctx context.Context) error
Purge deletes all messages from the queue based on the queueName and clientID.
If it fails it will return an error.
func (*DynamoDBQueue) PurgeAll ¶
func (dq *DynamoDBQueue) PurgeAll(ctx context.Context) error
PurgeAll deletes all items from the DynamoDB table.
If it fails, it will return an error.
func (*DynamoDBQueue) PushMessages ¶
func (dq *DynamoDBQueue) PushMessages( ctx context.Context, ttl time.Duration, messages ...events.SQSMessage, ) ([]events.SQSMessage, error)
PushMessages pushes one or more messages onto the database. It uses the unix nanosecond timestamp as SK. The primary key is the queueName{`PartitionKeySeparator`}clientID. If ttl is set to zero, it will use the default DynamoDBQueue.ttl.
This function strives to create unique SKs by using the unix nanosecond timestamp plus a random number with n digits. The n is by default 6 but can be altered to be higher by `SetRandomDigits()` function.
Maximum of 25 messages or 16MB is allowed.
The function returns any message that was not sent it is accompanied by an error.
func (*DynamoDBQueue) QueueName ¶
func (dp *DynamoDBQueue) QueueName() string
func (*DynamoDBQueue) RandomDigits ¶
func (dq *DynamoDBQueue) RandomDigits() int
func (*DynamoDBQueue) SetLogging ¶ added in v0.9.0
func (dq *DynamoDBQueue) SetLogging(enabled bool)
SetLogging will enable/disable logging.
func (*DynamoDBQueue) SetRandomDigits ¶
func (dp *DynamoDBQueue) SetRandomDigits(digits int) *DynamoDBQueue
SetRandomDigits will set a new number of random digits to append to the SK to ensure uniqueness. This is set to 6 by default.
func (*DynamoDBQueue) Table ¶
func (dp *DynamoDBQueue) Table() string
func (*DynamoDBQueue) TableExists ¶ added in v0.2.0
func (dq *DynamoDBQueue) TableExists(ctx context.Context) bool
TableExists will check if the table exist, if not, it will return `false`, otherwise it will return `true`.
func (*DynamoDBQueue) UseClientID ¶
func (dq *DynamoDBQueue) UseClientID(clientID string) *DynamoDBQueue
UseClientID will change the clientID for the queue.
CAUTION: The clientID *must* be set to a valid string otherwise it will panic!
func (*DynamoDBQueue) UseQueueName ¶
func (dq *DynamoDBQueue) UseQueueName(queueName string) *DynamoDBQueue
UseQueueName will change the queueName for the queue.
CAUTION: The queueName *must* be set to a valid string otherwise it will panic!
func (*DynamoDBQueue) UseTable ¶
func (dq *DynamoDBQueue) UseTable(table string) *DynamoDBQueue
type QueueAndClientId ¶ added in v0.3.0
QueueAndClientId is used when `List` operation.