Documentation ¶
Index ¶
- Variables
- func Execute(ctx context.Context, r *Registry, env Environment, ref Ref, task Task) error
- func MachineData[T any](n *Node) (T, error)
- func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)
- func RegisterExecutor[T Task](r *Registry, t int32, exec Executor[T]) error
- type AccessType
- type Collection
- func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)
- func (c Collection[T]) Data(stateMachineID string) (T, error)
- func (c Collection[T]) List() []*Node
- func (c Collection[T]) Node(stateMachineID string) (*Node, error)
- func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error
- type Environment
- type Executor
- type Key
- type MachineType
- type Node
- type PathAndOutputs
- type Ref
- type Registry
- type StateMachine
- type StateMachineDefinition
- type Task
- type TaskKind
- type TaskKindOutbound
- type TaskKindTimer
- type TaskRegenerator
- type TaskSerializer
- type TaskType
- type Transition
- type TransitionOutput
Constants ¶
This section is empty.
Variables ¶
var ErrDuplicateRegistration = errors.New("duplicate registration")
ErrDuplicateRegistration is returned by a Registry when it detects duplicate registration.
var ErrIncompatibleType = errors.New("state machine data was cast into an incompatible type")
ErrIncompatibleType is returned when trying to cast a state machine's data to a type that it is incompatible with.
var ErrInvalidTaskKind = errors.New("invalid task kind")
ErrInvalidTaskKind can be returned by a TaskSerializer if it received the wrong task kind.
var ErrInvalidTransition = errors.New("invalid transition")
ErrInvalidTransition is returned from Transition.Apply on an invalid state transition.
var ErrNotRegistered error = notRegisteredError{"not registered"}
ErrNotRegistered is returned by a Registry when trying to get a type that is not registered.
var ErrStateMachineAlreadyExists = errors.New("state machine already exists")
ErrStateMachineAlreadyExists is returned when trying to add a state machine with an ID that already exists in a Collection.
var ErrStateMachineNotFound = errors.New("state machine not found")
ErrStateMachineNotFound is returned when looking up a non-existing state machine in a Node or a Collection.
Functions ¶
func Execute ¶
Execute gets an Executor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.
func MachineData ¶
MachineData deserializes the persistent state machine's data, casts it to type T, and returns it. Returns an error when deserialization or casting fails.
func MachineTransition ¶
func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)
MachineTransition runs the given transitionFn on a machine's data for the given key. It updates the state machine's metadata and marks the entry as dirty in the node's cache. If the transition fails, the changes are rolled back and no state is mutated.
func RegisterExecutor ¶
RegisterExecutor registers an Executor for the given task types. Returns an ErrDuplicateRegistration if an executor for any of the types has already been registered.
Types ¶
type AccessType ¶
type AccessType int
AccessType is a specifier for storage access.
const ( // AccessRead specifies read access. AccessRead AccessType = iota // AccessWrite specifies write access. AccessWrite AccessType = iota )
type Collection ¶
type Collection[T any] struct { // The type of machines stored in this collection. Type int32 // contains filtered or unexported fields }
A Collection of similarly typed sibling state machines.
func NewCollection ¶
func NewCollection[T any](node *Node, stateMachineType int32) Collection[T]
NewCollection creates a new Collection.
func (Collection[T]) Add ¶
func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)
Add adds a node to the collection as a child of the collection's underlying Node.
func (Collection[T]) Data ¶
func (c Collection[T]) Data(stateMachineID string) (T, error)
Data gets the data for a given state machine ID.
func (Collection[T]) List ¶
func (c Collection[T]) List() []*Node
List returns all nodes in this collection.
func (Collection[T]) Node ¶
func (c Collection[T]) Node(stateMachineID string) (*Node, error)
Node gets an Node for a given state machine ID.
func (Collection[T]) Transition ¶
func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error
Transition transitions a machine by ID.
type Environment ¶
type Environment interface { // Wall clock. Backed by a the shard's time source. Now() time.Time // Access a state machine Node for the given ref. // // When using AccessRead, the accessor must guarantee not to mutate any state, accessor errors will not cause // mutable state unload. Access(ctx context.Context, ref Ref, accessType AccessType, accessor func(*Node) error) error }
Executor environment.
type Executor ¶
Executor is responsible for executing tasks. Implementations should be registered via RegisterExecutor to handle specific task types.
type Key ¶
type Key struct { // Type ID of the state machine. Type int32 // ID of the state machine. ID string }
Key is used for looking up a state machine in a Node.
type MachineType ¶
type MachineType struct { // Type ID that is used to minimize the persistence storage space and address a machine (see also [Key]). // Type IDs are expected to be immutable as they are used for looking up state machine definitions when loading data // from persistence. ID int32 // Human readable name for this type. Name string }
State machine type.
type Node ¶
type Node struct { // Key of this node in parent's map. Empty if node is the root. Key Key // Parent node. Nil if current node is the root. Parent *Node // contains filtered or unexported fields }
Node is a node in a heirarchical state machine tree.
It holds a persistent representation of itself and maintains an in-memory cache of deserialized data and child nodes. Node data should not be manipulated directly and should only be done using MachineTransition or [Collection.Transtion] to ensure the tree tracks dirty states and update transition counts.
func NewRoot ¶
func NewRoot(registry *Registry, t int32, data any, children map[int32]*persistencespb.StateMachineMap) (*Node, error)
NewRoot creates a new root Node. Children may be provided from persistence to rehydrate the tree. Returns ErrNotRegistered if the key's type is not registered in the given registry or serialization errors.
func (*Node) AddChild ¶
AddChild adds an immediate child to a node, serializing the given data. Returns ErrStateMachineAlreadyExists if a child with the given key already exists, ErrNotRegistered if the key's type is not found in the node's state machine registry and serialization errors.
func (*Node) ClearTransactionState ¶
func (n *Node) ClearTransactionState()
ClearTransactionState resets all transition outputs in the tree. This should be called at the end of every transaction where the transitions are performed to avoid emitting duplicate transition outputs.
func (*Node) Outputs ¶
func (n *Node) Outputs() []PathAndOutputs
Outputs returns all outputs produced by transitions on this tree.
func (*Node) TransitionCount ¶
TransitionCount returns the transition count for the state machine contained in this node.
type PathAndOutputs ¶
type PathAndOutputs struct { Path []Key Outputs []TransitionOutput }
type Ref ¶
type Ref struct { WorkflowKey definition.WorkflowKey StateMachineRef *persistencespb.StateMachineRef }
Ref is a reference to a statemachine on a specific workflow. It contains the workflow key and the key of the statemachine in the state machine [Store] as well as the namespace failover version and transition count that is expected to match on the referenced state machine.
func (Ref) StateMachinePath ¶
StateMachinePath gets the state machine path for from this reference.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maintains a mapping from state machine type to a StateMachineDefinition and task type to TaskSerializer. Registry methods are **not** protected by a lock and all registration is expected to happen in a single thread on startup for performance reasons.
func (*Registry) Machine ¶
func (r *Registry) Machine(t int32) (def StateMachineDefinition, ok bool)
Machine returns a StateMachineDefinition for a given type and a boolean indicating whether it was found.
func (*Registry) RegisterMachine ¶
func (r *Registry) RegisterMachine(sm StateMachineDefinition) error
RegisterMachine registers a StateMachineDefinition by its type. Returns an ErrDuplicateRegistration if the state machine type has already been registered.
func (*Registry) RegisterTaskSerializer ¶
func (r *Registry) RegisterTaskSerializer(t int32, def TaskSerializer) error
RegisterTaskSerializer registers a TaskSerializer for a given type. Returns an ErrDuplicateRegistration if a serializer for this task type has already been registered.
func (*Registry) TaskSerializer ¶
func (r *Registry) TaskSerializer(t int32) (d TaskSerializer, ok bool)
TaskSerializer returns a TaskSerializer for a given type and a boolean indicating whether it was found.
type StateMachine ¶
type StateMachine[S comparable] interface { TaskRegenerator State() S SetState(S) }
A StateMachine is anything that can get and set a comparable state S and re-generate tasks based on current state. It is meant to be used with Transition objects to safely transition their state on a given event.
type StateMachineDefinition ¶
type StateMachineDefinition interface { Type() MachineType // Serialize a state machine into bytes. Serialize(any) ([]byte, error) // Deserialize a state machine from bytes. Deserialize([]byte) (any, error) }
StateMachineDefinition provides type information and a serializer for a state machine.
type Task ¶
type Task interface { // Task type that must be unique per task definition. Type() TaskType // Kind of the task, see [TaskKind] for more info. Kind() TaskKind }
A Task is generated by a state machine in order to drive execution. For example, a callback state machine in the SCHEDULED state, would generate an invocation task to be eventually executed by the framework. State machine transitions and tasks are committed atomically to ensure that the system is in a consistent state.
Tasks are generated by calling the [StateMachine.Tasks] method on a state machine after it has transitioned. Tasks are executed by an executor that is registered to handle a specific task type. The framework converts this minimal task representation into [tasks.Task] instances, filling in the state machine reference, workflow key, and task ID. A TaskSerializer need to be registered in a Registry for a given type in order to process tasks of that type.
type TaskKind ¶
type TaskKind interface {
// contains filtered or unexported methods
}
TaskKind represents the possible set of kinds for a task. Each kind is mapped to a concrete [tasks.Task] implementation and is backed by specific protobuf message; for example, TaskKindTimer maps to TimerTaskInfo. Kind also determines which queue this task is scheduled on - it is mapped to a specific tasks.Category.
type TaskKindOutbound ¶
type TaskKindOutbound struct { // The destination of this task, used to group tasks into a per namespace-and-destination scheduler. Destination string // contains filtered or unexported fields }
TaskKindOutbound is a task that is scheduled on an outbound queue such as the callback queue.
type TaskKindTimer ¶
type TaskKindTimer struct { // A deadline for firing this task. // This represents a lower bound and actual execution may get delayed if the system is overloaded or for various // other reasons. Deadline time.Time // contains filtered or unexported fields }
TaskKindTimer is a task that is scheduled on the timer queue.
type TaskRegenerator ¶
A TaskRegenerator is invoked to regenerate tasks post state-based replication or when refreshing all tasks for a workflow.
type TaskSerializer ¶
type TaskSerializer interface { Serialize(Task) ([]byte, error) Deserialize(data []byte, kind TaskKind) (Task, error) }
TaskSerializer provides type information and a serializer for a state machine.
type TaskType ¶
type TaskType struct { // Type ID that is used to minimize the persistence storage space and look up the regisered serializer. // Type IDs are expected to be immutable as a serializer must be compatible with the task's persistent data. ID int32 // Human readable name for this type. Name string }
Task type.
type Transition ¶
type Transition[S comparable, SM StateMachine[S], E any] struct { // Source states that are valid for this transition. Sources []S // Destination state to transition to. Destination S // contains filtered or unexported fields }
Transition represents a state machine transition for a machine of type SM with state S and event E.
func NewTransition ¶
func NewTransition[S comparable, SM StateMachine[S], E any](src []S, dst S, apply func(SM, E) (TransitionOutput, error)) Transition[S, SM, E]
NewTransition creates a new Transition from the given source states to a destination state for a given event. The apply function is called after verifying the transition is possible and setting the destination state.
func (Transition[S, SM, E]) Apply ¶
func (t Transition[S, SM, E]) Apply(sm SM, event E) (TransitionOutput, error)
Apply applies a transition event to the given state machine changing the state machine's state to the transition's Destination on success.
func (Transition[S, SM, E]) Possible ¶
func (t Transition[S, SM, E]) Possible(sm SM) bool
Possible returns a boolean indicating whether the transition is possible for the current state.
type TransitionOutput ¶
type TransitionOutput struct {
Tasks []Task
}
TransitionOutput is output produced for a single transition.