Documentation
¶
Index ¶
- Constants
- Variables
- func Await[T any](ctx context.Context, wf *Workflow, dagID string, step *Step) (T, error)
- func AwaitByID[T any](ctx context.Context, wf *Workflow, dagID, stepID string) (T, error)
- func Cancel(ctx context.Context, wf *Workflow, dagID string) error
- func CurrentTarget(ctx context.Context) string
- func CurrentWorkerID(ctx context.Context) string
- func DAGInfo(ctx context.Context, wf *Workflow, dagID string) (DAGMeta, []StepRecord, error)
- func DebugPrint(ctx context.Context, wf *Workflow, dagID string, w io.Writer) error
- func DeleteDAG(ctx context.Context, wf *Workflow, dagID string) error
- func EventSubject(e Event) string
- func IsRef(raw json.RawMessage) bool
- func MarshalEvent(e Event) ([]byte, error)
- func ResolveArgs(rawArgs []json.RawMessage, results map[string]json.RawMessage, ...) (resolved []json.RawMessage, shouldSkip bool, err error)
- func WriteDebug(w io.Writer, dbg DAGDebug) error
- type ContextDAG
- type DAG
- type DAGDebug
- type DAGMeta
- type DAGOption
- type DAGState
- func (s *DAGState) AddStep(rec StepRecord) error
- func (s *DAGState) MarkDone(stepID string) ([]string, error)
- func (s *DAGState) MarkFailed(stepID, errorKind string) (newlyReady, newlySkipped []string, err error)
- func (s *DAGState) MarkSkipped(stepID string) ([]string, error)
- func (s *DAGState) ReadyToRun() []string
- func (s *DAGState) Terminal() (DAGStatus, bool)
- type DAGStatus
- type Enqueuer
- type Event
- type EventBus
- type EventKind
- type LeaderElector
- type MemBus
- type MemStore
- func (s *MemStore) DeleteMeta(_ context.Context, dagID string) error
- func (s *MemStore) DeleteResult(_ context.Context, dagID, stepID string) error
- func (s *MemStore) DeleteStep(_ context.Context, dagID, stepID string) error
- func (s *MemStore) GetMeta(_ context.Context, dagID string) (DAGMeta, uint64, error)
- func (s *MemStore) GetResult(_ context.Context, dagID, stepID string) ([]byte, error)
- func (s *MemStore) GetStep(_ context.Context, dagID, stepID string) (StepRecord, uint64, error)
- func (s *MemStore) ListDAGs(_ context.Context) ([]DAGMeta, error)
- func (s *MemStore) ListSteps(_ context.Context, dagID string) ([]StepRecord, error)
- func (s *MemStore) PutMeta(_ context.Context, dagID string, meta DAGMeta, expectedRev uint64) error
- func (s *MemStore) PutResult(_ context.Context, dagID, stepID string, data []byte) error
- func (s *MemStore) PutStep(_ context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error
- func (s *MemStore) WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)
- type NatsBus
- type NatsEnqueuer
- type NatsStore
- func (s *NatsStore) DeleteMeta(ctx context.Context, dagID string) error
- func (s *NatsStore) DeleteResult(ctx context.Context, dagID, stepID string) error
- func (s *NatsStore) DeleteStep(ctx context.Context, dagID, stepID string) error
- func (s *NatsStore) GetMeta(ctx context.Context, dagID string) (DAGMeta, uint64, error)
- func (s *NatsStore) GetResult(ctx context.Context, dagID, stepID string) ([]byte, error)
- func (s *NatsStore) GetStep(ctx context.Context, dagID, stepID string) (StepRecord, uint64, error)
- func (s *NatsStore) ListDAGs(ctx context.Context) ([]DAGMeta, error)
- func (s *NatsStore) ListSteps(ctx context.Context, dagID string) ([]StepRecord, error)
- func (s *NatsStore) PutMeta(ctx context.Context, dagID string, meta DAGMeta, expectedRev uint64) error
- func (s *NatsStore) PutResult(ctx context.Context, dagID, stepID string, data []byte) error
- func (s *NatsStore) PutStep(ctx context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error
- func (s *NatsStore) WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)
- type PlacementMode
- type PlacementSpec
- type Ref
- type RefMode
- type Scheduler
- type StateStore
- type Step
- type StepBlocker
- type StepDebug
- type StepHook
- type StepOption
- func After(steps ...*Step) StepOption
- func AfterAny(steps ...*Step) StepOption
- func ColocateHere() StepOption
- func ColocateWith(step *Step) StepOption
- func FollowTargetOf(step *Step) StepOption
- func OnTarget(target string) StepOption
- func Optional() StepOption
- func WithStepRetry(p task.RetryPolicy) StepOption
- type StepRecord
- type StepStatus
- type Subscription
- type Workflow
Constants ¶
const (
DAGEventsStream = "EBIND_DAG_EVENTS"
)
Variables ¶
var ErrCycle = errors.New("workflow: cycle detected")
ErrCycle is returned by DAG.Submit if the graph contains a cycle.
var ErrDAGCanceled = errors.New("workflow: DAG canceled")
ErrDAGCanceled is returned when a DAG no longer accepts new work because it was canceled.
var ErrDAGNotFound = errors.New("workflow: DAG not found")
ErrDAGNotFound is returned when a DAG ID has no meta record in the store.
var ErrDuplicateStep = errors.New("workflow: duplicate step ID")
ErrDuplicateStep is returned by DAG.Step when the step ID is already used.
var ErrStaleRevision = errors.New("workflow: stale revision")
ErrStaleRevision is returned by stores when a CAS operation finds a newer revision.
var ErrStepCanceled = errors.New("workflow: step canceled")
ErrStepCanceled is returned by Await when the target step was canceled before it started.
var ErrStepFailed = errors.New("workflow: step failed")
ErrStepFailed is returned by Await when the target step (or an upstream mandatory step whose failure cascaded) ended in a failed status.
var ErrStepNotFound = errors.New("workflow: step not found")
ErrStepNotFound is returned when a step ID is not registered in the DAG.
var ErrStepSkipped = errors.New("workflow: step skipped")
ErrStepSkipped is returned by Await when the target step was skipped because an upstream step this step depended on (via Ref, not RefOrDefault) failed or was skipped.
Functions ¶
func Await ¶
Await blocks until the target step's result is available in the store (or the step has terminally failed/been skipped). Decodes the result into out.
Usage: result, err := workflow.Await[Profile](ctx, wf, dagID, stepRef)
Returns:
- ErrStepFailed if the step ended in status=failed
- ErrStepSkipped if the step was skipped (upstream cascade)
- context error if ctx expires before resolution
Await is a thin wrapper over AwaitByID; prefer AwaitByID when the caller does not own the *Step handle (e.g., a different process resuming a DAG).
func AwaitByID ¶
AwaitByID is the stateless variant of Await: it takes the step ID as a string so a process that did not build the DAG can still wait on a specific step's result. Use this to resume from a different instance — persist the (dagID, stepID) pair from the submitter and call AwaitByID from the resumer.
Semantics are identical to Await.
func Cancel ¶
Cancel marks a DAG as canceled and prevents any new steps from being enqueued. Pending steps are transitioned to canceled immediately. Running steps are left untouched and may still finish, but their completion does not schedule follow-on work.
func CurrentTarget ¶
CurrentTarget returns the current task target from workflow handler context.
func CurrentWorkerID ¶
CurrentWorkerID returns the concrete worker handling the current workflow step.
func DAGInfo ¶
DAGInfo returns the current meta + step states for introspection. Useful in tests and for admin UIs.
func DebugPrint ¶
DebugPrint writes a human-readable report of the DAG to w.
func DeleteDAG ¶
DeleteDAG removes all KV records for a DAG: each step, each result, and the meta. Safe to call against a non-existent DAG (returns ErrDAGNotFound if meta is missing — but only after step/result cleanup has been attempted).
Callers that want to prevent accidental deletion of in-flight work should inspect DAGMeta.Status first and refuse unless it's terminal.
func EventSubject ¶
EventSubject builds the on-the-wire subject for an event.
func IsRef ¶
func IsRef(raw json.RawMessage) bool
IsRef inspects a raw JSON value to see if it's a Ref envelope. Fast path: checks for the marker string without a full decode.
func MarshalEvent ¶
MarshalEvent serializes an Event for on-wire delivery (without Ack/Nak).
func ResolveArgs ¶
func ResolveArgs( rawArgs []json.RawMessage, results map[string]json.RawMessage, statuses map[string]StepStatus, ) (resolved []json.RawMessage, shouldSkip bool, err error)
ResolveArgs substitutes each Ref in rawArgs with the actual upstream result bytes from results (keyed by step ID). Returns one of:
- resolved args (all Refs substituted; literal values unchanged)
- ShouldSkip=true if any RefModeRequired pointed at a failed/skipped upstream (caller should mark the step skipped and cascade)
- error if a required upstream result is missing or a Ref is malformed
ResolveArgs is PURE — no IO, deterministic. 100% unit-testable.
Types ¶
type ContextDAG ¶
type ContextDAG struct {
// contains filtered or unexported fields
}
ContextDAG is the in-handler handle for dynamically adding steps to the currently-running DAG. The newly added step depends on the current step (implicitly) plus any explicit Refs passed as args.
func FromContext ¶
func FromContext(ctx context.Context) *ContextDAG
FromContext retrieves the ContextDAG injected by ContextMiddleware. Returns nil if called outside a DAG-dispatched handler (so callers can no-op).
func (*ContextDAG) Step ¶
Step adds a new step to the current DAG. The new step implicitly depends on the currently-running step (so it won't run until the current handler returns successfully). Returns a Step ref usable for chaining further dynamic steps.
func (*ContextDAG) StepOpts ¶
func (c *ContextDAG) StepOpts(id string, fn any, opts []StepOption, args ...any) (*Step, error)
StepOpts adds a step with StepOption support (Optional, WithStepRetry, After, AfterAny). The new step implicitly depends on the currently-running step, plus any explicit Refs in args and any explicit After()/AfterAny() upstream steps.
type DAG ¶
type DAG struct {
// contains filtered or unexported fields
}
DAG is the user-facing workflow builder. Not safe for concurrent Step() calls. Submit() is one-shot.
func (*DAG) Step ¶
Step adds a node to the DAG. id must be unique within the DAG. fn must satisfy the same signature contract as task.Register (ctx first, (T, error) or just error return). args match fn's signature; upstream step outputs are referenced via Step.Ref() or Step.RefOrDefault(v).
type DAGDebug ¶
type DAGDebug struct {
Meta DAGMeta
Steps []StepDebug // ordered by AddedAt (tie-breaker: StepID)
Counts map[StepStatus]int // count of steps in each status
TotalDuration time.Duration // max(FinishedAt) - Meta.CreatedAt; 0 while running
Blockers []StepBlocker // Pending steps still waiting on non-terminal deps
}
DAGDebug is a structured snapshot of a DAG's state suitable for logs, dashboards, or debug endpoints. See Debug() for how to obtain one.
type DAGMeta ¶
type DAGMeta struct {
ID string `json:"id"`
Status DAGStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
DefaultPolicy *task.RetryPolicy `json:"default_policy,omitempty"`
TerminalSteps []string `json:"terminal_steps,omitempty"`
}
DAGMeta is the meta record stored in the state store (key: <dag_id>/meta).
type DAGOption ¶
type DAGOption func(*DAG)
DAGOption configures a DAG at construction time.
func WithRetry ¶
func WithRetry(p task.RetryPolicy) DAGOption
WithRetry sets the default retry policy for all steps in this DAG. A step can override via WithStepRetry.
type DAGState ¶
type DAGState struct {
Meta DAGMeta
Steps map[string]StepRecord // stepID -> record
}
DAGState is the in-memory view of a DAG — loaded from the store at scheduler evaluation time. All transition methods on DAGState are PURE (return data, no IO).
func (*DAGState) AddStep ¶
func (s *DAGState) AddStep(rec StepRecord) error
AddStep inserts a new step record into the state (used for dynamic DAGs). Returns error on duplicate ID.
func (*DAGState) MarkDone ¶
MarkDone transitions a step to `done` status (caller must also PutResult in the store). Returns step IDs that became ReadyToRun as a result.
func (*DAGState) MarkFailed ¶
func (s *DAGState) MarkFailed(stepID, errorKind string) (newlyReady, newlySkipped []string, err error)
MarkFailed transitions to `failed` (idempotent) and cascade-skips all downstream steps whose required deps are unsatisfied by the failure. Returns (newlyReady, newlySkipped).
func (*DAGState) MarkSkipped ¶
MarkSkipped transitions to `skipped` (idempotent) and cascade-skips downstream.
func (*DAGState) ReadyToRun ¶
ReadyToRun returns step IDs whose deps are all `done` (or satisfied-via-default) and whose status is `pending`. Used after MarkDone/MarkFailed to find next work.
type Enqueuer ¶
Enqueuer is the narrow interface used by the scheduler to place a ready step on the TASKS stream. Keeps scheduler logic decoupled from NATS specifics — tests pass a capture fake, production wires to JetStream via Workflow.NewFromNATS.
type Event ¶
type Event struct {
Kind EventKind `json:"kind"`
DAGID string `json:"dag_id"`
StepID string `json:"step_id"`
Status StepStatus `json:"status,omitempty"` // for EventCompleted
ErrorKind string `json:"error_kind,omitempty"`
// Ack acknowledges the event has been processed. Subscribers must call this
// on every delivered event. Nak redelivers after the implementation-specific delay.
Ack func() error `json:"-"`
Nak func() error `json:"-"`
}
Event is the payload delivered by EventBus to scheduler subscribers.
func UnmarshalEvent ¶
UnmarshalEvent deserializes a wire payload into an Event. Ack/Nak are not populated here; the EventBus implementation must attach them before invoking the handler.
type EventBus ¶
type EventBus interface {
// Publish sends a single event payload on the given subject.
// Subjects follow the convention DAG.<dag_id>.<kind>.<step_id>.
Publish(ctx context.Context, subject string, data []byte) error
// Subscribe delivers events matching the filter. The handler must call
// Event.Ack or Event.Nak exactly once per delivery.
Subscribe(ctx context.Context, filter string, handler func(Event)) (Subscription, error)
}
EventBus carries scheduler events between workers.
type LeaderElector ¶
type LeaderElector interface {
IsLeader() bool
}
LeaderElector is the user-provided contract for gating scheduler processing. Every worker starts a scheduler loop; only the current leader drives state mutations to avoid races during failover. nil means "always leader."
type MemBus ¶
type MemBus struct {
// contains filtered or unexported fields
}
MemBus is an in-memory EventBus for tests. It fans out each published event to all matching subscribers. Ack/Nak are no-ops since nothing's durable.
type MemStore ¶
type MemStore struct {
// contains filtered or unexported fields
}
MemStore is an in-memory StateStore suitable for tests and local dev. It emulates NATS KV's revision-based CAS: each Put* increments the stored revision; callers must pass the matching expectedRev or get ErrStaleRevision.
func NewMemStore ¶
func NewMemStore() *MemStore
func (*MemStore) DeleteResult ¶
func (*MemStore) DeleteStep ¶
type NatsBus ¶
type NatsBus struct {
// contains filtered or unexported fields
}
NatsBus is a JetStream-backed EventBus. Events durable in a dedicated stream; subscribers use a shared durable consumer so each event is handled once cluster-wide.
func NewNatsBus ¶
NewNatsBus creates (or opens) the DAG events stream.
type NatsEnqueuer ¶
type NatsEnqueuer struct {
// contains filtered or unexported fields
}
NatsEnqueuer publishes a task.Task envelope to the TASKS stream. The envelope is marshaled and sent with Nats-Msg-Id = task.ID for dedupe.
func NewNatsEnqueuer ¶
func NewNatsEnqueuer(js jetstream.JetStream) *NatsEnqueuer
type NatsStore ¶
type NatsStore struct {
// contains filtered or unexported fields
}
NatsStore is a JetStream KV-backed StateStore. Uses KV's built-in revision numbers for CAS; the revision returned by GetX matches what PutX expects in expectedRev.
func NewNatsStore ¶
NewNatsStore creates (or opens) the KV bucket and returns a NatsStore.
func (*NatsStore) DeleteMeta ¶
func (*NatsStore) DeleteResult ¶
func (*NatsStore) DeleteStep ¶
type PlacementMode ¶
type PlacementMode string
const ( PlacementDirect PlacementMode = "direct" PlacementColocate PlacementMode = "colocate_with" PlacementFollow PlacementMode = "follow_target_of" PlacementHere PlacementMode = "colocate_here" )
type PlacementSpec ¶
type PlacementSpec struct {
Mode PlacementMode `json:"mode,omitempty"`
Target string `json:"target,omitempty"`
StepID string `json:"step_id,omitempty"`
}
PlacementSpec describes how a step chooses its execution target.
type Ref ¶
type Ref struct {
StepID string `json:"step_id"`
Mode RefMode `json:"mode"`
Default json.RawMessage `json:"default,omitempty"` // used when Mode == RefOrDefault
}
Ref marks a position in a step's argument list that should be substituted with another step's result at scheduling time. Refs are JSON-serialized into the step record's ArgsJSON; the scheduler fetches the upstream result from the store and substitutes it in place before enqueueing.
The magic key "__ebind_ref__" makes Refs distinguishable from ordinary JSON values. The Mode controls what happens when the upstream step failed/skipped.
func DecodeRef ¶
func DecodeRef(raw json.RawMessage) (*Ref, bool)
DecodeRef parses a raw JSON value as a Ref if it is one; otherwise returns (nil, false).
func (Ref) MarshalJSON ¶
MarshalJSON serializes Ref using the refEnvelope shape so scheduler can find it.
func (*Ref) UnmarshalJSON ¶
UnmarshalJSON accepts the refEnvelope shape.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler consumes completion + step-added events, advances DAG state, and enqueues newly-ready steps. Every worker starts one; leader gating via Workflow.Elector ensures at-most-one processes events at a time across workers; within a single scheduler, mu serializes event handling to avoid CAS races.
On a false→true edge of IsLeader(), the scheduler runs a full sweep of all in-flight DAGs to re-enqueue any ready-but-stranded steps.
type StateStore ¶
type StateStore interface {
GetStep(ctx context.Context, dagID, stepID string) (StepRecord, uint64, error)
PutStep(ctx context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error
ListSteps(ctx context.Context, dagID string) ([]StepRecord, error)
GetResult(ctx context.Context, dagID, stepID string) ([]byte, error)
PutResult(ctx context.Context, dagID, stepID string, data []byte) error
GetMeta(ctx context.Context, dagID string) (DAGMeta, uint64, error)
PutMeta(ctx context.Context, dagID string, meta DAGMeta, expectedRev uint64) error
// ListDAGs enumerates all stored DAG meta records. Used by the scheduler's
// sweep on leader acquisition to find stranded ready steps across all running DAGs.
ListDAGs(ctx context.Context) ([]DAGMeta, error)
// DeleteMeta, DeleteStep, DeleteResult remove records. Missing keys are not errors
// (delete is idempotent). Callers coordinating a DAG-level delete should use
// DeleteDAG which composes these in the correct order.
DeleteMeta(ctx context.Context, dagID string) error
DeleteStep(ctx context.Context, dagID, stepID string) error
DeleteResult(ctx context.Context, dagID, stepID string) error
// WatchResult streams a single message once a result is written at the given key.
// Implementations should close the channel on cancel or immediately if the result
// already exists (sending the existing value first).
WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)
}
StateStore persists DAG + step + result records with CAS semantics. All PutX(..., expectedRev) operations return ErrStaleRevision if the stored record has a different revision. expectedRev == 0 means "create if absent".
The interface is designed so the scheduler logic remains pure — all IO flows through StateStore, EventBus, and Enqueuer. Tests use store_mem.go fakes.
type Step ¶
type Step struct {
// contains filtered or unexported fields
}
Step represents a single node in a DAG. Returned by DAG.Step for the caller to chain into dependent steps via Ref() or RefOrDefault().
func (*Step) Ref ¶
Ref returns a Required-mode reference. Downstream steps using this Ref will be cascade-skipped if this step fails or is itself skipped.
func (*Step) RefOrDefault ¶
RefOrDefault returns an OrDefault-mode reference. If this step fails or is skipped, the downstream step runs with the provided default value substituted for this step's output.
type StepBlocker ¶
StepBlocker names a Pending step and lists its non-terminal deps (required and optional combined). Useful for "why isn't this step running yet?".
type StepDebug ¶
type StepDebug struct {
StepRecord
QueueDuration time.Duration // StartedAt - AddedAt (0 if step never started)
ExecDuration time.Duration // FinishedAt - StartedAt (0 if step never finished)
}
StepDebug enriches StepRecord with computed per-step durations.
type StepHook ¶
type StepHook struct {
// contains filtered or unexported fields
}
StepHook implements worker.StepHook by persisting the step outcome to the state store and publishing a completion event to the bus. The scheduler consumes these events to advance the DAG.
func (*StepHook) OnStepDone ¶
OnStepDone marks the step as done, writes the result, and publishes a `completed` event with Status=done.
type StepOption ¶
type StepOption func(*Step)
StepOption configures a Step at construction time.
func After ¶
func After(steps ...*Step) StepOption
After declares explicit temporal-only dependencies on the given upstream steps. This step waits until every upstream is terminal (done/failed/skipped) before it runs. If any upstream ended in failed/skipped, this step is cascade-skipped (same semantics as referencing via Ref()).
Use After when you need ordering but the current step's handler doesn't consume any upstream output. Equivalent to adding a Ref(upstream) arg that the handler ignores, but without contaminating the handler's signature.
func AfterAny ¶
func AfterAny(steps ...*Step) StepOption
AfterAny declares optional temporal-only dependencies. This step waits until every upstream is terminal (done/failed/skipped) but runs regardless of whether they succeeded. Upstream failure does NOT cascade-skip this step.
Use AfterAny for ordering a "best-effort" or cleanup step that should run after some other work, whether that work succeeded or not.
func ColocateHere ¶
func ColocateHere() StepOption
ColocateHere is only valid for dynamic steps created from a running handler via workflow.FromContext(ctx).StepOpts(...). It binds the new step to the concrete worker that is currently executing.
func ColocateWith ¶
func ColocateWith(step *Step) StepOption
ColocateWith binds a step to the concrete worker that executed another step. The dependency is also made explicit so the referenced step has already run by the time placement is resolved.
func FollowTargetOf ¶
func FollowTargetOf(step *Step) StepOption
FollowTargetOf binds a step to the same logical target expression as another step, but re-resolves that target at execution time.
func OnTarget ¶
func OnTarget(target string) StepOption
OnTarget binds a step to a logical or concrete target claim.
func Optional ¶
func Optional() StepOption
Optional marks a step as non-critical — its failure does not fail the DAG. Downstream steps choose whether to cascade-skip (via Ref) or substitute (via RefOrDefault).
func WithStepRetry ¶
func WithStepRetry(p task.RetryPolicy) StepOption
WithStepRetry overrides the DAG's default retry policy for this step.
type StepRecord ¶
type StepRecord struct {
DAGID string `json:"dag_id"`
StepID string `json:"step_id"`
FnName string `json:"fn_name"`
ArgsJSON json.RawMessage `json:"args_json"` // JSON array; may contain Refs
Deps []string `json:"deps,omitempty"`
OptionalDeps []string `json:"optional_deps,omitempty"`
Status StepStatus `json:"status"`
Attempt int `json:"attempt"`
ErrorKind string `json:"error_kind,omitempty"`
Optional bool `json:"optional,omitempty"`
Policy *task.RetryPolicy `json:"policy,omitempty"` // per-step override
Placement *PlacementSpec `json:"placement,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
AddedAt time.Time `json:"added_at"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
}
StepRecord is stored per-step (key: <dag_id>/step/<step_id>).
Dependency model:
- Deps: required deps. Step waits until each is terminal. If any ends as failed/skipped AND the args don't contain a RefOrDefault on it, this step is cascade-skipped. Combines Ref-derived deps from args + After() options.
- OptionalDeps: "wait for completion but don't cascade on failure." Step waits until each is terminal; failure never causes cascade. Populated by AfterAny() options.
func (StepRecord) IsTerminal ¶
func (s StepRecord) IsTerminal() bool
IsTerminal returns true if this step's status cannot change anymore.
type StepStatus ¶
type StepStatus string
StepStatus is used by ResolveArgs to decide how to handle each Ref.
const ( StatusPending StepStatus = "pending" StatusRunning StepStatus = "running" StatusDone StepStatus = "done" StatusFailed StepStatus = "failed" StatusSkipped StepStatus = "skipped" StatusCanceled StepStatus = "canceled" )
type Subscription ¶
type Subscription interface {
Stop() error
}
Subscription is returned by EventBus.Subscribe and is stopped via Stop().
type Workflow ¶
type Workflow struct {
Store StateStore
Bus EventBus
Enq Enqueuer
Elector LeaderElector // nil ⇒ always leader
NakDelay time.Duration // default 1s; used by scheduler when non-leader sees an event
// SweepCheckInterval is the cadence at which the scheduler polls IsLeader()
// to detect leadership acquisition. On a false→true edge it runs a sweep of
// all in-flight DAGs to re-enqueue stranded ready steps. Default 5s.
SweepCheckInterval time.Duration
// SweepTimeout is the max wall-clock a single sweep may take. Default 60s.
SweepTimeout time.Duration
}
Workflow bundles the three IO dependencies (store, bus, enqueuer) plus an optional leader elector for the scheduler loop. DAG.Submit takes one; Scheduler.Run uses one.
func NewFromNATS ¶
NewFromNATS constructs a Workflow backed by the given NATS connection: - NatsStore for step/result persistence (KV bucket ebind-dags) - NatsBus for scheduler events (stream EBIND_DAG_EVENTS) - NatsEnqueuer for publishing tasks on the existing TASKS stream Replicas applies to the KV bucket and events stream; use 1 for single-node dev, 3 for HA cluster.
func NewWorkflow ¶
func NewWorkflow(store StateStore, bus EventBus, enq Enqueuer) *Workflow
NewWorkflow constructs a Workflow with the three dependencies. Defaults: Elector = always-leader, NakDelay = 1s.
func (*Workflow) ContextMiddleware ¶
func (wf *Workflow) ContextMiddleware() worker.Middleware
ContextMiddleware returns a worker.Middleware that injects a ContextDAG into the handler's context for DAG-dispatched tasks. Ad-hoc tasks (no DAGID) are passed through unchanged.
func (*Workflow) Hook ¶
Hook returns a worker.StepHook that persists step outcomes to the store and publishes completion events to the bus. Wire it into worker.Options.StepHook.
func (*Workflow) RunScheduler ¶
RunScheduler drives the scheduler loop — subscribes to completion events, processes ready steps, applies state transitions. Blocks until ctx is canceled.
func (*Workflow) WithElector ¶
func (wf *Workflow) WithElector(le LeaderElector) *Workflow
WithElector replaces the default always-leader elector.