workflow

package
v0.0.0-...-eaf5e47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComputeBlobHash

func ComputeBlobHash(data []byte) string

ComputeBlobHash computes a content hash for a blob.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates workflow execution. It runs on a single node (the leader) and dispatches steps as jobs to the worker pool via the existing job system.

func NewEngine

func NewEngine(store WorkflowStore, registry *Registry, log zerolog.Logger) *Engine

NewEngine creates a new workflow engine.

func (*Engine) ExecuteStep

func (e *Engine) ExecuteStep(ctx context.Context, runID, stepName string) error

ExecuteStep executes a single workflow step. This is called by the job handler (internal/worker) when a kronos.workflow_step job executes. The step input is constructed from upstream recorded outputs, and execution is recorded as events in the workflow_events table.

func (*Engine) ForkFromStep

func (e *Engine) ForkFromStep(ctx context.Context, runID, fromStep string) (string, error)

ForkFromStep creates a new run that reuses recorded outputs from all steps before fromStep, then schedules fromStep and all downstream steps for re-execution. Returns the new forked run ID.

type Event

type Event struct {
	ID        string
	RunID     string
	Seq       int       // monotonically increasing sequence number
	Ts        time.Time // timestamp when the event was recorded
	EventType EventType
	Payload   json.RawMessage
}

Event is a single entry in a workflow run's event log.

func NewEvent

func NewEvent(runID string, seq int, eventType EventType, payload interface{}) (*Event, error)

NewEvent creates a new event with a payload.

type EventType

type EventType string

EventType is the type of workflow event.

const (
	EventRunStarted         EventType = "run_started"
	EventStepScheduled      EventType = "step_scheduled"
	EventStepStarted        EventType = "step_started"
	EventStepInputRecorded  EventType = "step_input_recorded"
	EventStepOutputRecorded EventType = "step_output_recorded"
	EventStepCompleted      EventType = "step_completed"
	EventStepFailed         EventType = "step_failed"
	EventStepRetried        EventType = "step_retried"
	EventRunCompleted       EventType = "run_completed"
	EventRunFailed          EventType = "run_failed"
	EventRunForked          EventType = "run_forked"
	EventRunCancelled       EventType = "run_cancelled"
)

type PGWorkflowStore

type PGWorkflowStore struct {
	// contains filtered or unexported fields
}

PGWorkflowStore implements WorkflowStore against PostgreSQL.

func NewPGWorkflowStore

func NewPGWorkflowStore(pool *pgxpool.Pool) *PGWorkflowStore

NewPGWorkflowStore creates a PGWorkflowStore.

func (*PGWorkflowStore) AppendEvent

func (s *PGWorkflowStore) AppendEvent(ctx context.Context, event *Event) (int, error)

AppendEvent appends an event to the log.

func (*PGWorkflowStore) CancelRun

func (s *PGWorkflowStore) CancelRun(ctx context.Context, runID string) error

CancelRun transitions a pending run to cancelled.

func (*PGWorkflowStore) CreateRun

func (s *PGWorkflowStore) CreateRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)

CreateRun inserts a new workflow run.

func (*PGWorkflowStore) CreateWorkflow

func (s *PGWorkflowStore) CreateWorkflow(ctx context.Context, w *Workflow) error

CreateWorkflow inserts a workflow definition.

func (*PGWorkflowStore) DeleteBlob

func (s *PGWorkflowStore) DeleteBlob(ctx context.Context, hash string) error

DeleteBlob decrements ref_count and deletes if needed.

func (*PGWorkflowStore) GetBlob

func (s *PGWorkflowStore) GetBlob(ctx context.Context, hash string) ([]byte, error)

GetBlob retrieves a blob.

func (*PGWorkflowStore) GetEvents

func (s *PGWorkflowStore) GetEvents(ctx context.Context, runID string) ([]*Event, error)

GetEvents retrieves all events for a run.

func (*PGWorkflowStore) GetEventsAfterSeq

func (s *PGWorkflowStore) GetEventsAfterSeq(ctx context.Context, runID string, afterSeq int) ([]*Event, error)

GetEventsAfterSeq retrieves events after a sequence number.

func (*PGWorkflowStore) GetLatestWorkflow

func (s *PGWorkflowStore) GetLatestWorkflow(ctx context.Context, name string) (*Workflow, error)

GetLatestWorkflow retrieves the most recent workflow by name.

func (*PGWorkflowStore) GetRun

func (s *PGWorkflowStore) GetRun(ctx context.Context, id string) (*WorkflowRun, error)

GetRun retrieves a run by ID.

func (*PGWorkflowStore) GetStepExecution

func (s *PGWorkflowStore) GetStepExecution(ctx context.Context, runID, stepName string) (*StepExecution, error)

GetStepExecution retrieves a step execution.

func (*PGWorkflowStore) GetWorkflow

func (s *PGWorkflowStore) GetWorkflow(ctx context.Context, id string) (*Workflow, error)

GetWorkflow retrieves a workflow by ID.

func (*PGWorkflowStore) GetWorkflowByNameFingerprint

func (s *PGWorkflowStore) GetWorkflowByNameFingerprint(ctx context.Context, name, fingerprint string) (*Workflow, error)

GetWorkflowByNameFingerprint retrieves a workflow by name and fingerprint.

func (*PGWorkflowStore) ListRuns

func (s *PGWorkflowStore) ListRuns(ctx context.Context, workflowName *string, status *string, pageSize int, pageToken string) ([]*WorkflowRun, string, error)

ListRuns retrieves runs with optional filtering.

func (*PGWorkflowStore) ListStepExecutions

func (s *PGWorkflowStore) ListStepExecutions(ctx context.Context, runID string) ([]*StepExecution, error)

ListStepExecutions retrieves all step executions for a run.

func (*PGWorkflowStore) PutBlob

func (s *PGWorkflowStore) PutBlob(ctx context.Context, hash string, data []byte) error

PutBlob stores a blob (large input/output).

func (*PGWorkflowStore) UpdateRunOutput

func (s *PGWorkflowStore) UpdateRunOutput(ctx context.Context, runID string, output json.RawMessage) error

UpdateRunOutput sets the output on a completed run.

func (*PGWorkflowStore) UpdateRunStatus

func (s *PGWorkflowStore) UpdateRunStatus(ctx context.Context, runID, status string, errMsg *string, finishedAt *time.Time) error

UpdateRunStatus sets status, error, and finished_at.

func (*PGWorkflowStore) UpsertStepExecution

func (s *PGWorkflowStore) UpsertStepExecution(ctx context.Context, exec *StepExecution) error

UpsertStepExecution inserts or updates a step execution.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps workflow name to a list of versioned definitions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new workflow registry.

func (*Registry) Get

func (r *Registry) Get(name string) *Workflow

Get retrieves the latest registered workflow by name. Returns nil if not found.

func (*Registry) GetByFingerprint

func (r *Registry) GetByFingerprint(name, fingerprint string) *Workflow

GetByFingerprint retrieves a workflow by name and fingerprint. Returns nil if not found.

func (*Registry) ListAll

func (r *Registry) ListAll() []*Workflow

ListAll returns all registered workflows.

func (*Registry) Register

func (r *Registry) Register(w *Workflow) error

Register registers a workflow definition. If a workflow with the same name and fingerprint is already registered, it's a no-op. Panics if the workflow is invalid.

type RunCompletedPayload

type RunCompletedPayload struct {
	Output json.RawMessage `json:"output,omitempty"`
}

RunCompletedPayload is the payload for a RunCompleted event.

type RunFailedPayload

type RunFailedPayload struct {
	Error string `json:"error"`
}

RunFailedPayload is the payload for a RunFailed event.

type RunForkedPayload

type RunForkedPayload struct {
	ForkedRunID  string `json:"forked_run_id"`
	FromStepName string `json:"from_step_name"`
}

RunForkedPayload is the payload for a RunForked event.

type RunStartedPayload

type RunStartedPayload struct {
	WorkflowName    string          `json:"workflow_name"`
	WorkflowVersion string          `json:"workflow_version"`
	Input           json.RawMessage `json:"input"`
}

RunStartedPayload is the payload for a RunStarted event.

type StepCompletedPayload

type StepCompletedPayload struct {
	StepName string `json:"step_name"`
}

StepCompletedPayload is the payload for a StepCompleted event.

type StepDef

type StepDef struct {
	Name         string
	Handler      StepFunc
	Dependencies []string // names of steps this depends on; empty = no dependencies
}

StepDef describes a single step in a workflow DAG.

type StepExecution

type StepExecution struct {
	ID         string
	RunID      string
	StepName   string
	Attempt    int
	Status     string
	Input      json.RawMessage
	Output     json.RawMessage
	Error      string
	StartedAt  *time.Time
	FinishedAt *time.Time
	DurationMs *int
}

StepExecution represents a single step execution within a run.

type StepFailedPayload

type StepFailedPayload struct {
	StepName string `json:"step_name"`
	Error    string `json:"error"`
	Attempt  int    `json:"attempt"`
}

StepFailedPayload is the payload for a StepFailed event.

type StepFunc

type StepFunc func(ctx context.Context, input json.RawMessage) (json.RawMessage, error)

StepFunc is the signature for a workflow step handler. It receives a context (for cancellation, timeouts, tracing) and the step input (from upstream outputs or workflow input) and returns the step output. Implementations should be idempotent — they may be called more than once on retry.

type StepInputRecordedPayload

type StepInputRecordedPayload struct {
	StepName string          `json:"step_name"`
	Input    json.RawMessage `json:"input"`
	BlobID   *string         `json:"blob_id,omitempty"` // if input is large
}

StepInputRecordedPayload is the payload for a StepInputRecorded event.

type StepOutputRecordedPayload

type StepOutputRecordedPayload struct {
	StepName   string          `json:"step_name"`
	Output     json.RawMessage `json:"output"`
	BlobID     *string         `json:"blob_id,omitempty"` // if output is large
	DurationMs int             `json:"duration_ms"`
}

StepOutputRecordedPayload is the payload for a StepOutputRecorded event.

type StepRetriedPayload

type StepRetriedPayload struct {
	StepName string `json:"step_name"`
	Attempt  int    `json:"attempt"`
}

StepRetriedPayload is the payload for a StepRetried event.

type StepScheduledPayload

type StepScheduledPayload struct {
	StepName     string   `json:"step_name"`
	Dependencies []string `json:"dependencies"`
}

StepScheduledPayload is the payload for a StepScheduled event.

type StepStartedPayload

type StepStartedPayload struct {
	StepName string `json:"step_name"`
	Attempt  int    `json:"attempt"`
}

StepStartedPayload is the payload for a StepStarted event.

type Workflow

type Workflow struct {
	Name            string
	Version         string
	CodeFingerprint string
	Steps           []*StepDef
	DAGStructure    map[string][]string // step name -> list of names that depend on it
	// contains filtered or unexported fields
}

Workflow describes a repeatable multi-step process. Workflows are versioned and immutable: changes create a new Workflow with a new fingerprint.

func NewWorkflow

func NewWorkflow(name, version string) *Workflow

NewWorkflow creates a new workflow builder.

func (*Workflow) AddStep

func (w *Workflow) AddStep(name string, handler StepFunc, dependencies []string) *Workflow

AddStep adds a step to the workflow DAG. dependencies is a list of step names this step depends on (nil = no dependencies).

func (*Workflow) ComputeFingerprint

func (w *Workflow) ComputeFingerprint() string

ComputeFingerprint computes a content hash of the workflow definition. This is used to detect changes in workflow logic and create new workflow versions. We hash: workflow name + version + step names + dependency structure. Handler function pointers are not hashed (we can't serialize them reliably).

func (*Workflow) GetReadySteps

func (w *Workflow) GetReadySteps(completed map[string]json.RawMessage) []string

GetReadySteps returns the names of steps that are ready to execute (all dependencies have completed). completed is a map of step name -> output (json.RawMessage).

func (*Workflow) GetStepDef

func (w *Workflow) GetStepDef(name string) *StepDef

GetStepDef returns the StepDef for a given step name, or nil if not found.

func (*Workflow) Validate

func (w *Workflow) Validate() error

Validate checks that the DAG is acyclic and all dependencies are satisfied.

type WorkflowEvent

type WorkflowEvent struct {
	ID        string
	RunID     string
	Seq       int
	Ts        time.Time
	EventType string // run_started, step_started, step_completed, etc.
	Payload   json.RawMessage
}

WorkflowEvent is an entry in the event log.

type WorkflowRun

type WorkflowRun struct {
	ID          string
	WorkflowID  string // FK to workflows table
	Status      string // pending, running, completed, failed, cancelled, forked
	Input       json.RawMessage
	Output      json.RawMessage
	Error       string
	ParentRunID *string // for forked runs
	StartedAt   *time.Time
	FinishedAt  *time.Time
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

WorkflowRun represents a single execution of a workflow.

type WorkflowStore

type WorkflowStore interface {
	// CreateWorkflow inserts a new workflow definition.
	// Returns the created workflow with ID populated.
	CreateWorkflow(ctx context.Context, w *Workflow) error

	// GetWorkflow retrieves a workflow definition by ID.
	GetWorkflow(ctx context.Context, id string) (*Workflow, error)

	// GetWorkflowByNameFingerprint retrieves a workflow by name and fingerprint.
	// Returns nil if not found.
	GetWorkflowByNameFingerprint(ctx context.Context, name, fingerprint string) (*Workflow, error)

	// GetLatestWorkflow retrieves the most recent workflow definition by name.
	GetLatestWorkflow(ctx context.Context, name string) (*Workflow, error)

	// CreateRun inserts a new workflow run.
	CreateRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)

	// GetRun retrieves a run by ID.
	GetRun(ctx context.Context, id string) (*WorkflowRun, error)

	// ListRuns retrieves runs, optionally filtered by workflow name and/or status.
	ListRuns(ctx context.Context, workflowName *string, status *string, pageSize int, pageToken string) ([]*WorkflowRun, string, error)

	// UpdateRunStatus sets the status (and optionally error and finished_at) on a run.
	UpdateRunStatus(ctx context.Context, runID, status string, errMsg *string, finishedAt *time.Time) error

	// UpdateRunOutput sets the output on a completed run.
	UpdateRunOutput(ctx context.Context, runID string, output json.RawMessage) error

	// CancelRun transitions a pending run to cancelled.
	CancelRun(ctx context.Context, runID string) error

	// AppendEvent appends an event to the event log and returns the assigned sequence number.
	AppendEvent(ctx context.Context, event *Event) (int, error)

	// GetEvents retrieves all events for a run in order.
	GetEvents(ctx context.Context, runID string) ([]*Event, error)

	// GetEventsAfterSeq retrieves events for a run starting after a given sequence number.
	GetEventsAfterSeq(ctx context.Context, runID string, afterSeq int) ([]*Event, error)

	// PutBlob stores a large input/output in blob storage by content hash.
	// If the blob already exists, only the ref_count is incremented.
	PutBlob(ctx context.Context, hash string, data []byte) error

	// GetBlob retrieves a blob by content hash.
	GetBlob(ctx context.Context, hash string) ([]byte, error)

	// DeleteBlob decrements the ref_count and deletes the blob if it reaches 0.
	DeleteBlob(ctx context.Context, hash string) error

	// UpsertStepExecution inserts or updates a step execution record.
	UpsertStepExecution(ctx context.Context, exec *StepExecution) error

	// GetStepExecution retrieves a step execution.
	GetStepExecution(ctx context.Context, runID, stepName string) (*StepExecution, error)

	// ListStepExecutions retrieves all step executions for a run.
	ListStepExecutions(ctx context.Context, runID string) ([]*StepExecution, error)
}

WorkflowStore is the persistence interface for workflows and runs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL