workflow

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatePending      = "PENDING"
	StateRunning      = "RUNNING"
	StateCompleted    = "COMPLETED"
	StateFailed       = "FAILED"
	StateWaitingHuman = "WAITING_HUMAN"
	StateRetrying     = "RETRYING"
	StateFallback     = "FALLBACK"
	StateCompensating = "COMPENSATING"
)

Workflow and Step statuses.

View Source
const (
	ActionRetry  = "retry"
	ActionCancel = "cancel"
)

Action types for human intervention.

View Source
const (
	BackoffExponential = "exponential"
	BackoffConstant    = "constant"
)

Backoff strategies.

View Source
const (
	EventWorkflowStarted      = "workflow.started"
	EventWorkflowCompleted    = "workflow.completed"
	EventWorkflowFailed       = "workflow.failed"
	EventWorkflowCompensating = "workflow.compensating"
	EventStepStarted          = "workflow.step.started"
	EventStepCompleted        = "workflow.step.completed"
	EventStepFailed           = "workflow.step.failed"
	EventStepRetrying         = "workflow.step.retrying"
	EventStepFallback         = "workflow.step.fallback"
	EventWaitingHuman         = "workflow.waiting_human"
)

Workflow Event types.

View Source
const (
	StepPending   = mdk.StepPending
	StepRunning   = mdk.StepRunning
	StepCompleted = mdk.StepCompleted
	StepFailed    = mdk.StepFailed
	StepRetrying  = mdk.StepRetrying
	StepSkipped   = mdk.StepSkipped
)
View Source
const (
	EscalationWaitHuman = "wait_human"
)

Escalation strategies.

Variables

This section is empty.

Functions

func AcquireLock

func AcquireLock(ctx context.Context, key string, ttl time.Duration, timeout time.Duration) (bool, error)

AcquireLock attempts to acquire a lock for the current workflow.

func Emit

func Emit(ctx context.Context, eventType string, payload any) error

Emit sends an event to the EventBus attached to the current workflow runner. It is idempotent: it tracks emitted events in the StateStore to prevent duplicates during resumption.

func GetEventBus

func GetEventBus(ctx context.Context) mdk.EventBus

GetEventBus retrieves the EventBus attached to the current workflow runner.

func GetWorkflowID

func GetWorkflowID(ctx context.Context) string

GetWorkflowID retrieves the current executing Workflow ID from the context.

func ReleaseLock

func ReleaseLock(ctx context.Context, key string) error

ReleaseLock releases a lock for the current workflow.

func WithRunner

func WithRunner(ctx context.Context, r *Runner, id string) context.Context

WithRunner injects the Runner and Workflow ID into the context.

Types

type InMemStore

type InMemStore struct {
	// contains filtered or unexported fields
}

InMemStore is an in-memory implementation of StateStore for local testing.

func NewInMemStore

func NewInMemStore() *InMemStore

NewInMemStore creates a new InMemStore.

func (*InMemStore) Close

func (s *InMemStore) Close() error

func (*InMemStore) GetInput

func (s *InMemStore) GetInput(ctx context.Context, execID string) ([]byte, error)

func (*InMemStore) GetState

func (s *InMemStore) GetState(ctx context.Context, execID string) (map[string]string, error)

func (*InMemStore) GetStepOutput

func (s *InMemStore) GetStepOutput(ctx context.Context, execID string, stepID string) ([]byte, error)

func (*InMemStore) InitializeExecution

func (s *InMemStore) InitializeExecution(ctx context.Context, execID string, input []byte) error

func (*InMemStore) IsEventEmitted

func (s *InMemStore) IsEventEmitted(ctx context.Context, execID string, eventType string) (bool, error)

func (*InMemStore) ListExecutions

func (s *InMemStore) ListExecutions(ctx context.Context, state string) ([]string, error)

func (*InMemStore) RecordEventEmitted

func (s *InMemStore) RecordEventEmitted(ctx context.Context, execID string, eventType string) error

func (*InMemStore) SaveInput

func (s *InMemStore) SaveInput(ctx context.Context, execID string, input []byte) error

func (*InMemStore) SaveState

func (s *InMemStore) SaveState(ctx context.Context, execID string, stepID string, state string) error

func (*InMemStore) SaveStepOutput

func (s *InMemStore) SaveStepOutput(ctx context.Context, execID string, stepID string, output []byte) error

func (*InMemStore) SetTTL

func (s *InMemStore) SetTTL(ctx context.Context, execID string, ttl time.Duration) error

type NATSStore

type NATSStore struct {
	// contains filtered or unexported fields
}

NATSStore implements StateStore using NATS JetStream KeyValue.

func NewNATSStore

func NewNATSStore(ctx context.Context, nc *nats.Conn, bucketName string) (*NATSStore, error)

NewNATSStore creates a new NATSStore.

func (*NATSStore) GetInput

func (s *NATSStore) GetInput(ctx context.Context, execID string) ([]byte, error)

func (*NATSStore) GetState

func (s *NATSStore) GetState(ctx context.Context, execID string) (map[string]string, error)

func (*NATSStore) GetStepOutput

func (s *NATSStore) GetStepOutput(ctx context.Context, execID string, stepID string) ([]byte, error)

func (*NATSStore) InitializeExecution

func (s *NATSStore) InitializeExecution(ctx context.Context, execID string, input []byte) error

func (*NATSStore) IsEventEmitted

func (s *NATSStore) IsEventEmitted(ctx context.Context, execID string, eventType string) (bool, error)

func (*NATSStore) ListExecutions

func (s *NATSStore) ListExecutions(ctx context.Context, state string) ([]string, error)

func (*NATSStore) RecordEventEmitted

func (s *NATSStore) RecordEventEmitted(ctx context.Context, execID string, eventType string) error

func (*NATSStore) SaveInput

func (s *NATSStore) SaveInput(ctx context.Context, execID string, input []byte) error

func (*NATSStore) SaveState

func (s *NATSStore) SaveState(ctx context.Context, execID string, stepID string, state string) error

func (*NATSStore) SaveStepOutput

func (s *NATSStore) SaveStepOutput(ctx context.Context, execID string, stepID string, output []byte) error

func (*NATSStore) SetTTL

func (s *NATSStore) SetTTL(ctx context.Context, execID string, ttl time.Duration) error

type Parser

type Parser struct{}

Parser parses and validates workflow definitions.

func NewParser

func NewParser() *Parser

NewParser creates a new Parser.

func (*Parser) Parse

func (p *Parser) Parse(data []byte) (*Workflow, error)

Parse parses YAML bytes into a Workflow and validates its DAG.

func (*Parser) Validate

func (p *Parser) Validate(wf *Workflow) error

Validate ensures the workflow is a valid DAG.

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore implements StateStore using Redis.

func NewRedisStore

func NewRedisStore(client *redis.Client) *RedisStore

NewRedisStore creates a new RedisStore.

func (*RedisStore) GetInput

func (s *RedisStore) GetInput(ctx context.Context, execID string) ([]byte, error)

func (*RedisStore) GetState

func (s *RedisStore) GetState(ctx context.Context, execID string) (map[string]string, error)

func (*RedisStore) GetStepOutput

func (s *RedisStore) GetStepOutput(ctx context.Context, execID string, stepID string) ([]byte, error)

func (*RedisStore) InitializeExecution

func (s *RedisStore) InitializeExecution(ctx context.Context, execID string, input []byte) error

func (*RedisStore) IsEventEmitted

func (s *RedisStore) IsEventEmitted(ctx context.Context, execID string, eventType string) (bool, error)

func (*RedisStore) ListExecutions

func (s *RedisStore) ListExecutions(ctx context.Context, state string) ([]string, error)

func (*RedisStore) RecordEventEmitted

func (s *RedisStore) RecordEventEmitted(ctx context.Context, execID string, eventType string) error

func (*RedisStore) SaveInput

func (s *RedisStore) SaveInput(ctx context.Context, execID string, input []byte) error

func (*RedisStore) SaveState

func (s *RedisStore) SaveState(ctx context.Context, execID string, stepID string, state string) error

func (*RedisStore) SaveStepOutput

func (s *RedisStore) SaveStepOutput(ctx context.Context, execID string, stepID string, output []byte) error

func (*RedisStore) SetTTL

func (s *RedisStore) SetTTL(ctx context.Context, execID string, ttl time.Duration) error

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages the registration of workflow definitions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new Registry.

func (*Registry) Get

func (r *Registry) Get(name string) (*Workflow, error)

Get retrieves a workflow definition by name.

func (*Registry) List

func (r *Registry) List() []*Workflow

List returns all registered workflows.

func (*Registry) Register

func (r *Registry) Register(wf *Workflow) error

Register adds a workflow definition to the registry.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes workflow instances.

func NewRunner

func NewRunner(bus mdk.EventBus, store StateStore, locker locking.Locker) *Runner

NewRunner creates a new Runner.

func (*Runner) Cancel

func (r *Runner) Cancel(ctx context.Context, runID string) error

Cancel cancels a running workflow.

func (*Runner) Execute

func (r *Runner) Execute(ctx context.Context, workflowID string, input map[string]any) (string, error)

Execute starts an asynchronous workflow run and returns the run ID.

func (*Runner) ExecuteSync

func (r *Runner) ExecuteSync(ctx context.Context, id string, workflowID string, input map[string]any) (map[string]any, error)

ExecuteSync runs a workflow synchronously and returns the output results map.

func (*Runner) ExecuteSyncWorkflow

func (r *Runner) ExecuteSyncWorkflow(ctx context.Context, id string, wf *Workflow, input any) (map[string]any, error)

ExecuteSyncWorkflow runs a workflow synchronously and returns the results map.

func (*Runner) GetStateStore

func (r *Runner) GetStateStore() StateStore

GetStateStore returns the StateStore attached to the runner.

func (*Runner) Register

func (r *Runner) Register(w mdk.Workflow) error

Register registers a workflow definition.

func (*Runner) RegisterHandler

func (r *Runner) RegisterHandler(name string, handler mdk.StepHandler) error

RegisterHandler registers a named step handler.

func (*Runner) ResumeExecution

func (r *Runner) ResumeExecution(ctx context.Context, id string, wf *Workflow) (map[string]any, error)

ResumeExecution resumes a previously crashed or interrupted workflow.

func (*Runner) SetRuntime

func (r *Runner) SetRuntime(rt mdk.Runtime)

SetRuntime sets the active mdk.Runtime for execution context.

func (*Runner) Status

func (r *Runner) Status(ctx context.Context, runID string) (mdk.StepStatus, error)

Status returns the current execution status of a run.

type StateStore

type StateStore interface {
	// SaveState records the state of a specific step in a workflow execution.
	SaveState(ctx context.Context, execID string, stepID string, state string) error

	// GetState retrieves the state of all steps for a workflow execution.
	// Returns a map of stepID -> state (e.g. "COMPLETED", "FAILED").
	GetState(ctx context.Context, execID string) (map[string]string, error)

	// InitializeExecution sets up the initial state and input for a workflow.
	InitializeExecution(ctx context.Context, execID string, input []byte) error

	// SaveInput stores the initial input payload of the workflow for resumption.
	SaveInput(ctx context.Context, execID string, input []byte) error

	// GetInput retrieves the initial input payload of the workflow.
	GetInput(ctx context.Context, execID string) ([]byte, error)

	// SetTTL sets an expiration on the workflow execution state to prevent storage unbounded growth.
	SetTTL(ctx context.Context, execID string, ttl time.Duration) error

	// SaveStepOutput stores the successful result payload of a step for resumption.
	SaveStepOutput(ctx context.Context, execID string, stepID string, output []byte) error

	// GetStepOutput retrieves the successful result payload of a step.
	GetStepOutput(ctx context.Context, execID string, stepID string) ([]byte, error)

	// ListExecutions returns a list of execution IDs that match a given overall workflow state (e.g. "RUNNING").
	ListExecutions(ctx context.Context, state string) ([]string, error)

	// RecordEventEmitted tracks that a specific domain event has been sent to prevent duplicates during resumption.
	RecordEventEmitted(ctx context.Context, execID string, eventType string) error

	// IsEventEmitted checks if a domain event has already been sent for this execution.
	IsEventEmitted(ctx context.Context, execID string, eventType string) (bool, error)
}

StateStore defines the interface for checkpointing workflow states.

func GetStateStore

func GetStateStore(ctx context.Context) StateStore

GetStateStore retrieves the StateStore attached to the current workflow runner.

type Step

type Step = mdk.Step

type StepContext

type StepContext = mdk.StepContext

type StepHandler

type StepHandler = mdk.StepHandler

type StepResult

type StepResult = mdk.StepResult

type StepStatus

type StepStatus = mdk.StepStatus

type Workflow

type Workflow = mdk.Workflow

Re-export/alias mdk workflow types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL