workflow

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatePending      = "PENDING"
	StateRunning      = "RUNNING"
	StateCompleted    = "COMPLETED"
	StateFailed       = "FAILED"
	StateWaitingHuman = "WAITING_HUMAN"
	StateRetrying     = "RETRYING"
	StateFallback     = "FALLBACK"
	StateCompensating = "COMPENSATING"
)

Workflow and Step statuses.

View Source
const (
	ActionRetry  = "retry"
	ActionCancel = "cancel"
)

Action types for human intervention.

View Source
const (
	BackoffExponential = "exponential"
	BackoffConstant    = "constant"
)

Backoff strategies.

View Source
const (
	EventWorkflowStarted      = "workflow.started"
	EventWorkflowCompleted    = "workflow.completed"
	EventWorkflowFailed       = "workflow.failed"
	EventWorkflowCompensating = "workflow.compensating"
	EventStepStarted          = "workflow.step.started"
	EventStepCompleted        = "workflow.step.completed"
	EventStepFailed           = "workflow.step.failed"
	EventStepRetrying         = "workflow.step.retrying"
	EventStepFallback         = "workflow.step.fallback"
	EventWaitingHuman         = "workflow.waiting_human"
)

Workflow Event types.

View Source
const (
	StepPending   = mdk.StepPending
	StepRunning   = mdk.StepRunning
	StepCompleted = mdk.StepCompleted
	StepFailed    = mdk.StepFailed
	StepRetrying  = mdk.StepRetrying
	StepSkipped   = mdk.StepSkipped
)
View Source
const (
	EscalationWaitHuman = "wait_human"
)

Escalation strategies.

Variables

This section is empty.

Functions

func AcquireLock

func AcquireLock(ctx context.Context, key string, ttl time.Duration, timeout time.Duration) (bool, error)

AcquireLock attempts to acquire a lock for the current workflow.

func Emit

func Emit(ctx context.Context, eventType string, payload any) error

Emit sends an event to the EventBus attached to the current workflow runner. It is idempotent: it tracks emitted events in the StateStore to prevent duplicates during resumption.

func GetEventBus

func GetEventBus(ctx context.Context) mdk.EventBus

GetEventBus retrieves the EventBus attached to the current workflow runner.

func GetWorkflowID

func GetWorkflowID(ctx context.Context) string

GetWorkflowID retrieves the current executing Workflow ID from the context.

func RegisterStore added in v0.2.0

func RegisterStore(name string, provider StoreProvider)

func ReleaseLock

func ReleaseLock(ctx context.Context, key string) error

ReleaseLock releases a lock for the current workflow.

func WithRunner

func WithRunner(ctx context.Context, r *Runner, id string) context.Context

WithRunner injects the Runner and Workflow ID into the context.

Types

type InMemStore

type InMemStore struct {
	// contains filtered or unexported fields
}

InMemStore is an in-memory implementation of StateStore for local testing.

func NewInMemStore

func NewInMemStore() *InMemStore

NewInMemStore creates a new InMemStore.

func (*InMemStore) Close

func (s *InMemStore) Close() error

func (*InMemStore) GetInput

func (s *InMemStore) GetInput(ctx context.Context, execID string) ([]byte, error)

func (*InMemStore) GetState

func (s *InMemStore) GetState(ctx context.Context, execID string) (map[string]string, error)

func (*InMemStore) GetStepOutput

func (s *InMemStore) GetStepOutput(ctx context.Context, execID string, stepID string) ([]byte, error)

func (*InMemStore) InitializeExecution

func (s *InMemStore) InitializeExecution(ctx context.Context, execID string, input []byte) error

func (*InMemStore) IsEventEmitted

func (s *InMemStore) IsEventEmitted(ctx context.Context, execID string, eventType string) (bool, error)

func (*InMemStore) ListExecutions

func (s *InMemStore) ListExecutions(ctx context.Context, state string) ([]string, error)

func (*InMemStore) RecordEventEmitted

func (s *InMemStore) RecordEventEmitted(ctx context.Context, execID string, eventType string) error

func (*InMemStore) SaveInput

