dynamomq

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: MIT Imports: 16 Imported by: 0

README

Build Go Report Card Quality Gate Status Reliability Rating Security Rating Maintainability Rating Vulnerabilities Bugs Code Smells Coverage

Implementing message queueing with Amazon DynamoDB in Go.

Table of Contents

Current Status

This project is actively under development, but it is currently in version 0. Please be aware that the public API and exported methods may undergo changes.

Motivation

DynamoDB is a key-value and document database that delivers single-digit millisecond performance at any scale. It’s a serverless and fully managed service that you can use for mobile, web, gaming, ad tech, IoT, and other applications that need low-latency data access at a large scale.

There are many queuing implementations that offer persistence, single-message processing, and distributed computing. Some popular queuing solutions are Amazon SQS, Amazon MQ, Apache ActiveMQ, RabbitMQ, and Kafka. Those services handle various queuing features and functions with several different characteristics, such as methods of implementation, scaling, and performance.

However, most of those queuing systems cannot easily change the order of the items after they arrive in the queue. Discussed implementation with DynamoDB can change the order in the queue or cancel items before processing.

Quoted from AWS official blog: Implementing Priority Queueing with Amazon DynamoDB

Features

  • Redelivery: Redeliver messages that have not completed successfully for a specified number of times.
  • Concurrent Execution: Process concurrently using multiple goroutines.
  • Dead Letter Queue: Move messages that exceed the maximum number of redeliveries to the dead letter queue.
  • Graceful Shutdown: Complete processing of messages before shutting down the consumer process.
  • FIFO (First In, First Out): Retrieve messages from the message queue on a first-in, first-out basis.
  • Consumer Process Scaling: Scale out by running multiple consumer processes without duplicating message retrieval from the same message queue.
  • Visibility Timeout: DynamoMQ sets a visibility timeout, a period of time during which DynamoMQ prevents all consumers from receiving and processing the message.
  • Delay queues: Delay queues allow you to delay the delivery of new messages to consumers for a set number of seconds.
  • Deduplication: Deduplication messages within the message queue.
  • Randomized Exponential Backoff: Prevent overlapping redelivery timing.
  • Batch Message Processing: Send and delete multiple messages in bulk to/from the message queue.
  • Message Compression

Installation

Requires Go version 1.21 or greater.

DynamoMQ CLI

This package can be installed as CLI with the go install command:

$ go install github.com/vvatanabe/dynamomq/cmd/dynamomq@latest

DynamoMQ Library

This package can be installed as library with the go get command:

$ go get -u github.com/vvatanabe/dynamomq@latest

Setup DynamoMQ

Required IAM Policy

Please refer to dynamomq-iam-policy.json or dynamomq-iam-policy.tf

Create Table with AWS CLI

aws dynamodb create-table --cli-input-json file://dynamomq-table.json 

Please refer to dynamomq-table.json.

Create Table with Terraform

Please refer to dynamomq-table.tf.

Authentication and access credentials

DynamoMQ's CLI and library configure AWS Config with credentials obtained from external configuration sources. This setup allows for flexible and secure management of access credentials. The following are the default sources for configuration:

Environment Variables

  • AWS_REGION - Specifies the AWS region.
  • AWS_PROFILE - Identifies the AWS profile to be used.
  • AWS_ACCESS_KEY_ID - Your AWS access key.
  • AWS_SECRET_ACCESS_KEY - Your AWS secret key.
  • AWS_SESSION_TOKEN - Session token for temporary credentials.

Shared Configuration and Credentials Files

These files provide a common location for storing AWS credentials and configuration settings, enabling consistent credential management across different AWS tools and applications.

Usage for DynamoMQ CLI

The dynamomq command-line interface provides a range of commands to interact with your DynamoDB-based message queue. Below are the available commands and global flags that can be used with dynamomq.

Available Commands

  • completion: Generate the autocompletion script for the specified shell to ease command usage.
  • delete: Delete a message from the queue using its ID.
  • dlq: Retrieve the statistics for the Dead Letter Queue (DLQ), providing insights into failed message processing.
  • enqueue-test: Send test messages to the DynamoDB table with IDs A-101, A-202, A-303, and A-404; existing messages with these IDs will be overwritten.
  • fail: Simulate the failure of message processing, which will return the message to the queue for reprocessing.
  • get: Fetch a specific message from the DynamoDB table using the application domain ID.
  • help: Display help information about any command.
  • invalid: Move a message from the standard queue to the DLQ for manual review and correction.
  • ls: List all message IDs in the queue, limited to a maximum of 10 elements.
  • purge: Remove all messages from the DynamoMQ table, effectively clearing the queue.
  • qstat: Retrieve statistics for the queue, offering an overview of its current state.
  • receive: Receive a message from the queue; this operation will replace the current message ID with the retrieved one.
  • redrive: Move a message from the DLQ back to the standard queue for reprocessing.
  • reset: Reset the system information of a message, typically used in message recovery scenarios.

Global Flags

  • --endpoint-url: Override the default URL for commands with a specified endpoint URL.
  • -h, --help: Display help information for dynamomq.
  • --queueing-index-name: Specify the name of the queueing index to use (default is "dynamo-mq-index-queue_type-sent_at").
  • --table-name: Define the name of the DynamoDB table to contain the items (default is "dynamo-mq-table").

To get more detailed information about a specific command, use dynamomq [command] --help.

Example Usage

Here are a few examples of how to use the dynamomq commands:

# Generate autocompletion script for bash
dynamomq completion bash

# Delete a message with ID 'A-123'
dynamomq delete --id A-123

# Retrieve DLQ statistics
dynamomq dlq

# Enqueue test messages
dynamomq enqueue-test

# Get a message by ID
dynamomq get --id A-123

# List the first 10 message IDs in the queue
dynamomq ls

# Receive a message from the queue
dynamomq receive

# Reset system information of a message with ID
dynamomq reset --id A-123

Interactive Mode

The DynamoMQ CLI supports an Interactive Mode for an enhanced user experience. To enter the Interactive Mode, simply run the dynamomq command without specifying any subcommands.

Interactive Mode Commands

Once in Interactive Mode, you will have access to a suite of commands to manage and inspect your message queue:

  • qstat or qstats: Retrieves the queue statistics.
  • dlq: Retrieves the Dead Letter Queue (DLQ) statistics.
  • enqueue-test or et: Sends test messages to the DynamoDB table with IDs: A-101, A-202, A-303, and A-404; if a message with the same ID already exists, it will be overwritten.
  • purge: Removes all messages from the DynamoMQ table.
  • ls: Lists all message IDs, displaying a maximum of 10 elements.
  • receive: Receives a message from the queue and replaces the current ID with the peeked one.
  • id <id>: Switches the Interactive Mode to app mode, allowing you to perform various operations on a message identified by the provided app domain ID:
    • sys: Displays the system info data in a JSON format.
    • data: Prints the data as JSON for the current message record.
    • info: Prints all information regarding the Message record, including system_info and data in JSON format.
    • reset: Resets the system info of the message.
    • redrive: Drives a message from the DLQ back to the STANDARD queue.
    • delete: Deletes a message by its ID.
    • fail: Simulates the failed processing of a message by putting it back into the queue; the message will need to be received again.
    • invalid: Moves a message from the standard queue to the DLQ for manual fixing.

Usage for DynamoMQ Library

DynamoMQ Client

To begin using DynamoMQ, first import the necessary packages from the AWS SDK for Go v2 and the DynamoMQ library. These imports are required to interact with AWS services and to utilize the DynamoMQ functionalities.

import (
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/vvatanabe/dynamomq"
)

The following code block initializes the DynamoMQ client. It loads the AWS configuration and creates a new DynamoMQ client with that configuration. Replace 'ExampleData' with your own data structure as needed.

ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
  panic("failed to load aws config")
}
client, err := dynamomq.NewFromConfig[ExampleData](cfg)
if err != nil {
  panic("AWS session could not be established!")
}

Define the data structure that will be used with DynamoMQ. Here, 'ExampleData' is a struct that will be used to represent the data in the DynamoDB.

type ExampleData struct {
	Data1 string `dynamodbav:"data_1"`
	Data2 string `dynamodbav:"data_2"`
	Data3 string `dynamodbav:"data_3"`
}

DynamoMQ Producer

The following snippet creates a DynamoMQ producer for the 'ExampleData' type. It then sends a message with predefined data to the queue.

producer := dynamomq.NewProducer[ExampleData](client)
_, err = producer.Produce(ctx, &dynamomq.ProduceInput[ExampleData]{
  Data: ExampleData{
    Data1: "foo",
    Data2: "bar",
    Data3: "baz",
  },
})
if err != nil {
  panic("failed to produce message")
}

DynamoMQ Consumer

To consume messages, instantiate a DynamoMQ consumer for 'ExampleData' and start it in a new goroutine. The consumer will process messages until an interrupt signal is received. The example includes graceful shutdown logic for the consumer.

consumer := dynamomq.NewConsumer[ExampleData](client, &Counter[ExampleData]{})
go func() {
  err = consumer.StartConsuming()
  if err != nil {
    fmt.Println(err)
  }
}()

done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)

<-done

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

if err := consumer.Shutdown(ctx); err != nil {
  fmt.Println("failed to consumer shutdown:", err)
}

Here we define a 'Counter' type that implements the processing logic for consumed messages. Each time a message is processed, the counter is incremented, and the message details are printed.

type Counter[T any] struct {
	Value int
}

func (c *Counter[T]) Process(msg *dynamomq.Message[T]) error {
	c.Value++
	fmt.Printf("value: %d, message: %v\n", c.Value, msg)
	return nil
}

Software Design

State Machine

The state machine diagram below illustrates the key steps a message goes through as it traverses the system.

State Machine

Basic Flow
  1. SendMessage(): A user sends a message that is placed in the READY state in the queue.

  2. ReceiveMessage(): The message moves from READY to PROCESSING status as it is picked up for processing.

  3. DeleteMessage(): If processing is successful, the message is deleted from the queue.

Error Handling
  1. ChangeMessageVisibility(): If processing fails, the message is made visible again in the READY state for retry, and its visibility timeout is updated.

  2. MoveMessageToDLQ(): If the message exceeds the retry limit, it is moved to the Dead Letter Queue (DLQ). The DLQ is used to isolate problematic messages for later analysis.

Dead Letter Queue (DLQ)
  1. RedriveMessage(): The system may choose to return a message to the standard queue if it determines that the issues have been resolved. This is achieved through the Redrive operation.

  2. ReceiveMessage(): Messages in the DLQ are also moved from READY to PROCESSING status, similar to regular queue messages.

  3. DeleteMessage(): Once a message in the DLQ is successfully processed, it is deleted from the queue.

This design ensures that DynamoMQ maintains message reliability while enabling tracking and analysis of messages in the event of errors. The use of a DLQ minimizes the impact of failures while maintaining system resiliency.

Table Definition

The DynamoDB table for the DynamoMQ message queue system is designed to efficiently manage and track the status of messages. Here’s a breakdown of the table schema:

Key Attributes Type Example Value
PK id string A-101
data any any
receive_count number 1
GSIPK queue_type string STANDARD or DLQ
version number 1
created_at string 2006-01-02T15:04:05.999999999Z07:00
updated_at string 2006-01-02T15:04:05.999999999Z07:00
GSISK sent_at string 2006-01-02T15:04:05.999999999Z07:00
received_at string 2006-01-02T15:04:05.999999999Z07:00
invisible_until_at string 2006-01-02T15:04:05.999999999Z07:00

PK (Primary Key) ID: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages.

GSIPK (Global Secondary Index - Partition Key) queue_type: Used to categorize messages by queue_type, such as 'STANDARD' or 'DLQ' (Dead Letter Queue), allowing for quick access and operations on subsets of the queue.

GSISK (Global Secondary Index - Sort Key) sent_at: The timestamp when the message was sent to the queue. Facilitates the ordering of messages based on the time they were added to the queue, which is useful for implementing FIFO (First-In-First-Out) or other ordering mechanisms.

Attributes: These are the various properties associated with each message:

  • data: This attribute holds the content of the message and can be of any type.
  • receive_count: A numerical count of how many times the message has been retrieved from the queue.
  • version: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified.
  • created_at: The date and time when the message was created. ISO 8601 format.
  • updated_at: The date and time when the message was last updated. ISO 8601 format.
  • received_at: The timestamp when the message was last viewed without being altered. ISO 8601 format.
  • invisible_until_at: The timestamp indicating when the message becomes visible in the queue for processing. ISO 8601 format.

Data Transition

This data transition diagram serves as a map for developers and operators to understand how messages flow through the DynamoMQ system, providing insight into the mechanisms of message processing, failure handling, and retries within a DynamoDB-backed queue.

Data Transition

Initial State
  • SendMessage(): A message is created with an initial status of 'READY'. It includes a unique id, arbitrary data, and a receive_count set to 0, indicating it has not yet been processed. The queue_type is 'STANDARD', and timestamps are recorded for creation, last update, and when added to the queue.
Processing
  • ReceiveMessage(): The message status changes to 'PROCESSING', the receive_count increments to reflect the number of times it's been retrieved, and the version number increases to facilitate optimistic locking. Timestamps are updated accordingly.
Retry Logic
  • ChangeMessageVisibility(): If processing fails, the message's visibility is updated to make it available for retry, and the receive_count is incremented. Timestamps are refreshed to reflect the most recent update.
Dead Letter Queue
  • MoveMessageToDLQ(): After the maximum number of retries is reached without successful processing, the message is moved to the DLQ. Its queue_type changes to 'DLQ', and receive_count is reset, indicating that it's ready for a fresh attempt or investigation.
Redrive Policy
  • RedriveMessage(): If issues are resolved, messages in the DLQ can be sent back to the standard queue for processing. This is depicted by the RedriveMessage() operation, which resets the receive_count and alters the queue_type back to 'STANDARD', along with updating the timestamps.

Authors

  • vvatanabe - Main contributor
  • Currently, there are no other contributors

License

This project is licensed under the MIT License. For detailed licensing information, refer to the LICENSE file included in the repository.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed")

ErrConsumerClosed is an error that indicates the Consumer has been closed. This error is returned when operations are attempted on a Consumer that has already been shut down.

Functions

func WithAWSBaseEndpoint

func WithAWSBaseEndpoint(baseEndpoint string) func(*ClientOptions)

WithAWSBaseEndpoint is an option function to set a custom base endpoint for AWS services. This function is useful when you want the client to interact with a specific AWS service endpoint, such as a local or a different regional endpoint. If the DynamoDB client is set using the WithAWSDynamoDBClient function, this option function is ignored.

func WithAWSDynamoDBClient

func WithAWSDynamoDBClient(client *dynamodb.Client) func(*ClientOptions)

WithAWSDynamoDBClient is an option function to set a custom AWS DynamoDB client for the DynamoMQ client. This function is used to provide a pre-configured DynamoDB client that the DynamoMQ client will use for all interactions with DynamoDB.

func WithAWSRetryMaxAttempts

func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions)

WithAWSRetryMaxAttempts is an option function to set the maximum number of retry attempts for AWS service calls. Use this function to define how many times the client should retry a failed AWS service call. If the DynamoDB client is set using the WithAWSDynamoDBClient function, this option function is ignored.

func WithConcurrency added in v0.11.0

func WithConcurrency(concurrency int) func(o *ConsumerOptions)

WithConcurrency sets the number of concurrent workers for processing messages in the Consumer. This function determines how many messages can be processed at the same time.

func WithErrorLog

func WithErrorLog(errorLog *log.Logger) func(o *ConsumerOptions)

WithErrorLog sets a custom logger for the Consumer. This function configures an optional logger for errors. If nil, the standard logger is used.

func WithIDGenerator added in v0.10.1

func WithIDGenerator(idGenerator func() string) func(o *ProducerOptions)

WithIDGenerator is an option function to set a custom ID generator for the Producer. Use this function to provide a custom function that generates unique identifiers for messages. The default ID generator is uuid.NewString.

func WithMaximumReceives

func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions)

WithMaximumReceives sets the maximum number of times a message can be delivered to the Consumer. This function configures the limit on how many times a message will be attempted for delivery before being considered a failure or moved to a Dead Letter Queue, if applicable.

func WithOnShutdown

func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions)

WithOnShutdown adds functions to be called during the Consumer's shutdown process. This function appends to the list of callbacks executed when the Consumer is shutting down.

func WithPollingInterval

func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions)

WithPollingInterval sets the polling interval for the Consumer. This function configures the time interval at which the Consumer polls the DynamoDB queue for new messages.

func WithQueueType added in v0.9.0

func WithQueueType(queueType QueueType) func(o *ConsumerOptions)

WithQueueType sets the type of queue (STANDARD or DLQ) for the Consumer. This function allows specification of the queue type the Consumer will operate on.

func WithQueueingIndexName added in v0.8.0

func WithQueueingIndexName(queueingIndexName string) func(*ClientOptions)

WithQueueingIndexName is an option function to set the queue index name for the DynamoMQ client. This function allows defining a custom index name that the client will use for queue operations, optimizing message handling. By default, the index name is set to "dynamo-mq-index-queue_type-sent_at".

func WithRetryInterval added in v0.11.0

func WithRetryInterval(sec int) func(o *ConsumerOptions)

WithRetryInterval sets the retry interval for failed messages in the Consumer. This function specifies the time interval (in seconds) before a failed message is retried.

func WithTableName

func WithTableName(tableName string) func(*ClientOptions)

WithTableName is an option function to set the table name for the DynamoMQ client. Use this function to specify the name of the DynamoDB table that the client will use for storing and retrieving messages. By default, the table name is set to "dynamo-mq-table".

func WithUseFIFO

func WithUseFIFO(useFIFO bool) func(*ClientOptions)

WithUseFIFO is an option function to enable FIFO (First-In-First-Out) behavior for the DynamoMQ client. Setting this option to true makes the client treat the queue as a FIFO queue; otherwise, it is treated as a standard queue. By default, this option is set to false.

func WithVisibilityTimeout added in v0.11.0

func WithVisibilityTimeout(sec int) func(o *ConsumerOptions)

WithVisibilityTimeout sets the visibility timeout for messages in the Consumer. This function configures the duration (in seconds) a message remains invisible in the queue after being received.

Types

type BuildingExpressionError

type BuildingExpressionError struct {
	Cause error
}

BuildingExpressionError represents an error during the building of a DynamoDB expression.

func (BuildingExpressionError) Error

func (e BuildingExpressionError) Error() string

Error returns a detailed error message including the underlying cause for BuildingExpressionError.

type ChangeMessageVisibilityInput added in v0.11.0

type ChangeMessageVisibilityInput struct {
	// ID is The unique identifier of the message for which visibility is to be changed.
	ID string
	// VisibilityTimeout is The new timeout in seconds during which the message becomes invisible to other receivers.
	// After this time elapses, the message will become visible in the queue again
	VisibilityTimeout int
}

ChangeMessageVisibilityInput represents the input parameters for changing the visibility timeout of a specific message in a DynamoDB-based queue.

type ChangeMessageVisibilityOutput added in v0.11.0

type ChangeMessageVisibilityOutput[T any] struct {
	// ChangedMessage is a pointer to the Message type containing information about the message with changed visibility.
	// The type T determines the format of the message content.
	ChangedMessage *Message[T]
}

ChangeMessageVisibilityOutput represents the result of the operation to change the visibility of a message. This struct uses the generic type T and contains information about the message whose visibility has been changed.

type Client added in v0.6.0

type Client[T any] interface {
	// SendMessage sends a message to the DynamoDB-based queue.
	SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error)
	// ReceiveMessage retrieves and processes a message from a DynamoDB-based queue.
	ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error)
	// ChangeMessageVisibility changes the visibility of a specific message in a DynamoDB-based queue.
	ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error)
	// DeleteMessage deletes a specific message from a DynamoDB-based queue.
	DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error)
	// MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
	MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)
	// RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ).
	RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error)
	// GetMessage get a specific message from a DynamoDB-based queue.
	GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error)
	// GetQueueStats is a method for obtaining statistical information about a DynamoDB-based queue.
	GetQueueStats(ctx context.Context, params *GetQueueStatsInput) (*GetQueueStatsOutput, error)
	// GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
	GetDLQStats(ctx context.Context, params *GetDLQStatsInput) (*GetDLQStatsOutput, error)
	// ListMessages get a list of messages from a DynamoDB-based queue.
	ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error)
	// ReplaceMessage replace a specific message within a DynamoDB-based queue.
	ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error)
}

Client is an interface for interacting with a DynamoDB-based message queue system. It provides methods for various operations on messages within the queue. This interface is generic and works with any type T, which represents the structure of the message content.

func NewFromConfig added in v0.6.0

func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Client[T], error)

NewFromConfig creates a new DynamoMQ client using the provided AWS configuration and any additional client options. This function initializes a new client with default settings, which can be customized using option functions. It returns an error if the initialization of the DynamoDB client fails.

type ClientImpl added in v0.11.0

type ClientImpl[T any] struct {
	// contains filtered or unexported fields
}

ClientImpl is a concrete implementation of the dynamomq.Client interface. Note: ClientImpl cannot be used directly. Always use the dynamomq.NewFromConfig function to create an instance.

func (*ClientImpl[T]) ChangeMessageVisibility added in v0.11.0

func (c *ClientImpl[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error)

ChangeMessageVisibility changes the visibility of a specific message in a DynamoDB-based queue. It retrieves the message based on the specified message ID and alters its visibility timeout. The visibility timeout specifies the duration during which the message, once retrieved from the queue, becomes invisible to other clients. Modifying this timeout value allows dynamic adjustment of the message processing time.

func (*ClientImpl[T]) DeleteMessage added in v0.11.0

func (c *ClientImpl[T]) DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error)

DeleteMessage deletes a specific message from a DynamoDB-based queue. It directly deletes the message from DynamoDB based on the specified message ID.

func (*ClientImpl[T]) GetDLQStats added in v0.11.0

func (c *ClientImpl[T]) GetDLQStats(ctx context.Context, _ *GetDLQStatsInput) (*GetDLQStatsOutput, error)

GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ). It provides statistics on the messages within the DLQ. This includes the IDs of the first 100 messages in the queue and the total number of records in the DLQ. This functions offers vital information for monitoring and analyzing the message queue system, aiding in understanding the status of the DLQ.

func (*ClientImpl[T]) GetMessage added in v0.11.0

func (c *ClientImpl[T]) GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error)

GetMessage get a specific message from a DynamoDB-based queue. It retrieves the message from DynamoDB based on the specified message ID. The retrieved message is then unmarshaled into the specified generic type T.

func (*ClientImpl[T]) GetQueueStats added in v0.11.0

func (c *ClientImpl[T]) GetQueueStats(ctx context.Context, _ *GetQueueStatsInput) (*GetQueueStatsOutput, error)

GetQueueStats get statistical information about a DynamoDB-based queue. It provides statistics about the messages in the queue and their processing status. This includes the IDs of the first 100 messages in the queue, the first 100 IDs of messages selected for processing, the total number of records in the queue, the number of records currently in processing, and the number of records awaiting processing. This function provides essential information for monitoring and analyzing the message queue system, aiding in understanding the status of the queue.

func (*ClientImpl[T]) ListMessages added in v0.11.0

func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error)

ListMessages get a list of messages from a DynamoDB-based queue. It scans and retrieves messages from DynamoDB based on the specified size parameter. If the size is not specified or is zero or less, a default maximum list size of 10 is used. The retrieved messages are unmarshaled into an array of the generic type T and are sorted based on the update time.

func (*ClientImpl[T]) MoveMessageToDLQ added in v0.11.0

func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)

MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). It locates the message based on the specified message ID and marks it for the DLQ. Moving a message to the DLQ allows for the isolation of failed message processing, facilitating later analysis and reprocessing.

func (*ClientImpl[T]) ReceiveMessage added in v0.11.0

func (c *ClientImpl[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error)

ReceiveMessage retrieves and processes a message from a DynamoDB-based queue using the generic type T. The selection process involves constructing and executing a DynamoDB query based on the queue type and visibility timeout. After a message is selected, its status, including visibility and version, is updated to ensure the message remains invisible and in processing for a defined period. This process is crucial for maintaining queue integrity and preventing duplicate message delivery. If no messages are available for reception, an EmptyQueueError is returned. Additionally, when FIFO (First In, First Out) is enabled, the method guarantees that only one valid message is processed at a time.

func (*ClientImpl[T]) RedriveMessage added in v0.11.0

func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error)

RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ). It locates the message based on the specified message ID and marks it as restored from the DLQ to the standard queue. This process is essential for reprocessing messages that have failed to be processed and is a crucial function in error handling within the message queue system.

func (*ClientImpl[T]) ReplaceMessage added in v0.11.0

func (c *ClientImpl[T]) ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error)

ReplaceMessage replace a specific message within a DynamoDB-based queue. It searches for an existing message based on the specified message ID and deletes it if found. Then, a new message is added to the queue. If a message with the specified ID does not exist, the new message is directly added to the queue.

func (*ClientImpl[T]) SendMessage added in v0.11.0

func (c *ClientImpl[T]) SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error)

SendMessage sends a message to the DynamoDB-based message queue. It checks for message ID duplication and handles message delays if specified. This function takes a context and a SendMessageInput parameter. SendMessageInput contains the message ID, data, and an optional delay in seconds. If the message ID already exists in the queue, it returns an IDDuplicatedError. Otherwise, it adds the message to the queue. The function also handles message delays. If DelaySeconds is greater than 0 in the input parameter, the message will be delayed accordingly before being sent.

type ClientOptions added in v0.6.0

type ClientOptions struct {
	// DynamoDB is a pointer to the DynamoDB client used for database operations.
	DynamoDB *dynamodb.Client
	// TableName is the name of the DynamoDB table used for the queue.
	TableName string
	// QueueingIndexName is the name of the index used for queueing operations.
	QueueingIndexName string
	// MaximumReceives is the maximum number of times a message is delivered before being moved to the DLQ.
	MaximumReceives int
	// UseFIFO is a boolean indicating if the queue should behave as a First-In-First-Out (FIFO) queue.
	UseFIFO bool
	// BaseEndpoint is the base endpoint URL for DynamoDB requests.
	BaseEndpoint string
	// RetryMaxAttempts is the maximum number of attempts for retrying failed DynamoDB operations.
	RetryMaxAttempts int

	// Clock is an abstraction of time operations, allowing control over time during tests.
	Clock clock.Clock
	// MarshalMap is a function to marshal objects into a map of DynamoDB attribute values.
	MarshalMap func(in interface{}) (map[string]types.AttributeValue, error)
	// UnmarshalMap is a function to unmarshal a map of DynamoDB attribute values into objects.
	UnmarshalMap func(m map[string]types.AttributeValue, out interface{}) error
	// UnmarshalListOfMaps is a function to unmarshal a list of maps of DynamoDB attribute values into objects.
	UnmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error
	// BuildExpression is a function to build DynamoDB expressions from a builder.
	BuildExpression func(b expression.Builder) (expression.Expression, error)
}

ClientOptions defines configuration options for the DynamoMQ client.

Note: The following fields are primarily used for testing purposes. They allow for stubbing of operations during tests, facilitating the mocking of behavior without relying on a real DynamoDB instance:

  • Clock
  • MarshalMap
  • UnmarshalMap
  • UnmarshalListOfMaps
  • BuildExpression

In typical use, these testing fields should not be modified. They are provided to support advanced use cases, like unit testing, where control over these operations is necessary.

type ConditionalCheckFailedError

type ConditionalCheckFailedError struct {
	Cause error
}

ConditionalCheckFailedError represents an error when a condition check on the 'version' attribute fails.

func (ConditionalCheckFailedError) Error

Error returns a detailed error message including the underlying cause for ConditionalCheckFailedError.

type Consumer

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

