workflow

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Permanent

func Permanent(code string, err error) error

Permanent wraps err as a non-retryable workflow step error.

func Require

func Require(handles ...StepHandle) []model.Dependency

func Retryable

func Retryable(code string, err error) error

Retryable wraps err as a retryable workflow step error. The stable code is used for metrics, runtime events, and operator filtering.

func ToRunner

func ToRunner(executor Executor) runner.Runner

ToRunner adapts a workflow Executor to the existing engine runner interface.

Types

type ArtifactObject

type ArtifactObject struct {
	ID          string
	Name        string
	Kind        string
	ContentType string
	Metadata    map[string]string
	Body        []byte
}

ArtifactObject is a blob that should be stored outside the engine result row.

type ArtifactOption

type ArtifactOption func(*model.ArtifactWrite)

ArtifactOption customizes an artifact emitted by a step.

func ArtifactID

func ArtifactID(id string) ArtifactOption

func ArtifactKind

func ArtifactKind(kind string) ArtifactOption

func ArtifactMetadata

func ArtifactMetadata(metadata map[string]string) ArtifactOption

type ArtifactRef

type ArtifactRef struct {
	ID          string            `json:"id"`
	URI         string            `json:"uri"`
	Name        string            `json:"name"`
	Kind        string            `json:"kind"`
	ContentType string            `json:"contentType"`
	Metadata    map[string]string `json:"metadata,omitempty"`
	Size        int               `json:"size"`
}

ArtifactRef points to an artifact stored by an ArtifactStore.

type ArtifactStore

type ArtifactStore interface {
	Put(ctx context.Context, artifact ArtifactObject) (ArtifactRef, error)
	Open(ctx context.Context, id string) (io.ReadCloser, ArtifactRef, error)
}

ArtifactStore stores large artifact bytes outside the engine DB.

type Config

type Config struct {
	Store           StoreConfig
	ArtifactStore   ArtifactStore
	ProjectionStore ProjectionStore

	WorkerID      string
	MaxWorkers    int
	PollInterval  time.Duration
	LeaseDuration time.Duration

	Queues map[model.QueueKey]QueueConfig
}

Config configures an embeddable workflow Runtime.

type Entrypoint

type Entrypoint interface {
	Start(ctx context.Context, run *RunBuilder, rawInput json.RawMessage) error
}

Entrypoint creates the initial durable step graph for a run.

type EntrypointFunc

type EntrypointFunc[I any] func(context.Context, *RunBuilder, I) error

EntrypointFunc adapts a typed function to an Entrypoint.

func (EntrypointFunc[I]) Start

func (f EntrypointFunc[I]) Start(ctx context.Context, run *RunBuilder, rawInput json.RawMessage) error

type Error

type Error struct {
	Code      string
	Message   string
	Retryable bool
	Details   json.RawMessage
	Cause     error
}

Error marks a workflow step failure with stable operator-facing metadata. It implements the scheduler's OpError carrier interface, so returning it from an Executor causes the existing scheduler failure path to persist the embedded model.OpError.

func (*Error) Error

func (e *Error) Error() string

func (*Error) OpError

func (e *Error) OpError() model.OpError

func (*Error) Unwrap

func (e *Error) Unwrap() error

type Executor

type Executor interface {
	Kind() string
	Execute(ctx context.Context, step *StepContext) error
}

Executor executes one durable workflow step kind. It is the workflow-native facade over the lower-level engine/runner.Runner interface.

func NewExecutor

func NewExecutor(kind string, fn func(context.Context, *StepContext) error) Executor

NewExecutor creates an untyped executor from a function.

func NewTypedExecutor

func NewTypedExecutor[I any](kind string, fn func(context.Context, *StepContext, I) error) Executor

NewTypedExecutor creates an executor that decodes step input into I before invoking fn.

type ExecutorFunc

type ExecutorFunc struct {
	KindName string
	Func     func(context.Context, *StepContext) error
}

ExecutorFunc adapts a function to an Executor.

func (ExecutorFunc) Execute

func (e ExecutorFunc) Execute(ctx context.Context, step *StepContext) error

func (ExecutorFunc) Kind

func (e ExecutorFunc) Kind() string

type FileArtifactStore

type FileArtifactStore struct {
	// contains filtered or unexported fields
}

FileArtifactStore stores artifacts under a local filesystem root. It is the first external artifact backend for embedded/local workflow runtimes.

func NewFileArtifactStore

func NewFileArtifactStore(root string) *FileArtifactStore

func (*FileArtifactStore) Open

func (*FileArtifactStore) Put

type OperatorService

type OperatorService interface {
	RetryOp(ctx context.Context, workflowID model.WorkflowID, opID model.OpID) error
	CancelWorkflow(ctx context.Context, workflowID model.WorkflowID) error
}

OperatorService is the minimal mutation surface needed by embedded operator controls. SQLiteStore provides this through the existing engineview service; future backends can provide their own implementation without changing the public Runtime methods.

type Package

type Package struct {
	Name        string
	DisplayName string
	Entrypoint  Entrypoint
}

Package describes a workflow package/domain that can start durable runs.

type PackageBuilder

type PackageBuilder struct {
	// contains filtered or unexported fields
}

func NewPackage

func NewPackage(name string) *PackageBuilder

func (*PackageBuilder) Build

func (b *PackageBuilder) Build() *Package

func (*PackageBuilder) DisplayName

func (b *PackageBuilder) DisplayName(name string) *PackageBuilder

func (*PackageBuilder) Entrypoint

func (b *PackageBuilder) Entrypoint(entrypoint Entrypoint) *PackageBuilder

type Projection

type Projection interface {
	Exec(ctx context.Context, query string, args ...any) (int64, error)
	Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
}

Projection is a small database-like interface exposed to executors.

type ProjectionStore

type ProjectionStore interface {
	Projection(ctx context.Context, name string) (Projection, error)
}

ProjectionStore resolves domain/package projection databases. Projections are query-oriented read models owned by workflow packages, separate from engine scheduling state.

type QueueConfig

type QueueConfig struct {
	MaxWorkers int
	RateLimit  *model.RateLimitPolicy
}

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is a small workflow-native executor registry that can produce the existing runner registry used by the scheduler.

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Kinds

func (r *Registry) Kinds() []string

func (*Registry) Register

func (r *Registry) Register(executor Executor) error

func (*Registry) RunnerRegistry

func (r *Registry) RunnerRegistry() *runner.Registry

type RunBuilder

type RunBuilder struct {
	// contains filtered or unexported fields
}

RunBuilder constructs the initial step graph for a workflow run.

func (*RunBuilder) Metadata

func (b *RunBuilder) Metadata(key, value string)

func (*RunBuilder) Name

func (b *RunBuilder) Name(name string)

func (*RunBuilder) Step

func (b *RunBuilder) Step(id string, input any, opts StepOpts) (StepHandle, error)

Step appends an initial step to the run graph.

type RunHandle

type RunHandle struct {
	ID      model.WorkflowID
	Package string
	Name    string
}

type RunOption

type RunOption func(*runOptions)

func WithRunID

func WithRunID(id string) RunOption

func WithRunMetadata

func WithRunMetadata(metadata map[string]string) RunOption

func WithRunName

func WithRunName(name string) RunOption

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime is the embeddable workflow engine facade. It wraps the existing store, runner registry, and scheduler behind workflow-native concepts.

func NewRuntime

func NewRuntime(ctx context.Context, cfg Config) (*Runtime, error)

func (*Runtime) CancelRun

func (rt *Runtime) CancelRun(ctx context.Context, runID model.WorkflowID) error

CancelRun cancels pending, ready, and running steps for a run. The current SQLite implementation marks running steps canceled and removes leases; future phases should add cooperative executor cancellation for in-flight subprocesses.

func (*Runtime) Close

func (rt *Runtime) Close() error

func (*Runtime) Projection

func (rt *Runtime) Projection(ctx context.Context, name string) (Projection, error)

func (*Runtime) RegisterExecutor

func (rt *Runtime) RegisterExecutor(executor Executor) error

func (*Runtime) RegisterPackage

func (rt *Runtime) RegisterPackage(pkg *Package) error

func (*Runtime) Result

func (rt *Runtime) Result(ctx context.Context, runID model.WorkflowID, stepID model.OpID) (*model.OpResult, error)

func (*Runtime) RetryStep

func (rt *Runtime) RetryStep(ctx context.Context, runID model.WorkflowID, stepID model.OpID) error

RetryStep moves a failed step back to ready so workers can execute it again.

func (*Runtime) RunOnce

func (rt *Runtime) RunOnce(ctx context.Context) (*scheduler.CycleResult, error)

func (*Runtime) StartRun

func (rt *Runtime) StartRun(ctx context.Context, packageName string, input any, opts ...RunOption) (*RunHandle, error)

func (*Runtime) StartWorkers

func (rt *Runtime) StartWorkers(ctx context.Context, opts ...WorkerOption) error

StartWorkers runs scheduler cycles until ctx is canceled. It is intentionally context-driven so embedded applications can use their own lifecycle manager.

func (*Runtime) Workflow

func (rt *Runtime) Workflow(ctx context.Context, runID model.WorkflowID) (*model.WorkflowRun, error)

type SQLiteProjectionStore

type SQLiteProjectionStore struct {
	// contains filtered or unexported fields
}

SQLiteProjectionStore stores one SQLite projection database per projection name under a local directory.

func NewSQLiteProjectionStore

func NewSQLiteProjectionStore(root string) *SQLiteProjectionStore

func (*SQLiteProjectionStore) Close

func (s *SQLiteProjectionStore) Close() error

func (*SQLiteProjectionStore) Projection

func (s *SQLiteProjectionStore) Projection(ctx context.Context, name string) (Projection, error)

type StepContext

type StepContext struct {
	// contains filtered or unexported fields
}

StepContext is the public executor-facing view of a durable workflow step. It wraps runner.RunContext and accumulates the result data, records, artifacts, and dynamically emitted child steps that will be persisted by the existing scheduler/store completion path.

func (*StepContext) Artifact

func (s *StepContext) Artifact(name, contentType string, body []byte, opts ...ArtifactOption) (model.ArtifactID, error)

Artifact appends an artifact to the step result and returns its stable ID.

func (*StepContext) DependencyData

func (s *StepContext) DependencyData(opID model.OpID, out any) error

DependencyData decodes a dependency result's Data field into out.

func (*StepContext) DependencyResult

func (s *StepContext) DependencyResult(opID model.OpID) (*model.OpResult, error)

DependencyResult loads a completed dependency result by step/op ID.

func (*StepContext) Emit

func (s *StepContext) Emit(id string, input any, opts StepOpts) (model.OpID, error)

Emit appends a child step to this step's result. The child is persisted by the store when the current step completes successfully.

func (*StepContext) Input

func (s *StepContext) Input(out any) error

Input decodes this step's JSON input into out.

func (*StepContext) Lease

func (s *StepContext) Lease() model.Lease

Lease returns the current lease metadata.

func (*StepContext) Now

func (s *StepContext) Now() time.Time

Now returns the scheduler-provided execution timestamp.

func (*StepContext) Projection

func (s *StepContext) Projection(name string) (Projection, error)

Projection resolves a package/domain projection by name.

func (*StepContext) RawInput

func (s *StepContext) RawInput() json.RawMessage

RawInput returns a copy of the raw JSON input for advanced executors.

func (*StepContext) Record

func (s *StepContext) Record(collection, key string, data any) error

Record appends a projection-style record write to the step result.

func (*StepContext) Result

func (s *StepContext) Result(data any) error

Result stores structured step result data. It is serialized into the engine result row when the step completes.

func (*StepContext) Step

func (s *StepContext) Step() model.OpSpec

Step returns the current durable step/op spec.

func (*StepContext) StoreArtifact

func (s *StepContext) StoreArtifact(name, contentType string, body []byte, opts ...ArtifactOption) (ArtifactRef, error)

StoreArtifact stores bytes in the configured external ArtifactStore and adds a small reference artifact to the step result so existing result/artifact APIs can still point operators to the external object.

func (*StepContext) Workflow

func (s *StepContext) Workflow() model.WorkflowRun

Workflow returns the current durable workflow run.

type StepHandle

type StepHandle struct {
	ID model.OpID
}

type StepOpts

type StepOpts struct {
	Kind      string
	Queue     model.QueueKey
	DedupKey  string
	DependsOn []model.Dependency
	Retry     model.RetryPolicy
	Metadata  map[string]string
	Site      model.SiteName
	ParentID  *model.OpID
}

StepOpts customizes a dynamically emitted child step.

type StoreConfig

type StoreConfig interface {
	Open(context.Context) (storecontract.Store, func() error, error)
	OperatorService() OperatorService
}

StoreConfig opens the durable runtime store used by Runtime.

func SQLiteStore

func SQLiteStore(path string) StoreConfig

SQLiteStore configures Runtime to use the existing SQLite engine store.

type TypedExecutor

type TypedExecutor[I any] interface {
	Kind() string
	ExecuteTyped(ctx context.Context, step *StepContext, input I) error
}

TypedExecutor executes a step after StepContext input has been decoded into I.

type TypedExecutorFunc

type TypedExecutorFunc[I any] struct {
	KindName string
	Func     func(context.Context, *StepContext, I) error
}

TypedExecutorFunc adapts a typed function to an Executor.

func (TypedExecutorFunc[I]) Execute

func (e TypedExecutorFunc[I]) Execute(ctx context.Context, step *StepContext) error

func (TypedExecutorFunc[I]) ExecuteTyped

func (e TypedExecutorFunc[I]) ExecuteTyped(ctx context.Context, step *StepContext, input I) error

func (TypedExecutorFunc[I]) Kind

func (e TypedExecutorFunc[I]) Kind() string

type WorkerOption

type WorkerOption func(*workerOptions)

func WithWorkerMaxCycles

func WithWorkerMaxCycles(maxCycles int) WorkerOption

func WithWorkerPollInterval

func WithWorkerPollInterval(interval time.Duration) WorkerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL