engine

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package engine implements the outbox background machinery: the store (enqueue/claim/mark/cleanup queries) and the relay and cleaner workers.

Index

Constants

View Source
const NotifyChannel = "outbox_messages"

NotifyChannel is the fixed LISTEN/NOTIFY channel (matches the migration trigger). It cannot be parameterized; cross-schema wake-ups are harmless because SKIP LOCKED deduplicates claims.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner periodically deletes published rows older than the retention window.

func NewCleaner

func NewCleaner(s Store, set config.Settings) *Cleaner

NewCleaner builds a Cleaner.

func (*Cleaner) Run

func (c *Cleaner) Run(ctx context.Context) error

Run is a no-op (returns immediately) unless both retention and cleanup interval are set. Otherwise it blocks until ctx is cancelled.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay claims ready rows, publishes them, and records the outcome.

func NewRelay

func NewRelay(s Store, pub port.Publisher, set config.Settings) *Relay

NewRelay builds a Relay.

func (*Relay) Run

func (r *Relay) Run(ctx context.Context) error

Run starts the relay: set.Concurrency worker goroutines plus an optional LISTEN/NOTIFY listener. It blocks until ctx is cancelled, then drains in flight work and returns.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store runs the outbox SQL. The relay/cleaner queries use exec (must not be tx-bound); enqueue uses enq (joins the caller's business tx). pool is optional and only LISTEN/NOTIFY needs it.

func NewStore

func NewStore(exec, enq port.Executor, pool *pgxpool.Pool, schema string) Store

NewStore builds a Store. schema is assumed already validated.

func (Store) Enqueue

func (s Store) Enqueue(ctx context.Context, msgs []message.Message) error

Enqueue inserts messages using the transactional Executor, so the rows land in the caller's business transaction.

func (Store) Table

func (s Store) Table() string

Table returns the schema-qualified table identifier.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL