poutbox

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: MIT Imports: 8 Imported by: 0

README

Poutbox

GoDoc Go Report Card

A Go library that implements the outbox pattern using PostgreSQL. It supports two methods: polling and logical replication.

Status

This package is in alpha state. It works, but features may change and edge cases need more testing. Consider it experimental for production use.

The Problem

When building web services, you often need to handle background tasks reliably. For example:

  1. User submits an order
  2. Service saves the order in the database
  3. Service sends a confirmation email in the background

Many teams use a separate message queue (RabbitMQ, Redis, Kafka, etc.) for background tasks.

The Challenge

The difficulty is keeping the database and message queue in sync. Consider this code:

tx := startTransaction()
order := createOrder(tx)
commit(tx)
publish_to_queue(order)  // What if this fails?

If publish_to_queue fails, the order is saved but the background task is lost.

You could wrap everything in a transaction, but then if the commit fails, you published a message for an order that was never saved.

Keeping a transaction open while calling external services also causes problems:

  • Long transactions hold database locks
  • This prevents maintenance operations like vacuum
  • Locks can cause performance issues
The Solution: Outbox Pattern

Instead of publishing directly, save a job record in the database as part of the same transaction:

tx := startTransaction()
order := createOrder(tx)
save_to_outbox(tx, order_job)  // Same transaction
commit(tx)

Then a background process reads jobs from the outbox table and publishes them to your message queue. This ensures:

  • Jobs are only created when the database transaction succeeds
  • Jobs are never lost (they stay in the database until published)
  • No long-running transactions blocking locks

How Poutbox Works

Poutbox manages three tables:

  1. immediate - stores jobs to be processed, partitioned by created_at
  2. scheduled - stores jobs scheduled for future execution
  3. failed - stores jobs that failed from immediate or scheduled tables
  4. dead_letter - stores jobs that failed after max retries

The immediate table is partitioned by created_at to prevent unbounded growth. You can use the Maintenance struct to manage partitions automatically, or implement your own partition management. If you skip partitioning, you must manage deletions manually.

The library provides two ways to detect new jobs:

  • Polling - efficiently tracks jobs using transaction IDs, prevents table bloat
  • Logical Replication - uses PostgreSQL's WAL to detect new jobs

Quick Start

Prerequisites
  • PostgreSQL 17 (possibly works on 13 also, need to test)
  • Go 1.25 or newer
Setup
  1. Start PostgreSQL:
docker compose up
  1. Create the tables (run once):
go run examples/simple/main.go -mode=migrate
Using Polling
  1. Start the consumer (reads jobs continously - sleeps every 100ms when no jobs found):
go run examples/simple/main.go -mode=consumer
  1. In another terminal, create jobs:
go run examples/simple/main.go -mode=client

Press CTRL+C to stop each process.

Results are written to jobs.txt.

Using Logical Replication
  1. Start the consumer (real-time with logical replication):
go run examples/simple/main.go -mode=consumer-logical
  1. In another terminal, create jobs:
go run examples/simple/main.go -mode=client

Press CTRL+C to stop.

Results are written to jobs_logical_repl.txt.

Features

  • At-least-once delivery - every job runs at least once
  • Job retries - configurable max retries with exponential backoff
  • Scheduled jobs - run jobs at a specific time
  • Dead letter queue - failed jobs go here for manual review
  • Zero table bloat - immediate jobs are removed without creating dead tuples
  • Two consumption modes - polling or logical replication

Performance

Consumer performance on a laptop:

  • Polling: 13699 jobs/second
  • Logical Replication: 52632 jobs/second

Architecture: Polling vs Logical Replication

Polling

Polls the outbox table at regular intervals. Uses transaction IDs to avoid missing jobs and prevent table bloat.

Note: Long-running transactions can cause the poller to wait. Keep transaction times short.

Logical Replication

Uses PostgreSQL's write-ahead log (WAL) to detect new jobs. No polling needed.

Important: If your application crashes, the replication slot stays active and consumes WAL space. Monitor slot size regularly.

Usage

Create a Consumer
config := poutbox.ConsumerConfig{
    BatchSize:    100,
    MaxRetries:   3,
    PollInterval: 100 * time.Millisecond,
}

handler := MyJobHandler{} // implement poutbox.Handler interface
consumer := poutbox.NewConsumer(db, handler, config)

err := consumer.Start(ctx)
Implement a Handler
type MyHandler struct {}

func (h *MyHandler) Handle(ctx context.Context, jobs []poutbox.HandlerJob) []int64 {
    var failed []int64
    
    for _, job := range jobs {
        err := processJob(ctx, job.Payload)
        if err != nil {
            failed = append(failed, job.ID)
        }
    }
    
    return failed // return IDs of jobs to retry
}

func (h *MyHandler) Close(ctx context.Context) error {
    return nil
}
Create a Client
client := poutbox.NewClient(db)

// Enqueue a job for immediate processing
jobID, err := client.Enqueue(ctx, map[string]any{
    "order_id": 123,
    "email": "user@example.com",
})
Enqueue with Transaction

Enqueue a job as part of your database transaction:

tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

// Create your business object
order, err := createOrder(ctx, tx, orderData)
if err != nil {
    return err
}

// Enqueue a background job in the same transaction
client := poutbox.NewClient(db)
_, err = client.Enqueue(ctx, map[string]any{
    "order_id": order.ID,
    "email": order.Email,
}, poutbox.WithTx(tx))
if err != nil {
    return err
}

// Both the order and the job are created only if commit succeeds
return tx.Commit()
Schedule a Job for Later
client := poutbox.NewClient(db)

// Schedule a job to run in 1 hour
jobID, err := client.Enqueue(ctx, map[string]any{
    "order_id": 123,
    "action": "send_reminder",
}, poutbox.WithScheduleAt(time.Now().Add(1*time.Hour)))

// Or schedule for a specific time
scheduledTime := time.Date(2025, 12, 25, 10, 0, 0, 0, time.UTC)
jobID, err = client.Enqueue(ctx, payload, poutbox.WithScheduleAt(scheduledTime))

Development

To work on this library:

# Start the test database
docker compose up

# Run migrations
go run examples/simple/main.go -mode=migrate

# Run the example
go run examples/simple/main.go -mode=consumer

License

See LICENSE file.

Documentation

Overview

Package poutbox provides a reliable outbox pattern implementation for asynchronous job processing. Jobs are stored in Postgres, processed by a consumer, and retried on failure.

Example usage with polling:

client := poutbox.NewClient(db)

// Enqueue a job for immediate processing
jobID, err := client.Enqueue(ctx, MyJob{Data: "example"})

// Enqueue a job to be processed at a specific time
jobID, err := client.Enqueue(ctx, MyJob{Data: "example"}, poutbox.WithScheduleAt(time.Now().Add(1*time.Hour)))

// Start processing jobs with polling
consumer := poutbox.NewConsumer(db, myHandler, poutbox.ConsumerConfig{
	BatchSize:             100,
	MaxRetries:            3,
	PollInterval:          100 * time.Millisecond,
	UseLogicalReplication: false,
})
consumer.Start(ctx)

Example usage with logical replication:

client := poutbox.NewClient(db)

jobID, err := client.Enqueue(ctx, MyJob{Data: "example"})

// Start processing jobs with logical replication (cursor position tracked automatically)
// when UpdateCursorOnLogicalRepl is true
consumer := poutbox.NewConsumer(db, myHandler, poutbox.ConsumerConfig{
	BatchSize:             100,
	MaxRetries:            3,
	UseLogicalReplication: true,
	UpdateCursorOnLogicalRepl: true,
})
consumer.SetReplicationConnString(replConnStr)
consumer.Start(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client enqueues jobs into the Postgres-backed outbox system.

func NewClient

func NewClient(db *sql.DB) *Client

NewClient creates a new Client with the given database connection.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, job any, options ...EnqueueOption) (int64, error)

Enqueue adds a job to the outbox for processing. The job is marshaled to JSON and stored. It returns the job ID. Use WithScheduleAt to defer processing, WithTx to enqueue within a transaction.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer processes jobs from the outbox. It manages immediate, scheduled, and failed job processing.

func NewConsumer

func NewConsumer(db *sql.DB, handler Handler, config ConsumerConfig) *Consumer

NewConsumer creates a new Consumer with the given database, handler, and config. Sets default values for BatchSize (1000), MaxRetries (3), and PollInterval (100ms).

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

Close closes the consumer and releases resources via the handler.

func (*Consumer) SetReplicationConnString

func (c *Consumer) SetReplicationConnString(connStr string)

SetReplicationConnString sets the connection string for logical replication. Required when UseLogicalReplication is enabled.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start begins processing jobs from immediate, scheduled, and failed queues. It runs three concurrent processors and returns the first error encountered.

type ConsumerConfig

type ConsumerConfig struct {
	// BatchSize is the number of jobs to fetch and process in a single batch.
	BatchSize int32
	// MaxRetries is the maximum number of times a failed job will be retried.
	MaxRetries int32
	// PollInterval is the duration to wait between polling for new jobs.
	PollInterval time.Duration
	// UseLogicalReplication enables Postgres logical replication for immediate jobs.
	UseLogicalReplication bool
	// UpdateCursorOnLogicalRepl tracks cursor position during logical replication.
	UpdateCursorOnLogicalRepl bool
}

ConsumerConfig configures job processing behavior.

type EnqueueOption

type EnqueueOption func(*enqueueOptions)

EnqueueOption is a function that configures job enqueue behavior.

func WithScheduleAt

func WithScheduleAt(t time.Time) EnqueueOption

WithScheduleAt schedules a job to be processed at the specified time.

func WithTx

func WithTx(tx *sql.Tx) EnqueueOption

WithTx enqueues a job within an existing database transaction.

type Handler

type Handler interface {
	// Handle processes a batch of jobs and returns the IDs of failed jobs.
	Handle(ctx context.Context, jobs []HandlerJob) []int64
	// Close closes the handler and releases resources.
	Close(ctx context.Context) error
}

Handler processes jobs from the outbox. It returns the IDs of jobs that failed and should be retried.

type HandlerJob

type HandlerJob struct {
	// ID is the unique identifier of the job.
	ID int64
	// Payload is the job data in bytes (typically JSON).
	Payload []byte
}

HandlerJob represents a job to be processed by a handler.

type Maintenance

type Maintenance struct {
	// contains filtered or unexported fields
}

Maintenance manages database partitions and cleanup. It creates future partitions and removes old data based on retention settings.

func NewMaintenance

func NewMaintenance(db *sql.DB, opts ...MaintenanceOption) *Maintenance

NewMaintenance creates a new Maintenance instance with the given database. Sets defaults: PartitionInterval (1h), LookAhead (12h), RetentionWindow (3h).

func (*Maintenance) Run

func (m *Maintenance) Run(ctx context.Context) error

Run executes a single maintenance cycle: creates future partitions and removes old data.

func (*Maintenance) RunPeriodically

func (m *Maintenance) RunPeriodically(ctx context.Context, interval time.Duration)

RunPeriodically executes maintenance at regular intervals until context is canceled. Runs immediately on start, then at each interval. Logs errors without stopping.

type MaintenanceOption

type MaintenanceOption func(*Maintenance)

MaintenanceOption is a function that configures maintenance behavior.

func WithLookAhead

func WithLookAhead(d time.Duration) MaintenanceOption

WithLookAhead sets how far in the future to pre-create partitions.

func WithPartitionInterval

func WithPartitionInterval(d time.Duration) MaintenanceOption

WithPartitionInterval sets the time span of each partition.

func WithRetentionWindow

func WithRetentionWindow(d time.Duration) MaintenanceOption

WithRetentionWindow sets how long to keep old data before deletion.

type ProcessResult

type ProcessResult struct {
	// DeadLetter contains jobs that exceeded max retries.
	DeadLetter *jobBatch
	// ToRetry contains jobs that failed and should be retried.
	ToRetry *jobBatch
	// ToDelete contains job IDs that were processed successfully.
	ToDelete []int64
	// Cursor tracks the latest processed job ID, timestamp, and transaction ID for resumption.
	Cursor *postgres.UpdateCursorParams
}

ProcessResult holds the results of processing a batch of jobs. It tracks which jobs to retry, delete, or send to dead letter, and updates cursor position.

Directories

Path Synopsis
examples
simple command
Package postgres provides database operations for the poutbox outbox system.
Package postgres provides database operations for the poutbox outbox system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL