Documentation
¶
Index ¶
- func ComputeBlobHash(data []byte) string
- type Engine
- type Event
- type EventType
- type PGWorkflowStore
- func (s *PGWorkflowStore) AppendEvent(ctx context.Context, event *Event) (int, error)
- func (s *PGWorkflowStore) CancelRun(ctx context.Context, runID string) error
- func (s *PGWorkflowStore) CreateRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)
- func (s *PGWorkflowStore) CreateWorkflow(ctx context.Context, w *Workflow) error
- func (s *PGWorkflowStore) DeleteBlob(ctx context.Context, hash string) error
- func (s *PGWorkflowStore) GetBlob(ctx context.Context, hash string) ([]byte, error)
- func (s *PGWorkflowStore) GetEvents(ctx context.Context, runID string) ([]*Event, error)
- func (s *PGWorkflowStore) GetEventsAfterSeq(ctx context.Context, runID string, afterSeq int) ([]*Event, error)
- func (s *PGWorkflowStore) GetLatestWorkflow(ctx context.Context, name string) (*Workflow, error)
- func (s *PGWorkflowStore) GetRun(ctx context.Context, id string) (*WorkflowRun, error)
- func (s *PGWorkflowStore) GetStepExecution(ctx context.Context, runID, stepName string) (*StepExecution, error)
- func (s *PGWorkflowStore) GetWorkflow(ctx context.Context, id string) (*Workflow, error)
- func (s *PGWorkflowStore) GetWorkflowByNameFingerprint(ctx context.Context, name, fingerprint string) (*Workflow, error)
- func (s *PGWorkflowStore) ListRuns(ctx context.Context, workflowName *string, status *string, pageSize int, ...) ([]*WorkflowRun, string, error)
- func (s *PGWorkflowStore) ListStepExecutions(ctx context.Context, runID string) ([]*StepExecution, error)
- func (s *PGWorkflowStore) PutBlob(ctx context.Context, hash string, data []byte) error
- func (s *PGWorkflowStore) UpdateRunOutput(ctx context.Context, runID string, output json.RawMessage) error
- func (s *PGWorkflowStore) UpdateRunStatus(ctx context.Context, runID, status string, errMsg *string, ...) error
- func (s *PGWorkflowStore) UpsertStepExecution(ctx context.Context, exec *StepExecution) error
- type Registry
- type RunCompletedPayload
- type RunFailedPayload
- type RunForkedPayload
- type RunStartedPayload
- type StepCompletedPayload
- type StepDef
- type StepExecution
- type StepFailedPayload
- type StepFunc
- type StepInputRecordedPayload
- type StepOutputRecordedPayload
- type StepRetriedPayload
- type StepScheduledPayload
- type StepStartedPayload
- type Workflow
- func (w *Workflow) AddStep(name string, handler StepFunc, dependencies []string) *Workflow
- func (w *Workflow) ComputeFingerprint() string
- func (w *Workflow) GetReadySteps(completed map[string]json.RawMessage) []string
- func (w *Workflow) GetStepDef(name string) *StepDef
- func (w *Workflow) Validate() error
- type WorkflowEvent
- type WorkflowRun
- type WorkflowStore
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeBlobHash ¶
ComputeBlobHash computes a content hash for a blob.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine orchestrates workflow execution. It runs on a single node (the leader) and dispatches steps as jobs to the worker pool via the existing job system.
func NewEngine ¶
func NewEngine(store WorkflowStore, registry *Registry, log zerolog.Logger) *Engine
NewEngine creates a new workflow engine.
func (*Engine) ExecuteStep ¶
ExecuteStep executes a single workflow step. This is called by the job handler (internal/worker) when a kronos.workflow_step job executes. The step input is constructed from upstream recorded outputs, and execution is recorded as events in the workflow_events table.
func (*Engine) ForkFromStep ¶
ForkFromStep creates a new run that reuses recorded outputs from all steps before fromStep, then schedules fromStep and all downstream steps for re-execution. Returns the new forked run ID.
type Event ¶
type Event struct {
ID string
RunID string
Seq int // monotonically increasing sequence number
Ts time.Time // timestamp when the event was recorded
EventType EventType
Payload json.RawMessage
}
Event is a single entry in a workflow run's event log.
type EventType ¶
type EventType string
EventType is the type of workflow event.
const ( EventRunStarted EventType = "run_started" EventStepScheduled EventType = "step_scheduled" EventStepStarted EventType = "step_started" EventStepInputRecorded EventType = "step_input_recorded" EventStepOutputRecorded EventType = "step_output_recorded" EventStepCompleted EventType = "step_completed" EventStepFailed EventType = "step_failed" EventStepRetried EventType = "step_retried" EventRunCompleted EventType = "run_completed" EventRunFailed EventType = "run_failed" EventRunForked EventType = "run_forked" EventRunCancelled EventType = "run_cancelled" )
type PGWorkflowStore ¶
type PGWorkflowStore struct {
// contains filtered or unexported fields
}
PGWorkflowStore implements WorkflowStore against PostgreSQL.
func NewPGWorkflowStore ¶
func NewPGWorkflowStore(pool *pgxpool.Pool) *PGWorkflowStore
NewPGWorkflowStore creates a PGWorkflowStore.
func (*PGWorkflowStore) AppendEvent ¶
AppendEvent appends an event to the log.
func (*PGWorkflowStore) CancelRun ¶
func (s *PGWorkflowStore) CancelRun(ctx context.Context, runID string) error
CancelRun transitions a pending run to cancelled.
func (*PGWorkflowStore) CreateRun ¶
func (s *PGWorkflowStore) CreateRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)
CreateRun inserts a new workflow run.
func (*PGWorkflowStore) CreateWorkflow ¶
func (s *PGWorkflowStore) CreateWorkflow(ctx context.Context, w *Workflow) error
CreateWorkflow inserts a workflow definition.
func (*PGWorkflowStore) DeleteBlob ¶
func (s *PGWorkflowStore) DeleteBlob(ctx context.Context, hash string) error
DeleteBlob decrements ref_count and deletes if needed.
func (*PGWorkflowStore) GetEventsAfterSeq ¶
func (s *PGWorkflowStore) GetEventsAfterSeq(ctx context.Context, runID string, afterSeq int) ([]*Event, error)
GetEventsAfterSeq retrieves events after a sequence number.
func (*PGWorkflowStore) GetLatestWorkflow ¶
GetLatestWorkflow retrieves the most recent workflow by name.
func (*PGWorkflowStore) GetRun ¶
func (s *PGWorkflowStore) GetRun(ctx context.Context, id string) (*WorkflowRun, error)
GetRun retrieves a run by ID.
func (*PGWorkflowStore) GetStepExecution ¶
func (s *PGWorkflowStore) GetStepExecution(ctx context.Context, runID, stepName string) (*StepExecution, error)
GetStepExecution retrieves a step execution.
func (*PGWorkflowStore) GetWorkflow ¶
GetWorkflow retrieves a workflow by ID.
func (*PGWorkflowStore) GetWorkflowByNameFingerprint ¶
func (s *PGWorkflowStore) GetWorkflowByNameFingerprint(ctx context.Context, name, fingerprint string) (*Workflow, error)
GetWorkflowByNameFingerprint retrieves a workflow by name and fingerprint.
func (*PGWorkflowStore) ListRuns ¶
func (s *PGWorkflowStore) ListRuns(ctx context.Context, workflowName *string, status *string, pageSize int, pageToken string) ([]*WorkflowRun, string, error)
ListRuns retrieves runs with optional filtering.
func (*PGWorkflowStore) ListStepExecutions ¶
func (s *PGWorkflowStore) ListStepExecutions(ctx context.Context, runID string) ([]*StepExecution, error)
ListStepExecutions retrieves all step executions for a run.
func (*PGWorkflowStore) UpdateRunOutput ¶
func (s *PGWorkflowStore) UpdateRunOutput(ctx context.Context, runID string, output json.RawMessage) error
UpdateRunOutput sets the output on a completed run.
func (*PGWorkflowStore) UpdateRunStatus ¶
func (s *PGWorkflowStore) UpdateRunStatus(ctx context.Context, runID, status string, errMsg *string, finishedAt *time.Time) error
UpdateRunStatus sets status, error, and finished_at.
func (*PGWorkflowStore) UpsertStepExecution ¶
func (s *PGWorkflowStore) UpsertStepExecution(ctx context.Context, exec *StepExecution) error
UpsertStepExecution inserts or updates a step execution.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps workflow name to a list of versioned definitions.
func (*Registry) Get ¶
Get retrieves the latest registered workflow by name. Returns nil if not found.
func (*Registry) GetByFingerprint ¶
GetByFingerprint retrieves a workflow by name and fingerprint. Returns nil if not found.
type RunCompletedPayload ¶
type RunCompletedPayload struct {
Output json.RawMessage `json:"output,omitempty"`
}
RunCompletedPayload is the payload for a RunCompleted event.
type RunFailedPayload ¶
type RunFailedPayload struct {
Error string `json:"error"`
}
RunFailedPayload is the payload for a RunFailed event.
type RunForkedPayload ¶
type RunForkedPayload struct {
ForkedRunID string `json:"forked_run_id"`
FromStepName string `json:"from_step_name"`
}
RunForkedPayload is the payload for a RunForked event.
type RunStartedPayload ¶
type RunStartedPayload struct {
WorkflowName string `json:"workflow_name"`
WorkflowVersion string `json:"workflow_version"`
Input json.RawMessage `json:"input"`
}
RunStartedPayload is the payload for a RunStarted event.
type StepCompletedPayload ¶
type StepCompletedPayload struct {
StepName string `json:"step_name"`
}
StepCompletedPayload is the payload for a StepCompleted event.
type StepDef ¶
type StepDef struct {
Name string
Handler StepFunc
Dependencies []string // names of steps this depends on; empty = no dependencies
}
StepDef describes a single step in a workflow DAG.
type StepExecution ¶
type StepExecution struct {
ID string
RunID string
StepName string
Attempt int
Status string
Input json.RawMessage
Output json.RawMessage
Error string
StartedAt *time.Time
FinishedAt *time.Time
DurationMs *int
}
StepExecution represents a single step execution within a run.
type StepFailedPayload ¶
type StepFailedPayload struct {
StepName string `json:"step_name"`
Error string `json:"error"`
Attempt int `json:"attempt"`
}
StepFailedPayload is the payload for a StepFailed event.
type StepFunc ¶
type StepFunc func(ctx context.Context, input json.RawMessage) (json.RawMessage, error)
StepFunc is the signature for a workflow step handler. It receives a context (for cancellation, timeouts, tracing) and the step input (from upstream outputs or workflow input) and returns the step output. Implementations should be idempotent — they may be called more than once on retry.
type StepInputRecordedPayload ¶
type StepInputRecordedPayload struct {
StepName string `json:"step_name"`
Input json.RawMessage `json:"input"`
BlobID *string `json:"blob_id,omitempty"` // if input is large
}
StepInputRecordedPayload is the payload for a StepInputRecorded event.
type StepOutputRecordedPayload ¶
type StepOutputRecordedPayload struct {
StepName string `json:"step_name"`
Output json.RawMessage `json:"output"`
BlobID *string `json:"blob_id,omitempty"` // if output is large
DurationMs int `json:"duration_ms"`
}
StepOutputRecordedPayload is the payload for a StepOutputRecorded event.
type StepRetriedPayload ¶
StepRetriedPayload is the payload for a StepRetried event.
type StepScheduledPayload ¶
type StepScheduledPayload struct {
StepName string `json:"step_name"`
Dependencies []string `json:"dependencies"`
}
StepScheduledPayload is the payload for a StepScheduled event.
type StepStartedPayload ¶
StepStartedPayload is the payload for a StepStarted event.
type Workflow ¶
type Workflow struct {
Name string
Version string
CodeFingerprint string
Steps []*StepDef
DAGStructure map[string][]string // step name -> list of names that depend on it
// contains filtered or unexported fields
}
Workflow describes a repeatable multi-step process. Workflows are versioned and immutable: changes create a new Workflow with a new fingerprint.
func NewWorkflow ¶
NewWorkflow creates a new workflow builder.
func (*Workflow) AddStep ¶
AddStep adds a step to the workflow DAG. dependencies is a list of step names this step depends on (nil = no dependencies).
func (*Workflow) ComputeFingerprint ¶
ComputeFingerprint computes a content hash of the workflow definition. This is used to detect changes in workflow logic and create new workflow versions. We hash: workflow name + version + step names + dependency structure. Handler function pointers are not hashed (we can't serialize them reliably).
func (*Workflow) GetReadySteps ¶
func (w *Workflow) GetReadySteps(completed map[string]json.RawMessage) []string
GetReadySteps returns the names of steps that are ready to execute (all dependencies have completed). completed is a map of step name -> output (json.RawMessage).
func (*Workflow) GetStepDef ¶
GetStepDef returns the StepDef for a given step name, or nil if not found.
type WorkflowEvent ¶
type WorkflowEvent struct {
ID string
RunID string
Seq int
Ts time.Time
EventType string // run_started, step_started, step_completed, etc.
Payload json.RawMessage
}
WorkflowEvent is an entry in the event log.
type WorkflowRun ¶
type WorkflowRun struct {
ID string
WorkflowID string // FK to workflows table
Status string // pending, running, completed, failed, cancelled, forked
Input json.RawMessage
Output json.RawMessage
Error string
ParentRunID *string // for forked runs
StartedAt *time.Time
FinishedAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
WorkflowRun represents a single execution of a workflow.
type WorkflowStore ¶
type WorkflowStore interface {
// CreateWorkflow inserts a new workflow definition.
// Returns the created workflow with ID populated.
CreateWorkflow(ctx context.Context, w *Workflow) error
// GetWorkflow retrieves a workflow definition by ID.
GetWorkflow(ctx context.Context, id string) (*Workflow, error)
// GetWorkflowByNameFingerprint retrieves a workflow by name and fingerprint.
// Returns nil if not found.
GetWorkflowByNameFingerprint(ctx context.Context, name, fingerprint string) (*Workflow, error)
// GetLatestWorkflow retrieves the most recent workflow definition by name.
GetLatestWorkflow(ctx context.Context, name string) (*Workflow, error)
// CreateRun inserts a new workflow run.
CreateRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)
// GetRun retrieves a run by ID.
GetRun(ctx context.Context, id string) (*WorkflowRun, error)
// ListRuns retrieves runs, optionally filtered by workflow name and/or status.
ListRuns(ctx context.Context, workflowName *string, status *string, pageSize int, pageToken string) ([]*WorkflowRun, string, error)
// UpdateRunStatus sets the status (and optionally error and finished_at) on a run.
UpdateRunStatus(ctx context.Context, runID, status string, errMsg *string, finishedAt *time.Time) error
// UpdateRunOutput sets the output on a completed run.
UpdateRunOutput(ctx context.Context, runID string, output json.RawMessage) error
// CancelRun transitions a pending run to cancelled.
CancelRun(ctx context.Context, runID string) error
// AppendEvent appends an event to the event log and returns the assigned sequence number.
AppendEvent(ctx context.Context, event *Event) (int, error)
// GetEvents retrieves all events for a run in order.
GetEvents(ctx context.Context, runID string) ([]*Event, error)
// GetEventsAfterSeq retrieves events for a run starting after a given sequence number.
GetEventsAfterSeq(ctx context.Context, runID string, afterSeq int) ([]*Event, error)
// PutBlob stores a large input/output in blob storage by content hash.
// If the blob already exists, only the ref_count is incremented.
PutBlob(ctx context.Context, hash string, data []byte) error
// GetBlob retrieves a blob by content hash.
GetBlob(ctx context.Context, hash string) ([]byte, error)
// DeleteBlob decrements the ref_count and deletes the blob if it reaches 0.
DeleteBlob(ctx context.Context, hash string) error
// UpsertStepExecution inserts or updates a step execution record.
UpsertStepExecution(ctx context.Context, exec *StepExecution) error
// GetStepExecution retrieves a step execution.
GetStepExecution(ctx context.Context, runID, stepName string) (*StepExecution, error)
// ListStepExecutions retrieves all step executions for a run.
ListStepExecutions(ctx context.Context, runID string) ([]*StepExecution, error)
}
WorkflowStore is the persistence interface for workflows and runs.