func (s *InMemStore) SaveInput(ctx context.Context, execID string, input []byte) error

func (*InMemStore) SaveState

func (s *InMemStore) SaveState(ctx context.Context, execID string, stepID string, state string) error

func (*InMemStore) SaveStepOutput

func (s *InMemStore) SaveStepOutput(ctx context.Context, execID string, stepID string, output []byte) error

func (*InMemStore) SetTTL

func (s *InMemStore) SetTTL(ctx context.Context, execID string, ttl time.Duration) error

type Parser

type Parser struct{}

Parser parses and validates workflow definitions.

func NewParser

func NewParser() *Parser

NewParser creates a new Parser.

func (*Parser) Parse

func (p *Parser) Parse(data []byte) (*Workflow, error)

Parse parses YAML bytes into a Workflow and validates its DAG.

func (*Parser) Validate

func (p *Parser) Validate(wf *Workflow) error

Validate ensures the workflow is a valid DAG.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages the registration of workflow definitions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new Registry.

func (*Registry) Get

func (r *Registry) Get(name string) (*Workflow, error)

Get retrieves a workflow definition by name.

func (*Registry) List

func (r *Registry) List() []*Workflow

List returns all registered workflows.

func (*Registry) Register

func (r *Registry) Register(wf *Workflow) error

Register adds a workflow definition to the registry.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes workflow instances.

func NewRunner

func NewRunner(bus mdk.EventBus, store StateStore, locker locking.Locker) *Runner

NewRunner creates a new Runner.

func (*Runner) Cancel

func (r *Runner) Cancel(ctx context.Context, runID string) error

Cancel cancels a running workflow.

func (*Runner) Execute

func (r *Runner) Execute(ctx context.Context, workflowID string, input map[string]any) (string, error)

Execute starts an asynchronous workflow run and returns the run ID.

func (*Runner) ExecuteSync

func (r *Runner) ExecuteSync(ctx context.Context, id string, workflowID string, input map[string]any) (map[string]any, error)

ExecuteSync runs a workflow synchronously and returns the output results map.

func (*Runner) ExecuteSyncWorkflow

func (r *Runner) ExecuteSyncWorkflow(ctx context.Context, id string, wf *Workflow, input any) (map[string]any, error)

ExecuteSyncWorkflow runs a workflow synchronously and returns the results map.

func (*Runner) GetStateStore

func (r *Runner) GetStateStore() StateStore

GetStateStore returns the StateStore attached to the runner.

func (*Runner) Register

func (r *Runner) Register(w mdk.Workflow) error

Register registers a workflow definition.

func (*Runner) RegisterHandler

func (r *Runner) RegisterHandler(name string, handler mdk.StepHandler) error

RegisterHandler registers a named step handler.

func (*Runner) ResumeExecution

func (r *Runner) ResumeExecution(ctx context.Context, id string, wf *Workflow) (map[string]any, error)

ResumeExecution resumes a previously crashed or interrupted workflow.

func (*Runner) SetRuntime

func (r *Runner) SetRuntime(rt mdk.Runtime)

SetRuntime sets the active mdk.Runtime for execution context.

func (*Runner) Status

func (r *Runner) Status(ctx context.Context, runID string) (mdk.StepStatus, error)

Status returns the current execution status of a run.

type StateStore

type StateStore = mdk.StateStore

StateStore defines the interface for checkpointing workflow states.

func GetStateStore

func GetStateStore(ctx context.Context) StateStore

GetStateStore retrieves the StateStore attached to the current workflow runner.

type Step

type Step = mdk.Step

type StepContext

type StepContext = mdk.StepContext

type StepHandler

type StepHandler = mdk.StepHandler

type StepResult

type StepResult = mdk.StepResult

type StepStatus

type StepStatus = mdk.StepStatus

type StoreProvider added in v0.2.0

type StoreProvider = mdk.StoreProvider

func GetStore added in v0.2.0

func GetStore(name string) (StoreProvider, bool)

type Workflow

type Workflow = mdk.Workflow

Re-export/alias mdk workflow types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL