Documentation
¶
Overview ¶
Package flow is the Isopace transaction manager: it runs a transaction through an ordered pipeline of Stage values in two phases — a prepare pass that may vote to continue, route, or abort, followed by a commit pass over the stages that prepared successfully (or an abort pass over them if any stage failed). Per-transaction state travels in an Exchange.
The two-phase model gives a transaction all-or-nothing semantics across participants: a stage does its reversible work and validation in Prepare, then makes effects durable in Commit (implementing Committer) or undoes reserved work in Abort (implementing Aborter). Stages route between named groups for conditional processing, and the Flow adds optional journaling, idempotent replay, per-stage retry, and a profiler — all built on the immutable copy-on-write iso8583.Message so a request is shared across stages without locking or defensive copies.
An Exchange is processed by one goroutine at a time and is not safe for concurrent use; a Flow may run many exchanges concurrently.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoEntry = errors.New("flow: no entry group configured") ErrUnknownGroup = errors.New("flow: unknown group") ErrTooManyStages = errors.New("flow: stage limit exceeded (routing loop?)") ErrAborted = errors.New("flow: transaction aborted") )
Flow errors.
Functions ¶
This section is empty.
Types ¶
type Aborter ¶
Aborter is the rollback half: it runs when the transaction aborts, in reverse prepare order, to undo work reserved during Prepare.
type Committer ¶
Committer is the commit half of two-phase processing: it runs after every stage has prepared successfully, in prepare order, to make effects durable.
type Event ¶
type Event struct {
Kind EventKind
// Err is the abort cause for EventAborted, nil otherwise.
Err error
}
Event is a single journalled transaction lifecycle record.
type EventKind ¶
type EventKind int
EventKind classifies a transaction lifecycle event recorded to a Journal.
const ( // EventBegin marks the start of an exchange. EventBegin EventKind = iota // EventCommitted marks a successful commit pass. EventCommitted // EventAborted marks a rolled-back transaction. EventAborted // EventIdempotentHit marks a replayed response for a duplicate request. EventIdempotentHit )
type Exchange ¶
type Exchange struct {
// ID identifies the transaction in logs and journals (e.g. a STAN+terminal
// key or a generated UUID). It is supplied by the caller.
ID string
// Request is the decoded inbound message. Stages treat it as immutable.
Request *iso8583.Message
// Response is the message being assembled for the caller; stages set and
// refine it. It may be nil until a stage builds it.
Response *iso8583.Message
// contains filtered or unexported fields
}
Exchange is the per-transaction state carried through a Flow. It holds the inbound Request and the Response being built, a free-form property bag for inter-stage communication, the abort state, and (when profiling is enabled) per-stage timings.
An Exchange is owned by the single goroutine running its flow and is not safe for concurrent use.
func NewExchange ¶
NewExchange builds an exchange for request req identified by id.
func (*Exchange) Abort ¶
Abort marks the transaction for rollback with the given cause. A stage may call this instead of (or before) returning an error — useful when it produces a decline response yet still wants the joined stages rolled back. A nil cause is recorded as ErrAborted.
func (*Exchange) Profile ¶
Profile renders the recorded timings as a single line, in order, with the total — handy for a per-transaction log field. Empty when profiling is off.
func (*Exchange) Timings ¶
func (e *Exchange) Timings() []StageTiming
Timings returns the recorded per-stage timings (empty unless profiling is on).
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow runs transactions through named groups of Stage values in two phases. Build it with New and Flow.Group, then call Flow.Run per transaction; a single Flow is safe to run from many goroutines concurrently, provided its groups are not modified after the first Run.
type IdempotencyStore ¶
type IdempotencyStore interface {
// Lookup returns the stored response for key, if present.
Lookup(key string) (resp *iso8583.Message, found bool)
// Store records the response for key.
Store(key string, resp *iso8583.Message)
}
IdempotencyStore remembers the response produced for an idempotency key so a duplicate request can be answered with the original response instead of being reprocessed. Implementations must be safe for concurrent use.
type Journal ¶
Journal records transaction lifecycle events. A durable implementation is the basis for store-and-forward recovery and audit: write before the effect so it can be replayed. Record runs inline on the flow goroutine; failures are logged by the flow but do not by themselves abort the transaction.
Implementations must be safe for concurrent use across exchanges.
type JournalEntry ¶
JournalEntry is one stored record: the exchange ID plus the event.
type KeyFunc ¶
KeyFunc derives an idempotency key from an exchange and reports whether the transaction is idempotent at all (false skips idempotency for that exchange). A typical key joins the retrieval-reference / STAN with the terminal so a retransmitted request is recognised as a duplicate.
type MemoryJournal ¶
type MemoryJournal struct {
// contains filtered or unexported fields
}
MemoryJournal is an in-memory audit trail of events, useful for tests and inspection. It retains every entry, so it is not for unbounded production use.
func (*MemoryJournal) Entries ¶
func (j *MemoryJournal) Entries() []JournalEntry
Entries returns a snapshot of all recorded entries.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is an in-memory IdempotencyStore. It clones messages on store and lookup so cached responses cannot be mutated through the store. It retains every key, so production deployments should bound it (TTL/eviction) or back it with a shared store; this implementation is for single-process use and tests.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore builds an empty in-memory idempotency store.
type Option ¶
type Option func(*Flow)
Option configures a Flow.
func WithEntry ¶
WithEntry names the group the flow starts from (default: the first group added with Flow.Group).
func WithIdempotency ¶
func WithIdempotency(store IdempotencyStore, key KeyFunc) Option
WithIdempotency makes Run replay the stored response for a duplicate request (as decided by key) instead of reprocessing it, and store the response of each committed transaction.
func WithJournal ¶
WithJournal records transaction lifecycle events to j.
func WithLogger ¶
WithLogger sets the flow logger (default slog.Default).
func WithMaxStages ¶
WithMaxStages caps how many stages a single Run may execute, guarding against routing loops (default 1024).
func WithProfiler ¶
WithProfiler enables per-stage timing, recorded into each Exchange.
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
Result is a stage's prepare-phase disposition. The zero Result is Continue. Use the constructors below rather than building it directly.
func Continue ¶
func Continue() Result
Continue proceeds to the next stage and joins the transaction, so the stage's Commit (or Abort) runs in the second phase.
func Done ¶
func Done() Result
Done stops the prepare pass and commits the stages that have joined so far — for a stage that has produced the final response and wants to short-circuit the remaining pipeline. The stage returning Done does not itself join the transaction, so its own Commit/Abort (if it implements them) will not run; return Continue instead if the terminal stage must participate in the commit phase.
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int
Backoff func(attempt int) time.Duration
Retryable func(err error) bool
}
RetryPolicy governs per-stage prepare retries. Retryable decides whether an error warrants another attempt (default: any non-nil error). Backoff returns the delay before attempt n (1-based; default: a fixed 50ms). MaxAttempts caps total prepare attempts including the first (default 3).
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy returns a sensible policy: up to 3 attempts, fixed 50ms backoff, all errors retryable.
type SlogJournal ¶
SlogJournal writes lifecycle events to a structured logger.
type Stage ¶
Stage is one participant in a Flow. Prepare does the stage's reversible work — validation, lookups, reservations — and votes on what happens next via the returned Result. Returning a non-nil error (or calling Exchange.Abort) aborts the whole transaction.
A Stage that needs to make durable effects or undo reserved work additionally implements Committer and/or Aborter; those run in the second phase only for stages that voted to continue (joined the transaction).
func Func ¶
Func builds a prepare-only Stage from a function — convenient for simple, read-only or response-building steps that need no commit/abort hooks.
func Retry ¶
func Retry(inner Stage, policy RetryPolicy) Stage
Retry wraps inner so its Prepare is retried per policy. Commit/Abort (if inner implements them) are forwarded and run once. Retrying Prepare assumes its work is safe to repeat — keep reservations with side effects in Commit.