flows

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

flows

Minimal, Postgres-backed durable workflow runner (Cadence/Temporal-style replay) in Go.

Guarantees

  • Type-safe: steps and events use Go generics; no user casting.
  • ACID-friendly: the worker executes workflow code inside a pgx.Tx, so you can couple step outputs + business writes atomically.
  • Replay-based durability: on resume, the worker re-runs the workflow function; completed steps/events/timers are memoized in Postgres and returned without re-executing.

Important note about Go generics

Go currently does not support type parameters on methods, so the type-safe APIs are package-level generic functions like flows.Execute(...) and flows.WaitForEvent[T](...).

Schema

Create the required tables:

  • Use the SQL schema in sql folder or flows.SchemaSQL (see schema.go).
  • Flows does not automatically create or migrate its schema at runtime.
  • By default, Flows uses the flows schema with table names (runs, steps, ...).
  • To install into a different schema, use flows.SchemaSQLFor("my_schema").

For example:

if _, err := pool.Exec(ctx, flows.SchemaSQLFor("my_schema")); err != nil {
    return err
}

To point the worker/client at a custom schema:

cfg := flows.DBConfig{Schema: "my_schema"}

client := flows.Client{DBConfig: cfg}
worker := flows.Worker{Pool: pool, Registry: reg, DBConfig: cfg}

Sharding (Citus)

Flows supports sharded Postgres setups with Citus by routing all reads/writes using a distribution column workflow_name_shard.

  • Configure shard fan-out via flows.DBConfig{ShardCount: N}.
  • Each run is assigned to a shard deterministically as workflow_name_<k> where $0 \le k < N$.
  • The primary key for a run is (workflow_name_shard, run_id); all child tables include the same shard key.

Because of this, run identifiers are represented as flows.RunKey: Note: if you don't use Citus then vanilla Postgres partition will help.

type RunKey struct {
	WorkflowNameShard string
	RunID             flows.RunID
}
Worker concurrency

Each workflow type uses one dispatcher goroutine to claim work from Postgres and a bounded executor pool with configurable concurrency. This ensures busy workflows don't block other workflow types without multiplying shard scans.

reg := flows.NewRegistry()
flows.Register(reg, myFastWorkflow)  // default concurrency: 1
flows.Register(reg, mySlowWorkflow, flows.WithConcurrency(10))  // 10 goroutines

worker := flows.Worker{Pool: pool, Registry: reg, DBConfig: flows.DBConfig{ShardCount: 32}}
worker.Run(ctx)  // starts one dispatcher + executor pool per workflow type

When LISTEN/NOTIFY is enabled, PollInterval is the base idle/retry delay and workers back off empty polls up to MaxPollInterval (default 30s). When notifications are disabled, polling stays at PollInterval.

Citus Compatibility

On Citus, SELECT ... FOR UPDATE SKIP LOCKED must be routed to a single shard. The worker achieves this by including workflow_name_shard (the distribution column) in the WHERE clause.

The worker iterates through all shards for each workflow, using round-robin rotation to prevent starvation when some shards have more work than others. This happens automatically based on DBConfig.ShardCount.

Example

See examples/simple/example.go.

At a high level:

  • API-side enqueue inside a transaction:
runKey, err := flows.BeginTx(ctx, flows.Client{DBConfig: flows.DBConfig{ShardCount: 10}}, tx, myWorkflow, &MyInput{...})
  • Worker side:
reg := flows.NewRegistry()
flows.Register(reg, myWorkflow)

worker := flows.Worker{Pool: pool, Registry: reg}
_ = worker.Run(ctx)
  • Workflow code uses durable primitives:
out, err := flows.Execute(ctx, wf, "step/v1", stepFn, in, flows.RetryPolicy{MaxRetries: 3})
flows.Sleep(ctx, wf, "sleep/v1", 5*time.Second)
n := flows.WaitForEvent[int](ctx, wf, "customer_number/v1", "CustomerNumberEvent")
uuid := flows.RandomUUIDv7(ctx, wf, "uuid/v1")

Client APIs

The Client provides APIs for managing workflow runs:

client := flows.Client{}

// Get current status of a run
status, err := flows.GetRunStatusTx(ctx, client, tx, runKey)
// status.Status: "queued", "running", "sleeping", "waiting_event", "completed", "failed", "cancelled"

// Cancel a pending run (queued, sleeping, or waiting for event)
err := flows.CancelRunTx(ctx, client, tx, runKey)

// Get the output of a completed run
output, err := flows.GetRunOutputTx[MyOutput](ctx, client, tx, runKey)

// Pause a cron schedule (stops creating new runs)
err := flows.PauseScheduleTx(ctx, client, tx, "my-schedule")

// Resume a paused cron schedule
err := flows.ResumeScheduleTx(ctx, client, tx, "my-schedule")

Step Execution Options

The RetryPolicy supports configurable retries, backoff, and timeouts:

result, err := flows.Execute(ctx, wf, "step/v1", stepFn, in, flows.RetryPolicy{
    MaxRetries:  3,                          // Retry up to 3 times after initial attempt
    Backoff:     func(attempt int) int {     // Wait between retries (milliseconds)
        return 100 * (1 << attempt)          // Exponential: 100ms, 200ms, 400ms
    },
    StepTimeout: 30 * time.Second,           // Timeout per step execution
})

Step Panic Recovery: If a step panics, the panic is caught and converted to a StepPanicError. The step may be retried according to the retry policy.

Workflow Panic Recovery: If the workflow function itself panics (outside of Execute), the worker catches the panic, marks the run as failed, and persists a stack trace in runs.error_text.

Cron Schedules

Flows supports automatic, recurring workflow runs via cron schedules.

How it works
  1. Registration: At startup, register your workflow template with flows.Register(reg, wf) as usual.
  2. Scheduling: Call flows.ScheduleTx(...) — from an HTTP handler, a migration, init code, or anywhere you have a transaction — to create or update a schedule row in the schedules table.
  3. Wakeup model: The worker's cron goroutine sleeps until the next next_run_at, wakes early when schedule mutations send a notification, then claims due rows with FOR UPDATE SKIP LOCKED, inserts a new run, and advances next_run_at — all in one transaction.
  4. Execution: The new run is picked up by the normal workflow worker loop.

Multiple workers share the same schedules table. FOR UPDATE SKIP LOCKED guarantees exactly one worker fires each schedule tick, so no duplicate runs.

Because schedules live in the database, they can be created, updated, paused, resumed, and deleted at runtime without restarting workers. When LISTEN/NOTIFY is enabled, those mutations wake cron workers immediately; otherwise workers fall back to periodic polling.

Schedules

Two schedule types are built-in:

Type Constructor Example
Fixed interval flows.Every(d) flows.Every(5 * time.Minute)
Cron expression flows.MustParseCron(expr) flows.MustParseCron("*/10 * * * *")

The cron parser supports standard 5-field syntax (minute hour day-of-month month day-of-week) with *, N, N-M, N,M, */N, N-M/S, and @every <duration>. Day-of-week uses 0=Sunday (7 also accepted).

Quick start
reg := flows.NewRegistry()
flows.Register(reg, myWorkflow)

client := flows.Client{}

// Create a schedule — can be called at any time (init, HTTP handler, migration).
tx, _ := pool.Begin(ctx)
flows.ScheduleTx(ctx, client, tx, myWorkflow, &MyInput{Value: "cron"}, "my-schedule", flows.Every(5*time.Minute))
tx.Commit(ctx)

// Or use a cron expression and fire an immediate run too:
tx, _ = pool.Begin(ctx)
flows.ScheduleTx(ctx, client, tx, myWorkflow, &MyInput{Value: "nightly"}, "nightly-job",
    flows.MustParseCron("30 2 * * *"),
    flows.WithRunNow(),  // also creates one run immediately
)
tx.Commit(ctx)

// Start the worker — it picks up schedules from the database automatically.
worker := flows.Worker{Pool: pool, Registry: reg}
_ = worker.Run(ctx)

The schedule ID ("my-schedule" above) uniquely identifies the schedule row. Use a stable string (e.g. the workflow name) so the row survives redeploys. If you omit the schedule ID (empty string), it defaults to wf.Name().

Pausing, resuming, and deleting
client := flows.Client{}

// Pause — stops creating new runs; existing runs are not affected.
flows.PauseScheduleTx(ctx, client, tx, "my-schedule")

// Resume — re-enables a paused schedule.
flows.ResumeScheduleTx(ctx, client, tx, "my-schedule")

// Delete — permanently removes the schedule row.
flows.DeleteScheduleTx(ctx, client, tx, "my-schedule")
Example

See examples/cron/example.go for a complete runnable example.

Stress test

See examples/stress/README.md for a repeatable stress harness that:

  • runs against citusdata/citus:13.2.0
  • scales worker containers (e.g. 5 containers)
  • enqueues A/B/C/D workflows at different volumes
  • computes DB-derived completion time stats per workflow (from runs.created_at / runs.updated_at)

Documentation

Index

Constants

View Source
const DefaultSchema = "flows"

DefaultSchema is the schema used by this package when none is configured.

With unprefixed table names (runs, steps, waits, events, random), using a dedicated schema avoids collisions with application tables.

Variables

CitusSchemaSQL is the Citus distributed table setup for the default schema (DefaultSchema).

SchemaSQL is the default schema (DefaultSchema) required by this package.

Notes: - `run_id` is stored as Postgres `uuid`. UUIDv7 generation is done in Go. - payloads are stored as jsonb (default codec is JSON).

Functions

func CancelRunTx

func CancelRunTx(ctx context.Context, c Client, tx DBTX, runKey RunKey) error

CancelRunTx cancels a workflow run that is queued, sleeping, or waiting for an event. Runs that are currently running, already completed, already failed, or already cancelled cannot be cancelled and will return an error.

func CitusSchemaSQLFor

func CitusSchemaSQLFor(schema string) string

CitusSchemaSQLFor returns the Citus distributed table setup SQL for a given Postgres schema name.

This should be run AFTER SchemaSQL has created the tables. It distributes all tables by the `workflow_name_shard` column and colocates child tables with the runs table.

The schema name is validated conservatively and will fall back to DefaultSchema if invalid.

func DeleteScheduleTx added in v0.0.7

func DeleteScheduleTx(ctx context.Context, c Client, tx DBTX, scheduleID string) error

DeleteScheduleTx permanently removes a cron schedule.

Any runs that are already in progress are not affected. Returns an error if the schedule is not found.

func DurableExecute

func DurableExecute[I any, O any](ctx context.Context, wf *Context, stepKey string, step Step[I, O], in *I, retryPolicy RetryPolicy) (*O, error)

DurableExecute is an alias for Execute.

func Execute

func Execute[I any, O any](ctx context.Context, c *Context, stepKey string, step Step[I, O], in *I, retry RetryPolicy) (*O, error)

Execute runs a step exactly-once per (run_id, step_key) by memoizing its successful output.

func GetRunOutputTx

func GetRunOutputTx[O any](ctx context.Context, c Client, tx DBTX, runKey RunKey) (*O, error)

GetRunOutputTx retrieves the output of a completed workflow run. Returns an error if the run is not found or not completed.

func PauseScheduleTx added in v0.0.6

func PauseScheduleTx(ctx context.Context, c Client, tx DBTX, scheduleID string) error

PauseScheduleTx disables a cron schedule so it stops creating new runs. The schedule row is preserved; call ResumeScheduleTx to re-enable it. Returns an error if the schedule is not found.

func PublishEventTx

func PublishEventTx[T any](ctx context.Context, c Client, tx DBTX, runKey RunKey, eventName string, payload *T) error

PublishEventTx publishes (run-scoped) event payload, and wakes the run if it is waiting.

Go does not support type parameters on methods, so this is a package-level generic.

func RandomUUIDv7

func RandomUUIDv7(ctx context.Context, c *Context, key string) string

RandomUUIDv7 returns a deterministic UUIDv7 for this run and key.

func RandomUint64

func RandomUint64(ctx context.Context, c *Context, key string) uint64

RandomUint64 returns a deterministic random uint64 for this run and key.

func Register

func Register[I any, O any](r *Registry, wf Workflow[I, O], opts ...WorkflowOption)

Register registers a workflow with optional configuration.

Go does not support type parameters on methods, so this is a package-level generic.

func ResumeScheduleTx added in v0.0.6

func ResumeScheduleTx(ctx context.Context, c Client, tx DBTX, scheduleID string) error

ResumeScheduleTx re-enables a previously paused cron schedule. Returns an error if the schedule is not found.

func ScheduleTx added in v0.0.7

func ScheduleTx[I any, O any](ctx context.Context, c Client, tx DBTX, wf Workflow[I, O], input *I, scheduleID string, schedule Schedule, opts ...ScheduleOption) error

ScheduleTx creates or updates a cron schedule in the database.

Call this at any time — from an HTTP handler, a migration, or application init code. The worker's cron loop recomputes its next wake time whenever a schedule changes, and still falls back to periodic polling when LISTEN/NOTIFY is disabled.

If a schedule with the same scheduleID already exists it is updated (the input and cron expression are replaced, and next_run_at is recalculated if the expression changed).

Pass WithRunNow to also fire an immediate run in the same transaction.

Go does not support type parameters on methods, so this is a package-level generic.

func SchemaSQLFor

func SchemaSQLFor(schema string) string

SchemaSQLFor returns the schema required by this package for a given Postgres schema name.

The schema name is validated conservatively and will fall back to DefaultSchema if invalid.

func ShardValuesForWorkflow

func ShardValuesForWorkflow(workflowName string, shardCount int) []string

ShardValuesForWorkflow returns all possible shard values for a workflow name.

On Citus, SELECT ... FOR UPDATE SKIP LOCKED must be routed to a single shard, which requires an equality predicate on the distribution column (workflow_name_shard). The worker iterates through all shard values returned by this function to find work.

Example: ShardValuesForWorkflow("my_workflow", 4) returns ["my_workflow_0", "my_workflow_1", "my_workflow_2", "my_workflow_3"]

func Sleep

func Sleep(ctx context.Context, c *Context, waitKey string, duration time.Duration)

Sleep durably yields execution until now()+duration.

If the sleep already elapsed (on resume), it returns immediately.

func WaitForEvent

func WaitForEvent[T any](ctx context.Context, c *Context, waitKey string, eventName string) *T

WaitForEvent blocks until the event is published for this run.

The event is memoized by (run_id, wait_key). On replay, it returns the same payload.

Types

type Client

type Client struct {
	Codec    Codec
	Now      func() time.Time
	DBConfig DBConfig

	// NotifyChannel overrides the Postgres channel name used for pg_notify.
	// If empty, a safe default is used.
	NotifyChannel string
}

Client is used by API servers to start runs and publish events.

All methods have a `Tx` variant so you can call them inside your own transaction.

type Codec

type Codec interface {
	Marshal(v any) ([]byte, error)
	Unmarshal(data []byte, v any) error
}

Codec controls serialization of inputs/outputs/events.

Default is JSONCodec (stored as jsonb).

Implementations should be deterministic: same value => same bytes.

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context is passed to workflow code. It provides replay-safe primitives.

All reads/writes are performed inside the `pgx.Tx` associated with the current worker attempt.

func (*Context) RunID

func (c *Context) RunID() RunID

func (*Context) RunKey

func (c *Context) RunKey() RunKey

func (*Context) Tx

func (c *Context) Tx() pgx.Tx

Tx exposes the underlying transaction, so steps can do ACID business writes.

type CronSchedule added in v0.0.5

type CronSchedule struct {
	// contains filtered or unexported fields
}

CronSchedule is a parsed 5-field cron expression.

Fields: minute hour day-of-month month day-of-week

Supported syntax per field:

  • any value N specific value N-M range (inclusive) N,M,P list */N every N N-M/S range with step

Day-of-week: 0=Sunday, 1=Monday, ..., 6=Saturday (7 is also Sunday).

Special rules (POSIX):

  • If both day-of-month and day-of-week are restricted (not *), the schedule fires when EITHER matches.

func (*CronSchedule) Next added in v0.0.5

func (s *CronSchedule) Next(after time.Time) time.Time

Next returns the earliest time strictly after `after` that matches the schedule.

func (*CronSchedule) String added in v0.0.7

func (s *CronSchedule) String() string

String returns the original 5-field cron expression.

type DBConfig

type DBConfig struct {
	// Schema is the Postgres schema containing the flows tables.
	// If empty, DefaultSchema is used.
	Schema string

	// ShardCount controls how many shards a workflow can spread across.
	//
	// When using Citus (or any sharded Postgres setup), Flows routes writes/updates
	// using a distribution column `workflow_name_shard`, derived as
	// `workflow_name_<n>` where n is in [0, ShardCount).
	//
	// If ShardCount is <= 0, it defaults to 1.
	ShardCount int
}

DBConfig configures where Flows stores its tables.

type DBTX added in v0.0.4

type DBTX interface {
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

DBTX is the minimal database interface required by Flows helper functions.

It is intentionally small so callers can pass either a transaction (pgx.Tx) or a direct connection/pool that supports Exec and QueryRow.

type JSONCodec

type JSONCodec struct{}

func (JSONCodec) Marshal

func (JSONCodec) Marshal(v any) ([]byte, error)

func (JSONCodec) Unmarshal

func (JSONCodec) Unmarshal(data []byte, v any) error

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps workflow names to registered handlers.

Registration is type-safe; execution is dynamic (by workflow name from DB).

func NewRegistry

func NewRegistry() *Registry

type RetryPolicy

type RetryPolicy struct {
	// MaxRetries is the maximum number of retry attempts after the initial attempt.
	// A value of 0 means no retries (only the initial attempt).
	MaxRetries int

	// Backoff returns the duration in milliseconds to wait before retry attempt n.
	// If nil, no backoff is applied between retries.
	Backoff func(attempt int) (durationMillis int)

	// StepTimeout is the maximum duration a single step execution may take.
	// If zero, no timeout is applied. When a step times out, it counts as a
	// failed attempt and may be retried according to MaxRetries.
	StepTimeout time.Duration
}

RetryPolicy controls in-process retries for a single step invocation.

Note: retries happen inside a worker attempt; Durable memoization only stores successful outputs.

type RunID

type RunID string

RunID identifies a workflow run.

type RunKey

type RunKey struct {
	WorkflowNameShard string
	RunID             RunID
}

RunKey uniquely identifies a run within a sharded Postgres setup.

In sharded deployments (e.g., Citus), all writes and updates are routed using (workflow_name_shard, run_id) to avoid scatter queries.

func BeginTx

func BeginTx[I any, O any](ctx context.Context, c Client, tx DBTX, wf Workflow[I, O], in *I) (RunKey, error)

BeginTx enqueues a workflow run.

Go does not support type parameters on methods, so this is a package-level generic.

type RunStatus

type RunStatus struct {
	Status     string // queued, running, sleeping, waiting_event, completed, failed, cancelled
	Error      string // error message if failed
	CreatedAt  time.Time
	UpdatedAt  time.Time
	NextWakeAt *time.Time // when the run will wake (for sleeping status)
}

RunStatus represents the current state of a workflow run.

func GetRunStatusTx

func GetRunStatusTx(ctx context.Context, c Client, tx DBTX, runKey RunKey) (*RunStatus, error)

GetRunStatusTx retrieves the current status of a workflow run. Returns an error if the run is not found.

type Schedule added in v0.0.5

type Schedule interface {
	// Next returns the earliest time after `after` at which the schedule fires.
	// Returns the zero time if no future fire time exists (shouldn't happen for valid schedules).
	Next(after time.Time) time.Time
}

Schedule determines when the next workflow run should be created.

Implementations must be deterministic: the same (after) input must produce the same output.

func Every added in v0.0.5

func Every(interval time.Duration) Schedule

Every returns a Schedule that fires at fixed intervals.

Example:

flows.Every(5 * time.Minute)

func MustParseCron added in v0.0.5

func MustParseCron(expr string) Schedule

MustParseCron is like ParseCron but panics on error.

func ParseCron added in v0.0.5

func ParseCron(expr string) (Schedule, error)

ParseCron parses a standard 5-field cron expression.

Also accepts "@every <duration>" for fixed-interval schedules.

type ScheduleOption added in v0.0.7

type ScheduleOption func(*scheduleOptions)

ScheduleOption configures a ScheduleTx call.

func WithRunNow added in v0.0.7

func WithRunNow() ScheduleOption

WithRunNow causes ScheduleTx to also create an immediate run in the same transaction, in addition to setting up the recurring schedule.

type Step

type Step[I any, O any] func(ctx context.Context, in *I) (*O, error)

Step is the type-safe unit of work within a workflow.

type StepPanicError

type StepPanicError struct {
	Value any
	Stack string
}

StepPanicError wraps a panic that occurred during step execution.

func (StepPanicError) Error

func (e StepPanicError) Error() string

type Worker

type Worker struct {
	Pool         *pgxpool.Pool
	Registry     *Registry
	Codec        Codec
	PollInterval time.Duration
	// MaxPollInterval caps adaptive backoff after consecutive empty polls.
	// If zero, workers back off up to 30s when LISTEN/NOTIFY is enabled.
	// When notifications are disabled, polling stays at PollInterval.
	MaxPollInterval time.Duration
	DBConfig        DBConfig

	// DisableNotify disables LISTEN/NOTIFY wakeups. Polling remains enabled.
	DisableNotify bool

	// NotifyChannel overrides the Postgres channel name used for LISTEN/NOTIFY.
	// If empty, a safe default is used.
	NotifyChannel string

	// GracefulShutdownTimeout is the maximum time to wait for in-progress runs
	// to complete when the worker is shutting down. If zero, the worker will
	// wait indefinitely for all in-progress work to complete.
	GracefulShutdownTimeout time.Duration
	// contains filtered or unexported fields
}

Worker polls Postgres for runnable workflow runs and executes them.

A run is runnable when: - status = 'queued', or - status = 'sleeping' and next_wake_at <= now().

Each registered workflow type uses one dispatcher goroutine plus a bounded executor pool with configurable concurrency. This ensures busy workflows don't block other workflow types while avoiding duplicated DB polling.

Citus Compatibility

The runs table is distributed by workflow_name_shard (the distribution column). Citus requires SELECT ... FOR UPDATE SKIP LOCKED to be routed to a single shard, which means the query must include an equality predicate on workflow_name_shard.

Each workflow's shards are derived from: workflow_name + "_" + shard_index (e.g., "my_workflow_0", "my_workflow_1", ...).

The worker iterates through all shards for each workflow, using round-robin rotation to prevent starvation when some shards have more work than others.

func (*Worker) ProcessOne

func (w *Worker) ProcessOne(ctx context.Context) (processed bool, err error)

ProcessOne claims and executes at most one runnable run from any registered workflow. Returns processed=false if no runnable runs exist. This is useful for testing or when fine-grained control over processing is needed.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run starts the worker and blocks until ctx is cancelled. Each registered workflow type gets one dispatcher goroutine that claims work from Postgres and a bounded in-process executor pool that runs claimed work. This preserves per-workflow concurrency without multiplying DB polling.

type Workflow

type Workflow[I any, O any] interface {
	Name() string
	Run(ctx context.Context, wf *Context, in *I) (*O, error)
}

Workflow is a durable workflow definition.

The runtime re-executes the workflow function on resume. Use Context primitives (Execute/Sleep/WaitForEvent/Random*) to get replay-safe behavior.

type WorkflowOption

type WorkflowOption func(*workflowOptions)

WorkflowOption configures workflow registration.

func WithCodec

func WithCodec(codec Codec) WorkflowOption

WithCodec sets a custom codec for the workflow. If not set, JSONCodec is used.

func WithConcurrency

func WithConcurrency(n int) WorkflowOption

WithConcurrency sets the number of concurrent goroutines for this workflow. Default is 1. Each workflow type runs in its own goroutine pool.

type WorkflowPanicError

type WorkflowPanicError struct {
	Value any
	Stack string
}

WorkflowPanicError wraps a panic that occurred during workflow execution.

This is distinct from StepPanicError: step panics are caught inside Execute, while this covers panics in the workflow function itself.

func (WorkflowPanicError) Error

func (e WorkflowPanicError) Error() string

Directories

Path Synopsis
examples
cron command
simple command
stress command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL