flow

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Overview

Package flow is the Isopace transaction manager: it runs a transaction through an ordered pipeline of Stage values in two phases — a prepare pass that may vote to continue, route, or abort, followed by a commit pass over the stages that prepared successfully (or an abort pass over them if any stage failed). Per-transaction state travels in an Exchange.

The two-phase model gives a transaction all-or-nothing semantics across participants: a stage does its reversible work and validation in Prepare, then makes effects durable in Commit (implementing Committer) or undoes reserved work in Abort (implementing Aborter). Stages route between named groups for conditional processing, and the Flow adds optional journaling, idempotent replay, per-stage retry, and a profiler — all built on the immutable copy-on-write iso8583.Message so a request is shared across stages without locking or defensive copies.

An Exchange is processed by one goroutine at a time and is not safe for concurrent use; a Flow may run many exchanges concurrently.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoEntry       = errors.New("flow: no entry group configured")
	ErrUnknownGroup  = errors.New("flow: unknown group")
	ErrTooManyStages = errors.New("flow: stage limit exceeded (routing loop?)")
	ErrAborted       = errors.New("flow: transaction aborted")
)

Flow errors.

Functions

This section is empty.

Types

type Aborter

type Aborter interface {
	Abort(ctx context.Context, ex *Exchange) error
}

Aborter is the rollback half: it runs when the transaction aborts, in reverse prepare order, to undo work reserved during Prepare.

type Committer

type Committer interface {
	Commit(ctx context.Context, ex *Exchange) error
}

Committer is the commit half of two-phase processing: it runs after every stage has prepared successfully, in prepare order, to make effects durable.

type Event

type Event struct {
	Kind EventKind
	// Err is the abort cause for EventAborted, nil otherwise.
	Err error
}

Event is a single journalled transaction lifecycle record.

type EventKind

type EventKind int

EventKind classifies a transaction lifecycle event recorded to a Journal.

const (
	// EventBegin marks the start of an exchange.
	EventBegin EventKind = iota
	// EventCommitted marks a successful commit pass.
	EventCommitted
	// EventAborted marks a rolled-back transaction.
	EventAborted
	// EventIdempotentHit marks a replayed response for a duplicate request.
	EventIdempotentHit
)

func (EventKind) String

func (k EventKind) String() string

String renders the event kind.

type Exchange

type Exchange struct {
	// ID identifies the transaction in logs and journals (e.g. a STAN+terminal
	// key or a generated UUID). It is supplied by the caller.
	ID string
	// Request is the decoded inbound message. Stages treat it as immutable.
	Request *iso8583.Message
	// Response is the message being assembled for the caller; stages set and
	// refine it. It may be nil until a stage builds it.
	Response *iso8583.Message
	// contains filtered or unexported fields
}

Exchange is the per-transaction state carried through a Flow. It holds the inbound Request and the Response being built, a free-form property bag for inter-stage communication, the abort state, and (when profiling is enabled) per-stage timings.

An Exchange is owned by the single goroutine running its flow and is not safe for concurrent use.

func NewExchange

func NewExchange(id string, req *iso8583.Message) *Exchange

NewExchange builds an exchange for request req identified by id.

func (*Exchange) Abort

func (e *Exchange) Abort(cause error)

Abort marks the transaction for rollback with the given cause. A stage may call this instead of (or before) returning an error — useful when it produces a decline response yet still wants the joined stages rolled back. A nil cause is recorded as ErrAborted.

func (*Exchange) Aborted

func (e *Exchange) Aborted() bool

Aborted reports whether the exchange has been marked for rollback.

func (*Exchange) Err

func (e *Exchange) Err() error

Err returns the abort cause, or nil if the transaction has not aborted.

func (*Exchange) Get

func (e *Exchange) Get(key string) (any, bool)

Get returns the value stored under key and whether it was present.

func (*Exchange) Profile

func (e *Exchange) Profile() string

Profile renders the recorded timings as a single line, in order, with the total — handy for a per-transaction log field. Empty when profiling is off.

func (*Exchange) Set

func (e *Exchange) Set(key string, v any)

Set stores a value under key for later stages.

func (*Exchange) Timings

func (e *Exchange) Timings() []StageTiming

Timings returns the recorded per-stage timings (empty unless profiling is on).

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow runs transactions through named groups of Stage values in two phases. Build it with New and Flow.Group, then call Flow.Run per transaction; a single Flow is safe to run from many goroutines concurrently, provided its groups are not modified after the first Run.

func New

func New(opts ...Option) *Flow

New builds a Flow.

func (*Flow) Group

func (f *Flow) Group(name string, stages ...Stage) *Flow

Group appends stages to the named group and returns the flow for chaining. The first group added becomes the entry group unless WithEntry set one.

func (*Flow) Run

func (f *Flow) Run(ctx context.Context, ex *Exchange) error

Run drives ex through the pipeline. It returns nil on commit, or the abort cause if a stage failed or aborted (after running Abort on the joined stages), or a joined commit error if the commit phase failed.

type IdempotencyStore

type IdempotencyStore interface {
	// Lookup returns the stored response for key, if present.
	Lookup(key string) (resp *iso8583.Message, found bool)
	// Store records the response for key.
	Store(key string, resp *iso8583.Message)
}

IdempotencyStore remembers the response produced for an idempotency key so a duplicate request can be answered with the original response instead of being reprocessed. Implementations must be safe for concurrent use.

type Journal

type Journal interface {
	Record(ctx context.Context, ex *Exchange, ev Event) error
}

Journal records transaction lifecycle events. A durable implementation is the basis for store-and-forward recovery and audit: write before the effect so it can be replayed. Record runs inline on the flow goroutine; failures are logged by the flow but do not by themselves abort the transaction.

Implementations must be safe for concurrent use across exchanges.

type JournalEntry

type JournalEntry struct {
	ID    string
	Event Event
}

JournalEntry is one stored record: the exchange ID plus the event.

type KeyFunc

type KeyFunc func(ex *Exchange) (key string, ok bool)

KeyFunc derives an idempotency key from an exchange and reports whether the transaction is idempotent at all (false skips idempotency for that exchange). A typical key joins the retrieval-reference / STAN with the terminal so a retransmitted request is recognised as a duplicate.

type MemoryJournal

type MemoryJournal struct {
	// contains filtered or unexported fields
}

MemoryJournal is an in-memory audit trail of events, useful for tests and inspection. It retains every entry, so it is not for unbounded production use.

func (*MemoryJournal) Entries

func (j *MemoryJournal) Entries() []JournalEntry

Entries returns a snapshot of all recorded entries.

func (*MemoryJournal) Record

func (j *MemoryJournal) Record(_ context.Context, ex *Exchange, ev Event) error

Record appends the event.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory IdempotencyStore. It clones messages on store and lookup so cached responses cannot be mutated through the store. It retains every key, so production deployments should bound it (TTL/eviction) or back it with a shared store; this implementation is for single-process use and tests.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore builds an empty in-memory idempotency store.

func (*MemoryStore) Lookup

func (s *MemoryStore) Lookup(key string) (*iso8583.Message, bool)

Lookup returns a clone of the stored response for key.

func (*MemoryStore) Store

func (s *MemoryStore) Store(key string, resp *iso8583.Message)

Store records a clone of resp under key. A nil resp records the key with no payload, so a duplicate is still recognised (and answered with a nil response).

type Option

type Option func(*Flow)

Option configures a Flow.

func WithEntry

func WithEntry(group string) Option

WithEntry names the group the flow starts from (default: the first group added with Flow.Group).

func WithIdempotency

func WithIdempotency(store IdempotencyStore, key KeyFunc) Option

WithIdempotency makes Run replay the stored response for a duplicate request (as decided by key) instead of reprocessing it, and store the response of each committed transaction.

func WithJournal

func WithJournal(j Journal) Option

WithJournal records transaction lifecycle events to j.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets the flow logger (default slog.Default).

func WithMaxStages

func WithMaxStages(n int) Option

WithMaxStages caps how many stages a single Run may execute, guarding against routing loops (default 1024).

func WithProfiler

func WithProfiler(on bool) Option

WithProfiler enables per-stage timing, recorded into each Exchange.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result is a stage's prepare-phase disposition. The zero Result is Continue. Use the constructors below rather than building it directly.

func Continue

func Continue() Result

Continue proceeds to the next stage and joins the transaction, so the stage's Commit (or Abort) runs in the second phase.

func Done

func Done() Result

Done stops the prepare pass and commits the stages that have joined so far — for a stage that has produced the final response and wants to short-circuit the remaining pipeline. The stage returning Done does not itself join the transaction, so its own Commit/Abort (if it implements them) will not run; return Continue instead if the terminal stage must participate in the commit phase.

func Route

func Route(groups ...string) Result

Route proceeds and appends the named groups' stages to the pipeline, so a stage can branch processing based on the exchange (e.g. select an issuer). The routing stage itself joins the transaction.

func Skip

func Skip() Result

Skip proceeds to the next stage without joining the transaction — for a read-only stage that has no commit or rollback.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int
	Backoff     func(attempt int) time.Duration
	Retryable   func(err error) bool
}

RetryPolicy governs per-stage prepare retries. Retryable decides whether an error warrants another attempt (default: any non-nil error). Backoff returns the delay before attempt n (1-based; default: a fixed 50ms). MaxAttempts caps total prepare attempts including the first (default 3).

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns a sensible policy: up to 3 attempts, fixed 50ms backoff, all errors retryable.

type SlogJournal

type SlogJournal struct {
	Log *slog.Logger
}

SlogJournal writes lifecycle events to a structured logger.

func (SlogJournal) Record

func (j SlogJournal) Record(ctx context.Context, ex *Exchange, ev Event) error

Record logs the event.

type Stage

type Stage interface {
	Name() string
	Prepare(ctx context.Context, ex *Exchange) (Result, error)
}

Stage is one participant in a Flow. Prepare does the stage's reversible work — validation, lookups, reservations — and votes on what happens next via the returned Result. Returning a non-nil error (or calling Exchange.Abort) aborts the whole transaction.

A Stage that needs to make durable effects or undo reserved work additionally implements Committer and/or Aborter; those run in the second phase only for stages that voted to continue (joined the transaction).

func Func

func Func(name string, fn func(ctx context.Context, ex *Exchange) (Result, error)) Stage

Func builds a prepare-only Stage from a function — convenient for simple, read-only or response-building steps that need no commit/abort hooks.

func Retry

func Retry(inner Stage, policy RetryPolicy) Stage

Retry wraps inner so its Prepare is retried per policy. Commit/Abort (if inner implements them) are forwarded and run once. Retrying Prepare assumes its work is safe to repeat — keep reservations with side effects in Commit.

type StageTiming

type StageTiming struct {
	Stage string
	Phase string // "prepare", "commit", or "abort"
	Dur   time.Duration
}

StageTiming records how long one stage spent in a phase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL