Documentation
¶
Index ¶
- func Permanent(code string, err error) error
- func Require(handles ...StepHandle) []model.Dependency
- func Retryable(code string, err error) error
- func ToRunner(executor Executor) runner.Runner
- type ArtifactObject
- type ArtifactOption
- type ArtifactRef
- type ArtifactStore
- type Config
- type Entrypoint
- type EntrypointFunc
- type Error
- type Executor
- type ExecutorFunc
- type FileArtifactStore
- type OperatorService
- type Package
- type PackageBuilder
- type Projection
- type ProjectionStore
- type QueueConfig
- type Registry
- type RunBuilder
- type RunHandle
- type RunOption
- type Runtime
- func (rt *Runtime) CancelRun(ctx context.Context, runID model.WorkflowID) error
- func (rt *Runtime) Close() error
- func (rt *Runtime) Projection(ctx context.Context, name string) (Projection, error)
- func (rt *Runtime) RegisterExecutor(executor Executor) error
- func (rt *Runtime) RegisterPackage(pkg *Package) error
- func (rt *Runtime) Result(ctx context.Context, runID model.WorkflowID, stepID model.OpID) (*model.OpResult, error)
- func (rt *Runtime) RetryStep(ctx context.Context, runID model.WorkflowID, stepID model.OpID) error
- func (rt *Runtime) RunOnce(ctx context.Context) (*scheduler.CycleResult, error)
- func (rt *Runtime) StartRun(ctx context.Context, packageName string, input any, opts ...RunOption) (*RunHandle, error)
- func (rt *Runtime) StartWorkers(ctx context.Context, opts ...WorkerOption) error
- func (rt *Runtime) Workflow(ctx context.Context, runID model.WorkflowID) (*model.WorkflowRun, error)
- type SQLiteProjectionStore
- type StepContext
- func (s *StepContext) Artifact(name, contentType string, body []byte, opts ...ArtifactOption) (model.ArtifactID, error)
- func (s *StepContext) DependencyData(opID model.OpID, out any) error
- func (s *StepContext) DependencyResult(opID model.OpID) (*model.OpResult, error)
- func (s *StepContext) Emit(id string, input any, opts StepOpts) (model.OpID, error)
- func (s *StepContext) Input(out any) error
- func (s *StepContext) Lease() model.Lease
- func (s *StepContext) Now() time.Time
- func (s *StepContext) Projection(name string) (Projection, error)
- func (s *StepContext) RawInput() json.RawMessage
- func (s *StepContext) Record(collection, key string, data any) error
- func (s *StepContext) Result(data any) error
- func (s *StepContext) Step() model.OpSpec
- func (s *StepContext) StoreArtifact(name, contentType string, body []byte, opts ...ArtifactOption) (ArtifactRef, error)
- func (s *StepContext) Workflow() model.WorkflowRun
- type StepHandle
- type StepOpts
- type StoreConfig
- type TypedExecutor
- type TypedExecutorFunc
- type WorkerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Require ¶
func Require(handles ...StepHandle) []model.Dependency
Types ¶
type ArtifactObject ¶
type ArtifactObject struct {
ID string
Name string
Kind string
ContentType string
Metadata map[string]string
Body []byte
}
ArtifactObject is a blob that should be stored outside the engine result row.
type ArtifactOption ¶
type ArtifactOption func(*model.ArtifactWrite)
ArtifactOption customizes an artifact emitted by a step.
func ArtifactID ¶
func ArtifactID(id string) ArtifactOption
func ArtifactKind ¶
func ArtifactKind(kind string) ArtifactOption
func ArtifactMetadata ¶
func ArtifactMetadata(metadata map[string]string) ArtifactOption
type ArtifactRef ¶
type ArtifactRef struct {
ID string `json:"id"`
URI string `json:"uri"`
Name string `json:"name"`
Kind string `json:"kind"`
ContentType string `json:"contentType"`
Metadata map[string]string `json:"metadata,omitempty"`
Size int `json:"size"`
}
ArtifactRef points to an artifact stored by an ArtifactStore.
type ArtifactStore ¶
type ArtifactStore interface {
Put(ctx context.Context, artifact ArtifactObject) (ArtifactRef, error)
Open(ctx context.Context, id string) (io.ReadCloser, ArtifactRef, error)
}
ArtifactStore stores large artifact bytes outside the engine DB.
type Config ¶
type Config struct {
Store StoreConfig
ArtifactStore ArtifactStore
ProjectionStore ProjectionStore
WorkerID string
MaxWorkers int
PollInterval time.Duration
LeaseDuration time.Duration
Queues map[model.QueueKey]QueueConfig
}
Config configures an embeddable workflow Runtime.
type Entrypoint ¶
type Entrypoint interface {
Start(ctx context.Context, run *RunBuilder, rawInput json.RawMessage) error
}
Entrypoint creates the initial durable step graph for a run.
type EntrypointFunc ¶
type EntrypointFunc[I any] func(context.Context, *RunBuilder, I) error
EntrypointFunc adapts a typed function to an Entrypoint.
func (EntrypointFunc[I]) Start ¶
func (f EntrypointFunc[I]) Start(ctx context.Context, run *RunBuilder, rawInput json.RawMessage) error
type Error ¶
Error marks a workflow step failure with stable operator-facing metadata. It implements the scheduler's OpError carrier interface, so returning it from an Executor causes the existing scheduler failure path to persist the embedded model.OpError.
type Executor ¶
type Executor interface {
Kind() string
Execute(ctx context.Context, step *StepContext) error
}
Executor executes one durable workflow step kind. It is the workflow-native facade over the lower-level engine/runner.Runner interface.
func NewExecutor ¶
NewExecutor creates an untyped executor from a function.
func NewTypedExecutor ¶
NewTypedExecutor creates an executor that decodes step input into I before invoking fn.
type ExecutorFunc ¶
type ExecutorFunc struct {
KindName string
Func func(context.Context, *StepContext) error
}
ExecutorFunc adapts a function to an Executor.
func (ExecutorFunc) Execute ¶
func (e ExecutorFunc) Execute(ctx context.Context, step *StepContext) error
func (ExecutorFunc) Kind ¶
func (e ExecutorFunc) Kind() string
type FileArtifactStore ¶
type FileArtifactStore struct {
// contains filtered or unexported fields
}
FileArtifactStore stores artifacts under a local filesystem root. It is the first external artifact backend for embedded/local workflow runtimes.
func NewFileArtifactStore ¶
func NewFileArtifactStore(root string) *FileArtifactStore
func (*FileArtifactStore) Open ¶
func (s *FileArtifactStore) Open(ctx context.Context, id string) (io.ReadCloser, ArtifactRef, error)
func (*FileArtifactStore) Put ¶
func (s *FileArtifactStore) Put(ctx context.Context, artifact ArtifactObject) (ArtifactRef, error)
type OperatorService ¶
type OperatorService interface {
RetryOp(ctx context.Context, workflowID model.WorkflowID, opID model.OpID) error
CancelWorkflow(ctx context.Context, workflowID model.WorkflowID) error
}
OperatorService is the minimal mutation surface needed by embedded operator controls. SQLiteStore provides this through the existing engineview service; future backends can provide their own implementation without changing the public Runtime methods.
type Package ¶
type Package struct {
Name string
DisplayName string
Entrypoint Entrypoint
}
Package describes a workflow package/domain that can start durable runs.
type PackageBuilder ¶
type PackageBuilder struct {
// contains filtered or unexported fields
}
func NewPackage ¶
func NewPackage(name string) *PackageBuilder
func (*PackageBuilder) Build ¶
func (b *PackageBuilder) Build() *Package
func (*PackageBuilder) DisplayName ¶
func (b *PackageBuilder) DisplayName(name string) *PackageBuilder
func (*PackageBuilder) Entrypoint ¶
func (b *PackageBuilder) Entrypoint(entrypoint Entrypoint) *PackageBuilder
type Projection ¶
type Projection interface {
Exec(ctx context.Context, query string, args ...any) (int64, error)
Query(ctx context.Context, query string, args ...any) ([]map[string]any, error)
}
Projection is a small database-like interface exposed to executors.
type ProjectionStore ¶
type ProjectionStore interface {
Projection(ctx context.Context, name string) (Projection, error)
}
ProjectionStore resolves domain/package projection databases. Projections are query-oriented read models owned by workflow packages, separate from engine scheduling state.
type QueueConfig ¶
type QueueConfig struct {
MaxWorkers int
RateLimit *model.RateLimitPolicy
}
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is a small workflow-native executor registry that can produce the existing runner registry used by the scheduler.
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) RunnerRegistry ¶
type RunBuilder ¶
type RunBuilder struct {
// contains filtered or unexported fields
}
RunBuilder constructs the initial step graph for a workflow run.
func (*RunBuilder) Metadata ¶
func (b *RunBuilder) Metadata(key, value string)
func (*RunBuilder) Name ¶
func (b *RunBuilder) Name(name string)
func (*RunBuilder) Step ¶
func (b *RunBuilder) Step(id string, input any, opts StepOpts) (StepHandle, error)
Step appends an initial step to the run graph.
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime is the embeddable workflow engine facade. It wraps the existing store, runner registry, and scheduler behind workflow-native concepts.
func (*Runtime) CancelRun ¶
CancelRun cancels pending, ready, and running steps for a run. The current SQLite implementation marks running steps canceled and removes leases; future phases should add cooperative executor cancellation for in-flight subprocesses.
func (*Runtime) Projection ¶
func (*Runtime) RegisterExecutor ¶
func (*Runtime) RegisterPackage ¶
func (*Runtime) RetryStep ¶
RetryStep moves a failed step back to ready so workers can execute it again.
func (*Runtime) StartWorkers ¶
func (rt *Runtime) StartWorkers(ctx context.Context, opts ...WorkerOption) error
StartWorkers runs scheduler cycles until ctx is canceled. It is intentionally context-driven so embedded applications can use their own lifecycle manager.
func (*Runtime) Workflow ¶
func (rt *Runtime) Workflow(ctx context.Context, runID model.WorkflowID) (*model.WorkflowRun, error)
type SQLiteProjectionStore ¶
type SQLiteProjectionStore struct {
// contains filtered or unexported fields
}
SQLiteProjectionStore stores one SQLite projection database per projection name under a local directory.
func NewSQLiteProjectionStore ¶
func NewSQLiteProjectionStore(root string) *SQLiteProjectionStore
func (*SQLiteProjectionStore) Close ¶
func (s *SQLiteProjectionStore) Close() error
func (*SQLiteProjectionStore) Projection ¶
func (s *SQLiteProjectionStore) Projection(ctx context.Context, name string) (Projection, error)
type StepContext ¶
type StepContext struct {
// contains filtered or unexported fields
}
StepContext is the public executor-facing view of a durable workflow step. It wraps runner.RunContext and accumulates the result data, records, artifacts, and dynamically emitted child steps that will be persisted by the existing scheduler/store completion path.
func (*StepContext) Artifact ¶
func (s *StepContext) Artifact(name, contentType string, body []byte, opts ...ArtifactOption) (model.ArtifactID, error)
Artifact appends an artifact to the step result and returns its stable ID.
func (*StepContext) DependencyData ¶
func (s *StepContext) DependencyData(opID model.OpID, out any) error
DependencyData decodes a dependency result's Data field into out.
func (*StepContext) DependencyResult ¶
DependencyResult loads a completed dependency result by step/op ID.
func (*StepContext) Emit ¶
Emit appends a child step to this step's result. The child is persisted by the store when the current step completes successfully.
func (*StepContext) Input ¶
func (s *StepContext) Input(out any) error
Input decodes this step's JSON input into out.
func (*StepContext) Lease ¶
func (s *StepContext) Lease() model.Lease
Lease returns the current lease metadata.
func (*StepContext) Now ¶
func (s *StepContext) Now() time.Time
Now returns the scheduler-provided execution timestamp.
func (*StepContext) Projection ¶
func (s *StepContext) Projection(name string) (Projection, error)
Projection resolves a package/domain projection by name.
func (*StepContext) RawInput ¶
func (s *StepContext) RawInput() json.RawMessage
RawInput returns a copy of the raw JSON input for advanced executors.
func (*StepContext) Record ¶
func (s *StepContext) Record(collection, key string, data any) error
Record appends a projection-style record write to the step result.
func (*StepContext) Result ¶
func (s *StepContext) Result(data any) error
Result stores structured step result data. It is serialized into the engine result row when the step completes.
func (*StepContext) Step ¶
func (s *StepContext) Step() model.OpSpec
Step returns the current durable step/op spec.
func (*StepContext) StoreArtifact ¶
func (s *StepContext) StoreArtifact(name, contentType string, body []byte, opts ...ArtifactOption) (ArtifactRef, error)
StoreArtifact stores bytes in the configured external ArtifactStore and adds a small reference artifact to the step result so existing result/artifact APIs can still point operators to the external object.
func (*StepContext) Workflow ¶
func (s *StepContext) Workflow() model.WorkflowRun
Workflow returns the current durable workflow run.
type StepHandle ¶
type StepOpts ¶
type StepOpts struct {
Kind string
Queue model.QueueKey
DedupKey string
DependsOn []model.Dependency
Retry model.RetryPolicy
Metadata map[string]string
Site model.SiteName
ParentID *model.OpID
}
StepOpts customizes a dynamically emitted child step.
type StoreConfig ¶
type StoreConfig interface {
Open(context.Context) (storecontract.Store, func() error, error)
OperatorService() OperatorService
}
StoreConfig opens the durable runtime store used by Runtime.
func SQLiteStore ¶
func SQLiteStore(path string) StoreConfig
SQLiteStore configures Runtime to use the existing SQLite engine store.
type TypedExecutor ¶
type TypedExecutor[I any] interface { Kind() string ExecuteTyped(ctx context.Context, step *StepContext, input I) error }
TypedExecutor executes a step after StepContext input has been decoded into I.
type TypedExecutorFunc ¶
type TypedExecutorFunc[I any] struct { KindName string Func func(context.Context, *StepContext, I) error }
TypedExecutorFunc adapts a typed function to an Executor.
func (TypedExecutorFunc[I]) Execute ¶
func (e TypedExecutorFunc[I]) Execute(ctx context.Context, step *StepContext) error
func (TypedExecutorFunc[I]) ExecuteTyped ¶
func (e TypedExecutorFunc[I]) ExecuteTyped(ctx context.Context, step *StepContext, input I) error
func (TypedExecutorFunc[I]) Kind ¶
func (e TypedExecutorFunc[I]) Kind() string
type WorkerOption ¶
type WorkerOption func(*workerOptions)
func WithWorkerMaxCycles ¶
func WithWorkerMaxCycles(maxCycles int) WorkerOption
func WithWorkerPollInterval ¶
func WithWorkerPollInterval(interval time.Duration) WorkerOption