workflow

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DAGEventsStream = "EBIND_DAG_EVENTS"
)

Variables

View Source
var ErrCycle = errors.New("workflow: cycle detected")

ErrCycle is returned by DAG.Submit if the graph contains a cycle.

View Source
var ErrDAGCanceled = errors.New("workflow: DAG canceled")

ErrDAGCanceled is returned when a DAG no longer accepts new work because it was canceled.

View Source
var ErrDAGNotFound = errors.New("workflow: DAG not found")

ErrDAGNotFound is returned when a DAG ID has no meta record in the store.

View Source
var ErrDuplicateStep = errors.New("workflow: duplicate step ID")

ErrDuplicateStep is returned by DAG.Step when the step ID is already used.

View Source
var ErrStaleRevision = errors.New("workflow: stale revision")

ErrStaleRevision is returned by stores when a CAS operation finds a newer revision.

View Source
var ErrStepCanceled = errors.New("workflow: step canceled")

ErrStepCanceled is returned by Await when the target step was canceled before it started.

View Source
var ErrStepFailed = errors.New("workflow: step failed")

ErrStepFailed is returned by Await when the target step (or an upstream mandatory step whose failure cascaded) ended in a failed status.

View Source
var ErrStepNotFound = errors.New("workflow: step not found")

ErrStepNotFound is returned when a step ID is not registered in the DAG.

View Source
var ErrStepSkipped = errors.New("workflow: step skipped")

ErrStepSkipped is returned by Await when the target step was skipped because an upstream step this step depended on (via Ref, not RefOrDefault) failed or was skipped.

Functions

func Await

func Await[T any](ctx context.Context, wf *Workflow, dagID string, step *Step) (T, error)

Await blocks until the target step's result is available in the store (or the step has terminally failed/been skipped). Decodes the result into out.

Usage: result, err := workflow.Await[Profile](ctx, wf, dagID, stepRef)

Returns:

  • ErrStepFailed if the step ended in status=failed
  • ErrStepSkipped if the step was skipped (upstream cascade)
  • context error if ctx expires before resolution

Await is a thin wrapper over AwaitByID; prefer AwaitByID when the caller does not own the *Step handle (e.g., a different process resuming a DAG).

func AwaitByID

func AwaitByID[T any](ctx context.Context, wf *Workflow, dagID, stepID string) (T, error)

AwaitByID is the stateless variant of Await: it takes the step ID as a string so a process that did not build the DAG can still wait on a specific step's result. Use this to resume from a different instance — persist the (dagID, stepID) pair from the submitter and call AwaitByID from the resumer.

Semantics are identical to Await.

func Cancel

func Cancel(ctx context.Context, wf *Workflow, dagID string) error

Cancel marks a DAG as canceled and prevents any new steps from being enqueued. Pending steps are transitioned to canceled immediately. Running steps are left untouched and may still finish, but their completion does not schedule follow-on work.

func CurrentTarget

func CurrentTarget(ctx context.Context) string

CurrentTarget returns the current task target from workflow handler context.

func CurrentWorkerID

func CurrentWorkerID(ctx context.Context) string

CurrentWorkerID returns the concrete worker handling the current workflow step.

func DAGInfo

func DAGInfo(ctx context.Context, wf *Workflow, dagID string) (DAGMeta, []StepRecord, error)

DAGInfo returns the current meta + step states for introspection. Useful in tests and for admin UIs.

func DebugPrint

func DebugPrint(ctx context.Context, wf *Workflow, dagID string, w io.Writer) error

DebugPrint writes a human-readable report of the DAG to w.

func DeleteDAG

func DeleteDAG(ctx context.Context, wf *Workflow, dagID string) error

DeleteDAG removes all KV records for a DAG: each step, each result, and the meta. Safe to call against a non-existent DAG (returns ErrDAGNotFound if meta is missing — but only after step/result cleanup has been attempted).

Callers that want to prevent accidental deletion of in-flight work should inspect DAGMeta.Status first and refuse unless it's terminal.

func EventSubject

func EventSubject(e Event) string

EventSubject builds the on-the-wire subject for an event.

func IsRef

func IsRef(raw json.RawMessage) bool

IsRef inspects a raw JSON value to see if it's a Ref envelope. Fast path: checks for the marker string without a full decode.

func MarshalEvent

func MarshalEvent(e Event) ([]byte, error)

MarshalEvent serializes an Event for on-wire delivery (without Ack/Nak).

func ResolveArgs

func ResolveArgs(
	rawArgs []json.RawMessage,
	results map[string]json.RawMessage,
	statuses map[string]StepStatus,
) (resolved []json.RawMessage, shouldSkip bool, err error)

ResolveArgs substitutes each Ref in rawArgs with the actual upstream result bytes from results (keyed by step ID). Returns one of:

  • resolved args (all Refs substituted; literal values unchanged)
  • ShouldSkip=true if any RefModeRequired pointed at a failed/skipped upstream (caller should mark the step skipped and cascade)
  • error if a required upstream result is missing or a Ref is malformed

ResolveArgs is PURE — no IO, deterministic. 100% unit-testable.

func WriteDebug

func WriteDebug(w io.Writer, dbg DAGDebug) error

WriteDebug renders an already-loaded DAGDebug to w. Separated from DebugPrint so callers that have the snapshot can reuse the renderer without re-reading.

Types

type ContextDAG

type ContextDAG struct {
	// contains filtered or unexported fields
}

ContextDAG is the in-handler handle for dynamically adding steps to the currently-running DAG. The newly added step depends on the current step (implicitly) plus any explicit Refs passed as args.

func FromContext

func FromContext(ctx context.Context) *ContextDAG

FromContext retrieves the ContextDAG injected by ContextMiddleware. Returns nil if called outside a DAG-dispatched handler (so callers can no-op).

func (*ContextDAG) Step

func (c *ContextDAG) Step(id string, fn any, args ...any) (*Step, error)

Step adds a new step to the current DAG. The new step implicitly depends on the currently-running step (so it won't run until the current handler returns successfully). Returns a Step ref usable for chaining further dynamic steps.

func (*ContextDAG) StepOpts

func (c *ContextDAG) StepOpts(id string, fn any, opts []StepOption, args ...any) (*Step, error)

StepOpts adds a step with StepOption support (Optional, WithStepRetry, After, AfterAny). The new step implicitly depends on the currently-running step, plus any explicit Refs in args and any explicit After()/AfterAny() upstream steps.

type DAG

type DAG struct {
	// contains filtered or unexported fields
}

DAG is the user-facing workflow builder. Not safe for concurrent Step() calls. Submit() is one-shot.

func New

func New(opts ...DAGOption) *DAG

New constructs an empty DAG. Build it with Step() then Submit().

func (*DAG) ID

func (d *DAG) ID() string

ID returns the DAG identifier (generated uuid unless WithDAGID was passed).

func (*DAG) Step

func (d *DAG) Step(id string, fn any, args ...any) *Step

Step adds a node to the DAG. id must be unique within the DAG. fn must satisfy the same signature contract as task.Register (ctx first, (T, error) or just error return). args match fn's signature; upstream step outputs are referenced via Step.Ref() or Step.RefOrDefault(v).

func (*DAG) StepOpts

func (d *DAG) StepOpts(id string, fn any, opts []StepOption, args ...any) *Step

StepOpts is the options-flavored variant — same as Step but accepts StepOptions.

func (*DAG) Submit

func (d *DAG) Submit(ctx context.Context, wf *Workflow) error

Submit validates the DAG, persists meta + step records to the store, and enqueues root steps (those with no deps). Idempotent: re-submitting the same DAG ID returns ErrStaleRevision for the meta write.

type DAGDebug

type DAGDebug struct {
	Meta          DAGMeta
	Steps         []StepDebug        // ordered by AddedAt (tie-breaker: StepID)
	Counts        map[StepStatus]int // count of steps in each status
	TotalDuration time.Duration      // max(FinishedAt) - Meta.CreatedAt; 0 while running
	Blockers      []StepBlocker      // Pending steps still waiting on non-terminal deps
}

DAGDebug is a structured snapshot of a DAG's state suitable for logs, dashboards, or debug endpoints. See Debug() for how to obtain one.

func Debug

func Debug(ctx context.Context, wf *Workflow, dagID string) (DAGDebug, error)

Debug returns a structured snapshot of the given DAG. Safe to call from any process that can reach the StateStore — it's a read-only operation. Returns ErrDAGNotFound if the DAG's meta key is missing.

type DAGMeta

type DAGMeta struct {
	ID            string            `json:"id"`
	Status        DAGStatus         `json:"status"`
	CreatedAt     time.Time         `json:"created_at"`
	DefaultPolicy *task.RetryPolicy `json:"default_policy,omitempty"`
	TerminalSteps []string          `json:"terminal_steps,omitempty"`
}

DAGMeta is the meta record stored in the state store (key: <dag_id>/meta).

type DAGOption

type DAGOption func(*DAG)

DAGOption configures a DAG at construction time.

func WithDAGID

func WithDAGID(id string) DAGOption

WithDAGID pins an explicit DAG ID (otherwise a uuid is generated).

func WithRetry

func WithRetry(p task.RetryPolicy) DAGOption

WithRetry sets the default retry policy for all steps in this DAG. A step can override via WithStepRetry.

type DAGState

type DAGState struct {
	Meta  DAGMeta
	Steps map[string]StepRecord // stepID -> record
}

DAGState is the in-memory view of a DAG — loaded from the store at scheduler evaluation time. All transition methods on DAGState are PURE (return data, no IO).

func (*DAGState) AddStep

func (s *DAGState) AddStep(rec StepRecord) error

AddStep inserts a new step record into the state (used for dynamic DAGs). Returns error on duplicate ID.

func (*DAGState) MarkDone

func (s *DAGState) MarkDone(stepID string) ([]string, error)

MarkDone transitions a step to `done` status (caller must also PutResult in the store). Returns step IDs that became ReadyToRun as a result.

func (*DAGState) MarkFailed

func (s *DAGState) MarkFailed(stepID, errorKind string) (newlyReady, newlySkipped []string, err error)

MarkFailed transitions to `failed` (idempotent) and cascade-skips all downstream steps whose required deps are unsatisfied by the failure. Returns (newlyReady, newlySkipped).

func (*DAGState) MarkSkipped

func (s *DAGState) MarkSkipped(stepID string) ([]string, error)

MarkSkipped transitions to `skipped` (idempotent) and cascade-skips downstream.

func (*DAGState) ReadyToRun

func (s *DAGState) ReadyToRun() []string

ReadyToRun returns step IDs whose deps are all `done` (or satisfied-via-default) and whose status is `pending`. Used after MarkDone/MarkFailed to find next work.

func (*DAGState) Terminal

func (s *DAGState) Terminal() (DAGStatus, bool)

Terminal returns (status, done). done is true if all steps are in terminal states.

type DAGStatus

type DAGStatus string

DAGStatus is the overall workflow status.

const (
	DAGStatusRunning  DAGStatus = "running"
	DAGStatusDone     DAGStatus = "done"
	DAGStatusFailed   DAGStatus = "failed"
	DAGStatusCanceled DAGStatus = "canceled"
)

type Enqueuer

type Enqueuer interface {
	Enqueue(ctx context.Context, envelope task.Task) error
}

Enqueuer is the narrow interface used by the scheduler to place a ready step on the TASKS stream. Keeps scheduler logic decoupled from NATS specifics — tests pass a capture fake, production wires to JetStream via Workflow.NewFromNATS.

type Event

type Event struct {
	Kind      EventKind  `json:"kind"`
	DAGID     string     `json:"dag_id"`
	StepID    string     `json:"step_id"`
	Status    StepStatus `json:"status,omitempty"` // for EventCompleted
	ErrorKind string     `json:"error_kind,omitempty"`
	// Ack acknowledges the event has been processed. Subscribers must call this
	// on every delivered event. Nak redelivers after the implementation-specific delay.
	Ack func() error `json:"-"`
	Nak func() error `json:"-"`
}

Event is the payload delivered by EventBus to scheduler subscribers.

func UnmarshalEvent

func UnmarshalEvent(data []byte) (Event, error)

UnmarshalEvent deserializes a wire payload into an Event. Ack/Nak are not populated here; the EventBus implementation must attach them before invoking the handler.

type EventBus

type EventBus interface {
	// Publish sends a single event payload on the given subject.
	// Subjects follow the convention DAG.<dag_id>.<kind>.<step_id>.
	Publish(ctx context.Context, subject string, data []byte) error
	// Subscribe delivers events matching the filter. The handler must call
	// Event.Ack or Event.Nak exactly once per delivery.
	Subscribe(ctx context.Context, filter string, handler func(Event)) (Subscription, error)
}

EventBus carries scheduler events between workers.

type EventKind

type EventKind string

EventKind distinguishes the two scheduler-relevant events.

const (
	EventCompleted EventKind = "completed"
	EventStepAdded EventKind = "step_added"
)

type LeaderElector

type LeaderElector interface {
	IsLeader() bool
}

LeaderElector is the user-provided contract for gating scheduler processing. Every worker starts a scheduler loop; only the current leader drives state mutations to avoid races during failover. nil means "always leader."

type MemBus

type MemBus struct {
	// contains filtered or unexported fields
}

MemBus is an in-memory EventBus for tests. It fans out each published event to all matching subscribers. Ack/Nak are no-ops since nothing's durable.

func NewMemBus

func NewMemBus() *MemBus

func (*MemBus) Publish

func (b *MemBus) Publish(_ context.Context, subject string, data []byte) error

func (*MemBus) Subscribe

func (b *MemBus) Subscribe(_ context.Context, filter string, handler func(Event)) (Subscription, error)

type MemStore

type MemStore struct {
	// contains filtered or unexported fields
}

MemStore is an in-memory StateStore suitable for tests and local dev. It emulates NATS KV's revision-based CAS: each Put* increments the stored revision; callers must pass the matching expectedRev or get ErrStaleRevision.

func NewMemStore

func NewMemStore() *MemStore

func (*MemStore) DeleteMeta

func (s *MemStore) DeleteMeta(_ context.Context, dagID string) error

func (*MemStore) DeleteResult

func (s *MemStore) DeleteResult(_ context.Context, dagID, stepID string) error

func (*MemStore) DeleteStep

func (s *MemStore) DeleteStep(_ context.Context, dagID, stepID string) error

func (*MemStore) GetMeta

func (s *MemStore) GetMeta(_ context.Context, dagID string) (DAGMeta, uint64, error)

func (*MemStore) GetResult

func (s *MemStore) GetResult(_ context.Context, dagID, stepID string) ([]byte, error)

func (*MemStore) GetStep

func (s *MemStore) GetStep(_ context.Context, dagID, stepID string) (StepRecord, uint64, error)

func (*MemStore) ListDAGs

func (s *MemStore) ListDAGs(_ context.Context) ([]DAGMeta, error)

func (*MemStore) ListSteps

func (s *MemStore) ListSteps(_ context.Context, dagID string) ([]StepRecord, error)

func (*MemStore) PutMeta

func (s *MemStore) PutMeta(_ context.Context, dagID string, meta DAGMeta, expectedRev uint64) error

func (*MemStore) PutResult

func (s *MemStore) PutResult(_ context.Context, dagID, stepID string, data []byte) error

func (*MemStore) PutStep

func (s *MemStore) PutStep(_ context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error

func (*MemStore) WatchResult

func (s *MemStore) WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)

type NatsBus

type NatsBus struct {
	// contains filtered or unexported fields
}

NatsBus is a JetStream-backed EventBus. Events durable in a dedicated stream; subscribers use a shared durable consumer so each event is handled once cluster-wide.

func NewNatsBus

func NewNatsBus(ctx context.Context, js jetstream.JetStream, replicas int) (*NatsBus, error)

NewNatsBus creates (or opens) the DAG events stream.

func (*NatsBus) Publish

func (b *NatsBus) Publish(ctx context.Context, subject string, data []byte) error

func (*NatsBus) Subscribe

func (b *NatsBus) Subscribe(ctx context.Context, filter string, handler func(Event)) (Subscription, error)

type NatsEnqueuer

type NatsEnqueuer struct {
	// contains filtered or unexported fields
}

NatsEnqueuer publishes a task.Task envelope to the TASKS stream. The envelope is marshaled and sent with Nats-Msg-Id = task.ID for dedupe.

func NewNatsEnqueuer

func NewNatsEnqueuer(js jetstream.JetStream) *NatsEnqueuer

func (*NatsEnqueuer) Enqueue

func (e *NatsEnqueuer) Enqueue(ctx context.Context, t task.Task) error

type NatsStore

type NatsStore struct {
	// contains filtered or unexported fields
}

NatsStore is a JetStream KV-backed StateStore. Uses KV's built-in revision numbers for CAS; the revision returned by GetX matches what PutX expects in expectedRev.

func NewNatsStore

func NewNatsStore(ctx context.Context, js jetstream.JetStream, replicas int) (*NatsStore, error)

NewNatsStore creates (or opens) the KV bucket and returns a NatsStore.

func (*NatsStore) DeleteMeta

func (s *NatsStore) DeleteMeta(ctx context.Context, dagID string) error

func (*NatsStore) DeleteResult

func (s *NatsStore) DeleteResult(ctx context.Context, dagID, stepID string) error

func (*NatsStore) DeleteStep

func (s *NatsStore) DeleteStep(ctx context.Context, dagID, stepID string) error

func (*NatsStore) GetMeta

func (s *NatsStore) GetMeta(ctx context.Context, dagID string) (DAGMeta, uint64, error)

func (*NatsStore) GetResult

func (s *NatsStore) GetResult(ctx context.Context, dagID, stepID string) ([]byte, error)

func (*NatsStore) GetStep

func (s *NatsStore) GetStep(ctx context.Context, dagID, stepID string) (StepRecord, uint64, error)

func (*NatsStore) ListDAGs

func (s *NatsStore) ListDAGs(ctx context.Context) ([]DAGMeta, error)

func (*NatsStore) ListSteps

func (s *NatsStore) ListSteps(ctx context.Context, dagID string) ([]StepRecord, error)

func (*NatsStore) PutMeta

func (s *NatsStore) PutMeta(ctx context.Context, dagID string, meta DAGMeta, expectedRev uint64) error

func (*NatsStore) PutResult

func (s *NatsStore) PutResult(ctx context.Context, dagID, stepID string, data []byte) error

func (*NatsStore) PutStep

func (s *NatsStore) PutStep(ctx context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error

func (*NatsStore) WatchResult

func (s *NatsStore) WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)

type PlacementMode

type PlacementMode string
const (
	PlacementDirect   PlacementMode = "direct"
	PlacementColocate PlacementMode = "colocate_with"
	PlacementFollow   PlacementMode = "follow_target_of"
	PlacementHere     PlacementMode = "colocate_here"
)

type PlacementSpec

type PlacementSpec struct {
	Mode   PlacementMode `json:"mode,omitempty"`
	Target string        `json:"target,omitempty"`
	StepID string        `json:"step_id,omitempty"`
}

PlacementSpec describes how a step chooses its execution target.

type Ref

type Ref struct {
	StepID  string          `json:"step_id"`
	Mode    RefMode         `json:"mode"`
	Default json.RawMessage `json:"default,omitempty"` // used when Mode == RefOrDefault
}

Ref marks a position in a step's argument list that should be substituted with another step's result at scheduling time. Refs are JSON-serialized into the step record's ArgsJSON; the scheduler fetches the upstream result from the store and substitutes it in place before enqueueing.

The magic key "__ebind_ref__" makes Refs distinguishable from ordinary JSON values. The Mode controls what happens when the upstream step failed/skipped.

func DecodeRef

func DecodeRef(raw json.RawMessage) (*Ref, bool)

DecodeRef parses a raw JSON value as a Ref if it is one; otherwise returns (nil, false).

func (Ref) MarshalJSON

func (r Ref) MarshalJSON() ([]byte, error)

MarshalJSON serializes Ref using the refEnvelope shape so scheduler can find it.

func (*Ref) UnmarshalJSON

func (r *Ref) UnmarshalJSON(data []byte) error

UnmarshalJSON accepts the refEnvelope shape.

type RefMode

type RefMode string
const (
	// RefModeRequired: upstream must be `done`. If `failed`/`skipped`, this step is skipped (cascade).
	RefModeRequired RefMode = "required"
	// RefModeOrDefault: if upstream `failed`/`skipped`, substitute the Default value and run anyway.
	RefModeOrDefault RefMode = "or_default"
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler consumes completion + step-added events, advances DAG state, and enqueues newly-ready steps. Every worker starts one; leader gating via Workflow.Elector ensures at-most-one processes events at a time across workers; within a single scheduler, mu serializes event handling to avoid CAS races.

On a false→true edge of IsLeader(), the scheduler runs a full sweep of all in-flight DAGs to re-enqueue any ready-but-stranded steps.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run subscribes to all DAG.>.completed.> and DAG.>.step-added.> events and dispatches them. Also spawns a leadership watcher that triggers a sweep on every false→true edge of IsLeader(). Blocks until ctx is done.

type StateStore

type StateStore interface {
	GetStep(ctx context.Context, dagID, stepID string) (StepRecord, uint64, error)
	PutStep(ctx context.Context, dagID, stepID string, rec StepRecord, expectedRev uint64) error
	ListSteps(ctx context.Context, dagID string) ([]StepRecord, error)

	GetResult(ctx context.Context, dagID, stepID string) ([]byte, error)
	PutResult(ctx context.Context, dagID, stepID string, data []byte) error

	GetMeta(ctx context.Context, dagID string) (DAGMeta, uint64, error)
	PutMeta(ctx context.Context, dagID string, meta DAGMeta, expectedRev uint64) error
	// ListDAGs enumerates all stored DAG meta records. Used by the scheduler's
	// sweep on leader acquisition to find stranded ready steps across all running DAGs.
	ListDAGs(ctx context.Context) ([]DAGMeta, error)

	// DeleteMeta, DeleteStep, DeleteResult remove records. Missing keys are not errors
	// (delete is idempotent). Callers coordinating a DAG-level delete should use
	// DeleteDAG which composes these in the correct order.
	DeleteMeta(ctx context.Context, dagID string) error
	DeleteStep(ctx context.Context, dagID, stepID string) error
	DeleteResult(ctx context.Context, dagID, stepID string) error

	// WatchResult streams a single message once a result is written at the given key.
	// Implementations should close the channel on cancel or immediately if the result
	// already exists (sending the existing value first).
	WatchResult(ctx context.Context, dagID, stepID string) (<-chan []byte, error)
}

StateStore persists DAG + step + result records with CAS semantics. All PutX(..., expectedRev) operations return ErrStaleRevision if the stored record has a different revision. expectedRev == 0 means "create if absent".

The interface is designed so the scheduler logic remains pure — all IO flows through StateStore, EventBus, and Enqueuer. Tests use store_mem.go fakes.

type Step

type Step struct {
	// contains filtered or unexported fields
}

Step represents a single node in a DAG. Returned by DAG.Step for the caller to chain into dependent steps via Ref() or RefOrDefault().

func (*Step) ID

func (s *Step) ID() string

ID returns the stable step ID within its DAG.

func (*Step) Ref

func (s *Step) Ref() Ref

Ref returns a Required-mode reference. Downstream steps using this Ref will be cascade-skipped if this step fails or is itself skipped.

func (*Step) RefOrDefault

func (s *Step) RefOrDefault(defaultValue any) Ref

RefOrDefault returns an OrDefault-mode reference. If this step fails or is skipped, the downstream step runs with the provided default value substituted for this step's output.

type StepBlocker

type StepBlocker struct {
	StepID    string
	WaitingOn []string
}

StepBlocker names a Pending step and lists its non-terminal deps (required and optional combined). Useful for "why isn't this step running yet?".

type StepDebug

type StepDebug struct {
	StepRecord
	QueueDuration time.Duration // StartedAt - AddedAt (0 if step never started)
	ExecDuration  time.Duration // FinishedAt - StartedAt (0 if step never finished)
}

StepDebug enriches StepRecord with computed per-step durations.

type StepHook

type StepHook struct {
	// contains filtered or unexported fields
}

StepHook implements worker.StepHook by persisting the step outcome to the state store and publishing a completion event to the bus. The scheduler consumes these events to advance the DAG.

func (*StepHook) OnStepDone

func (h *StepHook) OnStepDone(ctx context.Context, t *task.Task, result []byte) error

OnStepDone marks the step as done, writes the result, and publishes a `completed` event with Status=done.

func (*StepHook) OnStepFailed

func (h *StepHook) OnStepFailed(ctx context.Context, t *task.Task, taskErr *task.TaskError) error

OnStepFailed marks the step as failed and publishes a `completed` event with Status=failed so the scheduler can cascade skips and re-evaluate readiness.

type StepOption

type StepOption func(*Step)

StepOption configures a Step at construction time.

func After

func After(steps ...*Step) StepOption

After declares explicit temporal-only dependencies on the given upstream steps. This step waits until every upstream is terminal (done/failed/skipped) before it runs. If any upstream ended in failed/skipped, this step is cascade-skipped (same semantics as referencing via Ref()).

Use After when you need ordering but the current step's handler doesn't consume any upstream output. Equivalent to adding a Ref(upstream) arg that the handler ignores, but without contaminating the handler's signature.

func AfterAny

func AfterAny(steps ...*Step) StepOption

AfterAny declares optional temporal-only dependencies. This step waits until every upstream is terminal (done/failed/skipped) but runs regardless of whether they succeeded. Upstream failure does NOT cascade-skip this step.

Use AfterAny for ordering a "best-effort" or cleanup step that should run after some other work, whether that work succeeded or not.

func ColocateHere

func ColocateHere() StepOption

ColocateHere is only valid for dynamic steps created from a running handler via workflow.FromContext(ctx).StepOpts(...). It binds the new step to the concrete worker that is currently executing.

func ColocateWith

func ColocateWith(step *Step) StepOption

ColocateWith binds a step to the concrete worker that executed another step. The dependency is also made explicit so the referenced step has already run by the time placement is resolved.

func FollowTargetOf

func FollowTargetOf(step *Step) StepOption

FollowTargetOf binds a step to the same logical target expression as another step, but re-resolves that target at execution time.

func OnTarget

func OnTarget(target string) StepOption

OnTarget binds a step to a logical or concrete target claim.

func Optional

func Optional() StepOption

Optional marks a step as non-critical — its failure does not fail the DAG. Downstream steps choose whether to cascade-skip (via Ref) or substitute (via RefOrDefault).

func WithStepRetry

func WithStepRetry(p task.RetryPolicy) StepOption

WithStepRetry overrides the DAG's default retry policy for this step.

type StepRecord

type StepRecord struct {
	DAGID        string            `json:"dag_id"`
	StepID       string            `json:"step_id"`
	FnName       string            `json:"fn_name"`
	ArgsJSON     json.RawMessage   `json:"args_json"` // JSON array; may contain Refs
	Deps         []string          `json:"deps,omitempty"`
	OptionalDeps []string          `json:"optional_deps,omitempty"`
	Status       StepStatus        `json:"status"`
	Attempt      int               `json:"attempt"`
	ErrorKind    string            `json:"error_kind,omitempty"`
	Optional     bool              `json:"optional,omitempty"`
	Policy       *task.RetryPolicy `json:"policy,omitempty"` // per-step override
	Placement    *PlacementSpec    `json:"placement,omitempty"`
	WorkerID     string            `json:"worker_id,omitempty"`
	AddedAt      time.Time         `json:"added_at"`
	StartedAt    time.Time         `json:"started_at,omitempty"`
	FinishedAt   time.Time         `json:"finished_at,omitempty"`
}

StepRecord is stored per-step (key: <dag_id>/step/<step_id>).

Dependency model:

  • Deps: required deps. Step waits until each is terminal. If any ends as failed/skipped AND the args don't contain a RefOrDefault on it, this step is cascade-skipped. Combines Ref-derived deps from args + After() options.
  • OptionalDeps: "wait for completion but don't cascade on failure." Step waits until each is terminal; failure never causes cascade. Populated by AfterAny() options.

func (StepRecord) IsTerminal

func (s StepRecord) IsTerminal() bool

IsTerminal returns true if this step's status cannot change anymore.

type StepStatus

type StepStatus string

StepStatus is used by ResolveArgs to decide how to handle each Ref.

const (
	StatusPending  StepStatus = "pending"
	StatusRunning  StepStatus = "running"
	StatusDone     StepStatus = "done"
	StatusFailed   StepStatus = "failed"
	StatusSkipped  StepStatus = "skipped"
	StatusCanceled StepStatus = "canceled"
)

type Subscription

type Subscription interface {
	Stop() error
}

Subscription is returned by EventBus.Subscribe and is stopped via Stop().

type Workflow

type Workflow struct {
	Store    StateStore
	Bus      EventBus
	Enq      Enqueuer
	Elector  LeaderElector // nil ⇒ always leader
	NakDelay time.Duration // default 1s; used by scheduler when non-leader sees an event

	// SweepCheckInterval is the cadence at which the scheduler polls IsLeader()
	// to detect leadership acquisition. On a false→true edge it runs a sweep of
	// all in-flight DAGs to re-enqueue stranded ready steps. Default 5s.
	SweepCheckInterval time.Duration
	// SweepTimeout is the max wall-clock a single sweep may take. Default 60s.
	SweepTimeout time.Duration
}

Workflow bundles the three IO dependencies (store, bus, enqueuer) plus an optional leader elector for the scheduler loop. DAG.Submit takes one; Scheduler.Run uses one.

func NewFromNATS

func NewFromNATS(ctx context.Context, nc *nats.Conn, replicas int) (*Workflow, error)

NewFromNATS constructs a Workflow backed by the given NATS connection: - NatsStore for step/result persistence (KV bucket ebind-dags) - NatsBus for scheduler events (stream EBIND_DAG_EVENTS) - NatsEnqueuer for publishing tasks on the existing TASKS stream Replicas applies to the KV bucket and events stream; use 1 for single-node dev, 3 for HA cluster.

func NewWorkflow

func NewWorkflow(store StateStore, bus EventBus, enq Enqueuer) *Workflow

NewWorkflow constructs a Workflow with the three dependencies. Defaults: Elector = always-leader, NakDelay = 1s.

func (*Workflow) ContextMiddleware

func (wf *Workflow) ContextMiddleware() worker.Middleware

ContextMiddleware returns a worker.Middleware that injects a ContextDAG into the handler's context for DAG-dispatched tasks. Ad-hoc tasks (no DAGID) are passed through unchanged.

func (*Workflow) Hook

func (wf *Workflow) Hook() *StepHook

Hook returns a worker.StepHook that persists step outcomes to the store and publishes completion events to the bus. Wire it into worker.Options.StepHook.

func (*Workflow) RunScheduler

func (wf *Workflow) RunScheduler(ctx context.Context) error

RunScheduler drives the scheduler loop — subscribes to completion events, processes ready steps, applies state transitions. Blocks until ctx is canceled.

func (*Workflow) WithElector

func (wf *Workflow) WithElector(le LeaderElector) *Workflow

WithElector replaces the default always-leader elector.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL