workflow

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 18 Imported by: 0

Documentation ¶

Index ¶

Constants ¶

View Source
const (
	DAGEventsStream = "EBIND_DAG_EVENTS"
)
View Source
const DefaultMaxStepErrorBytes = 4096

DefaultMaxStepErrorBytes bounds how many bytes of a failed step's error message are persisted into the step record and carried on completion events. Handler errors can be large (wrapped chains, panic text), and the step record is CAS-updated and re-read by the scheduler on every readiness evaluation, so the message is capped. Override via Workflow.MaxStepErrorBytes.

Variables ¶

View Source
var ErrCycle = errors.New("workflow: cycle detected")

ErrCycle is returned by DAG.Submit if the graph contains a cycle.

View Source
var ErrDAGCanceled = errors.New("workflow: DAG canceled")

ErrDAGCanceled is returned when a DAG no longer accepts new work because it was canceled.

View Source
var ErrDAGNotFound = errors.New("workflow: DAG not found")

ErrDAGNotFound is returned when a DAG ID has no meta record in the store.

View Source
var ErrDAGNotPaused = errors.New("workflow: DAG not paused")

ErrDAGNotPaused is returned by Resume when the DAG is not in paused status.

View Source
var ErrDAGNotRunning = errors.New("workflow: DAG not running")

ErrDAGNotRunning is returned by Pause when the DAG is not in running status.

View Source
var ErrDuplicateStep = errors.New("workflow: duplicate step ID")

ErrDuplicateStep is returned by DAG.Step when the step ID is already used.

View Source
var ErrStaleRevision = errors.New("workflow: stale revision")

ErrStaleRevision is returned by stores when a CAS operation finds a newer revision.

View Source
var ErrStepCanceled = errors.New("workflow: step canceled")

ErrStepCanceled is returned by Await when the target step was canceled before it started.

View Source
var ErrStepFailed = errors.New("workflow: step failed")

ErrStepFailed is returned by Await when the target step (or an upstream mandatory step whose failure cascaded) ended in a failed status.

View Source
var ErrStepNotFound = errors.New("workflow: step not found")

ErrStepNotFound is returned when a step ID is not registered in the DAG.

View Source
var ErrStepSkipped = errors.New("workflow: step skipped")

ErrStepSkipped is returned by Await when the target step was skipped because an upstream step this step depended on (via Ref, not RefOrDefault) failed or was skipped.

Functions ¶

func Await ¶

func Await[T any](ctx context.Context, wf *Workflow, dagID string, step *Step) (T, error)

Await blocks until the target step's result is available in the store (or the step has terminally failed/been skipped). Decodes the result into out.

Usage: result, err := workflow.Await[Profile](ctx, wf, dagID, stepRef)

Returns:

  • ErrStepFailed if the step ended in status=failed
  • ErrStepSkipped if the step was skipped (upstream cascade)
  • context error if ctx expires before resolution

Await is a thin wrapper over AwaitByID; prefer AwaitByID when the caller does not own the *Step handle (e.g., a different process resuming a DAG).

func AwaitByID ¶

func AwaitByID[T any](ctx context.Context, wf *Workflow, dagID, stepID string) (T, error)

AwaitByID is the stateless variant of Await: it takes the step ID as a string so a process that did not build the DAG can still wait on a specific step's result. Use this to resume from a different instance — persist the (dagID, stepID) pair from the submitter and call AwaitByID from the resumer.

Semantics are identical to Await.

func Cancel ¶

func Cancel(ctx context.Context, wf *Workflow, dagID string) error

Cancel marks a DAG as canceled and prevents any new steps from being enqueued. Pending steps are transitioned to canceled immediately. Running steps are left untouched and may still finish, but their completion does not schedule follow-on work.

func CurrentTarget ¶

func CurrentTarget(ctx context.Context) string

CurrentTarget returns the current task target from workflow handler context.

func CurrentWorkerID ¶

func CurrentWorkerID(ctx context.Context) string

CurrentWorkerID returns the concrete worker handling the current workflow step.

func DAGInfo ¶

func DAGInfo(ctx context.Context, wf *Workflow, dagID string) (DAGMeta, []StepRecord, error)

DAGInfo returns the current meta + step states for introspection. Useful in tests and for admin UIs.

func DebugPrint ¶

func DebugPrint(ctx context.Context, wf *Workflow, dagID string, w io.Writer) error

DebugPrint writes a human-readable report of the DAG to w.

func DeleteDAG ¶

func DeleteDAG(ctx context.Context, wf *Workflow, dagID string) error

DeleteDAG removes all KV records for a DAG: each step, each result, and the meta. Safe to call against a non-existent DAG (returns ErrDAGNotFound if meta is missing — but only after step/result cleanup has been attempted).

Callers that want to prevent accidental deletion of in-flight work should inspect DAGMeta.Status first and refuse unless it's terminal.

func EventSubject ¶

func EventSubject(e Event) string

EventSubject builds the on-the-wire subject for an event.

func IsRef ¶

func IsRef(raw json.RawMessage) bool

IsRef inspects a raw JSON value to see if it's a Ref envelope. Fast path: checks for the marker string without a full decode.

func MarshalEvent ¶

func MarshalEvent(e Event) ([]byte, error)

MarshalEvent serializes an Event for on-wire delivery (without Ack/Nak).

func Pause ¶

func Pause(ctx context.Context, wf *Workflow, dagID string) error

Pause requests a graceful pause of a running DAG. If the DAG has no in-flight steps, it transitions directly to paused. Otherwise it transitions to pausing and the scheduler auto-transitions to paused when the last in-flight step completes. Uses KV CAS with up to 5 retries on stale revision, consistent with Cancel().

Returns ErrDAGNotRunning if the DAG is not in a running state (including already pausing/paused, done, failed, or canceled).

func ResolveArgs ¶

func ResolveArgs(
	rawArgs []json.RawMessage,
	results map[string]json.RawMessage,
	statuses map[string]StepStatus,
) (resolved []json.RawMessage, shouldSkip bool, err error)

ResolveArgs substitutes each Ref in rawArgs with the actual upstream result bytes from results (keyed by step ID). Returns one of:

  • resolved args (all Refs substituted; literal values unchanged)
  • ShouldSkip=true if any RefModeRequired pointed at a failed/skipped upstream (caller should mark the step skipped and cascade)
  • error if a required upstream result is missing or a Ref is malformed

ResolveArgs is PURE — no IO, deterministic. 100% unit-testable.

func Resume ¶

func Resume(ctx context.Context, wf *Workflow, dagID string) error

Resume resumes a paused DAG. Transitions paused to running, then publishes a synthetic EventResumed on the event bus so the scheduler re-evaluates ready steps through its normal serialized event path. Uses KV CAS with up to 5 retries on stale revision.

Returns ErrDAGNotPaused if the DAG is not in a paused state (including running, done, failed, canceled, or pausing).

func WriteDebug ¶

func WriteDebug(w io.Writer, dbg DAGDebug) error

WriteDebug renders an already-loaded DAGDebug to w. Separated from DebugPrint so callers that have the snapshot can reuse the renderer without re-reading.

Types ¶

type ContextDAG ¶

type ContextDAG struct {
	// contains filtered or unexported fields
}

ContextDAG is the in-handler handle for dynamically adding steps to the currently-running DAG. The newly added step depends on the current step (implicitly) plus any explicit Refs passed as args.

func FromContext ¶

func FromContext(ctx context.Context) *ContextDAG

FromContext retrieves the ContextDAG injected by ContextMiddleware. Returns nil if called outside a DAG-dispatched handler (so callers can no-op).

func (*ContextDAG) Step ¶

func (c *ContextDAG) Step(id string, fn any, args ...any) (*Step, error)

Step adds a new step to the current DAG. The new step implicitly depends on the currently-running step (so it won't run until the current handler returns successfully). Returns a Step ref usable for chaining further dynamic steps.

func (*ContextDAG) StepOpts ¶

func (c *ContextDAG) StepOpts(id string, fn any, opts []StepOption, args ...any) (*Step, error)

StepOpts adds a step with StepOption support (Optional, WithStepRetry, After, AfterAny). The new step implicitly depends on the currently-running step, plus any explicit Refs in args and any explicit After()/AfterAny() upstream steps.

type DAG ¶

type DAG struct {
	// contains filtered or unexported fields
}

DAG is the user-facing workflow builder. Not safe for concurrent Step() calls. Submit() is one-shot.

func New ¶

func New(opts ...DAGOption) *DAG

New constructs an empty DAG. Build it with Step() then Submit().

func (*DAG) ID ¶

func (d *DAG) ID() string

ID returns the DAG identifier (generated uuid unless WithDAGID was passed).

func (*DAG) Step ¶

func (d *DAG) Step(id string, fn any, args ...any) *Step

Step adds a node to the DAG. id must be unique within the DAG. fn must satisfy the same signature contract as task.Register (ctx first, (T, error) or just error return). args match fn's signature; upstream step outputs are referenced via Step.Ref() or Step.RefOrDefault(v).

func (*DAG) StepOpts ¶

func (d *DAG) StepOpts(id string, fn any, opts []StepOption, args ...any) *Step

StepOpts is the options-flavored variant — same as Step but accepts StepOptions.

func (*DAG) Submit ¶

func (d *DAG) Submit(ctx context.Context, wf *Workflow) error

Submit validates the DAG, persists meta + step records to the store, and enqueues root steps (those with no deps). Idempotent: re-submitting the same DAG ID returns ErrStaleRevision for the meta write.

type DAGDebug ¶

type DAGDebug struct {
	Meta          DAGMeta
	Steps         []StepDebug        // ordered by AddedAt (tie-breaker: StepID)
	Counts        map[StepStatus]int // count of steps in each status
	TotalDuration time.Duration      // max(FinishedAt) - Meta.CreatedAt; 0 while running
	Blockers      []StepBlocker      // Pending steps still waiting on non-terminal deps
}

DAGDebug is a structured snapshot of a DAG's state suitable for logs, dashboards, or debug endpoints. See Debug() for how to obtain one.

func Debug ¶

func Debug(ctx context.Context, wf *Workflow, dagID string) (DAGDebug, error)

Debug returns a structured snapshot of the given DAG. Safe to call from any process that can reach the StateStore — it's a read-only operation. Returns ErrDAGNotFound if the DAG's meta key is missing.

type DAGMeta ¶

type DAGMeta struct {
	ID            string            `json:"id"`
	Status        DAGStatus         `json:"status"`
	CreatedAt     time.Time         `json:"created_at"`
	DefaultPolicy *task.RetryPolicy `json:"default_policy,omitempty"`
	PausedAt      time.Time         `json:"paused_at,omitempty"`
	TerminalSteps []string          `json:"terminal_steps,omitempty"`
}

DAGMeta is the meta record stored in the state store (key: <dag_id>/meta).

type DAGOption ¶

type DAGOption func(*DAG)

DAGOption configures a DAG at construction time.

func WithDAGID ¶

func WithDAGID(id string) DAGOption

WithDAGID pins an explicit DAG ID (otherwise a uuid is generated).

func WithRetry ¶

func WithRetry(p task.RetryPolicy) DAGOption

WithRetry sets the default retry policy for all steps in this DAG. A step can override via WithStepRetry.

type DAGState ¶

type DAGState struct {
	Meta  DAGMeta
	Steps map[string]StepRecord // stepID -> record
}

DAGState is the in-memory view of a DAG — loaded from the store at scheduler evaluation time. All transition methods on DAGState are PURE (return data, no IO).

func (*DAGState) AddStep ¶

func (s *DAGState) AddStep(rec StepRecord) error

AddStep inserts a new step record into the state (used for dynamic DAGs). Returns error on duplicate ID.

func (*DAGState) CanPause ¶

func (s *DAGState) CanPause() bool

CanPause returns true if the DAG can accept a pause request. Must be running (not already pausing/paused/terminal).

func (*DAGState) CanResume ¶

func (s *DAGState) CanResume() bool

CanResume returns true if the DAG can accept a resume request. Must be paused.

func (*DAGState) HasInFlightSteps ¶

func (s *DAGState) HasInFlightSteps() bool

HasInFlightSteps returns true if any step in the DAG is currently running. Used to determine whether pausing can skip directly to paused.

func (*DAGState) MarkDone ¶

func (s *DAGState) MarkDone(stepID string) ([]string, error)

MarkDone transitions a step to `done` status (caller must also PutResult in the store). Returns step IDs that became ReadyToRun as a result.

func (*DAGState) MarkFailed ¶

func (s *DAGState) MarkFailed(stepID, errorKind, errorMessage string) (newlyReady, newlySkipped []string, err error)

MarkFailed transitions to `failed` (idempotent) and cascade-skips all downstream steps whose required deps are unsatisfied by the failure. Returns (newlyReady, newlySkipped).

func (*DAGState) MarkSkipped ¶

func (s *DAGState) MarkSkipped(stepID string) ([]string, error)

MarkSkipped transitions to `skipped` (idempotent) and cascade-skips downstream.

func (*DAGState) ReadyToRun ¶

func (s *DAGState) ReadyToRun() []string

ReadyToRun returns step IDs whose deps are all `done` (or satisfied-via-default) and whose status is `pending`. Used after MarkDone/MarkFailed to find next work.

func (*DAGState) Terminal ¶

func (s *DAGState) Terminal() (DAGStatus, bool)

Terminal returns (status, done). done is true if all steps are in terminal states.

type DAGStatus ¶

type DAGStatus string

DAGStatus is the overall workflow status.

const (
	DAGStatusRunning  DAGStatus = "running"
	DAGStatusDone     DAGStatus = "done"
	DAGStatusFailed   DAGStatus = "failed"
	DAGStatusCanceled DAGStatus = "canceled"
	DAGStatusPausing  DAGStatus = "pausing"
	DAGStatusPaused   DAGStatus = "paused"
)

type Enqueuer ¶

type Enqueuer interface {
	Enqueue(ctx context.Context, envelope task.Task) error
}

Enqueuer is the narrow interface used by the scheduler to place a ready step on the TASKS stream. Keeps scheduler logic decoupled from NATS specifics — tests pass a capture fake, production wires to JetStream via Workflow.NewFromNATS.

type Event ¶

type Event struct {
	Kind         EventKind  `json:"kind"`
	DAGID        string     `json:"dag_id"`
	StepID       string     `json:"step_id"`
	Status       StepStatus `json:"status,omitempty"` // for EventCompleted
	ErrorKind    string     `json:"error_kind,omitempty"`
	ErrorMessage string     `json:"error_message,omitempty"`
	// Ack acknowledges the event has been processed. Subscribers must call this
	// on every delivered event. Nak redelivers after the implementation-specific delay.
	Ack func() error `json:"-"`
	Nak func() error `json:"-"`
}

Event is the payload delivered by EventBus to scheduler subscribers.

func UnmarshalEvent ¶

func UnmarshalEvent(data []byte) (Event, error)

UnmarshalEvent deserializes a wire payload into an Event. Ack/Nak are not populated here; the EventBus implementation must attach them before invoking the handler.

type EventBus ¶

type EventBus interface {
	// Publish sends a single event payload on the given subject.
	// Subjects follow the convention DAG.<dag_id>.<kind>.<step_id>.
	Publish(ctx context.Context, subject string, data []byte) error
	// Subscribe delivers events matching the filter. The handler must call
	// Event.Ack or Event.Nak exactly once per delivery.
	Subscribe(ctx context.Context, filter string, handler func(Event)) (Subscription, error)
}

EventBus carries scheduler events between workers.

type EventKind ¶

type EventKind string

EventKind distinguishes the scheduler-relevant event types.

const (
	EventCompleted EventKind = "completed"
	EventStepAdded EventKind = "step_added"
	// EventResumed is published by Resume() to trigger step re-evaluation in the scheduler.
	EventResumed EventKind = "resumed"
)

type LeaderElector ¶

type LeaderElector interface {
	IsLeader() bool
}

LeaderElector is the user-provided contract for gating scheduler processing. Every worker starts a scheduler loop; only the current leader drives state mutations to avoid races during failover. nil means "always leader."

type MemBus ¶

type MemBus struct {
	// contains filtered or unexported fields
}

MemBus is an in-memory EventBus for tests. It fans out each published event to all matching subscribers. Ack/Nak are no-ops since nothing's durable.

func NewMemBus ¶

func NewMemBus() *MemBus

func (*MemBus) Publish ¶

func (b *MemBus) Publish(_ context.Context, subject string, data []byte) error

func (*MemBus) Subscribe ¶

func (b *MemBus) Subscribe(_ context.Context, filter string, handler func(Event)) (Subscription, error)

type MemStore ¶

type MemStore struct {
	// contains filtered or unexported fields
}

MemStore is an in-memory StateStore suitable for tests and local dev. It emulates NATS KV's revision-based CAS: each Put* increments the stored revision; callers must pass the matching expectedRev or get ErrStaleRevision.

func NewMemStore ¶

func NewMemStore() *MemStore

func (*MemStore) DeleteMeta ¶

func (s *MemStore) DeleteMeta(_ context.Context, dagID string) error

func (*MemStore) DeleteResult ¶

func (s *MemStore) DeleteResult(_ context.Context, dagID, stepID string) error

func (*MemStore) DeleteStep ¶

func (s *MemStore) DeleteStep(_ context.Context, dagID, stepID string) error

func (*MemStore) GetMeta ¶

func (s *MemStore) GetMeta(_ context.Context, dagID string) (DAGMeta, uint64, error)

func (*MemStore) GetResult ¶

func (s *MemStore) GetResult(_ context.Context, dagID, stepID string) ([]byte, error)

func (*MemStore) GetStep ¶

func (s *MemStore) GetStep(_ context.Context, dagID, stepID string) (StepRecord, uint64, error)

func (*MemStore) ListDAGs ¶

func (s *MemStore) ListDAGs(_ context.Context) ([]DAGMeta, error)

func (*MemStore) ListSteps ¶

func (s *MemStore) ListSteps(_ context.Context, dagID string) ([]StepRecord, error)

func (*MemStore) PutMeta ¶

func (s *MemStore) PutMeta(_ context.Context, dagID string, meta DAGMeta, expectedRev uint64) error

func (*MemStore) PutResult ¶

func (s *MemStore) PutResult(_ context.Context, dagID, stepID string, data []byte) error

func (*MemStore) PutStep ¶

func (s *MemStore) PutStep(_ context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error

func (*MemStore) WatchResult ¶

func (s *MemStore) WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)

type NatsBus ¶

type NatsBus struct {
	// contains filtered or unexported fields
}

NatsBus is a JetStream-backed EventBus. Events durable in a dedicated stream; subscribers use a shared durable consumer so each event is handled once cluster-wide.

func NewNatsBus ¶

func NewNatsBus(ctx context.Context, js jetstream.JetStream, replicas int) (*NatsBus, error)

NewNatsBus creates (or opens) the DAG events stream.

func (*NatsBus) Publish ¶

func (b *NatsBus) Publish(ctx context.Context, subject string, data []byte) error

func (*NatsBus) Subscribe ¶

func (b *NatsBus) Subscribe(ctx context.Context, filter string, handler func(Event)) (Subscription, error)

type NatsEnqueuer ¶

type NatsEnqueuer struct {
	// contains filtered or unexported fields
}

NatsEnqueuer publishes a task.Task envelope to the TASKS stream. The envelope is marshaled and sent with Nats-Msg-Id = task.ID for dedupe.

func NewNatsEnqueuer ¶

func NewNatsEnqueuer(js jetstream.JetStream) *NatsEnqueuer

func (*NatsEnqueuer) Enqueue ¶

func (e *NatsEnqueuer) Enqueue(ctx context.Context, t task.Task) error

type NatsStore ¶

type NatsStore struct {
	// contains filtered or unexported fields
}

NatsStore is a JetStream KV-backed StateStore. Uses KV's built-in revision numbers for CAS; the revision returned by GetX matches what PutX expects in expectedRev.

func NewNatsStore ¶

func NewNatsStore(ctx context.Context, js jetstream.JetStream, replicas int) (*NatsStore, error)

NewNatsStore creates (or opens) the KV bucket and returns a NatsStore.

func (*NatsStore) DeleteMeta ¶

func (s *NatsStore) DeleteMeta(ctx context.Context, dagID string) error

func (*NatsStore) DeleteResult ¶

func (s *NatsStore) DeleteResult(ctx context.Context, dagID, stepID string) error

func (*NatsStore) DeleteStep ¶

func (s *NatsStore) DeleteStep(ctx context.Context, dagID, stepID string) error

func (*NatsStore) GetMeta ¶

func (s *NatsStore) GetMeta(ctx context.Context, dagID string) (DAGMeta, uint64, error)

func (*NatsStore) GetResult ¶

func (s *NatsStore) GetResult(ctx context.Context, dagID, stepID string) ([]byte, error)

func (*NatsStore) GetStep ¶

func (s *NatsStore) GetStep(ctx context.Context, dagID, stepID string) (StepRecord, uint64, error)

func (*NatsStore) ListDAGs ¶

func (s *NatsStore) ListDAGs(ctx context.Context) ([]DAGMeta, error)

func (*NatsStore) ListSteps ¶

func (s *NatsStore) ListSteps(ctx context.Context, dagID string) ([]StepRecord, error)

func (*NatsStore) PutMeta ¶

func (s *NatsStore) PutMeta(ctx context.Context, dagID string, meta DAGMeta, expectedRev uint64) error

func (*NatsStore) PutResult ¶

func (s *NatsStore) PutResult(ctx context.Context, dagID, stepID string, data []byte) error

func (*NatsStore) PutStep ¶

func (s *NatsStore) PutStep(ctx context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error

func (*NatsStore) WatchResult ¶

func (s *NatsStore) WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)

type PlacementMode ¶

type PlacementMode string
const (
	PlacementDirect   PlacementMode = "direct"
	PlacementColocate PlacementMode = "colocate_with"
	PlacementFollow   PlacementMode = "follow_target_of"
	PlacementHere     PlacementMode = "colocate_here"
)

type PlacementSpec ¶

type PlacementSpec struct {
	Mode   PlacementMode `json:"mode,omitempty"`
	Target string        `json:"target,omitempty"`
	StepID string        `json:"step_id,omitempty"`
}

PlacementSpec describes how a step chooses its execution target.

type Ref ¶

type Ref struct {
	StepID  string          `json:"step_id"`
	Mode    RefMode         `json:"mode"`
	Default json.RawMessage `json:"default,omitempty"` // used when Mode == RefOrDefault
}

Ref marks a position in a step's argument list that should be substituted with another step's result at scheduling time. Refs are JSON-serialized into the step record's ArgsJSON; the scheduler fetches the upstream result from the store and substitutes it in place before enqueueing.

The magic key "__ebind_ref__" makes Refs distinguishable from ordinary JSON values. The Mode controls what happens when the upstream step failed/skipped.

func DecodeRef ¶

func DecodeRef(raw json.RawMessage) (*Ref, bool)

DecodeRef parses a raw JSON value as a Ref if it is one; otherwise returns (nil, false).

func (Ref) MarshalJSON ¶

func (r Ref) MarshalJSON() ([]byte, error)

MarshalJSON serializes Ref using the refEnvelope shape so scheduler can find it.

func (*Ref) UnmarshalJSON ¶

func (r *Ref) UnmarshalJSON(data []byte) error

UnmarshalJSON accepts the refEnvelope shape.

type RefMode ¶

type RefMode string
const (
	// RefModeRequired: upstream must be `done`. If `failed`/`skipped`, this step is skipped (cascade).
	RefModeRequired RefMode = "required"
	// RefModeOrDefault: if upstream `failed`/`skipped`, substitute the Default value and run anyway.
	RefModeOrDefault RefMode = "or_default"
)

type Scheduler ¶

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler consumes completion + step-added events, advances DAG state, and enqueues newly-ready steps. Every worker starts one; leader gating via Workflow.Elector ensures at-most-one processes events at a time across workers; within a single scheduler, mu serializes event handling to avoid CAS races.

On a false→true edge of IsLeader(), the scheduler runs a full sweep of all in-flight DAGs to re-enqueue any ready-but-stranded steps.

func (*Scheduler) Run ¶

func (s *Scheduler) Run(ctx context.Context) error

Run subscribes to all DAG.>.completed.> and DAG.>.step-added.> events and dispatches them. Also spawns a leadership watcher that triggers a sweep on every false→true edge of IsLeader(). Blocks until ctx is done.

type StateStore ¶

type StateStore interface {
	GetStep(ctx context.Context, dagID, stepID string) (StepRecord, uint64, error)
	PutStep(ctx context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error
	ListSteps(ctx context.Context, dagID string) ([]StepRecord, error)

	GetResult(ctx context.Context, dagID, stepID string) ([]byte, error)
	PutResult(ctx context.Context, dagID, stepID string, data []byte) error

	GetMeta(ctx context.Context, dagID string) (DAGMeta, uint64, error)
	PutMeta(ctx context.Context, dagID string, meta DAGMeta, expectedRev uint64) error
	// ListDAGs enumerates all stored DAG meta records. Used by the scheduler's
	// sweep on leader acquisition to find stranded ready steps across all running DAGs.
	ListDAGs(ctx context.Context) ([]DAGMeta, error)

	// DeleteMeta, DeleteStep, DeleteResult remove records. Missing keys are not errors
	// (delete is idempotent). Callers coordinating a DAG-level delete should use
	// DeleteDAG which composes these in the correct order.
	DeleteMeta(ctx context.Context, dagID string) error
	DeleteStep(ctx context.Context, dagID, stepID string) error
	DeleteResult(ctx context.Context, dagID, stepID string) error

	// WatchResult streams a single message once a result is written at the given key.
	// Implementations should close the channel on cancel or immediately if the result
	// already exists (sending the existing value first).
	WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)
}

StateStore persists DAG + step + result records with CAS semantics. All PutX(..., expectedRev) operations return ErrStaleRevision if the stored record has a different revision. expectedRev == 0 means "create if absent".

The interface is designed so the scheduler logic remains pure — all IO flows through StateStore, EventBus, and Enqueuer. Tests use store_mem.go fakes.

type Step ¶

type Step struct {
	// contains filtered or unexported fields
}

Step represents a single node in a DAG. Returned by DAG.Step for the caller to chain into dependent steps via Ref() or RefOrDefault().

func (*Step) ID ¶

func (s *Step) ID() string

ID returns the stable step ID within its DAG.

func (*Step) Ref ¶

func (s *Step) Ref() Ref

Ref returns a Required-mode reference. Downstream steps using this Ref will be cascade-skipped if this step fails or is itself skipped.

func (*Step) RefOrDefault ¶

func (s *Step) RefOrDefault(defaultValue any) Ref

RefOrDefault returns an OrDefault-mode reference. If this step fails or is skipped, the downstream step runs with the provided default value substituted for this step's output.

type StepBlocker ¶

type StepBlocker struct {
	StepID    string
	WaitingOn []string
}

StepBlocker names a Pending step and lists its non-terminal deps (required and optional combined). Useful for "why isn't this step running yet?".

type StepDebug ¶

type StepDebug struct {
	StepRecord
	QueueDuration time.Duration // StartedAt - AddedAt (0 if step never started)
	ExecDuration  time.Duration // FinishedAt - StartedAt (0 if step never finished)
}

StepDebug enriches StepRecord with computed per-step durations.

type StepHook ¶

type StepHook struct {
	// contains filtered or unexported fields
}

StepHook implements worker.StepHook by persisting the step outcome to the state store and publishing a completion event to the bus. The scheduler consumes these events to advance the DAG.

func (*StepHook) OnStepDone ¶

func (h *StepHook) OnStepDone(ctx context.Context, t *task.Task, result []byte) error

OnStepDone marks the step as done, writes the result, and publishes a `completed` event with Status=done.

func (*StepHook) OnStepFailed ¶

func (h *StepHook) OnStepFailed(ctx context.Context, t *task.Task, taskErr *task.TaskError) error

OnStepFailed marks the step as failed and publishes a `completed` event with Status=failed so the scheduler can cascade skips and re-evaluate readiness.

type StepOption ¶

type StepOption func(*Step)

StepOption configures a Step at construction time.

func After ¶

func After(steps ...*Step) StepOption

After declares explicit temporal-only dependencies on the given upstream steps. This step waits until every upstream is terminal (done/failed/skipped) before it runs. If any upstream ended in failed/skipped, this step is cascade-skipped (same semantics as referencing via Ref()).

Use After when you need ordering but the current step's handler doesn't consume any upstream output. Equivalent to adding a Ref(upstream) arg that the handler ignores, but without contaminating the handler's signature.

func AfterAny ¶

func AfterAny(steps ...*Step) StepOption

AfterAny declares optional temporal-only dependencies. This step waits until every upstream is terminal (done/failed/skipped) but runs regardless of whether they succeeded. Upstream failure does NOT cascade-skip this step.

Use AfterAny for ordering a "best-effort" or cleanup step that should run after some other work, whether that work succeeded or not.

func ColocateHere ¶

func ColocateHere() StepOption

ColocateHere is only valid for dynamic steps created from a running handler via workflow.FromContext(ctx).StepOpts(...). It binds the new step to the concrete worker that is currently executing.

func ColocateWith ¶

func ColocateWith(step *Step) StepOption

ColocateWith binds a step to the concrete worker that executed another step. The dependency is also made explicit so the referenced step has already run by the time placement is resolved.

func FollowTargetOf ¶

func FollowTargetOf(step *Step) StepOption

FollowTargetOf binds a step to the same logical target expression as another step, but re-resolves that target at execution time.

func OnTarget ¶

func OnTarget(target string) StepOption

OnTarget binds a step to a logical or concrete target claim.

func Optional ¶

func Optional() StepOption

Optional marks a step as non-critical — its failure does not fail the DAG. Downstream steps choose whether to cascade-skip (via Ref) or substitute (via RefOrDefault).

func WithStepRetry ¶

func WithStepRetry(p task.RetryPolicy) StepOption

WithStepRetry overrides the DAG's default retry policy for this step.

type StepRecord ¶

type StepRecord struct {
	DAGID        string            `json:"dag_id"`
	StepID       string            `json:"step_id"`
	FnName       string            `json:"fn_name"`
	ArgsJSON     json.RawMessage   `json:"args_json"` // JSON array; may contain Refs
	Deps         []string          `json:"deps,omitempty"`
	OptionalDeps []string          `json:"optional_deps,omitempty"`
	Status       StepStatus        `json:"status"`
	Attempt      int               `json:"attempt"`
	ErrorKind    string            `json:"error_kind,omitempty"`
	ErrorMessage string            `json:"error_message,omitempty"`
	Optional     bool              `json:"optional,omitempty"`
	Policy       *task.RetryPolicy `json:"policy,omitempty"` // per-step override
	Placement    *PlacementSpec    `json:"placement,omitempty"`
	WorkerID     string            `json:"worker_id,omitempty"`
	AddedAt      time.Time         `json:"added_at"`
	StartedAt    time.Time         `json:"started_at,omitempty"`
	FinishedAt   time.Time         `json:"finished_at,omitempty"`
}

StepRecord is stored per-step (key: <dag_id>/step/<step_id>).

Dependency model:

  • Deps: required deps. Step waits until each is terminal. If any ends as failed/skipped AND the args don't contain a RefOrDefault on it, this step is cascade-skipped. Combines Ref-derived deps from args + After() options.
  • OptionalDeps: "wait for completion but don't cascade on failure." Step waits until each is terminal; failure never causes cascade. Populated by AfterAny() options.

func (StepRecord) IsTerminal ¶

func (s StepRecord) IsTerminal() bool

IsTerminal returns true if this step's status cannot change anymore.

type StepStatus ¶

type StepStatus string

StepStatus is used by ResolveArgs to decide how to handle each Ref.

const (
	StatusPending  StepStatus = "pending"
	StatusRunning  StepStatus = "running"
	StatusDone     StepStatus = "done"
	StatusFailed   StepStatus = "failed"
	StatusSkipped  StepStatus = "skipped"
	StatusCanceled StepStatus = "canceled"
)

type Subscription ¶

type Subscription interface {
	Stop() error
}

Subscription is returned by EventBus.Subscribe and is stopped via Stop().

type Workflow ¶

type Workflow struct {
	Store    StateStore
	Bus      EventBus
	Enq      Enqueuer
	Elector  LeaderElector // nil ⇒ always leader
	NakDelay time.Duration // default 1s; used by scheduler when non-leader sees an event

	// SweepCheckInterval is the cadence at which the scheduler polls IsLeader()
	// to detect leadership acquisition. On a false→true edge it runs a sweep of
	// all in-flight DAGs to re-enqueue stranded ready steps. Default 5s.
	SweepCheckInterval time.Duration
	// SweepTimeout is the max wall-clock a single sweep may take. Default 60s.
	SweepTimeout time.Duration

	// MaxStepErrorBytes bounds the failed-step error message persisted into the
	// step record (and carried on completion events). 0 ⇒ DefaultMaxStepErrorBytes;
	// negative ⇒ persist no message, keeping only the error kind. The full
	// message always remains available in the DLQ entry and the response stream.
	MaxStepErrorBytes int
}

Workflow bundles the three IO dependencies (store, bus, enqueuer) plus an optional leader elector for the scheduler loop. DAG.Submit takes one; Scheduler.Run uses one.

func NewFromNATS ¶

func NewFromNATS(ctx context.Context, nc *nats.Conn, replicas int) (*Workflow, error)

NewFromNATS constructs a Workflow backed by the given NATS connection: - NatsStore for step/result persistence (KV bucket ebind-dags) - NatsBus for scheduler events (stream EBIND_DAG_EVENTS) - NatsEnqueuer for publishing tasks on the existing TASKS stream Replicas applies to the KV bucket and events stream; use 1 for single-node dev, 3 for HA cluster.

func NewWorkflow ¶

func NewWorkflow(store StateStore, bus EventBus, enq Enqueuer) *Workflow

NewWorkflow constructs a Workflow with the three dependencies. Defaults: Elector = always-leader, NakDelay = 1s.

func (*Workflow) ContextMiddleware ¶

func (wf *Workflow) ContextMiddleware() worker.Middleware

ContextMiddleware returns a worker.Middleware that injects a ContextDAG into the handler's context for DAG-dispatched tasks. Ad-hoc tasks (no DAGID) are passed through unchanged.

func (*Workflow) Hook ¶

func (wf *Workflow) Hook() *StepHook

Hook returns a worker.StepHook that persists step outcomes to the store and publishes completion events to the bus. Wire it into worker.Options.StepHook.

func (*Workflow) RunScheduler ¶

func (wf *Workflow) RunScheduler(ctx context.Context) error

RunScheduler drives the scheduler loop — subscribes to completion events, processes ready steps, applies state transitions. Blocks until ctx is canceled.

func (*Workflow) WithElector ¶

func (wf *Workflow) WithElector(le LeaderElector) *Workflow

WithElector replaces the default always-leader elector.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL