workflow

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const WorkflowExecutionContextKey = "workflowExecutionContext"

Variables

View Source
var ErrControlledPanic = errors.New("controlled panic")
View Source
var ErrNonDeterministicError = errors.New("non-deterministic error")

Functions

func InjectWorkflowExecutionContext

func InjectWorkflowExecutionContext(ctx context.Context, workflowExecutionContext *WorkflowExecutionContext) context.Context

Types

type ActivityPromise

type ActivityPromise struct {
	WorkflowRuntime *WorkflowRuntime
	Activity        any
	Promise         *promise.Promise[[]byte]
}

func NewActivityPromise

func NewActivityPromise(runtime *WorkflowRuntime, activity any) *ActivityPromise

func (*ActivityPromise) Await

func (a *ActivityPromise) Await() (any, error)

type TimerPromise

type TimerPromise struct {
	WorkflowRuntime *WorkflowRuntime
	Promise         *promise.Promise[struct{}]
}

func NewTimerPromise

func NewTimerPromise(runtime *WorkflowRuntime) *TimerPromise

func (*TimerPromise) Await

func (a *TimerPromise) Await() error

type WorkflowExecutionContext

type WorkflowExecutionContext struct {
	WorkflowRuntime *WorkflowRuntime
	UserDefinedVars map[string]any
	EventCallbacks  map[string][]func([]byte)
}

func MustExtractWorkflowExecutionContext

func MustExtractWorkflowExecutionContext(ctx context.Context) *WorkflowExecutionContext

func NewWorkflowExecutionContext

func NewWorkflowExecutionContext(runtime *WorkflowRuntime) *WorkflowExecutionContext

type WorkflowRuntime

type WorkflowRuntime struct {
	// init
	WorkflowRegistry *registry.WorkflowRegistry
	DataConverter    dataconverter.DataConverter
	// workflow task
	Task *task.WorkflowTask
	// runtime state
	HistoryIndex            int
	IsReplaying             bool
	SequenceNo              int64
	CurrentTimestamp        int64
	ActivityScheduledEvents map[int64]*history.ActivityScheduled
	ActivityPromises        map[int64]*ActivityPromise
	TimerCreatedEvents      map[int64]*history.TimerCreated
	TimerPromises           map[int64]*TimerPromise
	// start
	WorkflowExecutionStartedTimestamp int64
	WorkflowExecutionStartedEvent     *history.WorkflowExecutionStarted
	WorkflowExecutionContext          *WorkflowExecutionContext
	Version                           string
	// final event, can only be execution completed event, and it's internally emit
	WorkflowExecutionCompleted *history.WorkflowExecutionCompleted
}

func NewWorkflowRuntime

func NewWorkflowRuntime(
	workflowRegistry *registry.WorkflowRegistry,
	dataConverter dataconverter.DataConverter,
	task *task.WorkflowTask,
) *WorkflowRuntime

func (*WorkflowRuntime) CreateTimer

func (w *WorkflowRuntime) CreateTimer(fireAt int64) *TimerPromise

func (*WorkflowRuntime) GetWorkflowTaskResult

func (w *WorkflowRuntime) GetWorkflowTaskResult() *task.WorkflowTaskResult

func (*WorkflowRuntime) RunSimulation

func (w *WorkflowRuntime) RunSimulation() (err error)

func (*WorkflowRuntime) ScheduleNewActivity

func (w *WorkflowRuntime) ScheduleNewActivity(activity any, input any) *ActivityPromise

func (*WorkflowRuntime) Step

func (w *WorkflowRuntime) Step() (bool, error)

type WorkflowTaskExecutor

type WorkflowTaskExecutor interface {
	Execute(ctx context.Context, task *task.WorkflowTask) (*task.WorkflowTaskResult, error)
}

func NewWorkflowTaskExecutor

func NewWorkflowTaskExecutor(
	workflowRegistry *registry.WorkflowRegistry,
	dataConverter dataconverter.DataConverter,
	logger *zap.Logger,
) WorkflowTaskExecutor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL