backend

package
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const TracerName = "go-workflow"

Variables

View Source
var DefaultRemovalOptions = RemovalOptions{
	BatchSize: 100,
}
View Source
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
View Source
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
View Source
var ErrInstanceNotFound = errors.New("workflow instance not found")

Functions

This section is empty.

Types

type ActivityTask

type ActivityTask struct {
	ID string

	// ActivityID is the ID of the activity event
	ActivityID string

	Queue workflow.Queue

	WorkflowInstance *core.WorkflowInstance

	Event *history.Event
}

ActivityTask represents one activity execution.

type Backend

type Backend interface {
	// CreateWorkflowInstance creates a new workflow instance
	CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error

	// CancelWorkflowInstance cancels a running workflow instance
	CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error

	// RemoveWorkflowInstance removes a workflow instance
	RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error

	// RemoveWorkflowInstances removes multiple workflow instances
	RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error

	// GetWorkflowInstanceState returns the state of the given workflow instance
	GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)

	// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
	// is given, only events after that event are returned. Otherwise the full history is returned.
	GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)

	// SignalWorkflow signals a running workflow instance
	//
	// If the given instance does not exist, it will return an error
	SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

	// PrepareWorkflowQueues prepares workflow queues for later consumption using this backend instane
	PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error

	// PrepareActivityQueues prepares activity queues for later consumption using this backend instance
	PrepareActivityQueues(ctx context.Context, queues []workflow.Queue) error

	// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
	GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*WorkflowTask, error)

	// ExtendWorkflowTask extends the lock of a workflow task
	ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error

	// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
	//
	// This checkpoints the execution. events are new events from the last workflow execution
	// which will be added to the workflow instance history. workflowEvents are new events for the
	// completed or other workflow instances.
	CompleteWorkflowTask(
		ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState,
		executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error

	// GetActivityTask returns a pending activity task or nil if there are no pending activities
	GetActivityTask(ctx context.Context, queues []workflow.Queue) (*ActivityTask, error)

	// ExtendActivityTask extends the lock of an activity task
	ExtendActivityTask(ctx context.Context, task *ActivityTask) error

	// CompleteActivityTask completes an activity task retrieved using GetActivityTask
	CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error

	// GetStats returns stats about the backend
	GetStats(ctx context.Context) (*Stats, error)

	// Tracer returns the configured trace provider for the backend
	Tracer() trace.Tracer

	// Metrics returns the configured metrics client for the backend
	Metrics() metrics.Client

	// Options returns the configured options for the backend
	Options() *Options

	// Close closes any underlying resources
	Close() error

	// FeatureSupported returns true if the given feature is supported by the backend
	FeatureSupported(feature Feature) bool
}

type BackendOption

type BackendOption func(*Options)

func WithContextPropagator

func WithContextPropagator(prop workflow.ContextPropagator) BackendOption

func WithConverter

func WithConverter(converter converter.Converter) BackendOption

func WithLogger

func WithLogger(logger *slog.Logger) BackendOption

func WithMaxHistorySize

func WithMaxHistorySize(size int64) BackendOption

func WithMetrics

func WithMetrics(client metrics.Client) BackendOption

func WithRemoveContinuedAsNewInstances

func WithRemoveContinuedAsNewInstances() BackendOption

func WithStickyTimeout

func WithStickyTimeout(timeout time.Duration) BackendOption

func WithTracerProvider

func WithTracerProvider(tp trace.TracerProvider) BackendOption

type ErrNotSupported

type ErrNotSupported struct {
	Message string
}

func (ErrNotSupported) Error

func (e ErrNotSupported) Error() string

type Feature

type Feature int
const (
	Feature_Expiration Feature = iota
)

type MockBackend

type MockBackend struct {
	mock.Mock
}

MockBackend is an autogenerated mock type for the Backend type

func NewMockBackend

func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend

NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockBackend) CancelWorkflowInstance

func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, cancelEvent *history.Event) error

CancelWorkflowInstance provides a mock function with given fields: ctx, instance, cancelEvent

func (*MockBackend) Close

func (_m *MockBackend) Close() error

Close provides a mock function with given fields:

func (*MockBackend) CompleteActivityTask

func (_m *MockBackend) CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error

CompleteActivityTask provides a mock function with given fields: ctx, task, result

func (*MockBackend) CompleteWorkflowTask

func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error

CompleteWorkflowTask provides a mock function with given fields: ctx, task, state, executedEvents, activityEvents, timerEvents, workflowEvents

func (*MockBackend) CreateWorkflowInstance

func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error

CreateWorkflowInstance provides a mock function with given fields: ctx, instance, event

func (*MockBackend) ExtendActivityTask

func (_m *MockBackend) ExtendActivityTask(ctx context.Context, task *ActivityTask) error

ExtendActivityTask provides a mock function with given fields: ctx, task

func (*MockBackend) ExtendWorkflowTask

func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error

ExtendWorkflowTask provides a mock function with given fields: ctx, task

func (*MockBackend) FeatureSupported

func (_m *MockBackend) FeatureSupported(feature Feature) bool

FeatureSupported provides a mock function with given fields: feature

func (*MockBackend) GetActivityTask

func (_m *MockBackend) GetActivityTask(ctx context.Context, queues []core.Queue) (*ActivityTask, error)

GetActivityTask provides a mock function with given fields: ctx, queues

func (*MockBackend) GetStats

func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)

GetStats provides a mock function with given fields: ctx

func (*MockBackend) GetWorkflowInstanceHistory

func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)

GetWorkflowInstanceHistory provides a mock function with given fields: ctx, instance, lastSequenceID

func (*MockBackend) GetWorkflowInstanceState

func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)

GetWorkflowInstanceState provides a mock function with given fields: ctx, instance

func (*MockBackend) GetWorkflowTask

func (_m *MockBackend) GetWorkflowTask(ctx context.Context, queues []core.Queue) (*WorkflowTask, error)

GetWorkflowTask provides a mock function with given fields: ctx, queues

func (*MockBackend) Metrics

func (_m *MockBackend) Metrics() metrics.Client

Metrics provides a mock function with given fields:

func (*MockBackend) Options

func (_m *MockBackend) Options() *Options

Options provides a mock function with given fields:

func (*MockBackend) PrepareActivityQueues

func (_m *MockBackend) PrepareActivityQueues(ctx context.Context, queues []core.Queue) error

PrepareActivityQueues provides a mock function with given fields: ctx, queues

func (*MockBackend) PrepareWorkflowQueues

func (_m *MockBackend) PrepareWorkflowQueues(ctx context.Context, queues []core.Queue) error

PrepareWorkflowQueues provides a mock function with given fields: ctx, queues

func (*MockBackend) RemoveWorkflowInstance

func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error

RemoveWorkflowInstance provides a mock function with given fields: ctx, instance

func (*MockBackend) RemoveWorkflowInstances

func (_m *MockBackend) RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error

RemoveWorkflowInstances provides a mock function with given fields: ctx, options

func (*MockBackend) SignalWorkflow

func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

SignalWorkflow provides a mock function with given fields: ctx, instanceID, event

func (*MockBackend) Tracer

func (_m *MockBackend) Tracer() trace.Tracer

Tracer provides a mock function with given fields:

type Options

type Options struct {
	Logger *slog.Logger

	Metrics metrics.Client

	TracerProvider trace.TracerProvider

	// Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set
	// converter.DefaultConverter is used.
	Converter converter.Converter

	// ContextPropagators is a list of context propagators to use for passing context into workflows and activities.
	ContextPropagators []workflow.ContextPropagator

	StickyTimeout time.Duration

	// WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed
	// by that timeframe, it's considered abandoned and another worker might pick it up.
	//
	// For long running workflow tasks, combine this with heartbearts.
	WorkflowLockTimeout time.Duration

	// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
	// by that timeframe, it's considered abandoned and another worker might pick it up
	ActivityLockTimeout time.Duration

	// RemoveContinuedAsNewInstances determines whether instances that were completed using ContinueAsNew should be
	// removed immediately, including their history. If set to false, the instance will be removed after the configured
	// retention period or never.
	RemoveContinuedAsNewInstances bool

	// MaxHistorySize is the maximum size of a workflow history. If a workflow exceeds this size, it will be failed.
	MaxHistorySize int64
}
var DefaultOptions Options = Options{
	StickyTimeout:       30 * time.Second,
	WorkflowLockTimeout: time.Minute,
	ActivityLockTimeout: time.Minute * 2,

	Logger:         slog.Default(),
	Metrics:        mi.NewNoopMetricsClient(),
	TracerProvider: noop.NewTracerProvider(),
	Converter:      converter.DefaultConverter,

	ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}},

	RemoveContinuedAsNewInstances: false,

	MaxHistorySize: 10_000,
}

func ApplyOptions

func ApplyOptions(opts ...BackendOption) *Options

type RemovalOption

type RemovalOption func(o *RemovalOptions)

func RemoveFinishedBatchSize

func RemoveFinishedBatchSize(size int) RemovalOption

func RemoveFinishedBefore

func RemoveFinishedBefore(t time.Time) RemovalOption

type RemovalOptions

type RemovalOptions struct {
	FinishedBefore time.Time
	BatchSize      int
}

type Stats

type Stats struct {
	ActiveWorkflowInstances int64

	// PendingActivities are the number of activities that are currently in the queue,
	// waiting to be processed by a worker
	PendingActivityTasks map[workflow.Queue]int64

	// PendingWorkflowTasks are the number of workflow tasks that are currently in the queue,
	// waiting to be processed by a worker
	PendingWorkflowTasks map[workflow.Queue]int64
}

type WorkflowTask

type WorkflowTask struct {
	// ID is an identifier for this task. It's set by the backend
	ID string

	// Queue is the queue of the workflow instance
	Queue workflow.Queue

	// WorkflowInstance is the workflow instance that this task is for
	WorkflowInstance *core.WorkflowInstance

	// WorkflowInstanceState is the state of the workflow instance when the task was dequeued
	WorkflowInstanceState core.WorkflowInstanceState

	// Metadata is the metadata of the workflow instance
	Metadata *metadata.WorkflowMetadata

	// LastSequenceID is the sequence ID of the newest event in the workflow instances's history
	LastSequenceID int64

	// NewEvents are new events since the last task execution
	NewEvents []*history.Event

	// Backend specific data, only the producer of the task should rely on this.
	CustomData any
}

WorkflowTask represents work for one workflow execution slice.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL