Documentation
¶
Index ¶
- Constants
- func ActionFailed(err error) error
- func Convert[T ActionData](err error) (T, error)
- func DeserializeFailed(message string) error
- func InjectedError() error
- func LookupTyped[R any, T any, S SagaType[T]](ac ActionContext[T, S], nodeName NodeName) (R, bool)
- func NewDeserializeError(err error) error
- func NewSerializeError(err error) error
- func NewSubsagaError(err error) error
- func NoOpUndo[T any, S SagaType[T]](_ context.Context, _ ActionContext[T, S]) error
- func NodeIndexSliceToInt64Slice(in []NodeIndex) []int64
- func NotFoundError() error
- func PermanentFailure(err error) error
- func SerializeFailed(message string) error
- func SubsagaCreateFailed(message string) error
- type Action
- type ActionConstant
- type ActionContext
- type ActionData
- type ActionError
- type ActionFunc
- func (af *ActionFunc[T, S, R]) DoIt(ctx context.Context, sgctx ActionContext[T, S]) (ActionResult[ActionData], error)
- func (af *ActionFunc[T, S, R]) Name() ActionName
- func (af *ActionFunc[T, S, R]) String() string
- func (af *ActionFunc[T, S, R]) UndoIt(ctx context.Context, sgctx ActionContext[T, S]) error
- type ActionFuncResult
- type ActionInjectError
- type ActionName
- type ActionNodeInternal
- type ActionNodeKind
- type ActionRegistry
- type ActionRegistryError
- type ActionResult
- type ActionState
- type BTreeMap
- type CompletedAction
- type ConstantNodeInternal
- type ConstantNodeKind
- type Dag
- type DagBuilder
- type DoItFunc
- type EmptyActionContext
- type EmptyOutput
- type EndNode
- type ExecutionNode
- type ExecutionRecord
- type FileStore
- type InternalNode
- type MemoryStore
- type Node
- type NodeEntry
- type NodeIndex
- type NodeKind
- type NodeName
- type SagaDag
- type SagaDagIterator
- type SagaExecutor
- func (e *SagaExecutor[T, S]) Execute(ctx context.Context) error
- func (e *SagaExecutor[T, S]) GetCompletedNodes() []int64
- func (e *SagaExecutor[T, S]) GetExecutionOrder() []string
- func (e *SagaExecutor[T, S]) GetExecutionState() map[int64]*ExecutionNode
- func (e *SagaExecutor[T, S]) GetExecutionTrace() []ExecutionRecord
- func (e *SagaExecutor[T, S]) GetFailedNodes() []int64
- func (e *SagaExecutor[T, S]) Rollback(ctx context.Context) error
- type SagaID
- type SagaLog
- type SagaLogPretty
- type SagaName
- type SagaNodeEvent
- type SagaNodeEventType
- type SagaNodeID
- type SagaNodeLoadStatus
- type SagaParams
- type SagaType
- type StartNode
- type State
- type Store
- type SubsagaEndNode
- type SubsagaNodeKind
- type SubsagaStartNode
- type UndoActionError
- type UndoItFunc
- type UndoResult
Constants ¶
const ( SagaStatusRunning = "running" SagaStatusCompleted = "completed" SagaStatusFailed = "failed" SagaStatusRollingBack = "rolling_back" SagaStatusRolledBack = "rolled_back" )
Saga status constants
Variables ¶
This section is empty.
Functions ¶
func ActionFailed ¶
ActionFailed wraps a user-provided error in an ActionError.
func Convert ¶
func Convert[T ActionData](err error) (T, error)
Convert tries to convert the error to a specific consumer error type.
func DeserializeFailed ¶
DeserializeFailed indicates a failure to deserialize saga data.
func InjectedError ¶
func InjectedError() error
InjectedError indicates an error was injected (for testing).
func LookupTyped ¶
LookupTyped retrieves the output from a previous node with type assertion. Returns the typed output and true if found and type matches, or zero value and false otherwise. If the value is stored as json.RawMessage (from persistence), it will be unmarshaled.
func NewDeserializeError ¶
NewDeserializeError creates a new DeserializeFailed error.
func NewSerializeError ¶
NewSerializeError creates a new SerializeFailed error.
func NewSubsagaError ¶
NewSubsagaError creates a new SubsagaCreateFailed error.
func NotFoundError ¶
func NotFoundError() error
NotFoundError indicates that an action with the given name was not found.
func PermanentFailure ¶
PermanentFailure indicates a permanent failure of an undo action.
func SerializeFailed ¶
SerializeFailed indicates a failure to serialize saga data.
func SubsagaCreateFailed ¶
SubsagaCreateFailed indicates a failure to create a subsaga.
Types ¶
type Action ¶
type Action[T any, S SagaType[T]] interface { DoIt(ctx context.Context, sgctx ActionContext[T, S]) (ActionResult[ActionData], error) UndoIt(ctx context.Context, sgctx ActionContext[T, S]) error Name() ActionName }
Action represents the building blocks of sagas.
type ActionConstant ¶
type ActionConstant[T any] struct { // contains filtered or unexported fields }
ActionConstant is an Action implementation that emits a predefined value.
func NewActionConstant ¶
func NewActionConstant[T any](value T) *ActionConstant[T]
NewActionConstant creates a new ActionConstant.
func (*ActionConstant[T]) DoIt ¶
func (ac *ActionConstant[T]) DoIt(ctx context.Context, _ EmptyActionContext) (ActionResult[T], error)
DoIt implements the Action interface for ActionConstant.
func (*ActionConstant[T]) Name ¶
func (ac *ActionConstant[T]) Name() ActionName
Name implements the Action interface for ActionConstant.
func (*ActionConstant[T]) UndoIt ¶
func (ac *ActionConstant[T]) UndoIt(ctx context.Context, _ EmptyActionContext) error
UndoIt implements the Action interface for ActionConstant.
type ActionContext ¶
type ActionContext[T any, S SagaType[T]] struct { AncestorTree *btree.Map[NodeName, any] NodeID int DAG *SagaDag UserContext T }
ActionContext provides context to individual actions.
func (*ActionContext[T, S]) Lookup ¶
func (ac *ActionContext[T, S]) Lookup(nodeName NodeName) (any, bool)
Lookup retrieves the output from a previous node by name. Returns the output and true if found, or zero value and false if not found.
func (*ActionContext[T, S]) LookupTypedResult ¶
func (ac *ActionContext[T, S]) LookupTypedResult(name string, result any) error
LookupTypedResult retrieves and unmarshals the output from a previous node. This method provides the ergonomic interface suggested in the plan.
type ActionData ¶
type ActionData interface{}
ActionData represents data that can be serialized in the saga context.
type ActionError ¶
type ActionError struct {
// contains filtered or unexported fields
}
ActionError represents an error produced by a saga action.
type ActionFunc ¶
type ActionFunc[T any, S SagaType[T], R ActionData] struct { // contains filtered or unexported fields }
ActionFunc is an implementation of Action that uses ordinary functions.
func NewActionFunc ¶
func NewActionFunc[T any, S SagaType[T], R ActionData](name ActionName, actionFunc DoItFunc[T, S, R], undoFunc UndoItFunc[T, S]) *ActionFunc[T, S, R]
NewActionFunc constructs a new ActionFunc from a pair of functions.
func NewActionFuncWithNoOpUndo ¶
func NewActionFuncWithNoOpUndo[T any, S SagaType[T], R ActionData](name ActionName, actionFunc DoItFunc[T, S, R]) *ActionFunc[T, S, R]
NewActionFuncWithNoOpUndo constructs a new ActionFunc with a no-op undo function.
func (*ActionFunc[T, S, R]) DoIt ¶
func (af *ActionFunc[T, S, R]) DoIt(ctx context.Context, sgctx ActionContext[T, S]) (ActionResult[ActionData], error)
DoIt implements the Action interface for ActionFunc.
func (*ActionFunc[T, S, R]) Name ¶
func (af *ActionFunc[T, S, R]) Name() ActionName
Name implements the Action interface for ActionFunc.
func (*ActionFunc[T, S, R]) String ¶
func (af *ActionFunc[T, S, R]) String() string
String implements the fmt.Stringer interface for ActionFunc.
func (*ActionFunc[T, S, R]) UndoIt ¶
func (af *ActionFunc[T, S, R]) UndoIt(ctx context.Context, sgctx ActionContext[T, S]) error
UndoIt implements the Action interface for ActionFunc.
type ActionFuncResult ¶
type ActionFuncResult[T ActionData] struct { Output T }
ActionFuncResult represents the result of a function that implements a saga action.
type ActionInjectError ¶
type ActionInjectError struct{}
ActionInjectError is an Action implementation that simulates an error.
func (*ActionInjectError) DoIt ¶
func (aie *ActionInjectError) DoIt(ctx context.Context, _ EmptyActionContext) (ActionResult[EmptyOutput], error)
DoIt implements the Action interface for ActionInjectError.
func (*ActionInjectError) Name ¶
func (aie *ActionInjectError) Name() ActionName
Name implements the Action interface for ActionInjectError.
func (*ActionInjectError) UndoIt ¶
func (aie *ActionInjectError) UndoIt(ctx context.Context, _ EmptyActionContext) error
UndoIt implements the Action interface for ActionInjectError.
type ActionNodeInternal ¶
type ActionNodeInternal struct {
Name NodeName
LabelValue string
ActionName ActionName
}
ActionNodeInternal represents an action node in the DAG.
func (*ActionNodeInternal) Label ¶
func (n *ActionNodeInternal) Label() string
func (*ActionNodeInternal) NodeName ¶
func (n *ActionNodeInternal) NodeName() *NodeName
type ActionNodeKind ¶
type ActionRegistry ¶
ActionRegistry is a registry of saga actions that can be used across multiple sagas.
Actions are identified by their ActionName. Actions can exist at multiple nodes in each saga DAG. Since saga construction is dynamic and based upon user input, we need to allow a way to insert actions at runtime into the DAG. While this could be achieved by referencing the action during saga construction, this is not possible when reloading a saga from persistent storage. In this case, the concrete type of the Action is erased and the only mechanism we have to recover it is an `ActionName`. We therefore have all users register their actions for use across sagas so we can dynamically construct and restore sagas.
func NewActionRegistry ¶
func NewActionRegistry[T any, S SagaType[T]]() *ActionRegistry[T, S]
NewActionRegistry creates a new ActionRegistry.
func (*ActionRegistry[T, S]) Get ¶
func (r *ActionRegistry[T, S]) Get(name ActionName) (Action[T, S], error)
Get retrieves an action from the registry by its name.
func (*ActionRegistry[T, S]) Register ¶
func (r *ActionRegistry[T, S]) Register(action Action[T, S]) error
Register adds an action to the registry.
type ActionRegistryError ¶
type ActionRegistryError struct {
// contains filtered or unexported fields
}
ActionRegistryError represents an error returned from ActionRegistry.Get().
type ActionResult ¶
type ActionResult[T any] struct { Output T }
ActionResult represents the result of a saga action.
type ActionState ¶
type ActionState int
ActionState represents the execution state of an action
const ( ActionStatePending ActionState = iota ActionStateRunning ActionStateCompleted ActionStateFailed ActionStateUndoing ActionStateUndone )
func (ActionState) String ¶
func (s ActionState) String() string
type BTreeMap ¶
type BTreeMap[K comparable, V any] map[K]V
BTreeMap is a sorted map (similar to Rust's BTreeMap). We'll need to replace this with an actual implementation later.
type CompletedAction ¶
type CompletedAction struct {
Name string `json:"name"`
Output json.RawMessage `json:"output,omitempty"`
}
CompletedAction records an action that has been successfully executed, along with its output for use by dependent actions during rollback.
type ConstantNodeInternal ¶
type ConstantNodeInternal struct {
Name NodeName
Value json.RawMessage
}
ConstantNodeInternal represents a constant node in the DAG.
func (*ConstantNodeInternal) Label ¶
func (n *ConstantNodeInternal) Label() string
func (*ConstantNodeInternal) NodeName ¶
func (n *ConstantNodeInternal) NodeName() *NodeName
type ConstantNodeKind ¶
type ConstantNodeKind struct {
NodeName NodeName
Label string
Constant *ActionData
}
type Dag ¶
Dag represents a directed acyclic graph (DAG).
func (*Dag) AddNode ¶
func (d *Dag) AddNode(node InternalNode) NodeIndex
AddNode adds an internal node to the DAG.
type DagBuilder ¶
DagBuilder builds a DAG that can be executed as a saga or subsaga.
func NewDagBuilder ¶
func NewDagBuilder[T any, S SagaType[T]](sagaName SagaName, registry *ActionRegistry[T, S]) *DagBuilder[T, S]
NewDagBuilder creates a new DagBuilder.
func (*DagBuilder[T, S]) Append ¶
func (b *DagBuilder[T, S]) Append(node Node) error
Append adds a single node sequentially to the DAG.
func (*DagBuilder[T, S]) AppendParallel ¶
func (b *DagBuilder[T, S]) AppendParallel(nodes ...Node) error
AppendParallel adds multiple nodes that can run concurrently (variadic API).
func (*DagBuilder[T, S]) Build ¶
func (b *DagBuilder[T, S]) Build() (*Dag, error)
Build finalizes the DAG construction and returns the DAG.
type DoItFunc ¶
type DoItFunc[T any, S SagaType[T], R ActionData] func(ctx context.Context, sgctx ActionContext[T, S]) (ActionFuncResult[R], error)
type EmptyActionContext ¶
type EmptyActionContext ActionContext[any, SagaType[any]]
type EmptyOutput ¶
type EmptyOutput struct{}
EmptyOutput is an empty struct type used for ActionInjectError.
type ExecutionNode ¶
type ExecutionNode struct {
NodeIndex int64
NodeName NodeName
State ActionState
Output ActionData
Error error
}
ExecutionNode represents a node in the execution context
type ExecutionRecord ¶
type ExecutionRecord struct {
ActionName string
NodeID int64
StartTime time.Time
EndTime time.Time
Status ActionState
Error error
}
ExecutionRecord tracks the execution of a single action
type FileStore ¶
type FileStore[T any] struct { // contains filtered or unexported fields }
FileStore provides a file-based implementation of Store that persists saga state as JSON files on disk.
type InternalNode ¶
InternalNode represents an internal node in the saga DAG.
type MemoryStore ¶
type MemoryStore[T any] struct { // contains filtered or unexported fields }
MemoryStore provides an in-memory implementation of Store for testing or scenarios where persistence is not required.
func (*MemoryStore[T]) Delete ¶
func (m *MemoryStore[T]) Delete(ctx context.Context, sagaID string) error
Delete removes the saga state from memory.
type Node ¶
type Node interface {
// contains filtered or unexported methods
}
Node represents a node in the saga DAG.
There are three kinds of nodes you can add to a graph:
- an _action_ (see NewActionNode), which executes a particular Action with an associated undo action
- a _constant_ (see NewConstantNode), which is like an action that outputs a value that's known when the DAG is constructed
- a _subsaga_ (see NewSubsagaNode), which executes another DAG in the context of this saga
Each of these node types has a `node_name` and produces an output. Other nodes that depend on this node (directly or indirectly) can access the output by looking it up by the node name using `ActionContext.Lookup`:
- The output of an action node is emitted by the action itself.
- The output of a constant node is the value provided when the node was created (see NewConstantNode).
- The output of a subsaga node is the output of the subsaga itself. Note that the output of individual nodes from the subsaga DAG is _not_ available to other nodes in this DAG. Only the final output is available.
type NodeEntry ¶
type NodeEntry struct {
Internal InternalNode
Index NodeIndex
}
NodeEntry represents a node in the SagaDag.
type NodeKind ¶
type NodeKind struct {
// Action holds the action details for an Action node.
Action *struct {
Label string
ActionName ActionName
}
// Constant holds the constant value for a Constant node.
Constant *ActionData
// Subsaga holds the subsaga details for a Subsaga node.
Subsaga *struct {
ParamsNodeName NodeName
Dag *Dag
}
}
NodeKind represents the type of a Node.
type SagaDag ¶
type SagaDag struct {
Graph *dag.Graph
SagaName SagaName
StartNode int64
EndNode int64
Nodes map[int64]InternalNode // Access nodes by gonumNode ID
}
SagaDag represents a saga DAG that is built on top of a regular Dag.
func NewSagaDag ¶
func NewSagaDag(dag *Dag, params json.RawMessage) *SagaDag
NewSagaDag creates a new SagaDag by wrapping the DAG with Start and End nodes.
func (*SagaDag) GetNode ¶
func (s *SagaDag) GetNode(nodeID int64) (InternalNode, error)
GetNode returns a node given its index.
func (*SagaDag) GetNodeIndex ¶
GetNodeIndex returns the index for a given node name.
func (*SagaDag) GetNodes ¶
func (s *SagaDag) GetNodes() *SagaDagIterator
GetNodes returns an iterator over all named nodes in the saga DAG.
func (*SagaDag) GetSagaName ¶
GetSagaName returns the saga name.
type SagaDagIterator ¶
type SagaDagIterator struct {
// contains filtered or unexported fields
}
SagaDagIterator iterates over the named nodes in a SagaDag.
func NewSagaDagIterator ¶
func NewSagaDagIterator(dag *SagaDag) *SagaDagIterator
NewSagaDagIterator creates a new iterator for a SagaDag.
func (*SagaDagIterator) Next ¶
func (it *SagaDagIterator) Next() (*NodeEntry, bool)
Next advances the iterator to the next named node in the SagaDag.
type SagaExecutor ¶
SagaExecutor handles the sequential execution of saga actions
func NewExecutorFromState ¶
func NewExecutorFromState[T any, S SagaType[T]]( dag *SagaDag, registry *ActionRegistry[T, S], sagaContext S, state *State[T], store Store[T], ) *SagaExecutor[T, S]
NewExecutorFromState creates an executor from a saved state for rollback
func NewSagaExecutor ¶
func NewSagaExecutor[T any, S SagaType[T]]( dag *SagaDag, actionRegistry *ActionRegistry[T, S], sagaContext S, sagaID string, store Store[T], ) *SagaExecutor[T, S]
NewSagaExecutor creates a new saga executor with required persistence
func (*SagaExecutor[T, S]) Execute ¶
func (e *SagaExecutor[T, S]) Execute(ctx context.Context) error
Execute runs the saga sequentially
func (*SagaExecutor[T, S]) GetCompletedNodes ¶
func (e *SagaExecutor[T, S]) GetCompletedNodes() []int64
GetCompletedNodes returns the list of completed node indices
func (*SagaExecutor[T, S]) GetExecutionOrder ¶
func (e *SagaExecutor[T, S]) GetExecutionOrder() []string
GetExecutionOrder returns just the action names in execution order (for easy testing)
func (*SagaExecutor[T, S]) GetExecutionState ¶
func (e *SagaExecutor[T, S]) GetExecutionState() map[int64]*ExecutionNode
GetExecutionState returns the current state of all nodes
func (*SagaExecutor[T, S]) GetExecutionTrace ¶
func (e *SagaExecutor[T, S]) GetExecutionTrace() []ExecutionRecord
GetExecutionTrace returns the execution trace (copy to avoid external modification)
func (*SagaExecutor[T, S]) GetFailedNodes ¶
func (e *SagaExecutor[T, S]) GetFailedNodes() []int64
GetFailedNodes returns the list of failed node indices
type SagaLog ¶
SagaLog represents the write log for a saga.
func NewEmptySagaLog ¶
NewEmptySagaLog creates a new, empty SagaLog.
func NewSagaLogRecover ¶
func NewSagaLogRecover(sagaID SagaID, events []*SagaNodeEvent) (*SagaLog, error)
NewSagaLogRecover creates a new SagaLog from a list of events (for recovery).
func (*SagaLog) Events ¶
func (l *SagaLog) Events() []*SagaNodeEvent
Events returns the slice of events in the SagaLog.
func (*SagaLog) Record ¶
func (l *SagaLog) Record(event *SagaNodeEvent) error
Record adds an event to the SagaLog.
type SagaLogPretty ¶
type SagaLogPretty struct {
Log *SagaLog
}
SagaLogPretty is a helper for pretty-printing a SagaLog.
func (*SagaLogPretty) String ¶
func (p *SagaLogPretty) String() string
String implements the fmt.Stringer interface for SagaLogPretty.
type SagaName ¶
type SagaName string
SagaName represents a human-readable name for a particular saga.
type SagaNodeEvent ¶
type SagaNodeEvent struct {
SagaID SagaID
NodeID SagaNodeID
EventType SagaNodeEventType
}
SagaNodeEvent represents an entry in the saga log.
func (*SagaNodeEvent) String ¶
func (e *SagaNodeEvent) String() string
String implements the fmt.Stringer interface for SagaNodeEvent.
type SagaNodeEventType ¶
type SagaNodeEventType int
SagaNodeEventType defines the types of events that can occur for a saga node.
const ( EventStarted SagaNodeEventType = iota EventSucceeded EventFailed EventUndoStarted EventUndoFinished EventUndoFailed )
func (SagaNodeEventType) String ¶
func (s SagaNodeEventType) String() string
String returns the string representation of the SagaNodeEventType.
type SagaNodeLoadStatus ¶
type SagaNodeLoadStatus int
SagaNodeLoadStatus represents the persistent status for a saga node.
const ( LoadNeverStarted SagaNodeLoadStatus = iota LoadStarted LoadSucceeded LoadFailed LoadUndoStarted LoadUndoFinished LoadUndoFailed )
func (SagaNodeLoadStatus) MarshalJSON ¶
func (s SagaNodeLoadStatus) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface for SagaNodeLoadStatus.
func (SagaNodeLoadStatus) String ¶
func (s SagaNodeLoadStatus) String() string
String returns the string representation of the SagaNodeLoadStatus.
func (*SagaNodeLoadStatus) UnmarshalJSON ¶
func (s *SagaNodeLoadStatus) UnmarshalJSON(data []byte) error
UnmarshalJSON implements the json.Unmarshaler interface for SagaNodeLoadStatus.
type SagaParams ¶
type SagaParams any
type SagaType ¶
type SagaType[T any] interface { ExecContext() T }
SagaType defines the type signature for a saga. type SagaType[T any] interface {
type StartNode ¶
type StartNode struct {
Params json.RawMessage
}
StartNode represents the start of the DAG.
type State ¶
type State[T any] struct { SagaID string `json:"saga_id"` SagaName string `json:"saga_name"` Status string `json:"status"` // "running", "completed", "failed", "rolled_back" Context T `json:"context"` CompletedActions []CompletedAction `json:"completed_actions"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` }
State contains the minimal information needed to resume or rollback a saga. It is generic over T, the saga context type.
type Store ¶
type Store[T any] interface { // Save persists the current saga state Save(ctx context.Context, sagaID string, state State[T]) error // Load retrieves a saga state by ID Load(ctx context.Context, sagaID string) (*State[T], error) // Delete removes a saga state after successful rollback Delete(ctx context.Context, sagaID string) error }
Store defines the interface for persisting saga state. It is generic over T, the saga context type, to provide type safety without reflection for the main saga state.
func NewFileStore ¶
NewFileStore creates a new file-based store that saves saga state to the specified directory.
func NewMemoryStore ¶
NewMemoryStore creates a new in-memory store.
type SubsagaEndNode ¶
type SubsagaEndNode struct {
Name NodeName
}
SubsagaEndNode represents the end of a subsaga.
func (*SubsagaEndNode) Label ¶
func (n *SubsagaEndNode) Label() string
func (*SubsagaEndNode) NodeName ¶
func (n *SubsagaEndNode) NodeName() *NodeName
type SubsagaNodeKind ¶
type SubsagaStartNode ¶
SubsagaStartNode represents the start of a subsaga.
func (*SubsagaStartNode) Label ¶
func (n *SubsagaStartNode) Label() string
func (*SubsagaStartNode) NodeName ¶
func (n *SubsagaStartNode) NodeName() *NodeName
type UndoActionError ¶
type UndoActionError struct {
// contains filtered or unexported fields
}
UndoActionError represents an error produced by a failed undo action.
type UndoItFunc ¶
type UndoResult ¶
type UndoResult struct {
Err error
}
UndoResult represents the result of a saga undo action.