Documentation
¶
Index ¶
- Constants
- Variables
- func InjectWorkflowExecutionContext(ctx context.Context, workflowExecutionContext *WorkflowExecutionContext) context.Context
- func NewWorkflowTaskProcessor(be backend.Backend, executor WorkflowTaskExecutor, logger *zap.Logger) worker.TaskProcessor[*task.WorkflowTask, *task.WorkflowTaskResult]
- type ActivityPromise
- type TimerPromise
- type WorkflowExecutionContext
- type WorkflowRuntime
- func (w *WorkflowRuntime) CreateTimer(fireAt int64) *TimerPromise
- func (w *WorkflowRuntime) GetWorkflowTaskResult() *task.WorkflowTaskResult
- func (w *WorkflowRuntime) RunSimulation() (err error)
- func (w *WorkflowRuntime) ScheduleNewActivity(activity any, input any) *ActivityPromise
- func (w *WorkflowRuntime) Step() (bool, error)
- type WorkflowTaskExecutor
Constants ¶
View Source
const WorkflowExecutionContextKey = "workflowExecutionContext"
Variables ¶
View Source
var ErrControlledPanic = errors.New("controlled panic")
View Source
var ErrNonDeterministicError = errors.New("non-deterministic error")
Functions ¶
func InjectWorkflowExecutionContext ¶
func InjectWorkflowExecutionContext(ctx context.Context, workflowExecutionContext *WorkflowExecutionContext) context.Context
func NewWorkflowTaskProcessor ¶
func NewWorkflowTaskProcessor( be backend.Backend, executor WorkflowTaskExecutor, logger *zap.Logger, ) worker.TaskProcessor[*task.WorkflowTask, *task.WorkflowTaskResult]
Types ¶
type ActivityPromise ¶
type ActivityPromise struct { WorkflowRuntime *WorkflowRuntime Activity any Promise *promise.Promise[[]byte] }
func NewActivityPromise ¶
func NewActivityPromise(runtime *WorkflowRuntime, activity any) *ActivityPromise
func (*ActivityPromise) Await ¶
func (a *ActivityPromise) Await() (any, error)
type TimerPromise ¶
type TimerPromise struct { WorkflowRuntime *WorkflowRuntime Promise *promise.Promise[struct{}] }
func NewTimerPromise ¶
func NewTimerPromise(runtime *WorkflowRuntime) *TimerPromise
func (*TimerPromise) Await ¶
func (a *TimerPromise) Await() error
type WorkflowExecutionContext ¶
type WorkflowExecutionContext struct { WorkflowRuntime *WorkflowRuntime UserDefinedVars map[string]any EventCallbacks map[string][]func([]byte) }
func MustExtractWorkflowExecutionContext ¶
func MustExtractWorkflowExecutionContext(ctx context.Context) *WorkflowExecutionContext
func NewWorkflowExecutionContext ¶
func NewWorkflowExecutionContext(runtime *WorkflowRuntime) *WorkflowExecutionContext
type WorkflowRuntime ¶
type WorkflowRuntime struct { // init WorkflowRegistry *registry.WorkflowRegistry DataConverter dataconverter.DataConverter // workflow task Task *task.WorkflowTask // runtime state HistoryIndex int IsReplaying bool SequenceNo int64 CurrentTimestamp int64 ActivityScheduledEvents map[int64]*history.ActivityScheduled ActivityPromises map[int64]*ActivityPromise TimerCreatedEvents map[int64]*history.TimerCreated TimerPromises map[int64]*TimerPromise // start WorkflowExecutionStartedTimestamp int64 WorkflowExecutionStartedEvent *history.WorkflowExecutionStarted WorkflowExecutionContext *WorkflowExecutionContext Version string // final event, can only be execution completed event, and it's internally emit WorkflowExecutionCompleted *history.WorkflowExecutionCompleted }
func NewWorkflowRuntime ¶
func NewWorkflowRuntime( workflowRegistry *registry.WorkflowRegistry, dataConverter dataconverter.DataConverter, task *task.WorkflowTask, ) *WorkflowRuntime
func (*WorkflowRuntime) CreateTimer ¶
func (w *WorkflowRuntime) CreateTimer(fireAt int64) *TimerPromise
func (*WorkflowRuntime) GetWorkflowTaskResult ¶
func (w *WorkflowRuntime) GetWorkflowTaskResult() *task.WorkflowTaskResult
func (*WorkflowRuntime) RunSimulation ¶
func (w *WorkflowRuntime) RunSimulation() (err error)
func (*WorkflowRuntime) ScheduleNewActivity ¶
func (w *WorkflowRuntime) ScheduleNewActivity(activity any, input any) *ActivityPromise
func (*WorkflowRuntime) Step ¶
func (w *WorkflowRuntime) Step() (bool, error)
type WorkflowTaskExecutor ¶
type WorkflowTaskExecutor interface {
Execute(ctx context.Context, task *task.WorkflowTask) (*task.WorkflowTaskResult, error)
}
func NewWorkflowTaskExecutor ¶
func NewWorkflowTaskExecutor( workflowRegistry *registry.WorkflowRegistry, dataConverter dataconverter.DataConverter, logger *zap.Logger, ) WorkflowTaskExecutor
Click to show internal directories.
Click to hide internal directories.