Documentation ¶
Index ¶
- Variables
- func WithAWSBaseEndpoint(baseEndpoint string) func(*ClientOptions)
- func WithAWSDynamoDBClient(client *dynamodb.Client) func(*ClientOptions)
- func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions)
- func WithConcurrency(concurrency int) func(o *ConsumerOptions)
- func WithErrorLog(errorLog *log.Logger) func(o *ConsumerOptions)
- func WithIDGenerator(idGenerator func() string) func(o *ProducerOptions)
- func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions)
- func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions)
- func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions)
- func WithQueueType(queueType QueueType) func(o *ConsumerOptions)
- func WithQueueingIndexName(queueingIndexName string) func(*ClientOptions)
- func WithRetryInterval(sec int) func(o *ConsumerOptions)
- func WithTableName(tableName string) func(*ClientOptions)
- func WithUseFIFO(useFIFO bool) func(*ClientOptions)
- func WithVisibilityTimeout(sec int) func(o *ConsumerOptions)
- type BuildingExpressionError
- type ChangeMessageVisibilityInput
- type ChangeMessageVisibilityOutput
- type Client
- type ClientImpl
- func (c *ClientImpl[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error)
- func (c *ClientImpl[T]) DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error)
- func (c *ClientImpl[T]) GetDLQStats(ctx context.Context, _ *GetDLQStatsInput) (*GetDLQStatsOutput, error)
- func (c *ClientImpl[T]) GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error)
- func (c *ClientImpl[T]) GetQueueStats(ctx context.Context, _ *GetQueueStatsInput) (*GetQueueStatsOutput, error)
- func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error)
- func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)
- func (c *ClientImpl[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error)
- func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error)
- func (c *ClientImpl[T]) ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error)
- func (c *ClientImpl[T]) SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error)
- type ClientOptions
- type ConditionalCheckFailedError
- type Consumer
- type ConsumerOptions
- type DeleteMessageInput
- type DeleteMessageOutput
- type DynamoDBAPIError
- type EmptyQueueError
- type GetDLQStatsInput
- type GetDLQStatsOutput
- type GetMessageInput
- type GetMessageOutput
- type GetQueueStatsInput
- type GetQueueStatsOutput
- type IDDuplicatedError
- type IDNotFoundError
- type IDNotProvidedError
- type InvalidStateTransitionError
- type ListMessagesInput
- type ListMessagesOutput
- type MarshalingAttributeError
- type Message
- type MessageProcessor
- type MessageProcessorFunc
- type MoveMessageToDLQInput
- type MoveMessageToDLQOutput
- type ProduceInput
- type ProduceOutput
- type Producer
- type ProducerOptions
- type QueueType
- type ReceiveMessageInput
- type ReceiveMessageOutput
- type RedriveMessageInput
- type RedriveMessageOutput
- type ReplaceMessageInput
- type ReplaceMessageOutput
- type SendMessageInput
- type SendMessageOutput
- type Status
- type UnmarshalingAttributeError
Constants ¶
This section is empty.
Variables ¶
var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed")
ErrConsumerClosed is an error that indicates the Consumer has been closed. This error is returned when operations are attempted on a Consumer that has already been shut down.
Functions ¶
func WithAWSBaseEndpoint ¶
func WithAWSBaseEndpoint(baseEndpoint string) func(*ClientOptions)
WithAWSBaseEndpoint is an option function to set a custom base endpoint for AWS services. This function is useful when you want the client to interact with a specific AWS service endpoint, such as a local or a different regional endpoint. If the DynamoDB client is set using the WithAWSDynamoDBClient function, this option function is ignored.
func WithAWSDynamoDBClient ¶
func WithAWSDynamoDBClient(client *dynamodb.Client) func(*ClientOptions)
WithAWSDynamoDBClient is an option function to set a custom AWS DynamoDB client for the DynamoMQ client. This function is used to provide a pre-configured DynamoDB client that the DynamoMQ client will use for all interactions with DynamoDB.
func WithAWSRetryMaxAttempts ¶
func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions)
WithAWSRetryMaxAttempts is an option function to set the maximum number of retry attempts for AWS service calls. Use this function to define how many times the client should retry a failed AWS service call. If the DynamoDB client is set using the WithAWSDynamoDBClient function, this option function is ignored.
func WithConcurrency ¶ added in v0.11.0
func WithConcurrency(concurrency int) func(o *ConsumerOptions)
WithConcurrency sets the number of concurrent workers for processing messages in the Consumer. This function determines how many messages can be processed at the same time.
func WithErrorLog ¶
func WithErrorLog(errorLog *log.Logger) func(o *ConsumerOptions)
WithErrorLog sets a custom logger for the Consumer. This function configures an optional logger for errors. If nil, the standard logger is used.
func WithIDGenerator ¶ added in v0.10.1
func WithIDGenerator(idGenerator func() string) func(o *ProducerOptions)
WithIDGenerator is an option function to set a custom ID generator for the Producer. Use this function to provide a custom function that generates unique identifiers for messages. The default ID generator is uuid.NewString.
func WithMaximumReceives ¶
func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions)
WithMaximumReceives sets the maximum number of times a message can be delivered to the Consumer. This function configures the limit on how many times a message will be attempted for delivery before being considered a failure or moved to a Dead Letter Queue, if applicable.
func WithOnShutdown ¶
func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions)
WithOnShutdown adds functions to be called during the Consumer's shutdown process. This function appends to the list of callbacks executed when the Consumer is shutting down.
func WithPollingInterval ¶
func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions)
WithPollingInterval sets the polling interval for the Consumer. This function configures the time interval at which the Consumer polls the DynamoDB queue for new messages.
func WithQueueType ¶ added in v0.9.0
func WithQueueType(queueType QueueType) func(o *ConsumerOptions)
WithQueueType sets the type of queue (STANDARD or DLQ) for the Consumer. This function allows specification of the queue type the Consumer will operate on.
func WithQueueingIndexName ¶ added in v0.8.0
func WithQueueingIndexName(queueingIndexName string) func(*ClientOptions)
WithQueueingIndexName is an option function to set the queue index name for the DynamoMQ client. This function allows defining a custom index name that the client will use for queue operations, optimizing message handling. By default, the index name is set to "dynamo-mq-index-queue_type-sent_at".
func WithRetryInterval ¶ added in v0.11.0
func WithRetryInterval(sec int) func(o *ConsumerOptions)
WithRetryInterval sets the retry interval for failed messages in the Consumer. This function specifies the time interval (in seconds) before a failed message is retried.
func WithTableName ¶
func WithTableName(tableName string) func(*ClientOptions)
WithTableName is an option function to set the table name for the DynamoMQ client. Use this function to specify the name of the DynamoDB table that the client will use for storing and retrieving messages. By default, the table name is set to "dynamo-mq-table".
func WithUseFIFO ¶
func WithUseFIFO(useFIFO bool) func(*ClientOptions)
WithUseFIFO is an option function to enable FIFO (First-In-First-Out) behavior for the DynamoMQ client. Setting this option to true makes the client treat the queue as a FIFO queue; otherwise, it is treated as a standard queue. By default, this option is set to false.
func WithVisibilityTimeout ¶ added in v0.11.0
func WithVisibilityTimeout(sec int) func(o *ConsumerOptions)
WithVisibilityTimeout sets the visibility timeout for messages in the Consumer. This function configures the duration (in seconds) a message remains invisible in the queue after being received.
Types ¶
type BuildingExpressionError ¶
type BuildingExpressionError struct {
Cause error
}
BuildingExpressionError represents an error during the building of a DynamoDB expression.
func (BuildingExpressionError) Error ¶
func (e BuildingExpressionError) Error() string
Error returns a detailed error message including the underlying cause for BuildingExpressionError.
type ChangeMessageVisibilityInput ¶ added in v0.11.0
type ChangeMessageVisibilityInput struct { // ID is The unique identifier of the message for which visibility is to be changed. ID string // VisibilityTimeout is The new timeout in seconds during which the message becomes invisible to other receivers. // After this time elapses, the message will become visible in the queue again VisibilityTimeout int }
ChangeMessageVisibilityInput represents the input parameters for changing the visibility timeout of a specific message in a DynamoDB-based queue.
type ChangeMessageVisibilityOutput ¶ added in v0.11.0
type ChangeMessageVisibilityOutput[T any] struct { // ChangedMessage is a pointer to the Message type containing information about the message with changed visibility. // The type T determines the format of the message content. ChangedMessage *Message[T] }
ChangeMessageVisibilityOutput represents the result of the operation to change the visibility of a message. This struct uses the generic type T and contains information about the message whose visibility has been changed.
type Client ¶ added in v0.6.0
type Client[T any] interface { // SendMessage sends a message to the DynamoDB-based queue. SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error) // ReceiveMessage retrieves and processes a message from a DynamoDB-based queue. ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error) // ChangeMessageVisibility changes the visibility of a specific message in a DynamoDB-based queue. ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error) // DeleteMessage deletes a specific message from a DynamoDB-based queue. DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error) // MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error) // RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ). RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error) // GetMessage get a specific message from a DynamoDB-based queue. GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error) // GetQueueStats is a method for obtaining statistical information about a DynamoDB-based queue. GetQueueStats(ctx context.Context, params *GetQueueStatsInput) (*GetQueueStatsOutput, error) // GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ). GetDLQStats(ctx context.Context, params *GetDLQStatsInput) (*GetDLQStatsOutput, error) // ListMessages get a list of messages from a DynamoDB-based queue. ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error) // ReplaceMessage replace a specific message within a DynamoDB-based queue. ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error) }
Client is an interface for interacting with a DynamoDB-based message queue system. It provides methods for various operations on messages within the queue. This interface is generic and works with any type T, which represents the structure of the message content.
func NewFromConfig ¶ added in v0.6.0
NewFromConfig creates a new DynamoMQ client using the provided AWS configuration and any additional client options. This function initializes a new client with default settings, which can be customized using option functions. It returns an error if the initialization of the DynamoDB client fails.
type ClientImpl ¶ added in v0.11.0
type ClientImpl[T any] struct { // contains filtered or unexported fields }
ClientImpl is a concrete implementation of the dynamomq.Client interface. Note: ClientImpl cannot be used directly. Always use the dynamomq.NewFromConfig function to create an instance.
func (*ClientImpl[T]) ChangeMessageVisibility ¶ added in v0.11.0
func (c *ClientImpl[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error)
ChangeMessageVisibility changes the visibility of a specific message in a DynamoDB-based queue. It retrieves the message based on the specified message ID and alters its visibility timeout. The visibility timeout specifies the duration during which the message, once retrieved from the queue, becomes invisible to other clients. Modifying this timeout value allows dynamic adjustment of the message processing time.
func (*ClientImpl[T]) DeleteMessage ¶ added in v0.11.0
func (c *ClientImpl[T]) DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error)
DeleteMessage deletes a specific message from a DynamoDB-based queue. It directly deletes the message from DynamoDB based on the specified message ID.
func (*ClientImpl[T]) GetDLQStats ¶ added in v0.11.0
func (c *ClientImpl[T]) GetDLQStats(ctx context.Context, _ *GetDLQStatsInput) (*GetDLQStatsOutput, error)
GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ). It provides statistics on the messages within the DLQ. This includes the IDs of the first 100 messages in the queue and the total number of records in the DLQ. This functions offers vital information for monitoring and analyzing the message queue system, aiding in understanding the status of the DLQ.
func (*ClientImpl[T]) GetMessage ¶ added in v0.11.0
func (c *ClientImpl[T]) GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error)
GetMessage get a specific message from a DynamoDB-based queue. It retrieves the message from DynamoDB based on the specified message ID. The retrieved message is then unmarshaled into the specified generic type T.
func (*ClientImpl[T]) GetQueueStats ¶ added in v0.11.0
func (c *ClientImpl[T]) GetQueueStats(ctx context.Context, _ *GetQueueStatsInput) (*GetQueueStatsOutput, error)
GetQueueStats get statistical information about a DynamoDB-based queue. It provides statistics about the messages in the queue and their processing status. This includes the IDs of the first 100 messages in the queue, the first 100 IDs of messages selected for processing, the total number of records in the queue, the number of records currently in processing, and the number of records awaiting processing. This function provides essential information for monitoring and analyzing the message queue system, aiding in understanding the status of the queue.
func (*ClientImpl[T]) ListMessages ¶ added in v0.11.0
func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error)
ListMessages get a list of messages from a DynamoDB-based queue. It scans and retrieves messages from DynamoDB based on the specified size parameter. If the size is not specified or is zero or less, a default maximum list size of 10 is used. The retrieved messages are unmarshaled into an array of the generic type T and are sorted based on the update time.
func (*ClientImpl[T]) MoveMessageToDLQ ¶ added in v0.11.0
func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)
MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). It locates the message based on the specified message ID and marks it for the DLQ. Moving a message to the DLQ allows for the isolation of failed message processing, facilitating later analysis and reprocessing.
func (*ClientImpl[T]) ReceiveMessage ¶ added in v0.11.0
func (c *ClientImpl[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error)
ReceiveMessage retrieves and processes a message from a DynamoDB-based queue using the generic type T. The selection process involves constructing and executing a DynamoDB query based on the queue type and visibility timeout. After a message is selected, its status, including visibility and version, is updated to ensure the message remains invisible and in processing for a defined period. This process is crucial for maintaining queue integrity and preventing duplicate message delivery. If no messages are available for reception, an EmptyQueueError is returned. Additionally, when FIFO (First In, First Out) is enabled, the method guarantees that only one valid message is processed at a time.
func (*ClientImpl[T]) RedriveMessage ¶ added in v0.11.0
func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error)
RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ). It locates the message based on the specified message ID and marks it as restored from the DLQ to the standard queue. This process is essential for reprocessing messages that have failed to be processed and is a crucial function in error handling within the message queue system.
func (*ClientImpl[T]) ReplaceMessage ¶ added in v0.11.0
func (c *ClientImpl[T]) ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error)
ReplaceMessage replace a specific message within a DynamoDB-based queue. It searches for an existing message based on the specified message ID and deletes it if found. Then, a new message is added to the queue. If a message with the specified ID does not exist, the new message is directly added to the queue.
func (*ClientImpl[T]) SendMessage ¶ added in v0.11.0
func (c *ClientImpl[T]) SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error)
SendMessage sends a message to the DynamoDB-based message queue. It checks for message ID duplication and handles message delays if specified. This function takes a context and a SendMessageInput parameter. SendMessageInput contains the message ID, data, and an optional delay in seconds. If the message ID already exists in the queue, it returns an IDDuplicatedError. Otherwise, it adds the message to the queue. The function also handles message delays. If DelaySeconds is greater than 0 in the input parameter, the message will be delayed accordingly before being sent.
type ClientOptions ¶ added in v0.6.0
type ClientOptions struct { // DynamoDB is a pointer to the DynamoDB client used for database operations. DynamoDB *dynamodb.Client // TableName is the name of the DynamoDB table used for the queue. TableName string // QueueingIndexName is the name of the index used for queueing operations. QueueingIndexName string // MaximumReceives is the maximum number of times a message is delivered before being moved to the DLQ. MaximumReceives int // UseFIFO is a boolean indicating if the queue should behave as a First-In-First-Out (FIFO) queue. UseFIFO bool // BaseEndpoint is the base endpoint URL for DynamoDB requests. BaseEndpoint string // RetryMaxAttempts is the maximum number of attempts for retrying failed DynamoDB operations. RetryMaxAttempts int // Clock is an abstraction of time operations, allowing control over time during tests. Clock clock.Clock // MarshalMap is a function to marshal objects into a map of DynamoDB attribute values. MarshalMap func(in interface{}) (map[string]types.AttributeValue, error) // UnmarshalMap is a function to unmarshal a map of DynamoDB attribute values into objects. UnmarshalMap func(m map[string]types.AttributeValue, out interface{}) error // UnmarshalListOfMaps is a function to unmarshal a list of maps of DynamoDB attribute values into objects. UnmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error // BuildExpression is a function to build DynamoDB expressions from a builder. BuildExpression func(b expression.Builder) (expression.Expression, error) }
ClientOptions defines configuration options for the DynamoMQ client.
Note: The following fields are primarily used for testing purposes. They allow for stubbing of operations during tests, facilitating the mocking of behavior without relying on a real DynamoDB instance:
- Clock
- MarshalMap
- UnmarshalMap
- UnmarshalListOfMaps
- BuildExpression
In typical use, these testing fields should not be modified. They are provided to support advanced use cases, like unit testing, where control over these operations is necessary.
type ConditionalCheckFailedError ¶
type ConditionalCheckFailedError struct {
Cause error
}
ConditionalCheckFailedError represents an error when a condition check on the 'version' attribute fails.
func (ConditionalCheckFailedError) Error ¶
func (e ConditionalCheckFailedError) Error() string
Error returns a detailed error message including the underlying cause for ConditionalCheckFailedError.
type Consumer ¶
type Consumer[T any] struct { // contains filtered or unexported fields }
Consumer is a struct responsible for consuming messages from a DynamoDB-based queue. It supports generic message types and includes settings such as concurrency, polling intervals, and more. Note: To create a new instance of Consumer, it is necessary to use the NewConsumer function.
func NewConsumer ¶
func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T]
NewConsumer creates a new Consumer instance with the specified client, message processor, and options. It configures the Consumer with default values which can be overridden by the provided option functions.
func (*Consumer[T]) Shutdown ¶
Shutdown gracefully shuts down the Consumer, stopping the message consumption and executing any registered shutdown callbacks.
func (*Consumer[T]) StartConsuming ¶ added in v0.9.0
StartConsuming starts the message consumption process, polling the queue for messages and processing them. The method handles message retrieval, processing, error handling, retries, and moving messages to the DLQ if necessary.
type ConsumerOptions ¶ added in v0.6.0
type ConsumerOptions struct { // PollingInterval specifies the time interval at which the Consumer polls the DynamoDB queue for new messages. PollingInterval time.Duration // Concurrency sets the number of concurrent message processing workers. Concurrency int // MaximumReceives defines the maximum number of times a message can be delivered. MaximumReceives int // VisibilityTimeout sets the duration (in seconds) a message remains invisible in the queue after being received. VisibilityTimeout int // RetryInterval defines the time interval (in seconds) before a failed message is retried. RetryInterval int // QueueType determines the type of queue (STANDARD or DLQ) the Consumer will operate on. QueueType QueueType // ErrorLog is an optional logger for errors. If nil, the standard logger is used. ErrorLog *log.Logger // OnShutdown is a slice of functions called when the Consumer is shutting down. OnShutdown []func() }
ConsumerOptions contains configuration options for a Consumer instance. It allows customization of polling intervals, concurrency levels, visibility timeouts, and more.
Consumer functions for setting various ConsumerOptions.
type DeleteMessageInput ¶ added in v0.7.0
type DeleteMessageInput struct { // ID is the unique identifier of the message to be deleted from the queue. ID string }
DeleteMessageInput represents the input parameters for deleting a specific message from a DynamoDB-based queue.
type DeleteMessageOutput ¶ added in v0.7.0
type DeleteMessageOutput struct{}
DeleteMessageOutput represents the result of the delete message operation. This struct is empty as the delete operation does not return any specific information.
type DynamoDBAPIError ¶
type DynamoDBAPIError struct {
Cause error
}
DynamoDBAPIError represents a generic error encountered when making a DynamoDB API call.
func (DynamoDBAPIError) Error ¶
func (e DynamoDBAPIError) Error() string
Error returns a detailed error message including the underlying cause for DynamoDBAPIError.
type EmptyQueueError ¶
type EmptyQueueError struct{}
EmptyQueueError represents an error when an operation cannot proceed due to an empty queue.
func (EmptyQueueError) Error ¶
func (e EmptyQueueError) Error() string
Error returns a standard error message for EmptyQueueError.
type GetDLQStatsInput ¶ added in v0.7.0
type GetDLQStatsInput struct{}
GetDLQStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based Dead Letter Queue (DLQ). This struct does not contain any fields as it's used to request general DLQ statistics without the need for specific parameters.
type GetDLQStatsOutput ¶ added in v0.7.0
type GetDLQStatsOutput struct { // First100IDsInQueue is an array of the first 100 message IDs currently in the DLQ. First100IDsInQueue []string `json:"first_100_IDs_in_queue"` // TotalMessagesInDLQ is the total number of messages present in the DLQ. TotalMessagesInDLQ int `json:"total_messages_in_DLQ"` }
GetDLQStatsOutput represents the output containing statistical information about the Dead Letter Queue (DLQ).
type GetMessageInput ¶ added in v0.7.0
type GetMessageInput struct { // ID is the unique identifier of the message to be retrieved from the queue. ID string }
GetMessageInput represents the input parameters for retrieving a specific message from a DynamoDB-based queue.
type GetMessageOutput ¶ added in v0.7.0
type GetMessageOutput[T any] struct { // Message is a pointer to the Message type containing information about the retrieved message. // The type T determines the format of the message content. Message *Message[T] }
GetMessageOutput represents the result of the operation to retrieve a message. This struct uses the generic type T and contains information about the retrieved message.
type GetQueueStatsInput ¶ added in v0.7.0
type GetQueueStatsInput struct{}
GetQueueStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based queue. This struct does not contain any fields as it's used to request general queue statistics without the need for specific parameters.
type GetQueueStatsOutput ¶ added in v0.7.0
type GetQueueStatsOutput struct { // First100IDsInQueue is an array of the first 100 message IDs currently in the queue. First100IDsInQueue []string `json:"first_100_IDs_in_queue"` // First100IDsInQueueProcessing is an array of the first 100 message IDs that are currently being processed. First100IDsInQueueProcessing []string `json:"first_100_IDs_in_queue_processing"` // TotalMessagesInQueue is the total number of messages present in the queue. TotalMessagesInQueue int `json:"total_messages_in_queue"` // TotalMessagesInQueueProcessing is the total number of messages that are currently in the process of being handled. TotalMessagesInQueueProcessing int `json:"total_messages_in_queue_processing"` // TotalMessagesInQueueReady is the total number of messages in the queue that are ready to be processed and have not started processing yet. TotalMessagesInQueueReady int `json:"total_messages_in_queue_ready"` }
GetQueueStatsOutput represents the output containing statistical information about a DynamoDB-based queue.
type IDDuplicatedError ¶
type IDDuplicatedError struct{}
IDDuplicatedError represents an error when a provided ID is duplicated in the system.
func (IDDuplicatedError) Error ¶
func (e IDDuplicatedError) Error() string
Error returns a standard error message for IDDuplicatedError.
type IDNotFoundError ¶
type IDNotFoundError struct{}
IDNotFoundError represents an error when a provided ID is not found in DynamoDB.
func (IDNotFoundError) Error ¶
func (e IDNotFoundError) Error() string
Error returns a standard error message for IDNotFoundError.
type IDNotProvidedError ¶
type IDNotProvidedError struct{}
IDNotProvidedError represents an error when an ID is not provided where it is required.
func (IDNotProvidedError) Error ¶
func (e IDNotProvidedError) Error() string
Error returns a standard error message for IDNotProvidedError.
type InvalidStateTransitionError ¶ added in v0.6.0
InvalidStateTransitionError represents an error for invalid state transitions during operations.
func (InvalidStateTransitionError) Error ¶ added in v0.6.0
func (e InvalidStateTransitionError) Error() string
Error returns a detailed error message explaining the invalid state transition.
type ListMessagesInput ¶ added in v0.7.0
type ListMessagesInput struct { // Size is the number of messages to be listed from the queue. It determines the maximum size of the returned message list. Size int32 }
ListMessagesInput represents the input parameters for listing messages from a DynamoDB-based queue.
type ListMessagesOutput ¶ added in v0.7.0
type ListMessagesOutput[T any] struct { // Messages is an array of pointers to Message types, containing information about each listed message. // The type T determines the format of the message content for each message in the array. Messages []*Message[T] }
ListMessagesOutput represents the result of the operation to list messages from the queue. This struct uses the generic type T and contains an array of messages.
type MarshalingAttributeError ¶
type MarshalingAttributeError struct {
Cause error
}
MarshalingAttributeError represents an error during the marshaling of DynamoDB attributes.
func (MarshalingAttributeError) Error ¶
func (e MarshalingAttributeError) Error() string
Error returns a detailed error message including the underlying cause for MarshalingAttributeError.
type Message ¶
type Message[T any] struct { // ID is a unique identifier for the message. ID string `json:"id" dynamodbav:"id"` // Data is the content of the message. The type T defines the format of this data. Data T `json:"data" dynamodbav:"data"` // ReceiveCount is the number of times the message has been received from the queue. ReceiveCount int `json:"receive_count" dynamodbav:"receive_count"` // QueueType is the type of queue (standard or DLQ) to which the message belongs. QueueType QueueType `json:"queue_type" dynamodbav:"queue_type,omitempty"` // Version is the version number of the message, used for optimistic concurrency control. Version int `json:"version" dynamodbav:"version"` // CreatedAt is the timestamp when the message was created. CreatedAt string `json:"created_at" dynamodbav:"created_at"` // UpdatedAt is the timestamp when the message was last updated. UpdatedAt string `json:"updated_at" dynamodbav:"updated_at"` // SentAt is the timestamp when the message was sent to the queue. SentAt string `json:"sent_at" dynamodbav:"sent_at"` // ReceivedAt is the timestamp when the message was last received from the queue. ReceivedAt string `json:"received_at" dynamodbav:"received_at"` // InvisibleUntilAt: The deadline until which the message remains invisible in the queue. // Until this timestamp, the message will not be visible to other consumers. InvisibleUntilAt string `json:"invisible_until_at" dynamodbav:"invisible_until_at"` }
Message represents a message structure in a DynamoDB-based queue system. It uses the generic type T for the message content, allowing for flexibility in the data type of the message payload. This struct includes tags for JSON serialization (`json:"..."`) and DynamoDB attribute value (`dynamodbav:"..."`) mappings.
func NewMessage ¶ added in v0.9.0
NewMessage creates a new instance of a Message with the provided data and initializes its timestamps. This function is a constructor for Message, setting initial values and preparing the message for use in the queue.
func (*Message[T]) GetStatus ¶ added in v0.11.0
GetStatus determines the current status of the message based on the provided time. It returns the status as either 'StatusReady' or 'StatusProcessing'.
StatusReady if the message is ready to be processed (either 'InvisibleUntilAt' is empty or the current time is after the 'InvisibleUntilAt' time). StatusProcessing if the current time is before the 'InvisibleUntilAt' time, indicating that the message is currently being processed and is not yet ready for further processing.
type MessageProcessor ¶
type MessageProcessor[T any] interface { // Process handles the processing of a message. // It takes a pointer to a Message of type T and returns an error if the processing fails. Process(msg *Message[T]) error }
MessageProcessor is an interface defining a method to process messages of a generic type T. It is used in the context of consuming messages from a DynamoDB-based queue.
type MessageProcessorFunc ¶ added in v0.10.1
MessageProcessorFunc is a functional type that implements the MessageProcessor interface. It allows using a function as a MessageProcessor.
func (MessageProcessorFunc[T]) Process ¶ added in v0.10.1
func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error
Process calls the MessageProcessorFunc itself to process the message. It enables the function type to adhere to the MessageProcessor interface.
type MoveMessageToDLQInput ¶ added in v0.7.0
type MoveMessageToDLQInput struct { // ID is the unique identifier of the message to be moved to the DLQ. ID string }
MoveMessageToDLQInput represents the input parameters for moving a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
type MoveMessageToDLQOutput ¶ added in v0.7.0
type MoveMessageToDLQOutput[T any] struct { // MovedMessage is a pointer to the Message type containing information about the moved message. // The type T determines the format of the message content. MovedMessage *Message[T] }
MoveMessageToDLQOutput represents the result of the operation to move a message to the DLQ. This struct uses the generic type T and contains information about the message that has been moved.
type ProduceInput ¶ added in v0.9.0
type ProduceInput[T any] struct { // Data is the content of the message to be produced. The type T allows for flexibility in the data type of the message payload. Data T // DelaySeconds is the delay time (in seconds) before the message is sent to the queue. DelaySeconds int }
ProduceInput represents the input parameters for producing a message.
type ProduceOutput ¶ added in v0.9.0
type ProduceOutput[T any] struct { // Message is a pointer to the Message type containing information about the produced message. Message *Message[T] }
ProduceOutput represents the result of the produce operation.
type Producer ¶ added in v0.9.0
type Producer[T any] struct { // contains filtered or unexported fields }
Producer is a generic struct responsible for producing messages of any type T to a DynamoDB-based queue.
func NewProducer ¶ added in v0.9.0
func NewProducer[T any](client Client[T], opts ...func(o *ProducerOptions)) *Producer[T]
NewProducer creates a new instance of a Producer, which is used to produce messages to a DynamoDB-based queue. The Producer can be configured with various options, such as a custom ID generator.
func (*Producer[T]) Produce ¶ added in v0.9.0
func (c *Producer[T]) Produce(ctx context.Context, params *ProduceInput[T]) (*ProduceOutput[T], error)
Produce sends a message to the queue using the provided input parameters. It generates a unique ID for the message using the Producer's ID generator and delegates to the Client's SendMessage method. An error is returned if the SendMessage operation fails.
type ProducerOptions ¶ added in v0.10.1
type ProducerOptions struct { // IDGenerator is function that generates a unique identifier for each message produced by the Producer. // The default ID generator is uuid.NewString. IDGenerator func() string }
ProducerOptions holds configuration options for a Producer.
type QueueType ¶
type QueueType string
QueueType represents the type of queue in a DynamoDB-based messaging system.
type ReceiveMessageInput ¶ added in v0.7.0
type ReceiveMessageInput struct { // QueueType is the type of queue from which the message is to be retrieved. QueueType specifies the kind of queue, such as STANDARD or DLQ. QueueType QueueType // VisibilityTimeout is the timeout in seconds during which the message becomes invisible to other receivers. VisibilityTimeout int }
ReceiveMessageInput represents the input parameters for receiving a message from a DynamoDB-based queue.
type ReceiveMessageOutput ¶ added in v0.7.0
type ReceiveMessageOutput[T any] struct { // ReceivedMessage is A pointer to the Message type containing information about the received message. // The type T determines the format of the message content. ReceivedMessage *Message[T] }
ReceiveMessageOutput represents the result of a message receiving operation. This struct uses the generic type T and contains information about the received message.
type RedriveMessageInput ¶ added in v0.7.0
type RedriveMessageInput struct { // ID is the unique identifier of the message to be redriven from the DLQ. ID string }
RedriveMessageInput represents the input parameters for restoring a specific message from a DynamoDB-based Dead Letter Queue (DLQ) back to the STANDARD queue.
type RedriveMessageOutput ¶ added in v0.7.0
type RedriveMessageOutput[T any] struct { // RedroveMessage is a pointer to the Message type containing information about the redriven message. // The type T determines the format of the message content. RedroveMessage *Message[T] }
RedriveMessageOutput represents the result of the operation to redrive a message from the DLQ. This struct uses the generic type T and contains information about the message that has been restored.
type ReplaceMessageInput ¶ added in v0.7.0
type ReplaceMessageInput[T any] struct { // Message is pointer to the Message type containing the new message data that will replace the existing message in the queue. // The type T determines the format of the new message content. Message *Message[T] }
ReplaceMessageInput represents the input parameters for replacing a specific message in a DynamoDB-based queue. This struct uses the generic type T for the message content.
type ReplaceMessageOutput ¶ added in v0.7.0
type ReplaceMessageOutput struct { }
ReplaceMessageOutput represents the result of the operation to replace a message in the queue. This struct is empty as the replace message operation does not return any specific information.
type SendMessageInput ¶ added in v0.7.0
type SendMessageInput[T any] struct { // ID is a unique identifier for the message. ID string // Data is the content of the message to be sent to the queue. The type T determines the format of the message. Data T // DelaySeconds is the delay time (in seconds) before the message is sent to the queue. DelaySeconds int }
SendMessageInput represents the input parameters for sending a message to a DynamoDB-based queue. This struct uses the generic type T, supporting messages of various data types.
type SendMessageOutput ¶ added in v0.7.0
type SendMessageOutput[T any] struct { // SentMessage is a pointer to the Message type containing information about the sent message. SentMessage *Message[T] }
SendMessageOutput represents the result of a message sending operation. This struct also uses the generic type T and contains information about the sent message.
type Status ¶
type Status string
Status represents the state of a message in a DynamoDB-based queue.
type UnmarshalingAttributeError ¶
type UnmarshalingAttributeError struct {
Cause error
}
UnmarshalingAttributeError represents an error during the unmarshaling of DynamoDB attributes.
func (UnmarshalingAttributeError) Error ¶
func (e UnmarshalingAttributeError) Error() string
Error returns a detailed error message including the underlying cause for UnmarshalingAttributeError.