Consumer is a struct responsible for consuming messages from a DynamoDB-based queue. It supports generic message types and includes settings such as concurrency, polling intervals, and more. Note: To create a new instance of Consumer, it is necessary to use the NewConsumer function.

func NewConsumer

func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T]

NewConsumer creates a new Consumer instance with the specified client, message processor, and options. It configures the Consumer with default values which can be overridden by the provided option functions.

func (*Consumer[T]) Shutdown

func (c *Consumer[T]) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the Consumer, stopping the message consumption and executing any registered shutdown callbacks.

func (*Consumer[T]) StartConsuming added in v0.9.0

func (c *Consumer[T]) StartConsuming() error

StartConsuming starts the message consumption process, polling the queue for messages and processing them. The method handles message retrieval, processing, error handling, retries, and moving messages to the DLQ if necessary.

type ConsumerOptions added in v0.6.0

type ConsumerOptions struct {
	// PollingInterval specifies the time interval at which the Consumer polls the DynamoDB queue for new messages.
	PollingInterval time.Duration
	// Concurrency sets the number of concurrent message processing workers.
	Concurrency int
	// MaximumReceives defines the maximum number of times a message can be delivered.
	MaximumReceives int
	// VisibilityTimeout sets the duration (in seconds) a message remains invisible in the queue after being received.
	VisibilityTimeout int
	// RetryInterval defines the time interval (in seconds) before a failed message is retried.
	RetryInterval int
	// QueueType determines the type of queue (STANDARD or DLQ) the Consumer will operate on.
	QueueType QueueType
	// ErrorLog is an optional logger for errors. If nil, the standard logger is used.
	ErrorLog *log.Logger
	// OnShutdown is a slice of functions called when the Consumer is shutting down.
	OnShutdown []func()
}

ConsumerOptions contains configuration options for a Consumer instance. It allows customization of polling intervals, concurrency levels, visibility timeouts, and more.

Consumer functions for setting various ConsumerOptions.

type DeleteMessageInput added in v0.7.0

type DeleteMessageInput struct {
	// ID is the unique identifier of the message to be deleted from the queue.
	ID string
}

DeleteMessageInput represents the input parameters for deleting a specific message from a DynamoDB-based queue.

type DeleteMessageOutput added in v0.7.0

type DeleteMessageOutput struct{}

DeleteMessageOutput represents the result of the delete message operation. This struct is empty as the delete operation does not return any specific information.

type DynamoDBAPIError

type DynamoDBAPIError struct {
	Cause error
}

DynamoDBAPIError represents a generic error encountered when making a DynamoDB API call.

func (DynamoDBAPIError) Error

func (e DynamoDBAPIError) Error() string

Error returns a detailed error message including the underlying cause for DynamoDBAPIError.

type EmptyQueueError

type EmptyQueueError struct{}

EmptyQueueError represents an error when an operation cannot proceed due to an empty queue.

func (EmptyQueueError) Error

func (e EmptyQueueError) Error() string

Error returns a standard error message for EmptyQueueError.

type GetDLQStatsInput added in v0.7.0

type GetDLQStatsInput struct{}

GetDLQStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based Dead Letter Queue (DLQ). This struct does not contain any fields as it's used to request general DLQ statistics without the need for specific parameters.

type GetDLQStatsOutput added in v0.7.0

type GetDLQStatsOutput struct {
	// First100IDsInQueue is an array of the first 100 message IDs currently in the DLQ.
	First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
	// TotalMessagesInDLQ is the total number of messages present in the DLQ.
	TotalMessagesInDLQ int `json:"total_messages_in_DLQ"`
}

GetDLQStatsOutput represents the output containing statistical information about the Dead Letter Queue (DLQ).

type GetMessageInput added in v0.7.0

type GetMessageInput struct {
	// ID is the unique identifier of the message to be retrieved from the queue.
	ID string
}

GetMessageInput represents the input parameters for retrieving a specific message from a DynamoDB-based queue.

type GetMessageOutput added in v0.7.0

type GetMessageOutput[T any] struct {
	// Message is a pointer to the Message type containing information about the retrieved message.
	// The type T determines the format of the message content.
	Message *Message[T]
}

GetMessageOutput represents the result of the operation to retrieve a message. This struct uses the generic type T and contains information about the retrieved message.

type GetQueueStatsInput added in v0.7.0

type GetQueueStatsInput struct{}

GetQueueStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based queue. This struct does not contain any fields as it's used to request general queue statistics without the need for specific parameters.

type GetQueueStatsOutput added in v0.7.0

type GetQueueStatsOutput struct {
	// First100IDsInQueue is an array of the first 100 message IDs currently in the queue.
	First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
	// First100IDsInQueueProcessing is an array of the first 100 message IDs that are currently being processed.
	First100IDsInQueueProcessing []string `json:"first_100_IDs_in_queue_processing"`
	// TotalMessagesInQueue is the total number of messages present in the queue.
	TotalMessagesInQueue int `json:"total_messages_in_queue"`
	// TotalMessagesInQueueProcessing is the total number of messages that are currently in the process of being handled.
	TotalMessagesInQueueProcessing int `json:"total_messages_in_queue_processing"`
	// TotalMessagesInQueueReady is the total number of messages in the queue that are ready to be processed and have not started processing yet.
	TotalMessagesInQueueReady int `json:"total_messages_in_queue_ready"`
}

GetQueueStatsOutput represents the output containing statistical information about a DynamoDB-based queue.

type IDDuplicatedError

type IDDuplicatedError struct{}

IDDuplicatedError represents an error when a provided ID is duplicated in the system.

func (IDDuplicatedError) Error

func (e IDDuplicatedError) Error() string

Error returns a standard error message for IDDuplicatedError.

type IDNotFoundError

type IDNotFoundError struct{}

IDNotFoundError represents an error when a provided ID is not found in DynamoDB.

func (IDNotFoundError) Error

func (e IDNotFoundError) Error() string

Error returns a standard error message for IDNotFoundError.

type IDNotProvidedError

type IDNotProvidedError struct{}

IDNotProvidedError represents an error when an ID is not provided where it is required.

func (IDNotProvidedError) Error

func (e IDNotProvidedError) Error() string

Error returns a standard error message for IDNotProvidedError.

type InvalidStateTransitionError added in v0.6.0

type InvalidStateTransitionError struct {
	Msg       string
	Operation string
	Current   Status
}

InvalidStateTransitionError represents an error for invalid state transitions during operations.

func (InvalidStateTransitionError) Error added in v0.6.0

Error returns a detailed error message explaining the invalid state transition.

type ListMessagesInput added in v0.7.0

type ListMessagesInput struct {
	// Size is the number of messages to be listed from the queue. It determines the maximum size of the returned message list.
	Size int32
}

ListMessagesInput represents the input parameters for listing messages from a DynamoDB-based queue.

type ListMessagesOutput added in v0.7.0

type ListMessagesOutput[T any] struct {
	// Messages is an array of pointers to Message types, containing information about each listed message.
	// The type T determines the format of the message content for each message in the array.
	Messages []*Message[T]
}

ListMessagesOutput represents the result of the operation to list messages from the queue. This struct uses the generic type T and contains an array of messages.

type MarshalingAttributeError

type MarshalingAttributeError struct {
	Cause error
}

MarshalingAttributeError represents an error during the marshaling of DynamoDB attributes.

func (MarshalingAttributeError) Error

func (e MarshalingAttributeError) Error() string

Error returns a detailed error message including the underlying cause for MarshalingAttributeError.

type Message

type Message[T any] struct {
	// ID is a unique identifier for the message.
	ID string `json:"id" dynamodbav:"id"`
	// Data is the content of the message. The type T defines the format of this data.
	Data T `json:"data" dynamodbav:"data"`
	// ReceiveCount is the number of times the message has been received from the queue.
	ReceiveCount int `json:"receive_count" dynamodbav:"receive_count"`
	// QueueType is the type of queue (standard or DLQ) to which the message belongs.
	QueueType QueueType `json:"queue_type" dynamodbav:"queue_type,omitempty"`
	// Version is the version number of the message, used for optimistic concurrency control.
	Version int `json:"version" dynamodbav:"version"`
	// CreatedAt is the timestamp when the message was created.
	CreatedAt string `json:"created_at" dynamodbav:"created_at"`
	// UpdatedAt is the timestamp when the message was last updated.
	UpdatedAt string `json:"updated_at" dynamodbav:"updated_at"`
	// SentAt is the timestamp when the message was sent to the queue.
	SentAt string `json:"sent_at" dynamodbav:"sent_at"`
	// ReceivedAt is the timestamp when the message was last received from the queue.
	ReceivedAt string `json:"received_at" dynamodbav:"received_at"`
	// InvisibleUntilAt: The deadline until which the message remains invisible in the queue.
	// Until this timestamp, the message will not be visible to other consumers.
	InvisibleUntilAt string `json:"invisible_until_at" dynamodbav:"invisible_until_at"`
}

Message represents a message structure in a DynamoDB-based queue system. It uses the generic type T for the message content, allowing for flexibility in the data type of the message payload. This struct includes tags for JSON serialization (`json:"..."`) and DynamoDB attribute value (`dynamodbav:"..."`) mappings.

func NewMessage added in v0.9.0

func NewMessage[T any](id string, data T, now time.Time) *Message[T]

NewMessage creates a new instance of a Message with the provided data and initializes its timestamps. This function is a constructor for Message, setting initial values and preparing the message for use in the queue.

func (*Message[T]) GetStatus added in v0.11.0

func (m *Message[T]) GetStatus(now time.Time) Status

GetStatus determines the current status of the message based on the provided time. It returns the status as either 'StatusReady' or 'StatusProcessing'.

StatusReady if the message is ready to be processed (either 'InvisibleUntilAt' is empty or the current time is after the 'InvisibleUntilAt' time). StatusProcessing if the current time is before the 'InvisibleUntilAt' time, indicating that the message is currently being processed and is not yet ready for further processing.

type MessageProcessor

type MessageProcessor[T any] interface {
	// Process handles the processing of a message.
	// It takes a pointer to a Message of type T and returns an error if the processing fails.
	Process(msg *Message[T]) error
}

MessageProcessor is an interface defining a method to process messages of a generic type T. It is used in the context of consuming messages from a DynamoDB-based queue.

type MessageProcessorFunc added in v0.10.1

type MessageProcessorFunc[T any] func(msg *Message[T]) error

MessageProcessorFunc is a functional type that implements the MessageProcessor interface. It allows using a function as a MessageProcessor.

func (MessageProcessorFunc[T]) Process added in v0.10.1

func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error

Process calls the MessageProcessorFunc itself to process the message. It enables the function type to adhere to the MessageProcessor interface.

type MoveMessageToDLQInput added in v0.7.0

type MoveMessageToDLQInput struct {
	// ID is the unique identifier of the message to be moved to the DLQ.
	ID string
}

MoveMessageToDLQInput represents the input parameters for moving a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).

type MoveMessageToDLQOutput added in v0.7.0

type MoveMessageToDLQOutput[T any] struct {
	// MovedMessage is a pointer to the Message type containing information about the moved message.
	// The type T determines the format of the message content.
	MovedMessage *Message[T]
}

MoveMessageToDLQOutput represents the result of the operation to move a message to the DLQ. This struct uses the generic type T and contains information about the message that has been moved.

type ProduceInput added in v0.9.0

type ProduceInput[T any] struct {
	// Data is the content of the message to be produced. The type T allows for flexibility in the data type of the message payload.
	Data T
	// DelaySeconds is the delay time (in seconds) before the message is sent to the queue.
	DelaySeconds int
}

ProduceInput represents the input parameters for producing a message.

type ProduceOutput added in v0.9.0

type ProduceOutput[T any] struct {
	// Message is a pointer to the Message type containing information about the produced message.
	Message *Message[T]
}

ProduceOutput represents the result of the produce operation.

type Producer added in v0.9.0

type Producer[T any] struct {
	// contains filtered or unexported fields
}

Producer is a generic struct responsible for producing messages of any type T to a DynamoDB-based queue.

func NewProducer added in v0.9.0

func NewProducer[T any](client Client[T], opts ...func(o *ProducerOptions)) *Producer[T]

NewProducer creates a new instance of a Producer, which is used to produce messages to a DynamoDB-based queue. The Producer can be configured with various options, such as a custom ID generator.

func (*Producer[T]) Produce added in v0.9.0

func (c *Producer[T]) Produce(ctx context.Context, params *ProduceInput[T]) (*ProduceOutput[T], error)

Produce sends a message to the queue using the provided input parameters. It generates a unique ID for the message using the Producer's ID generator and delegates to the Client's SendMessage method. An error is returned if the SendMessage operation fails.

type ProducerOptions added in v0.10.1

type ProducerOptions struct {
	// IDGenerator is function that generates a unique identifier for each message produced by the Producer.
	// The default ID generator is uuid.NewString.
	IDGenerator func() string
}

ProducerOptions holds configuration options for a Producer.

type QueueType

type QueueType string

QueueType represents the type of queue in a DynamoDB-based messaging system.

const (
	// QueueTypeStandard represents a standard queue.
	QueueTypeStandard QueueType = "STANDARD"
	// QueueTypeDLQ represents a Dead Letter Queue, used for holding messages that failed to process.
	QueueTypeDLQ QueueType = "DLQ"
)

Constants defining the types of queues available.

type ReceiveMessageInput added in v0.7.0

type ReceiveMessageInput struct {
	// QueueType is the type of queue from which the message is to be retrieved. QueueType specifies the kind of queue, such as STANDARD or DLQ.
	QueueType QueueType
	// VisibilityTimeout is the timeout in seconds during which the message becomes invisible to other receivers.
	VisibilityTimeout int
}

ReceiveMessageInput represents the input parameters for receiving a message from a DynamoDB-based queue.

type ReceiveMessageOutput added in v0.7.0

type ReceiveMessageOutput[T any] struct {
	// ReceivedMessage is A pointer to the Message type containing information about the received message.
	// The type T determines the format of the message content.
	ReceivedMessage *Message[T]
}

ReceiveMessageOutput represents the result of a message receiving operation. This struct uses the generic type T and contains information about the received message.

type RedriveMessageInput added in v0.7.0

type RedriveMessageInput struct {
	// ID is the unique identifier of the message to be redriven from the DLQ.
	ID string
}

RedriveMessageInput represents the input parameters for restoring a specific message from a DynamoDB-based Dead Letter Queue (DLQ) back to the STANDARD queue.

type RedriveMessageOutput added in v0.7.0

type RedriveMessageOutput[T any] struct {
	// RedroveMessage is a pointer to the Message type containing information about the redriven message.
	// The type T determines the format of the message content.
	RedroveMessage *Message[T]
}

RedriveMessageOutput represents the result of the operation to redrive a message from the DLQ. This struct uses the generic type T and contains information about the message that has been restored.

type ReplaceMessageInput added in v0.7.0

type ReplaceMessageInput[T any] struct {
	// Message is pointer to the Message type containing the new message data that will replace the existing message in the queue.
	// The type T determines the format of the new message content.
	Message *Message[T]
}

ReplaceMessageInput represents the input parameters for replacing a specific message in a DynamoDB-based queue. This struct uses the generic type T for the message content.

type ReplaceMessageOutput added in v0.7.0

type ReplaceMessageOutput struct {
}

ReplaceMessageOutput represents the result of the operation to replace a message in the queue. This struct is empty as the replace message operation does not return any specific information.

type SendMessageInput added in v0.7.0

type SendMessageInput[T any] struct {
	// ID is a unique identifier for the message.
	ID string
	// Data is the content of the message to be sent to the queue. The type T determines the format of the message.
	Data T
	// DelaySeconds is the delay time (in seconds) before the message is sent to the queue.
	DelaySeconds int
}

SendMessageInput represents the input parameters for sending a message to a DynamoDB-based queue. This struct uses the generic type T, supporting messages of various data types.

type SendMessageOutput added in v0.7.0

type SendMessageOutput[T any] struct {
	// SentMessage is a pointer to the Message type containing information about the sent message.
	SentMessage *Message[T]
}

SendMessageOutput represents the result of a message sending operation. This struct also uses the generic type T and contains information about the sent message.

type Status

type Status string

Status represents the state of a message in a DynamoDB-based queue.

const (
	// StatusReady indicates that a message is ready to be processed.
	StatusReady Status = "READY"
	// StatusProcessing indicates that a message is currently being processed.
	StatusProcessing Status = "PROCESSING"
)

Constants defining various states of a message in the queue.

type UnmarshalingAttributeError

type UnmarshalingAttributeError struct {
	Cause error
}

UnmarshalingAttributeError represents an error during the unmarshaling of DynamoDB attributes.

func (UnmarshalingAttributeError) Error

Error returns a detailed error message including the underlying cause for UnmarshalingAttributeError.

Directories

Path Synopsis
cmd
internal
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL