Documentation
¶
Index ¶
- Constants
- Variables
- func CancelRunTx(ctx context.Context, c Client, tx DBTX, runKey RunKey) error
- func CitusSchemaSQLFor(schema string) string
- func DeleteScheduleTx(ctx context.Context, c Client, tx DBTX, scheduleID string) error
- func DurableExecute[I any, O any](ctx context.Context, wf *Context, stepKey string, step Step[I, O], in *I, ...) (*O, error)
- func Execute[I any, O any](ctx context.Context, c *Context, stepKey string, step Step[I, O], in *I, ...) (*O, error)
- func GetRunOutputTx[O any](ctx context.Context, c Client, tx DBTX, runKey RunKey) (*O, error)
- func PauseScheduleTx(ctx context.Context, c Client, tx DBTX, scheduleID string) error
- func PublishEventTx[T any](ctx context.Context, c Client, tx DBTX, runKey RunKey, eventName string, ...) error
- func RandomUUIDv7(ctx context.Context, c *Context, key string) string
- func RandomUint64(ctx context.Context, c *Context, key string) uint64
- func Register[I any, O any](r *Registry, wf Workflow[I, O], opts ...WorkflowOption)
- func ResumeScheduleTx(ctx context.Context, c Client, tx DBTX, scheduleID string) error
- func ScheduleTx[I any, O any](ctx context.Context, c Client, tx DBTX, wf Workflow[I, O], input *I, ...) error
- func SchemaSQLFor(schema string) string
- func ShardValuesForWorkflow(workflowName string, shardCount int) []string
- func Sleep(ctx context.Context, c *Context, waitKey string, duration time.Duration)
- func WaitForEvent[T any](ctx context.Context, c *Context, waitKey string, eventName string) *T
- type Client
- type Codec
- type Context
- type CronSchedule
- type DBConfig
- type DBTX
- type JSONCodec
- type Registry
- type RetryPolicy
- type RunID
- type RunKey
- type RunStatus
- type Schedule
- type ScheduleOption
- type Step
- type StepPanicError
- type Worker
- type Workflow
- type WorkflowOption
- type WorkflowPanicError
Constants ¶
const DefaultSchema = "flows"
DefaultSchema is the schema used by this package when none is configured.
With unprefixed table names (runs, steps, waits, events, random), using a dedicated schema avoids collisions with application tables.
Variables ¶
var CitusSchemaSQL = CitusSchemaSQLFor(DefaultSchema)
CitusSchemaSQL is the Citus distributed table setup for the default schema (DefaultSchema).
var SchemaSQL = SchemaSQLFor(DefaultSchema)
SchemaSQL is the default schema (DefaultSchema) required by this package.
Notes: - `run_id` is stored as Postgres `uuid`. UUIDv7 generation is done in Go. - payloads are stored as jsonb (default codec is JSON).
Functions ¶
func CancelRunTx ¶
CancelRunTx cancels a workflow run that is queued, sleeping, or waiting for an event. Runs that are currently running, already completed, already failed, or already cancelled cannot be cancelled and will return an error.
func CitusSchemaSQLFor ¶
CitusSchemaSQLFor returns the Citus distributed table setup SQL for a given Postgres schema name.
This should be run AFTER SchemaSQL has created the tables. It distributes all tables by the `workflow_name_shard` column and colocates child tables with the runs table.
The schema name is validated conservatively and will fall back to DefaultSchema if invalid.
func DeleteScheduleTx ¶ added in v0.0.7
DeleteScheduleTx permanently removes a cron schedule.
Any runs that are already in progress are not affected. Returns an error if the schedule is not found.
func DurableExecute ¶
func DurableExecute[I any, O any](ctx context.Context, wf *Context, stepKey string, step Step[I, O], in *I, retryPolicy RetryPolicy) (*O, error)
DurableExecute is an alias for Execute.
func Execute ¶
func Execute[I any, O any](ctx context.Context, c *Context, stepKey string, step Step[I, O], in *I, retry RetryPolicy) (*O, error)
Execute runs a step exactly-once per (run_id, step_key) by memoizing its successful output.
func GetRunOutputTx ¶
GetRunOutputTx retrieves the output of a completed workflow run. Returns an error if the run is not found or not completed.
func PauseScheduleTx ¶ added in v0.0.6
PauseScheduleTx disables a cron schedule so it stops creating new runs. The schedule row is preserved; call ResumeScheduleTx to re-enable it. Returns an error if the schedule is not found.
func PublishEventTx ¶
func PublishEventTx[T any](ctx context.Context, c Client, tx DBTX, runKey RunKey, eventName string, payload *T) error
PublishEventTx publishes (run-scoped) event payload, and wakes the run if it is waiting.
Go does not support type parameters on methods, so this is a package-level generic.
func RandomUUIDv7 ¶
RandomUUIDv7 returns a deterministic UUIDv7 for this run and key.
func RandomUint64 ¶
RandomUint64 returns a deterministic random uint64 for this run and key.
func Register ¶
func Register[I any, O any](r *Registry, wf Workflow[I, O], opts ...WorkflowOption)
Register registers a workflow with optional configuration.
Go does not support type parameters on methods, so this is a package-level generic.
func ResumeScheduleTx ¶ added in v0.0.6
ResumeScheduleTx re-enables a previously paused cron schedule. Returns an error if the schedule is not found.
func ScheduleTx ¶ added in v0.0.7
func ScheduleTx[I any, O any](ctx context.Context, c Client, tx DBTX, wf Workflow[I, O], input *I, scheduleID string, schedule Schedule, opts ...ScheduleOption) error
ScheduleTx creates or updates a cron schedule in the database.
Call this at any time — from an HTTP handler, a migration, or application init code. The worker's cron loop recomputes its next wake time whenever a schedule changes, and still falls back to periodic polling when LISTEN/NOTIFY is disabled.
If a schedule with the same scheduleID already exists it is updated (the input and cron expression are replaced, and next_run_at is recalculated if the expression changed).
Pass WithRunNow to also fire an immediate run in the same transaction.
Go does not support type parameters on methods, so this is a package-level generic.
func SchemaSQLFor ¶
SchemaSQLFor returns the schema required by this package for a given Postgres schema name.
The schema name is validated conservatively and will fall back to DefaultSchema if invalid.
func ShardValuesForWorkflow ¶
ShardValuesForWorkflow returns all possible shard values for a workflow name.
On Citus, SELECT ... FOR UPDATE SKIP LOCKED must be routed to a single shard, which requires an equality predicate on the distribution column (workflow_name_shard). The worker iterates through all shard values returned by this function to find work.
Example: ShardValuesForWorkflow("my_workflow", 4) returns ["my_workflow_0", "my_workflow_1", "my_workflow_2", "my_workflow_3"]
Types ¶
type Client ¶
type Client struct {
Codec Codec
Now func() time.Time
DBConfig DBConfig
// NotifyChannel overrides the Postgres channel name used for pg_notify.
// If empty, a safe default is used.
NotifyChannel string
}
Client is used by API servers to start runs and publish events.
All methods have a `Tx` variant so you can call them inside your own transaction.
type Codec ¶
Codec controls serialization of inputs/outputs/events.
Default is JSONCodec (stored as jsonb).
Implementations should be deterministic: same value => same bytes.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context is passed to workflow code. It provides replay-safe primitives.
All reads/writes are performed inside the `pgx.Tx` associated with the current worker attempt.
type CronSchedule ¶ added in v0.0.5
type CronSchedule struct {
// contains filtered or unexported fields
}
CronSchedule is a parsed 5-field cron expression.
Fields: minute hour day-of-month month day-of-week
Supported syntax per field:
- any value N specific value N-M range (inclusive) N,M,P list */N every N N-M/S range with step
Day-of-week: 0=Sunday, 1=Monday, ..., 6=Saturday (7 is also Sunday).
Special rules (POSIX):
- If both day-of-month and day-of-week are restricted (not *), the schedule fires when EITHER matches.
func (*CronSchedule) Next ¶ added in v0.0.5
func (s *CronSchedule) Next(after time.Time) time.Time
Next returns the earliest time strictly after `after` that matches the schedule.
func (*CronSchedule) String ¶ added in v0.0.7
func (s *CronSchedule) String() string
String returns the original 5-field cron expression.
type DBConfig ¶
type DBConfig struct {
// Schema is the Postgres schema containing the flows tables.
// If empty, DefaultSchema is used.
Schema string
// ShardCount controls how many shards a workflow can spread across.
//
// When using Citus (or any sharded Postgres setup), Flows routes writes/updates
// using a distribution column `workflow_name_shard`, derived as
// `workflow_name_<n>` where n is in [0, ShardCount).
//
// If ShardCount is <= 0, it defaults to 1.
ShardCount int
}
DBConfig configures where Flows stores its tables.
type DBTX ¶ added in v0.0.4
type DBTX interface {
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
DBTX is the minimal database interface required by Flows helper functions.
It is intentionally small so callers can pass either a transaction (pgx.Tx) or a direct connection/pool that supports Exec and QueryRow.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps workflow names to registered handlers.
Registration is type-safe; execution is dynamic (by workflow name from DB).
func NewRegistry ¶
func NewRegistry() *Registry
type RetryPolicy ¶
type RetryPolicy struct {
// MaxRetries is the maximum number of retry attempts after the initial attempt.
// A value of 0 means no retries (only the initial attempt).
MaxRetries int
// Backoff returns the duration in milliseconds to wait before retry attempt n.
// If nil, no backoff is applied between retries.
Backoff func(attempt int) (durationMillis int)
// StepTimeout is the maximum duration a single step execution may take.
// If zero, no timeout is applied. When a step times out, it counts as a
// failed attempt and may be retried according to MaxRetries.
StepTimeout time.Duration
}
RetryPolicy controls in-process retries for a single step invocation.
Note: retries happen inside a worker attempt; Durable memoization only stores successful outputs.
type RunKey ¶
RunKey uniquely identifies a run within a sharded Postgres setup.
In sharded deployments (e.g., Citus), all writes and updates are routed using (workflow_name_shard, run_id) to avoid scatter queries.
type RunStatus ¶
type RunStatus struct {
Status string // queued, running, sleeping, waiting_event, completed, failed, cancelled
Error string // error message if failed
CreatedAt time.Time
UpdatedAt time.Time
NextWakeAt *time.Time // when the run will wake (for sleeping status)
}
RunStatus represents the current state of a workflow run.
type Schedule ¶ added in v0.0.5
type Schedule interface {
// Next returns the earliest time after `after` at which the schedule fires.
// Returns the zero time if no future fire time exists (shouldn't happen for valid schedules).
Next(after time.Time) time.Time
}
Schedule determines when the next workflow run should be created.
Implementations must be deterministic: the same (after) input must produce the same output.
func Every ¶ added in v0.0.5
Every returns a Schedule that fires at fixed intervals.
Example:
flows.Every(5 * time.Minute)
func MustParseCron ¶ added in v0.0.5
MustParseCron is like ParseCron but panics on error.
type ScheduleOption ¶ added in v0.0.7
type ScheduleOption func(*scheduleOptions)
ScheduleOption configures a ScheduleTx call.
func WithRunNow ¶ added in v0.0.7
func WithRunNow() ScheduleOption
WithRunNow causes ScheduleTx to also create an immediate run in the same transaction, in addition to setting up the recurring schedule.
type StepPanicError ¶
StepPanicError wraps a panic that occurred during step execution.
func (StepPanicError) Error ¶
func (e StepPanicError) Error() string
type Worker ¶
type Worker struct {
Pool *pgxpool.Pool
Registry *Registry
Codec Codec
PollInterval time.Duration
// MaxPollInterval caps adaptive backoff after consecutive empty polls.
// If zero, workers back off up to 30s when LISTEN/NOTIFY is enabled.
// When notifications are disabled, polling stays at PollInterval.
MaxPollInterval time.Duration
DBConfig DBConfig
// DisableNotify disables LISTEN/NOTIFY wakeups. Polling remains enabled.
DisableNotify bool
// NotifyChannel overrides the Postgres channel name used for LISTEN/NOTIFY.
// If empty, a safe default is used.
NotifyChannel string
// GracefulShutdownTimeout is the maximum time to wait for in-progress runs
// to complete when the worker is shutting down. If zero, the worker will
// wait indefinitely for all in-progress work to complete.
GracefulShutdownTimeout time.Duration
// contains filtered or unexported fields
}
Worker polls Postgres for runnable workflow runs and executes them.
A run is runnable when: - status = 'queued', or - status = 'sleeping' and next_wake_at <= now().
Each registered workflow type uses one dispatcher goroutine plus a bounded executor pool with configurable concurrency. This ensures busy workflows don't block other workflow types while avoiding duplicated DB polling.
Citus Compatibility ¶
The runs table is distributed by workflow_name_shard (the distribution column). Citus requires SELECT ... FOR UPDATE SKIP LOCKED to be routed to a single shard, which means the query must include an equality predicate on workflow_name_shard.
Each workflow's shards are derived from: workflow_name + "_" + shard_index (e.g., "my_workflow_0", "my_workflow_1", ...).
The worker iterates through all shards for each workflow, using round-robin rotation to prevent starvation when some shards have more work than others.
func (*Worker) ProcessOne ¶
ProcessOne claims and executes at most one runnable run from any registered workflow. Returns processed=false if no runnable runs exist. This is useful for testing or when fine-grained control over processing is needed.
func (*Worker) Run ¶
Run starts the worker and blocks until ctx is cancelled. Each registered workflow type gets one dispatcher goroutine that claims work from Postgres and a bounded in-process executor pool that runs claimed work. This preserves per-workflow concurrency without multiplying DB polling.
type Workflow ¶
type Workflow[I any, O any] interface { Name() string Run(ctx context.Context, wf *Context, in *I) (*O, error) }
Workflow is a durable workflow definition.
The runtime re-executes the workflow function on resume. Use Context primitives (Execute/Sleep/WaitForEvent/Random*) to get replay-safe behavior.
type WorkflowOption ¶
type WorkflowOption func(*workflowOptions)
WorkflowOption configures workflow registration.
func WithCodec ¶
func WithCodec(codec Codec) WorkflowOption
WithCodec sets a custom codec for the workflow. If not set, JSONCodec is used.
func WithConcurrency ¶
func WithConcurrency(n int) WorkflowOption
WithConcurrency sets the number of concurrent goroutines for this workflow. Default is 1. Each workflow type runs in its own goroutine pool.
type WorkflowPanicError ¶
WorkflowPanicError wraps a panic that occurred during workflow execution.
This is distinct from StepPanicError: step panics are caught inside Execute, while this covers panics in the workflow function itself.
func (WorkflowPanicError) Error ¶
func (e WorkflowPanicError) Error() string