outbox

package module
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2025 License: MIT Imports: 9 Imported by: 0

README

Release Software License GitHub Actions codecov Go Report Card Go Reference

Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.

Key Features

  • Minimal External Dependencies: Doesn't impose additional dependencies (like specific Kafka, MySQL, etc.) other than google/uuid on users of this library.
  • Database Agnostic: Designed to work with PostgreSQL, MySQL, Oracle and other relational databases.
  • Message Broker Agnostic: Integrates seamlessly with popular brokers like Kafka, NATS, RabbitMQ, and others.
  • Optimistic Publishing: Optional immediate async message publishing after transaction commit for reduced latency, with guaranteed delivery fallback.
  • Configurable Retry & Backoff Policies: Fixed or exponential back-off with adjustable initial and maximum delay.
  • Max Attempts Safeguard: Automatically discards messages that exceed a configurable maxAttempts threshold, enabling dead-letter routing or alert on poison events.
  • Observability: Exposes channels for processing errors and discarded messages, making it easy to integrate with your metrics and alerting systems.
  • Simplicity: Minimal, easy-to-understand codebase focused on core outbox pattern concepts.
  • Extensible: Designed for easy customization and integration into your own projects.

Usage

The library consists of two main components:

  1. Writer: Stores your entity and corresponding message atomically within a transaction
  2. Reader: Publishes stored messages to your message broker in the background

The Writer

The Writer ensures your entity and outbox message are stored together atomically:

// Setup database connection
db, _ := sql.Open("pgx", "postgres://user:password@localhost:5432/outbox?sslmode=disable")

// Create a DBContext and Writer instance
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)
writer := outbox.NewWriter(dbCtx)

// In your business logic:
//
// Create your entity and outbox message
entity := Entity{
    ID:        uuid.New(),
    CreatedAt: time.Now().UTC(),
}

payload, _ := json.Marshal(entity)
metadata := json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)
msg := outbox.NewMessage(payload,
    outbox.WithCreatedAt(entity.CreatedAt),
    outbox.WithMetadata(metadata))

// Write message and entity in a single transaction
err = writer.Write(ctx, msg,
    // This user-defined callback executes queries within the
    // same transaction that stores the outbox message
    func(ctx context.Context, txQueryer outbox.TxQueryer) error {
        _, err := txQueryer.ExecContext(ctx,
            "INSERT INTO entity (id, created_at) VALUES ($1, $2)",
            entity.ID, entity.CreatedAt,
        )
        return err
    })
🚀 Optimistic Publishing (Optional)

Optimistic publishing attempts to publish messages immediately after transaction commit, reducing latency while maintaining guaranteed delivery through the background reader as fallback.

How It Works
  1. Transaction commits (entity + outbox message stored)
  2. Immediate publish attempt to broker (asynchronously, will not block the incoming request)
  3. On success: message is removed from outbox
  4. On failure: background reader handles delivery later
Configuration
// Create publisher (see Reader section below)
publisher := &messagePublisher{}

// Enable optimistic publishing in writer
writer := outbox.NewWriter(dbCtx, outbox.WithOptimisticPublisher(publisher))

Important considerations:

  • Publishing happens asynchronously after transaction commit
  • Message consumers must be idempotent as messages could be published twice - by the optimistic publisher and the reader (Note: consumer idempotency is a good practice regardless of optimistic publishing, though some brokers also provide deduplication features)
  • Publishing failures don't affect your transactions - they don't cause Write() to fail

The Reader

The Reader periodically checks for unsent messages and publishes them to your message broker:

// Create a message publisher implementation
type messagePublisher struct {
    // Your message broker client (e.g., Kafka, RabbitMQ)
}
func (p *messagePublisher) Publish(ctx context.Context, msg *outbox.Message) error {
    // Publish the message to your broker. See examples below for specific implementations
    return nil
}

// Create and start the reader
reader := outbox.NewReader(
    dbCtx,                              // Database context
    &messagePublisher{},                // Publisher implementation
    outbox.WithInterval(5*time.Second), // Polling interval (default: 10s)
    outbox.WithReadBatchSize(200),      // Read batch size (default: 100)
    outbox.WithDeleteBatchSize(50),     // Delete batch size (default: 20)
    outbox.WithMaxAttempts(300),        // Discard after 300 attempts (default: MaxInt32)
    outbox.WithExponentialDelay(        // Delay between attempts (default: Exponential; can also use Fixed or Custom)
        500*time.Millisecond,           // Initial delay (default: 200ms)
        30*time.Minute),                // Maximum delay (default: 1h)
)
reader.Start()
defer reader.Stop(context.Background()) // Stop during application shutdown

// Monitor standard processing errors (publish / update / delete / read).
go func() {
    for err := range reader.Errors() {
        switch e := err.(type) {
        case *outbox.PublishError:
            log.Printf("Failed to publish message | ID: %s | Error: %v",
                e.Message.ID, e.Err)

        case *outbox.UpdateError:
            log.Printf("Failed to update message | ID: %s | Error: %v",
                e.Message.ID, e.Err)

        case *outbox.DeleteError:
            log.Printf("Batch message deletion failed | Count: %d | Error: %v",
                len(e.Messages), e.Err)
            for _, msg := range e.Messages {
                log.Printf("Failed to delete message | ID: %s", msg.ID)
            }

        case *outbox.ReadError:
            log.Printf("Failed to read outbox messages | Error: %v", e.Err)

        default:
            log.Printf("Unexpected error occurred | Error: %v", e)
        }
    }
}()

// Monitor discarded messages (hit the max-attempts threshold).
go func() {
    for msg := range reader.DiscardedMessages() {
        log.Printf("outbox message %s discarded after %d attempts",
            msg.ID, msg.TimesAttempted)
        // Example next steps:
        //   • forward to a dead-letter topic
        //   • raise an alert / metric
        //   • persist for manual inspection
    }
}()

Database Setup

1. Choose Your Database Dialect

The library supports multiple relational databases. Configure the appropriate SQLDialect when creating the DBContext. Supported dialects are PostgreSQL, MySQL, MariaDB, SQLite, Oracle and SQL Server.

// Example creating a DBContext with MySQL dialect
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectMySQL)
2. Create the Outbox Table

The outbox table stores messages that need to be published to your message broker. Choose your database below:

🐘 PostgreSQL
CREATE TABLE IF NOT EXISTS outbox (
    id UUID PRIMARY KEY,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL,
    scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
    metadata BYTEA,
    payload BYTEA NOT NULL,
    times_attempted INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
📊 MySQL
CREATE TABLE IF NOT EXISTS outbox (
    id BINARY(16) PRIMARY KEY,
    created_at TIMESTAMP(3) NOT NULL,
    scheduled_at TIMESTAMP(3) NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
🐬 MariaDB
CREATE TABLE IF NOT EXISTS outbox (
    id UUID PRIMARY KEY,
    created_at TIMESTAMP(3) NOT NULL,
    scheduled_at TIMESTAMP(3) NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
🗃️ SQLite
CREATE TABLE IF NOT EXISTS outbox (
    id TEXT PRIMARY KEY,
    created_at DATETIME NOT NULL,
    scheduled_at DATETIME NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
🏛️ Oracle
CREATE TABLE outbox (
    id RAW(16) PRIMARY KEY,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL,
    scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted NUMBER(10) NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
🪟 SQL Server
CREATE TABLE outbox (
    id UNIQUEIDENTIFIER PRIMARY KEY,
    created_at DATETIMEOFFSET(3) NOT NULL,
    scheduled_at DATETIMEOFFSET(3) NOT NULL,
    metadata VARBINARY(MAX),
    payload VARBINARY(MAX) NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

Examples

Complete working examples for different databases and message brokers:

To run an example:

cd examples/postgres-kafka # or examples/oracle-nats or examples/mysql-rabitmq
../../scripts/up-and-wait.sh
go run service.go

# In another terminal trigger a POST to trigger entity creation
curl -X POST http://localhost:8080/entity

FAQ

What happens when multiple instances of my service use the library?

When running multiple instances of your service, each with its own reader, be aware that:

  • Multiple readers will independently poll for messages
  • This can result in duplicate message publishing
  • To handle this, you can either:
    1. Ensure your consumers are idempotent and accept duplicates
    2. Use broker-side deduplication if available (e.g. NATS JetStream's Msg-Id)
    3. Run the reader in a single instance only (e.g. single replica deployment in k8s with reader)

The optimistic publisher feature can significantly reduce the number of duplicates. With optimistic publisher messages are delivered as soon as they are committed, so readers will usually see no messages in the outbox table.

Note that even in single instance deployments, message duplicates can still occur (e.g. if the service crashes right after successfully publishing to the broker). However, these duplicates are less frequent compared to multi instance deployments.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

Overview

Package outbox implements the transactional outbox pattern, ensuring reliable message publishing by first persisting messages within a database transaction before they are sent to a message broker.

The core of the pattern involves two main operations:

  1. Writing: Messages are stored durably in an "outbox" table as part of the application's local database transaction. This ensures that the message is captured atomically with the business operation that produces it.

  2. Reading & Publishing: A background process reads unpublished messages from the outbox table and publishes them to the message broker. Once successfully published, messages are removed from the outbox table.

This package provides the following components to integrate this pattern:

  • A `Writer` to facilitate the atomic storage of messages into the outbox table alongside your application's domain changes within a single transaction.
  • A `Reader` background process to poll the outbox table for unpublished messages, attempt to publish them to a message broker, and remove them upon success.

The library is designed to be agnostic to specific database or message broker technologies, allowing integration with various systems. For detailed setup, features, and examples, please refer to the project README.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DB

type DB interface {
	BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error)
	Queryer
}

DB represents a database connection. It is compatible with the standard sql.DB type.

type DBContext

type DBContext struct {
	// contains filtered or unexported fields
}

DBContext holds the database connection and the SQL dialect.

func NewDBContext

func NewDBContext(db *sql.DB, dialect SQLDialect) *DBContext

NewDBContext creates a new DBContext from a standard *sql.DB.

func NewDBContextWithDB

func NewDBContextWithDB(db DB, dialect SQLDialect) *DBContext

NewDBContextWithDB creates a new DBContext with a custom DB implementation. This is useful for users who want to provide their own database abstraction or for testing.

type DelayFunc

type DelayFunc func(attempt int) time.Duration

DelayFunc is a function that returns the delay after a given attempt.

func Exponential

func Exponential(delay time.Duration, maxDelay time.Duration) DelayFunc

Exponential returns a DelayFunc that returns an exponential delay for all attempts. The delay is 2^n where n is the current attempt number.

For example, with initialDelay of 200 miliseconds and maxDelay of 1 hour:

Delay after attempt 0: 200ms Delay after attempt 1: 400ms Delay after attempt 2: 800ms Delay after attempt 3: 1.6s Delay after attempt 4: 3.2s Delay after attempt 5: 6.4s Delay after attempt 6: 12.8s Delay after attempt 7: 25.6s Delay after attempt 8: 51.2s Delay after attempt 9: 1m42.4s Delay after attempt 10: 3m24.8s Delay after attempt 11: 6m49.6s Delay after attempt 12: 13m39.2s Delay after attempt 13: 27m18.4s Delay after attempt 14: 54m36.8s Delay after attempt 15: 1h0m0s Delay after attempt 16: 1h0m0s ...

func Fixed

func Fixed(delay time.Duration) DelayFunc

Fixed returns a DelayFunc that returns a fixed delay for all attempts.

type DeleteError

type DeleteError struct {
	Messages []Message
	Err      error
}

DeleteError indicates an error during the batch deletion of messages. It includes the messages that failed to be deleted and the original error.

func (*DeleteError) Error

func (e *DeleteError) Error() string

func (*DeleteError) Unwrap

func (e *DeleteError) Unwrap() error

type Message

type Message struct {
	// ID is a unique identifier for the message
	ID uuid.UUID

	// CreatedAt is the timestamp when the message was created
	CreatedAt time.Time

	// ScheduledAt is the timestamp when the message should be published
	ScheduledAt time.Time

	// Metadata is an optional field containing additional information about the message,
	// such as correlation IDs, trace IDs, user context, or other custom attributes.
	// This data is typically JSON-serialized and can be used for tracing, debugging, or routing purposes.
	// Most message brokers support attaching such metadata as message headers, enabling richer message processing and observability.
	Metadata []byte

	// Payload contains the actual message data, typically JSON serialized
	Payload []byte

	// TimesAttempted is the number of times the message has been attempted to be published
	// Read only field
	TimesAttempted int32
}

Message represents a message to be published through the outbox pattern. It contains all the information needed to process and publish the message to an external system (like a message broker).

func NewMessage

func NewMessage(payload []byte, opts ...MessageOption) *Message

NewMessage creates a new Message with the given payload.

type MessageOption

type MessageOption func(*Message)

MessageOption is a function that can be used to configure a Message.

func WithCreatedAt

func WithCreatedAt(createdAt time.Time) MessageOption

WithCreatedAt sets the time the message was created. If not provided, the current time will be used.

func WithID

func WithID(id uuid.UUID) MessageOption

WithID sets the unique identifier of the message. If not provided, a new UUID will be generated.

func WithMetadata

func WithMetadata(metadata []byte) MessageOption

WithMetadata attaches message metadata (e.g. correlation ID, trace ID, etc).

func WithScheduledAt

func WithScheduledAt(scheduledAt time.Time) MessageOption

WithScheduledAt sets the time the message should be published. If not provided, the current time will be used.

type MessagePublisher

type MessagePublisher interface {
	// Publish sends a message to an external system (e.g., a message broker).
	// This function may be called multiple times for the same message.
	// Consumers must be idempotent and handle duplicate messages,
	// though some brokers provide deduplication features.
	// Return nil on success.
	// Return error on failure. In this case:
	// - The message will be retried according to the configured retry and backoff settings
	// - or will be discarded if the maximum number of attempts is reached.
	Publish(ctx context.Context, msg *Message) error
}

MessagePublisher defines an interface for publishing messages to an external system.

type PublishError

type PublishError struct {
	Message Message
	Err     error
}

PublishError indicates an error during message publication. It includes the message that failed to be published and the original error.

func (*PublishError) Error

func (e *PublishError) Error() string

func (*PublishError) Unwrap

func (e *PublishError) Unwrap() error

type Queryer

type Queryer interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
}

Queryer represents a query executor.

type ReadError

type ReadError struct {
	Err error
}

ReadError indicates an error when reading messages from the outbox.

func (*ReadError) Error

func (e *ReadError) Error() string

func (*ReadError) Unwrap

func (e *ReadError) Unwrap() error

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader periodically reads unpublished messages from the outbox table and attempts to publish them to an external system.

func NewReader

func NewReader(dbCtx *DBContext, msgPublisher MessagePublisher, opts ...ReaderOption) *Reader

NewReader creates a new outbox Reader with the given database context, message publisher, and options.

func (*Reader) DiscardedMessages

func (r *Reader) DiscardedMessages() <-chan Message

DiscardedMessages returns a channel that receives messages that were discarded because they reached the maximum number of attempts. The channel is closed when the reader is stopped.

Consumers should drain this channel promptly to avoid missing messages.

func (*Reader) Errors

func (r *Reader) Errors() <-chan error

Errors returns a channel that receives errors from the outbox reader. The channel is buffered to prevent blocking the reader. If the buffer becomes full, subsequent errors will be dropped to maintain reader throughput. The channel is closed when the reader is stopped.

The returned error will be one of the following types, which can be checked using a type switch:

  • *PublishError: Failed to publish a message. Contains the message.
  • *UpdateError: Failed to update a message after a failed publish attempt. Contains the message.
  • *DeleteError: Failed to delete a batch of messages. Contains the messages.
  • *ReadError: Failed to read messages from the outbox.

Example of error handling:

for err := range r.Errors() {
	switch e := err.(type) {
	case *outbox.PublishError:
		log.Printf("Failed to publish message | ID: %s | Error: %v",
			e.Message.ID, e.Err)

	case *outbox.UpdateError:
		log.Printf("Failed to update message | ID: %s | Error: %v",
			e.Message.ID, e.Err)

	case *outbox.DeleteError:
		log.Printf("Batch message deletion failed | Count: %d | Error: %v",
			len(e.Messages), e.Err)
		for _, msg := range e.Messages {
			log.Printf("Failed to delete message | ID: %s", msg.ID)
		}

	case *outbox.ReadError:
		log.Printf("Failed to read outbox messages | Error: %v", e.Err)

	default:
		log.Printf("Unexpected error occurred | Error: %v", e)
	}
}

func (*Reader) Start

func (r *Reader) Start()

Start begins the background processing of outbox messages. It periodically reads unpublished messages and attempts to publish them. If Start is called multiple times, only the first call has an effect.

func (*Reader) Stop

func (r *Reader) Stop(ctx context.Context) error

Stop gracefully shuts down the outbox reader processing. It prevents new reader cycles from starting and waits for any ongoing message publishing to complete. The provided context controls how long to wait for graceful shutdown before giving up.

If the context expires before processing completes, Stop returns the context's error. If shutdown completes successfully, it returns nil. Calling Stop multiple times is safe and only the first call has an effect.

type ReaderOption

type ReaderOption func(*Reader)

ReaderOption is a function that configures a Reader instance.

func WithDelay

func WithDelay(delayFunc DelayFunc) ReaderOption

WithDelay sets the delay function to apply between attempts to publish a message. Default is ExponentialDelay(200ms, 1h).

func WithDeleteBatchSize

func WithDeleteBatchSize(size int) ReaderOption

WithDeleteBatchSize sets the number of successfully published messages to accumulate before executing a batch delete operation from the outbox table.

The reader processes messages sequentially: for each message, it attempts to publish and then accumulates successfully published messages for deletion. When the batch reaches the specified size, all messages in the batch are deleted in a single database operation.

Performance considerations:

  • Larger batch sizes reduce database round trips but increase memory usage
  • Smaller batch sizes provide faster cleanup but more frequent database operations
  • A batch size of 1 deletes each message immediately after successful publication

Behavior:

  • Only successfully published messages are added to the delete batch
  • Failed publications do not affect the batch; those messages remain in the outbox
  • At the end of each processing cycle, any remaining messages in the batch are deleted regardless of batch size to prevent reprocessing
  • If fewer messages exist than the batch size, they are still deleted in one operation

Default is 20. Size must be positive.

func WithDeleteTimeout

func WithDeleteTimeout(timeout time.Duration) ReaderOption

WithDeleteTimeout sets the timeout for deleting messages from the outbox. Default is 5 seconds.

func WithDiscardedMessagesChannelSize

func WithDiscardedMessagesChannelSize(size int) ReaderOption

WithDiscardedMessagesChannelSize sets the size of the discarded messages channel. Default is 128. Size must be positive.

func WithErrorChannelSize

func WithErrorChannelSize(size int) ReaderOption

WithErrorChannelSize sets the size of the error channel. Default is 128. Size must be positive.

func WithExponentialDelay

func WithExponentialDelay(initialDelay time.Duration, maxDelay time.Duration) ReaderOption

WithExponentialDelay sets the delay between attempts to publish a message to be exponential. The delay is 2^n where n is the current attempt number.

For example, with initialDelay of 200 miliseconds and maxDelay of 1 hour:

Delay after attempt 0: 200ms Delay after attempt 1: 400ms Delay after attempt 2: 800ms Delay after attempt 3: 1.6s Delay after attempt 4: 3.2s Delay after attempt 5: 6.4s Delay after attempt 6: 12.8s Delay after attempt 7: 25.6s Delay after attempt 8: 51.2s Delay after attempt 9: 1m42.4s Delay after attempt 10: 3m24.8s Delay after attempt 11: 6m49.6s Delay after attempt 12: 13m39.2s Delay after attempt 13: 27m18.4s Delay after attempt 14: 54m36.8s Delay after attempt 15: 1h0m0s Delay after attempt 16: 1h0m0s ...

func WithFixedDelay

func WithFixedDelay(delay time.Duration) ReaderOption

WithFixedDelay sets the delay between attempts to publish a message to be fixed. The delay is the same for all attempts.

For example, with delay of 200 miliseconds:

Delay after attempt 0: 200ms Delay after attempt 1: 200ms ...

func WithInterval

func WithInterval(interval time.Duration) ReaderOption

WithInterval sets the time between outbox reader processing attempts. Default is 10 seconds.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int32) ReaderOption

WithMaxAttempts sets the maximum number of attempts to publish a message. Message is discarded if max attempts is reached. Users can use `DiscardedMessages` function to get a channel and be notified about any message discarded. Default is math.MaxInt32. Must be positive.

func WithPublishTimeout

func WithPublishTimeout(timeout time.Duration) ReaderOption

WithPublishTimeout sets the timeout for publishing messages to the external system. Default is 5 seconds.

func WithReadBatchSize

func WithReadBatchSize(batchSize int) ReaderOption

WithReadBatchSize sets the maximum number of messages to process in a single batch. Default is 100 messages. Must be positive.

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) ReaderOption

WithReadTimeout sets the timeout for reading messages from the outbox. Default is 5 seconds.

func WithUpdateTimeout

func WithUpdateTimeout(timeout time.Duration) ReaderOption

WithUpdateTimeout sets the timeout for updating a message in the outbox table. Default is 5 seconds.

type SQLDialect

type SQLDialect string

SQLDialect represents a SQL database dialect.

const (
	SQLDialectPostgres  SQLDialect = "postgres"
	SQLDialectMySQL     SQLDialect = "mysql"
	SQLDialectMariaDB   SQLDialect = "mariadb"
	SQLDialectSQLite    SQLDialect = "sqlite"
	SQLDialectOracle    SQLDialect = "oracle"
	SQLDialectSQLServer SQLDialect = "sqlserver"
)

Supported database dialects.

type Tx

type Tx interface {
	Commit() error
	Rollback() error
	TxQueryer
}

Tx represents a database transaction. It is compatible with the standard sql.Tx type.

type TxQueryer

type TxQueryer interface {
	Queryer
}

TxQueryer represents a query executor inside a transaction.

type TxWorkFunc

type TxWorkFunc func(ctx context.Context, txQueryer TxQueryer) error

TxWorkFunc is user-supplied callback that accepts a TxQueryer parameter that executes user-defined queries within the transaction that stores a message in the outbox. The Writer itself commits or rolls back the transaction once the callback and the outbox insert complete.

type UpdateError

type UpdateError struct {
	Message Message
	Err     error
}

UpdateError indicates an error when updating a message in the outbox. It includes the message that failed to be updated and the original error.

func (*UpdateError) Error

func (e *UpdateError) Error() string

func (*UpdateError) Unwrap

func (e *UpdateError) Unwrap() error

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer handles storing messages in the outbox table as part of user-defined queries within a database transaction. It optionally supports optimistic publishing, which attempts to publish messages immediately after transaction commit.

func NewWriter

func NewWriter(dbCtx *DBContext, opts ...WriterOption) *Writer

NewWriter creates a new outbox Writer with the given database context and options.

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) error

Write stores a message in the outbox table as part of a transaction, and executes the provided callback within the same transaction. This ensures that if the callback succeeds but storing the message fails, the entire transaction is rolled back.

If optimistic publishing is enabled, the message will also be published to the external system after the transaction is committed asynchronously.

type WriterOption

type WriterOption func(*Writer)

WriterOption is a function that configures a Writer instance.

func WithOptimisticPublisher

func WithOptimisticPublisher(msgPublisher MessagePublisher) WriterOption

WithOptimisticPublisher configures the Writer to attempt immediate publishing of messages after the transaction is committed. This can improve performance by reducing the delay between transaction commit and message publishing, while still ensuring consistency if publishing fails.

Note: optimistic path is just an efficiency optimization, not something the system depends on for correctness. If the message is not published, it will be retried by the reader. Due to this retry mechanism, duplicate message deliveries may occur (e.g. reader wakes up just after message is committed).

func WithOptimisticTimeout

func WithOptimisticTimeout(timeout time.Duration) WriterOption

WithOptimisticTimeout sets the timeout for optimistic publishing and deleting messages. Default is 10 seconds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL