Documentation
¶
Index ¶
- Constants
- Variables
- type ActivityTask
- type Backend
- type BackendOption
- func WithActivityLockTimeout(timeout time.Duration) BackendOption
- func WithContextPropagator(prop workflow.ContextPropagator) BackendOption
- func WithConverter(converter converter.Converter) BackendOption
- func WithLogger(logger *slog.Logger) BackendOption
- func WithMaxHistorySize(size int64) BackendOption
- func WithMetrics(client metrics.Client) BackendOption
- func WithRemoveContinuedAsNewInstances() BackendOption
- func WithStickyTimeout(timeout time.Duration) BackendOption
- func WithTracerProvider(tp trace.TracerProvider) BackendOption
- func WithWorkerName(workerName string) BackendOption
- func WithWorkflowLockTimeout(timeout time.Duration) BackendOption
- type ErrNotSupported
- type Feature
- type MockBackend
- func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, ...) error
- func (_m *MockBackend) Close() error
- func (_m *MockBackend) CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error
- func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, ...) error
- func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error
- func (_m *MockBackend) ExtendActivityTask(ctx context.Context, task *ActivityTask) error
- func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error
- func (_m *MockBackend) FeatureSupported(feature Feature) bool
- func (_m *MockBackend) GetActivityTask(ctx context.Context, queues []core.Queue) (*ActivityTask, error)
- func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)
- func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)
- func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)
- func (_m *MockBackend) GetWorkflowTask(ctx context.Context, queues []core.Queue) (*WorkflowTask, error)
- func (_m *MockBackend) Metrics() metrics.Client
- func (_m *MockBackend) Options() *Options
- func (_m *MockBackend) PrepareActivityQueues(ctx context.Context, queues []core.Queue) error
- func (_m *MockBackend) PrepareWorkflowQueues(ctx context.Context, queues []core.Queue) error
- func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error
- func (_m *MockBackend) RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error
- func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
- func (_m *MockBackend) Tracer() trace.Tracer
- type Options
- type RemovalOption
- type RemovalOptions
- type Stats
- type WorkflowTask
Constants ¶
const TracerName = "go-workflow"
Variables ¶
var DefaultRemovalOptions = RemovalOptions{
BatchSize: 100,
}
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
var ErrInstanceNotFound = errors.New("workflow instance not found")
Functions ¶
This section is empty.
Types ¶
type ActivityTask ¶ added in v0.17.0
type ActivityTask struct { ID string // ActivityID is the ID of the activity event ActivityID string Queue workflow.Queue WorkflowInstance *core.WorkflowInstance Event *history.Event }
ActivityTask represents one activity execution.
type Backend ¶
type Backend interface { // CreateWorkflowInstance creates a new workflow instance CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error // CancelWorkflowInstance cancels a running workflow instance CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error // RemoveWorkflowInstance removes a workflow instance RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error // RemoveWorkflowInstances removes multiple workflow instances RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error // GetWorkflowInstanceState returns the state of the given workflow instance GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) // GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID // is given, only events after that event are returned. Otherwise the full history is returned. GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error) // SignalWorkflow signals a running workflow instance // // If the given instance does not exist, it will return an error SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error // PrepareWorkflowQueues prepares workflow queues for later consumption using this backend instane PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error // PrepareActivityQueues prepares activity queues for later consumption using this backend instance PrepareActivityQueues(ctx context.Context, queues []workflow.Queue) error // GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*WorkflowTask, error) // ExtendWorkflowTask extends the lock of a workflow task ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error // CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask // // This checkpoints the execution. events are new events from the last workflow execution // which will be added to the workflow instance history. workflowEvents are new events for the // completed or other workflow instances. CompleteWorkflowTask( ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error // GetActivityTask returns a pending activity task or nil if there are no pending activities GetActivityTask(ctx context.Context, queues []workflow.Queue) (*ActivityTask, error) // ExtendActivityTask extends the lock of an activity task ExtendActivityTask(ctx context.Context, task *ActivityTask) error // CompleteActivityTask completes an activity task retrieved using GetActivityTask CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error // GetStats returns stats about the backend GetStats(ctx context.Context) (*Stats, error) // Tracer returns the configured trace provider for the backend Tracer() trace.Tracer // Metrics returns the configured metrics client for the backend Metrics() metrics.Client // Options returns the configured options for the backend Options() *Options // Close closes any underlying resources Close() error // FeatureSupported returns true if the given feature is supported by the backend FeatureSupported(feature Feature) bool }
type BackendOption ¶
type BackendOption func(*Options)
func WithActivityLockTimeout ¶ added in v1.2.0
func WithActivityLockTimeout(timeout time.Duration) BackendOption
WithActivityLockTimeout sets the timeout for activity task locks. If an activity task is not completed within this timeframe, it's considered abandoned and another worker might pick it up.
func WithContextPropagator ¶ added in v0.14.0
func WithContextPropagator(prop workflow.ContextPropagator) BackendOption
WithContextPropagator adds a context propagator for passing context into workflows and activities. Multiple propagators can be added by calling this function multiple times.
func WithConverter ¶ added in v0.9.0
func WithConverter(converter converter.Converter) BackendOption
WithConverter sets a custom converter for serializing and deserializing workflow inputs and results. If not set, converter.DefaultConverter will be used.
func WithLogger ¶ added in v0.0.9
func WithLogger(logger *slog.Logger) BackendOption
WithLogger sets a custom logger for the backend. If not set, slog.Default() will be used.
func WithMaxHistorySize ¶ added in v1.0.0
func WithMaxHistorySize(size int64) BackendOption
WithMaxHistorySize sets the maximum size of a workflow history in bytes. If a workflow exceeds this size, it will be failed to prevent unbounded growth.
func WithMetrics ¶ added in v0.6.0
func WithMetrics(client metrics.Client) BackendOption
WithMetrics sets a custom metrics client for collecting workflow execution metrics. If not set, a no-op metrics client will be used.
func WithRemoveContinuedAsNewInstances ¶ added in v0.19.0
func WithRemoveContinuedAsNewInstances() BackendOption
WithRemoveContinuedAsNewInstances enables immediate removal of workflow instances that completed using ContinueAsNew, including their history. If not set, instances will be removed after the configured retention period or never.
func WithStickyTimeout ¶
func WithStickyTimeout(timeout time.Duration) BackendOption
WithStickyTimeout sets the sticky timeout duration for workflow executions. This determines how long a worker will hold onto a workflow execution context before releasing it back to the task queue.
func WithTracerProvider ¶ added in v0.4.0
func WithTracerProvider(tp trace.TracerProvider) BackendOption
WithTracerProvider sets a custom OpenTelemetry tracer provider for distributed tracing. If not set, a no-op tracer provider will be used.
func WithWorkerName ¶ added in v1.0.1
func WithWorkerName(workerName string) BackendOption
WithWorkerName sets a custom name for this worker instance. If not set, backends will generate a default name based on hostname and process ID.
func WithWorkflowLockTimeout ¶ added in v1.2.0
func WithWorkflowLockTimeout(timeout time.Duration) BackendOption
WithWorkflowLockTimeout sets the timeout for workflow task locks. If a workflow task is not completed within this timeframe, it's considered abandoned and another worker might pick it up.
type ErrNotSupported ¶ added in v0.19.0
type ErrNotSupported struct {
Message string
}
func (ErrNotSupported) Error ¶ added in v0.19.0
func (e ErrNotSupported) Error() string
type MockBackend ¶
MockBackend is an autogenerated mock type for the Backend type
func NewMockBackend ¶ added in v0.4.0
func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend
NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockBackend) CancelWorkflowInstance ¶
func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, cancelEvent *history.Event) error
CancelWorkflowInstance provides a mock function with given fields: ctx, instance, cancelEvent
func (*MockBackend) Close ¶ added in v0.18.0
func (_m *MockBackend) Close() error
Close provides a mock function with given fields:
func (*MockBackend) CompleteActivityTask ¶
func (_m *MockBackend) CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error
CompleteActivityTask provides a mock function with given fields: ctx, task, result
func (*MockBackend) CompleteWorkflowTask ¶
func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error
CompleteWorkflowTask provides a mock function with given fields: ctx, task, state, executedEvents, activityEvents, timerEvents, workflowEvents
func (*MockBackend) CreateWorkflowInstance ¶
func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error
CreateWorkflowInstance provides a mock function with given fields: ctx, instance, event
func (*MockBackend) ExtendActivityTask ¶
func (_m *MockBackend) ExtendActivityTask(ctx context.Context, task *ActivityTask) error
ExtendActivityTask provides a mock function with given fields: ctx, task
func (*MockBackend) ExtendWorkflowTask ¶
func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error
ExtendWorkflowTask provides a mock function with given fields: ctx, task
func (*MockBackend) FeatureSupported ¶ added in v0.19.0
func (_m *MockBackend) FeatureSupported(feature Feature) bool
FeatureSupported provides a mock function with given fields: feature
func (*MockBackend) GetActivityTask ¶
func (_m *MockBackend) GetActivityTask(ctx context.Context, queues []core.Queue) (*ActivityTask, error)
GetActivityTask provides a mock function with given fields: ctx, queues
func (*MockBackend) GetStats ¶ added in v0.16.1
func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)
GetStats provides a mock function with given fields: ctx
func (*MockBackend) GetWorkflowInstanceHistory ¶ added in v0.0.4
func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)
GetWorkflowInstanceHistory provides a mock function with given fields: ctx, instance, lastSequenceID
func (*MockBackend) GetWorkflowInstanceState ¶ added in v0.0.4
func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)
GetWorkflowInstanceState provides a mock function with given fields: ctx, instance
func (*MockBackend) GetWorkflowTask ¶
func (_m *MockBackend) GetWorkflowTask(ctx context.Context, queues []core.Queue) (*WorkflowTask, error)
GetWorkflowTask provides a mock function with given fields: ctx, queues
func (*MockBackend) Metrics ¶ added in v0.6.0
func (_m *MockBackend) Metrics() metrics.Client
Metrics provides a mock function with given fields:
func (*MockBackend) Options ¶ added in v0.19.0
func (_m *MockBackend) Options() *Options
Options provides a mock function with given fields:
func (*MockBackend) PrepareActivityQueues ¶ added in v0.19.0
PrepareActivityQueues provides a mock function with given fields: ctx, queues
func (*MockBackend) PrepareWorkflowQueues ¶ added in v0.19.0
PrepareWorkflowQueues provides a mock function with given fields: ctx, queues
func (*MockBackend) RemoveWorkflowInstance ¶ added in v0.12.0
func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error
RemoveWorkflowInstance provides a mock function with given fields: ctx, instance
func (*MockBackend) RemoveWorkflowInstances ¶ added in v0.19.0
func (_m *MockBackend) RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error
RemoveWorkflowInstances provides a mock function with given fields: ctx, options
func (*MockBackend) SignalWorkflow ¶
func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
SignalWorkflow provides a mock function with given fields: ctx, instanceID, event
func (*MockBackend) Tracer ¶ added in v0.4.0
func (_m *MockBackend) Tracer() trace.Tracer
Tracer provides a mock function with given fields:
type Options ¶
type Options struct { Logger *slog.Logger Metrics metrics.Client TracerProvider trace.TracerProvider // Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set // converter.DefaultConverter is used. Converter converter.Converter // ContextPropagators is a list of context propagators to use for passing context into workflows and activities. ContextPropagators []workflow.ContextPropagator StickyTimeout time.Duration // WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed // by that timeframe, it's considered abandoned and another worker might pick it up. // // For long running workflow tasks, combine this with heartbearts. WorkflowLockTimeout time.Duration // ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed // by that timeframe, it's considered abandoned and another worker might pick it up ActivityLockTimeout time.Duration // RemoveContinuedAsNewInstances determines whether instances that were completed using ContinueAsNew should be // removed immediately, including their history. If set to false, the instance will be removed after the configured // retention period or never. RemoveContinuedAsNewInstances bool // MaxHistorySize is the maximum size of a workflow history. If a workflow exceeds this size, it will be failed. MaxHistorySize int64 // WorkerName allows setting a custom worker name. If not set, backends will generate a default name. WorkerName string }
var DefaultOptions Options = Options{ StickyTimeout: 30 * time.Second, WorkflowLockTimeout: time.Minute, ActivityLockTimeout: time.Minute * 2, Logger: slog.Default(), Metrics: mi.NewNoopMetricsClient(), TracerProvider: noop.NewTracerProvider(), Converter: converter.DefaultConverter, ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}}, RemoveContinuedAsNewInstances: false, MaxHistorySize: 10_000, }
func ApplyOptions ¶
func ApplyOptions(opts ...BackendOption) *Options
type RemovalOption ¶ added in v0.19.0
type RemovalOption func(o *RemovalOptions)
func RemoveFinishedBatchSize ¶ added in v0.19.0
func RemoveFinishedBatchSize(size int) RemovalOption
func RemoveFinishedBefore ¶ added in v0.19.0
func RemoveFinishedBefore(t time.Time) RemovalOption
type RemovalOptions ¶ added in v0.19.0
type Stats ¶ added in v0.16.1
type Stats struct { ActiveWorkflowInstances int64 // PendingActivities are the number of activities that are currently in the queue, // waiting to be processed by a worker PendingActivityTasks map[workflow.Queue]int64 // PendingWorkflowTasks are the number of workflow tasks that are currently in the queue, // waiting to be processed by a worker PendingWorkflowTasks map[workflow.Queue]int64 }
type WorkflowTask ¶ added in v0.17.0
type WorkflowTask struct { // ID is an identifier for this task. It's set by the backend ID string // Queue is the queue of the workflow instance Queue workflow.Queue // WorkflowInstance is the workflow instance that this task is for WorkflowInstance *core.WorkflowInstance // WorkflowInstanceState is the state of the workflow instance when the task was dequeued WorkflowInstanceState core.WorkflowInstanceState // Metadata is the metadata of the workflow instance Metadata *metadata.WorkflowMetadata // LastSequenceID is the sequence ID of the newest event in the workflow instances's history LastSequenceID int64 // NewEvents are new events since the last task execution NewEvents []*history.Event // Backend specific data, only the producer of the task should rely on this. CustomData any }
WorkflowTask represents work for one workflow execution slice.