Documentation
¶
Overview ¶
Package outbox implements the transactional outbox pattern, ensuring reliable message publishing by first persisting messages within a database transaction before they are sent to a message broker.
The core of the pattern involves two main operations:
Writing: Messages are stored durably in an "outbox" table as part of the application's local database transaction. This ensures that the message is captured atomically with the business operation that produces it.
Reading & Publishing: A background process reads unpublished messages from the outbox table and publishes them to the message broker. Once successfully published, messages are removed from the outbox table.
This package provides the following components to integrate this pattern:
- A `Writer` to facilitate the atomic storage of messages into the outbox table alongside your application's domain changes within a single transaction.
- A `Reader` background process to poll the outbox table for unpublished messages, attempt to publish them to a message broker, and remove them upon success.
The library is designed to be agnostic to specific database or message broker technologies, allowing integration with various systems. For detailed setup, features, and examples, please refer to the project README.
Index ¶
- type DB
- type DBContext
- type DelayFunc
- type DeleteError
- type Message
- type MessageOption
- type MessagePublisher
- type PublishError
- type Queryer
- type ReadError
- type Reader
- type ReaderOption
- func WithDelay(delayFunc DelayFunc) ReaderOption
- func WithDeleteBatchSize(size int) ReaderOption
- func WithDeleteTimeout(timeout time.Duration) ReaderOption
- func WithDiscardedMessagesChannelSize(size int) ReaderOption
- func WithErrorChannelSize(size int) ReaderOption
- func WithExponentialDelay(initialDelay time.Duration, maxDelay time.Duration) ReaderOption
- func WithFixedDelay(delay time.Duration) ReaderOption
- func WithInterval(interval time.Duration) ReaderOption
- func WithMaxAttempts(maxAttempts int32) ReaderOption
- func WithPublishTimeout(timeout time.Duration) ReaderOption
- func WithReadBatchSize(batchSize int) ReaderOption
- func WithReadTimeout(timeout time.Duration) ReaderOption
- func WithUpdateTimeout(timeout time.Duration) ReaderOption
- type SQLDialect
- type Tx
- type TxQueryer
- type TxWorkFunc
- type UpdateError
- type Writer
- type WriterOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DBContext ¶
type DBContext struct {
// contains filtered or unexported fields
}
DBContext holds the database connection and the SQL dialect.
func NewDBContext ¶
func NewDBContext(db *sql.DB, dialect SQLDialect) *DBContext
NewDBContext creates a new DBContext from a standard *sql.DB.
func NewDBContextWithDB ¶
func NewDBContextWithDB(db DB, dialect SQLDialect) *DBContext
NewDBContextWithDB creates a new DBContext with a custom DB implementation. This is useful for users who want to provide their own database abstraction or for testing.
type DelayFunc ¶
DelayFunc is a function that returns the delay after a given attempt.
func Exponential ¶
Exponential returns a DelayFunc that returns an exponential delay for all attempts. The delay is 2^n where n is the current attempt number.
For example, with initialDelay of 200 miliseconds and maxDelay of 1 hour:
Delay after attempt 0: 200ms Delay after attempt 1: 400ms Delay after attempt 2: 800ms Delay after attempt 3: 1.6s Delay after attempt 4: 3.2s Delay after attempt 5: 6.4s Delay after attempt 6: 12.8s Delay after attempt 7: 25.6s Delay after attempt 8: 51.2s Delay after attempt 9: 1m42.4s Delay after attempt 10: 3m24.8s Delay after attempt 11: 6m49.6s Delay after attempt 12: 13m39.2s Delay after attempt 13: 27m18.4s Delay after attempt 14: 54m36.8s Delay after attempt 15: 1h0m0s Delay after attempt 16: 1h0m0s ...
type DeleteError ¶
DeleteError indicates an error during the batch deletion of messages. It includes the messages that failed to be deleted and the original error.
func (*DeleteError) Error ¶
func (e *DeleteError) Error() string
func (*DeleteError) Unwrap ¶
func (e *DeleteError) Unwrap() error
type Message ¶
type Message struct { // ID is a unique identifier for the message ID uuid.UUID // CreatedAt is the timestamp when the message was created CreatedAt time.Time // ScheduledAt is the timestamp when the message should be published ScheduledAt time.Time // Metadata is an optional field containing additional information about the message, // such as correlation IDs, trace IDs, user context, or other custom attributes. // This data is typically JSON-serialized and can be used for tracing, debugging, or routing purposes. // Most message brokers support attaching such metadata as message headers, enabling richer message processing and observability. Metadata []byte // Payload contains the actual message data, typically JSON serialized Payload []byte // TimesAttempted is the number of times the message has been attempted to be published // Read only field TimesAttempted int32 }
Message represents a message to be published through the outbox pattern. It contains all the information needed to process and publish the message to an external system (like a message broker).
func NewMessage ¶
func NewMessage(payload []byte, opts ...MessageOption) *Message
NewMessage creates a new Message with the given payload.
type MessageOption ¶
type MessageOption func(*Message)
MessageOption is a function that can be used to configure a Message.
func WithCreatedAt ¶
func WithCreatedAt(createdAt time.Time) MessageOption
WithCreatedAt sets the time the message was created. If not provided, the current time will be used.
func WithID ¶
func WithID(id uuid.UUID) MessageOption
WithID sets the unique identifier of the message. If not provided, a new UUID will be generated.
func WithMetadata ¶
func WithMetadata(metadata []byte) MessageOption
WithMetadata attaches message metadata (e.g. correlation ID, trace ID, etc).
func WithScheduledAt ¶
func WithScheduledAt(scheduledAt time.Time) MessageOption
WithScheduledAt sets the time the message should be published. If not provided, the current time will be used.
type MessagePublisher ¶
type MessagePublisher interface { // Publish sends a message to an external system (e.g., a message broker). // This function may be called multiple times for the same message. // Consumers must be idempotent and handle duplicate messages, // though some brokers provide deduplication features. // Return nil on success. // Return error on failure. In this case: // - The message will be retried according to the configured retry and backoff settings // - or will be discarded if the maximum number of attempts is reached. Publish(ctx context.Context, msg *Message) error }
MessagePublisher defines an interface for publishing messages to an external system.
type PublishError ¶
PublishError indicates an error during message publication. It includes the message that failed to be published and the original error.
func (*PublishError) Error ¶
func (e *PublishError) Error() string
func (*PublishError) Unwrap ¶
func (e *PublishError) Unwrap() error
type Queryer ¶
type Queryer interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) }
Queryer represents a query executor.
type ReadError ¶
type ReadError struct {
Err error
}
ReadError indicates an error when reading messages from the outbox.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader periodically reads unpublished messages from the outbox table and attempts to publish them to an external system.
func NewReader ¶
func NewReader(dbCtx *DBContext, msgPublisher MessagePublisher, opts ...ReaderOption) *Reader
NewReader creates a new outbox Reader with the given database context, message publisher, and options.
func (*Reader) DiscardedMessages ¶
DiscardedMessages returns a channel that receives messages that were discarded because they reached the maximum number of attempts. The channel is closed when the reader is stopped.
Consumers should drain this channel promptly to avoid missing messages.
func (*Reader) Errors ¶
Errors returns a channel that receives errors from the outbox reader. The channel is buffered to prevent blocking the reader. If the buffer becomes full, subsequent errors will be dropped to maintain reader throughput. The channel is closed when the reader is stopped.
The returned error will be one of the following types, which can be checked using a type switch:
- *PublishError: Failed to publish a message. Contains the message.
- *UpdateError: Failed to update a message after a failed publish attempt. Contains the message.
- *DeleteError: Failed to delete a batch of messages. Contains the messages.
- *ReadError: Failed to read messages from the outbox.
Example of error handling:
for err := range r.Errors() { switch e := err.(type) { case *outbox.PublishError: log.Printf("Failed to publish message | ID: %s | Error: %v", e.Message.ID, e.Err) case *outbox.UpdateError: log.Printf("Failed to update message | ID: %s | Error: %v", e.Message.ID, e.Err) case *outbox.DeleteError: log.Printf("Batch message deletion failed | Count: %d | Error: %v", len(e.Messages), e.Err) for _, msg := range e.Messages { log.Printf("Failed to delete message | ID: %s", msg.ID) } case *outbox.ReadError: log.Printf("Failed to read outbox messages | Error: %v", e.Err) default: log.Printf("Unexpected error occurred | Error: %v", e) } }
func (*Reader) Start ¶
func (r *Reader) Start()
Start begins the background processing of outbox messages. It periodically reads unpublished messages and attempts to publish them. If Start is called multiple times, only the first call has an effect.
func (*Reader) Stop ¶
Stop gracefully shuts down the outbox reader processing. It prevents new reader cycles from starting and waits for any ongoing message publishing to complete. The provided context controls how long to wait for graceful shutdown before giving up.
If the context expires before processing completes, Stop returns the context's error. If shutdown completes successfully, it returns nil. Calling Stop multiple times is safe and only the first call has an effect.
type ReaderOption ¶
type ReaderOption func(*Reader)
ReaderOption is a function that configures a Reader instance.
func WithDelay ¶
func WithDelay(delayFunc DelayFunc) ReaderOption
WithDelay sets the delay function to apply between attempts to publish a message. Default is ExponentialDelay(200ms, 1h).
func WithDeleteBatchSize ¶
func WithDeleteBatchSize(size int) ReaderOption
WithDeleteBatchSize sets the number of successfully published messages to accumulate before executing a batch delete operation from the outbox table.
The reader processes messages sequentially: for each message, it attempts to publish and then accumulates successfully published messages for deletion. When the batch reaches the specified size, all messages in the batch are deleted in a single database operation.
Performance considerations:
- Larger batch sizes reduce database round trips but increase memory usage
- Smaller batch sizes provide faster cleanup but more frequent database operations
- A batch size of 1 deletes each message immediately after successful publication
Behavior:
- Only successfully published messages are added to the delete batch
- Failed publications do not affect the batch; those messages remain in the outbox
- At the end of each processing cycle, any remaining messages in the batch are deleted regardless of batch size to prevent reprocessing
- If fewer messages exist than the batch size, they are still deleted in one operation
Default is 20. Size must be positive.
func WithDeleteTimeout ¶
func WithDeleteTimeout(timeout time.Duration) ReaderOption
WithDeleteTimeout sets the timeout for deleting messages from the outbox. Default is 5 seconds.
func WithDiscardedMessagesChannelSize ¶
func WithDiscardedMessagesChannelSize(size int) ReaderOption
WithDiscardedMessagesChannelSize sets the size of the discarded messages channel. Default is 128. Size must be positive.
func WithErrorChannelSize ¶
func WithErrorChannelSize(size int) ReaderOption
WithErrorChannelSize sets the size of the error channel. Default is 128. Size must be positive.
func WithExponentialDelay ¶
func WithExponentialDelay(initialDelay time.Duration, maxDelay time.Duration) ReaderOption
WithExponentialDelay sets the delay between attempts to publish a message to be exponential. The delay is 2^n where n is the current attempt number.
For example, with initialDelay of 200 miliseconds and maxDelay of 1 hour:
Delay after attempt 0: 200ms Delay after attempt 1: 400ms Delay after attempt 2: 800ms Delay after attempt 3: 1.6s Delay after attempt 4: 3.2s Delay after attempt 5: 6.4s Delay after attempt 6: 12.8s Delay after attempt 7: 25.6s Delay after attempt 8: 51.2s Delay after attempt 9: 1m42.4s Delay after attempt 10: 3m24.8s Delay after attempt 11: 6m49.6s Delay after attempt 12: 13m39.2s Delay after attempt 13: 27m18.4s Delay after attempt 14: 54m36.8s Delay after attempt 15: 1h0m0s Delay after attempt 16: 1h0m0s ...
func WithFixedDelay ¶
func WithFixedDelay(delay time.Duration) ReaderOption
WithFixedDelay sets the delay between attempts to publish a message to be fixed. The delay is the same for all attempts.
For example, with delay of 200 miliseconds:
Delay after attempt 0: 200ms Delay after attempt 1: 200ms ...
func WithInterval ¶
func WithInterval(interval time.Duration) ReaderOption
WithInterval sets the time between outbox reader processing attempts. Default is 10 seconds.
func WithMaxAttempts ¶
func WithMaxAttempts(maxAttempts int32) ReaderOption
WithMaxAttempts sets the maximum number of attempts to publish a message. Message is discarded if max attempts is reached. Users can use `DiscardedMessages` function to get a channel and be notified about any message discarded. Default is math.MaxInt32. Must be positive.
func WithPublishTimeout ¶
func WithPublishTimeout(timeout time.Duration) ReaderOption
WithPublishTimeout sets the timeout for publishing messages to the external system. Default is 5 seconds.
func WithReadBatchSize ¶
func WithReadBatchSize(batchSize int) ReaderOption
WithReadBatchSize sets the maximum number of messages to process in a single batch. Default is 100 messages. Must be positive.
func WithReadTimeout ¶
func WithReadTimeout(timeout time.Duration) ReaderOption
WithReadTimeout sets the timeout for reading messages from the outbox. Default is 5 seconds.
func WithUpdateTimeout ¶
func WithUpdateTimeout(timeout time.Duration) ReaderOption
WithUpdateTimeout sets the timeout for updating a message in the outbox table. Default is 5 seconds.
type SQLDialect ¶
type SQLDialect string
SQLDialect represents a SQL database dialect.
const ( SQLDialectPostgres SQLDialect = "postgres" SQLDialectMySQL SQLDialect = "mysql" SQLDialectMariaDB SQLDialect = "mariadb" SQLDialectSQLite SQLDialect = "sqlite" SQLDialectOracle SQLDialect = "oracle" SQLDialectSQLServer SQLDialect = "sqlserver" )
Supported database dialects.
type TxQueryer ¶
type TxQueryer interface { Queryer }
TxQueryer represents a query executor inside a transaction.
type TxWorkFunc ¶
TxWorkFunc is user-supplied callback that accepts a TxQueryer parameter that executes user-defined queries within the transaction that stores a message in the outbox. The Writer itself commits or rolls back the transaction once the callback and the outbox insert complete.
type UpdateError ¶
UpdateError indicates an error when updating a message in the outbox. It includes the message that failed to be updated and the original error.
func (*UpdateError) Error ¶
func (e *UpdateError) Error() string
func (*UpdateError) Unwrap ¶
func (e *UpdateError) Unwrap() error
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer handles storing messages in the outbox table as part of user-defined queries within a database transaction. It optionally supports optimistic publishing, which attempts to publish messages immediately after transaction commit.
func NewWriter ¶
func NewWriter(dbCtx *DBContext, opts ...WriterOption) *Writer
NewWriter creates a new outbox Writer with the given database context and options.
func (*Writer) Write ¶
Write stores a message in the outbox table as part of a transaction, and executes the provided callback within the same transaction. This ensures that if the callback succeeds but storing the message fails, the entire transaction is rolled back.
If optimistic publishing is enabled, the message will also be published to the external system after the transaction is committed asynchronously.
type WriterOption ¶
type WriterOption func(*Writer)
WriterOption is a function that configures a Writer instance.
func WithOptimisticPublisher ¶
func WithOptimisticPublisher(msgPublisher MessagePublisher) WriterOption
WithOptimisticPublisher configures the Writer to attempt immediate publishing of messages after the transaction is committed. This can improve performance by reducing the delay between transaction commit and message publishing, while still ensuring consistency if publishing fails.
Note: optimistic path is just an efficiency optimization, not something the system depends on for correctness. If the message is not published, it will be retried by the reader. Due to this retry mechanism, duplicate message deliveries may occur (e.g. reader wakes up just after message is committed).
func WithOptimisticTimeout ¶
func WithOptimisticTimeout(timeout time.Duration) WriterOption
WithOptimisticTimeout sets the timeout for optimistic publishing and deleting messages. Default is 10 seconds.