outbox

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 11 Imported by: 0

README

pg-outbox

Transactional outbox for PostgreSQL. Transport-agnostic, horizontally scalable.

The database surface is the Executor interface (Exec / Query / QueryRow). *pgxpool.Pool, pgx.Tx, *pgx.Conn and pgtx's tx.DB all satisfy it, so you can plug your own pooling / routing / instrumentation layer.

Install

go get github.com/gopherex/pg-outbox

Migrate

The SQL is schema-unqualified. Apply it with your tool of choice via outbox.MigrationsFS() (an embed.FS), or apply the raw statements from outbox.Migrations():

for _, stmt := range outbox.Migrations() {
    if _, err := pool.Exec(ctx, stmt); err != nil { log.Fatal(err) }
}

To install into a non-default schema, set search_path on the migration connection; then pass the same schema to WithSchema.

Use

// exec drives the background relay (must NOT be tx-bound);
// enq is pgtx's tx.DB (so Enqueue joins your business transaction).
ob, err := outbox.New(pool, enq, myPublisher,
    outbox.WithInstanceID(os.Getenv("POD_NAME")), // unique, stable per process
    outbox.WithSchema("public"),
    outbox.WithConcurrency(4),
    outbox.WithRetention(72*time.Hour),
    outbox.WithCleanupInterval(time.Hour),
)
if err != nil { log.Fatal(err) }

// Inside a pgtx transaction, atomically with your business writes:
err = tx.DoSerializable(ctx, mgr, func(ctx context.Context) error {
    if err := repo.Save(ctx, order); err != nil { return err }
    return ob.Enqueue(ctx, outbox.Message{
        Topic:        "orders",
        PartitionKey: order.ID,
        Payload:      data,
        Headers:      map[string]string{"trace-id": traceID},
    })
})

// In a background goroutine:
go ob.Run(ctx) // blocks until ctx is cancelled, then drains and returns

Implement outbox.Publisher for your transport:

type Publisher interface {
    Publish(ctx context.Context, msgs []outbox.Message) error
}

Plug your own database layer by implementing outbox.Executor:

type Executor interface {
    Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
    Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
    QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

LISTEN/NOTIFY wake-ups need a *pgxpool.Pool: supply one with WithListenPool, or pass a *pgxpool.Pool as the relay executor and it is detected automatically. Without one the relay falls back to polling.

Configuration

Two interchangeable styles, usable together (functional options win):

// declarative — mapstructure/validate/default tags, load from file or env:
var cfg outbox.Config // see config.Config fields
ob, _ := outbox.New(pool, enq, pub,
    outbox.WithConfig(cfg),
    outbox.WithLogger(log), // logger injected separately, not config data
)

// or pure options:
ob, _ := outbox.New(pool, enq, pub, outbox.WithInstanceID("pod-1"), ...)

Logging

The logger is the standard library's *slog.Logger, injected via WithLogger. Default is a no-op (slog.DiscardHandler), so nothing is logged until you supply one — no third-party logging dependency:

ob, _ := outbox.New(pool, enq, pub, outbox.WithLogger(slog.Default()))

Retry backoff

Default is exponential 100ms→30s with jitter. Override with WithRetryBackoff. The Backoff type is func(attempt int) time.Duration (stateless, indexed by the message's persisted attempt count — survives restarts). Adapters bridge popular libraries without adding them as dependencies:

// github.com/cenkalti/backoff
outbox.WithRetryBackoff(backoff.FromNextBackOff(func() backoff.NextBackOffer {
    return cbackoff.NewExponentialBackOff()
}))

// github.com/sethvargo/go-retry
outbox.WithRetryBackoff(backoff.FromNexter(func() backoff.Nexter { ... }))

Package layout

The root package outbox is the only public surface (facade + re-exports). The implementation lives in subpackages: message, port (Publisher/Codec/Hooks/ Executor), config, backoff, migrations, engine (store/relay/cleaner). The published module depends only on pgx; the standard library covers logging.

Semantics

  • At-least-once. A failed batch is retried in full; consumers must deduplicate (e.g. on Message.ID).
  • Scaling. Run one relay per instance with a unique WithInstanceID. Claims use FOR UPDATE SKIP LOCKED + a lease (locked_until); a crashed instance's rows are re-claimed once its lease expires.
  • Retries / dead-letter. On failure attempts increments and the row is rescheduled with backoff; after WithMaxAttempts it becomes dead and stays in the table for inspection.
  • Ordering. Best-effort by created_at. WithOrdered(true) enforces strict per-PartitionKey ordering; note a dead row then blocks its key until an operator resolves it.
  • Latency. A trigger fires NOTIFY; the relay wakes near-instantly and falls back to polling every WithPollInterval.

Notes

  • attempts increments at claim time, so a crash mid-publish counts as an attempt (poison-message protection).
  • The table name is fixed (outbox_messages); only the schema is configurable.
  • Codecs (JSON/proto) and concrete publishers (Kafka/NATS/…) are out of scope — bring your own.

Tests

go test -race ./...                       # pure-logic unit tests, no DB
cd test/integration && go test ./...      # black-box integration (testcontainers Postgres, Docker required)

Integration tests live in a separate nested module (test/integration) so the heavy testcontainers / docker dependency tree never enters this module's go.mod. The published library stays a single pgx dependency.

Documentation

Overview

Package outbox implements a transactional outbox for PostgreSQL.

Messages are written to the outbox table inside the caller's business transaction; a background relay later claims them with FOR UPDATE SKIP LOCKED (leased to an externally supplied instance id) and hands them to a transport-agnostic Publisher. Delivery is at-least-once: consumers must deduplicate.

This file is the package's single public surface: it re-exports the types and constructors from the implementation subpackages (message, port, config, backoff, engine, migrations) and provides the Outbox facade. The database surface is the Executor interface (Exec, Query, QueryRow); plug your own. The logger is the standard library's *slog.Logger, injected via WithLogger.

Index

Constants

View Source
const (
	StatusPending    = message.StatusPending
	StatusProcessing = message.StatusProcessing
	StatusPublished  = message.StatusPublished
	StatusDead       = message.StatusDead
)

Status values.

View Source
const TableName = message.TableName

TableName is the fixed outbox table name (only the schema is configurable).

Variables

View Source
var (
	WithConfig          = config.WithConfig
	WithInstanceID      = config.WithInstanceID
	WithSchema          = config.WithSchema
	WithPollInterval    = config.WithPollInterval
	WithBatchSize       = config.WithBatchSize
	WithLeaseDuration   = config.WithLeaseDuration
	WithConcurrency     = config.WithConcurrency
	WithMaxAttempts     = config.WithMaxAttempts
	WithRetryBackoff    = config.WithRetryBackoff
	WithRetention       = config.WithRetention
	WithCleanupInterval = config.WithCleanupInterval
	WithOrdered         = config.WithOrdered
	WithoutNotify       = config.WithoutNotify
	WithListenPool      = config.WithListenPool
	WithHooks           = config.WithHooks
	WithCodec           = config.WithCodec
	WithLogger          = config.WithLogger
)
View Source
var (
	// ErrEmptyTopic is returned when a message has no topic.
	ErrEmptyTopic = message.ErrEmptyTopic
	// ErrNilPayload is returned when a message payload is nil.
	ErrNilPayload = message.ErrNilPayload
	// ErrInvalidSchema is returned by New when the schema is not a valid identifier.
	ErrInvalidSchema = config.ErrInvalidSchema
	// ErrNoCodec is returned by EnqueueValue when no codec was configured.
	ErrNoCodec = errors.New("outbox: no codec configured (use WithCodec)")
	// ErrNoExecutor is returned by New when no relay Executor was supplied.
	ErrNoExecutor = errors.New("outbox: a relay executor is required")
)
View Source
var DefaultBackoff = backoff.Default

DefaultBackoff is exponential 100ms -> 30s with full jitter.

View Source
var ExpBackoff = backoff.Exp

ExpBackoff returns an exponential backoff (base doubled per attempt, capped).

Functions

func Migrations

func Migrations() []string

Migrations returns the contents of every *.up.sql file, ordered by filename, ready to pass to Exec for callers applying migrations without a tool.

func MigrationsFS

func MigrationsFS() embed.FS

MigrationsFS returns the embedded migration files for golang-migrate (iofs source), goose, atlas, etc. The SQL is schema-unqualified: set search_path on the migration connection to install into a non-default schema.

Types

type Backoff

type Backoff = backoff.Backoff

Backoff returns the delay before the retry after a given attempt.

type Codec

type Codec = port.Codec

Codec marshals a value to bytes for storage and back.

type Config

type Config = config.Config

Config is the declarative configuration (mapstructure/validate/default).

type Executor

type Executor = port.Executor

Executor is the pluggable pgx query surface (Exec, Query, QueryRow).

type Hooks

type Hooks = port.Hooks

Hooks observes relay activity.

type Message

type Message = message.Message

Message is a single outbox row.

type NoopHooks

type NoopHooks = port.NoopHooks

NoopHooks is the default no-op Hooks implementation.

type Option

type Option = config.Option

Option configures the outbox.

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

Outbox is the public facade: enqueue messages (transactionally) and run the background relay + cleaner.

func New

func New(exec Executor, enq Executor, pub Publisher, opts ...Option) (*Outbox, error)

New builds an Outbox.

  • exec is the relay/cleaner Executor: it runs the background claim, mark and cleanup queries and must NOT be bound to a business transaction (pass a *pgxpool.Pool, or your own Executor implementation).
  • enq is the enqueue Executor: pass one that resolves the caller's transaction from the context (e.g. pgtx's tx.DB) so inserts join the business transaction.
  • pub is the transport publisher.

LISTEN/NOTIFY wake-ups require a *pgxpool.Pool: supply one with WithListenPool, or pass a *pgxpool.Pool as exec and it is detected automatically. Without one the relay falls back to polling.

func (*Outbox) Enqueue

func (o *Outbox) Enqueue(ctx context.Context, m Message) error

Enqueue writes one message inside the caller's current transaction.

func (*Outbox) EnqueueBatch

func (o *Outbox) EnqueueBatch(ctx context.Context, ms []Message) error

EnqueueBatch writes multiple messages inside the caller's current transaction.

func (*Outbox) EnqueueValue

func (o *Outbox) EnqueueValue(ctx context.Context, topic, partitionKey, msgType string, v any) error

EnqueueValue marshals v with the configured codec and enqueues it. Requires WithCodec.

func (*Outbox) Run

func (o *Outbox) Run(ctx context.Context) error

Run starts the relay and cleaner and blocks until ctx is cancelled. The lease owner id was resolved at New time (WithInstanceID, or auto-generated).

type Publisher

type Publisher = port.Publisher

Publisher delivers a batch of claimed messages to a transport.

type Settings

type Settings = config.Settings

Settings is the resolved runtime configuration.

type Status

type Status = message.Status

Status is the lifecycle state of an outbox message.

Directories

Path Synopsis
Package backoff provides retry backoff strategies for the outbox relay.
Package backoff provides retry backoff strategies for the outbox relay.
Package config holds the outbox runtime settings, the functional options that build them, and the declarative Config struct (mapstructure/validate/default tags) for loading from files or environment.
Package config holds the outbox runtime settings, the functional options that build them, and the declarative Config struct (mapstructure/validate/default tags) for loading from files or environment.
Package engine implements the outbox background machinery: the store (enqueue/claim/mark/cleanup queries) and the relay and cleaner workers.
Package engine implements the outbox background machinery: the store (enqueue/claim/mark/cleanup queries) and the relay and cleaner workers.
Package message defines the outbox data model.
Package message defines the outbox data model.
Package migrations exposes the embedded outbox SQL migrations.
Package migrations exposes the embedded outbox SQL migrations.
Package port defines the pluggable integration interfaces of the outbox: the transport Publisher, the value Codec, observability Hooks, and the database Executor.
Package port defines the pluggable integration interfaces of the outbox: the transport Publisher, the value Codec, observability Hooks, and the database Executor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL