Documentation
¶
Overview ¶
Package poutbox provides a reliable outbox pattern implementation for asynchronous job processing. Jobs are stored in Postgres, processed by a consumer, and retried on failure.
Example usage with polling:
client := poutbox.NewClient(db)
// Enqueue a job for immediate processing
jobID, err := client.Enqueue(ctx, MyJob{Data: "example"})
// Enqueue a job to be processed at a specific time
jobID, err := client.Enqueue(ctx, MyJob{Data: "example"}, poutbox.WithScheduleAt(time.Now().Add(1*time.Hour)))
// Start processing jobs with polling
consumer := poutbox.NewConsumer(db, myHandler, poutbox.ConsumerConfig{
BatchSize: 100,
MaxRetries: 3,
PollInterval: 100 * time.Millisecond,
UseLogicalReplication: false,
})
consumer.Start(ctx)
Example usage with logical replication:
client := poutbox.NewClient(db)
jobID, err := client.Enqueue(ctx, MyJob{Data: "example"})
// Start processing jobs with logical replication (cursor position tracked automatically)
// when UpdateCursorOnLogicalRepl is true
consumer := poutbox.NewConsumer(db, myHandler, poutbox.ConsumerConfig{
BatchSize: 100,
MaxRetries: 3,
UseLogicalReplication: true,
UpdateCursorOnLogicalRepl: true,
})
consumer.SetReplicationConnString(replConnStr)
consumer.Start(ctx)
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client enqueues jobs into the Postgres-backed outbox system.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer processes jobs from the outbox. It manages immediate, scheduled, and failed job processing.
func NewConsumer ¶
func NewConsumer(db *sql.DB, handler Handler, config ConsumerConfig) *Consumer
NewConsumer creates a new Consumer with the given database, handler, and config. Sets default values for BatchSize (1000), MaxRetries (3), and PollInterval (100ms).
func (*Consumer) SetReplicationConnString ¶
SetReplicationConnString sets the connection string for logical replication. Required when UseLogicalReplication is enabled.
type ConsumerConfig ¶
type ConsumerConfig struct {
// BatchSize is the number of jobs to fetch and process in a single batch.
BatchSize int32
// MaxRetries is the maximum number of times a failed job will be retried.
MaxRetries int32
// PollInterval is the duration to wait between polling for new jobs.
PollInterval time.Duration
// UseLogicalReplication enables Postgres logical replication for immediate jobs.
UseLogicalReplication bool
// UpdateCursorOnLogicalRepl tracks cursor position during logical replication.
UpdateCursorOnLogicalRepl bool
}
ConsumerConfig configures job processing behavior.
type EnqueueOption ¶
type EnqueueOption func(*enqueueOptions)
EnqueueOption is a function that configures job enqueue behavior.
func WithScheduleAt ¶
func WithScheduleAt(t time.Time) EnqueueOption
WithScheduleAt schedules a job to be processed at the specified time.
func WithTx ¶
func WithTx(tx *sql.Tx) EnqueueOption
WithTx enqueues a job within an existing database transaction.
type Handler ¶
type Handler interface {
// Handle processes a batch of jobs and returns the IDs of failed jobs.
Handle(ctx context.Context, jobs []HandlerJob) []int64
// Close closes the handler and releases resources.
Close(ctx context.Context) error
}
Handler processes jobs from the outbox. It returns the IDs of jobs that failed and should be retried.
type HandlerJob ¶
type HandlerJob struct {
// ID is the unique identifier of the job.
ID int64
// Payload is the job data in bytes (typically JSON).
Payload []byte
}
HandlerJob represents a job to be processed by a handler.
type Maintenance ¶
type Maintenance struct {
// contains filtered or unexported fields
}
Maintenance manages database partitions and cleanup. It creates future partitions and removes old data based on retention settings.
func NewMaintenance ¶
func NewMaintenance(db *sql.DB, opts ...MaintenanceOption) *Maintenance
NewMaintenance creates a new Maintenance instance with the given database. Sets defaults: PartitionInterval (1h), LookAhead (12h), RetentionWindow (3h).
func (*Maintenance) Run ¶
func (m *Maintenance) Run(ctx context.Context) error
Run executes a single maintenance cycle: creates future partitions and removes old data.
func (*Maintenance) RunPeriodically ¶
func (m *Maintenance) RunPeriodically(ctx context.Context, interval time.Duration)
RunPeriodically executes maintenance at regular intervals until context is canceled. Runs immediately on start, then at each interval. Logs errors without stopping.
type MaintenanceOption ¶
type MaintenanceOption func(*Maintenance)
MaintenanceOption is a function that configures maintenance behavior.
func WithLookAhead ¶
func WithLookAhead(d time.Duration) MaintenanceOption
WithLookAhead sets how far in the future to pre-create partitions.
func WithPartitionInterval ¶
func WithPartitionInterval(d time.Duration) MaintenanceOption
WithPartitionInterval sets the time span of each partition.
func WithRetentionWindow ¶
func WithRetentionWindow(d time.Duration) MaintenanceOption
WithRetentionWindow sets how long to keep old data before deletion.
type ProcessResult ¶
type ProcessResult struct {
// DeadLetter contains jobs that exceeded max retries.
DeadLetter *jobBatch
// ToRetry contains jobs that failed and should be retried.
ToRetry *jobBatch
// ToDelete contains job IDs that were processed successfully.
ToDelete []int64
// Cursor tracks the latest processed job ID, timestamp, and transaction ID for resumption.
Cursor *postgres.UpdateCursorParams
}
ProcessResult holds the results of processing a batch of jobs. It tracks which jobs to retry, delete, or send to dead letter, and updates cursor position.