compensate

package module
v0.0.0-...-118695c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

README

Compensate

A Go library implementing the Distributed Saga pattern for managing complex, multi-step operations with automatic rollback capabilities. Inspired by oxidecomputer/steno, which was in turn inspired by Caitie McCaffrey's saga pattern presentation.

Features

  • Type-safe generics: Full Go 1.18+ generics support for compile-time safety
  • DAG-based workflow: Actions organized in a Directed Acyclic Graph for optimal execution
  • Automatic rollback: Failed actions trigger automatic compensation of completed steps
  • Parallel execution: Independent actions can run concurrently
  • State persistence: Save and restore saga state with pluggable storage backends
  • Action results: Actions can produce typed outputs accessible by dependent actions
  • Visualization: Export saga DAGs as Graphviz DOT files

Installation

go get github.com/fortressi/compensate

Quick Start

Here's a simple "Hello World" saga that demonstrates the basic concepts:

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/fortressi/compensate"
)

// Define your state type
type HelloState struct {
    Message string
}

// Define your saga type that implements SagaType interface
type HelloSaga struct {
    State *HelloState
}

func (s *HelloSaga) ExecContext() *HelloState {
    return s.State
}

// Define a simple result type
type MessageResult struct {
    Output string
}

func main() {
    // Create an action with do and undo functions
    helloAction := compensate.NewActionFunc[*HelloState, *HelloSaga, *MessageResult](
        "hello_action",
        // Do function
        func(ctx context.Context, sagaCtx *HelloSaga) (compensate.ActionFuncResult[*MessageResult], error) {
            fmt.Println("Executing: Hello, World!")
            sagaCtx.State.Message = "Hello executed"
            
            return compensate.ActionFuncResult[*MessageResult]{
                Result: &MessageResult{Output: "Hello, World!"},
            }, nil
        },
        // Undo function
        func(ctx context.Context, sagaCtx *HelloSaga) error {
            fmt.Println("Rolling back: Goodbye, World!")
            sagaCtx.State.Message = "Hello rolled back"
            return nil
        },
    )

    // Create registry and builder
    registry := compensate.NewActionRegistry[*HelloState, *HelloSaga]()
    registry.RegisterAction(helloAction)
    
    builder := compensate.NewDagBuilder[*HelloState, *HelloSaga]("HelloSaga", registry)
    
    // Add the action to the saga
    builder.Append(&compensate.ActionNodeKind[*HelloState, *HelloSaga]{
        NodeName: "say_hello",
        Label:    "Say Hello",
        Action:   helloAction,
    })
    
    // Build the DAG
    dag, err := builder.Build()
    if err != nil {
        log.Fatal(err)
    }
    
    // Create saga context and executor
    sagaContext := &HelloSaga{State: &HelloState{}}
    sagaID := "hello-saga-001"
    store := compensate.NewMemoryStore[*HelloState]()
    
    sagaDag := compensate.NewSagaDag(dag, []byte(`{}`))
    executor := compensate.NewSagaExecutor(sagaDag, registry, sagaContext, sagaID, store)
    
    // Execute the saga
    ctx := context.Background()
    if err := executor.Execute(ctx); err != nil {
        log.Printf("Saga failed: %v", err)
        // Rollback will happen automatically
    }
    
    fmt.Printf("Final state: %s\n", sagaContext.State.Message)
}

Core Concepts

Actions

Actions are the building blocks of sagas. Each action has:

  • A Do function that performs the operation
  • An Undo function that compensates/reverses the operation
  • Optional typed result that can be passed to dependent actions
Saga DAG

Actions are organized in a Directed Acyclic Graph (DAG) where:

  • Nodes represent actions
  • Edges represent dependencies
  • Actions with no dependencies can execute in parallel
State Management

Sagas maintain two types of state:

  • Saga State: Shared context accessible to all actions
  • Action State: Individual results from each action execution

Examples

The repository includes several comprehensive examples:

1. Manual Rollback Example

Located in examples/manual_rollback/, demonstrates:

  • Basic saga construction
  • Manual rollback triggering
  • Simple resource creation/deletion pattern
2. AWS Infrastructure Example

Located in examples/aws-infrastructure/, demonstrates:

  • Complex multi-step AWS VPC provisioning
  • CLI interface with deploy/destroy commands
  • State persistence between runs
  • Parallel action execution
# Deploy infrastructure
cd examples/aws-infrastructure
go run . deploy --saga-id my-vpc-123

# Destroy using persisted state
go run . destroy --saga-id my-vpc-123
3. Persistent CLI Example

Located in examples/persistent_cli/, demonstrates:

  • File-based saga state persistence
  • Recovery from interruptions
  • Resource provisioning with cleanup

Advanced Usage

Building Complex Sagas
// Sequential actions
builder.Append(action1).Append(action2).Append(action3)

// Parallel actions
builder.AppendParallel(
    &compensate.ActionNodeKind[*State, *Saga]{
        NodeName: "parallel1",
        Action:   parallelAction1,
    },
    &compensate.ActionNodeKind[*State, *Saga]{
        NodeName: "parallel2", 
        Action:   parallelAction2,
    },
)

// Add dependencies manually
builder.AddNode("node1", "Node 1", action1)
builder.AddNode("node2", "Node 2", action2)
builder.AddDependency("node1", "node2") // node2 depends on node1
Accessing Action Results
// In a dependent action's Do function
func doDependent(ctx context.Context, sagaCtx *MySaga) (ActionFuncResult[*Result], error) {
    // Get result from a previous action
    prevResult, ok := compensate.GetActionResult[*PreviousResult](sagaCtx, "previous_action_name")
    if !ok {
        return ActionFuncResult[*Result]{}, fmt.Errorf("previous result not found")
    }
    
    // Use the previous result
    fmt.Printf("Previous output: %s\n", prevResult.Output)
    
    return ActionFuncResult[*Result]{
        Result: &Result{Data: processData(prevResult)},
    }, nil
}
Persistence
// File-based persistence
store := compensate.NewFileStore[*MyState]("/path/to/saga/states")

// Memory persistence (for testing)
store := compensate.NewMemoryStore[*MyState]()

// Create executor with persistence
executor := compensate.NewSagaExecutor(sagaDag, registry, sagaContext, sagaID, store)
Visualization

Export your saga DAG as a Graphviz DOT file:

dag, _ := builder.Build()
dot := dag.ToDot()
// Write to file and visualize with graphviz

Architecture

The library follows a modular architecture:

  • Core Types: Generic interfaces for actions, sagas, and state management
  • DAG Builder: Fluent API for constructing saga workflows
  • Executor: Manages saga execution with automatic rollback on failure
  • Registry: Type-safe action registration and retrieval
  • Storage: Pluggable persistence layer with file and memory implementations
  • Event Log: Comprehensive execution history tracking

Current Status

Implemented:

  • Sequential saga execution
  • Automatic rollback on failure
  • State persistence and recovery
  • Type-safe action composition
  • Comprehensive examples

🚧 In Progress:

  • Concurrent action execution
  • Additional storage backends
  • Production monitoring tools

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • Inspired by Caitie McCaffrey's work on distributed sagas
  • Built with Go's powerful generics system for type safety

Documentation

Index

Constants

View Source
const (
	SagaStatusRunning     = "running"
	SagaStatusCompleted   = "completed"
	SagaStatusFailed      = "failed"
	SagaStatusRollingBack = "rolling_back"
	SagaStatusRolledBack  = "rolled_back"
)

Saga status constants

Variables

This section is empty.

Functions

func ActionFailed

func ActionFailed(err error) error

ActionFailed wraps a user-provided error in an ActionError.

func Convert

func Convert[T ActionData](err error) (T, error)

Convert tries to convert the error to a specific consumer error type.

func DeserializeFailed

func DeserializeFailed(message string) error

DeserializeFailed indicates a failure to deserialize saga data.

func InjectedError

func InjectedError() error

InjectedError indicates an error was injected (for testing).

func LookupTyped

func LookupTyped[R any, T any, S SagaType[T]](ac ActionContext[T, S], nodeName NodeName) (R, bool)

LookupTyped retrieves the output from a previous node with type assertion. Returns the typed output and true if found and type matches, or zero value and false otherwise. If the value is stored as json.RawMessage (from persistence), it will be unmarshaled.

func NewDeserializeError

func NewDeserializeError(err error) error

NewDeserializeError creates a new DeserializeFailed error.

func NewSerializeError

func NewSerializeError(err error) error

NewSerializeError creates a new SerializeFailed error.

func NewSubsagaError

func NewSubsagaError(err error) error

NewSubsagaError creates a new SubsagaCreateFailed error.

func NoOpUndo

func NoOpUndo[T any, S SagaType[T]](_ context.Context, _ ActionContext[T, S]) error

func NodeIndexSliceToInt64Slice

func NodeIndexSliceToInt64Slice(in []NodeIndex) []int64

func NotFoundError

func NotFoundError() error

NotFoundError indicates that an action with the given name was not found.

func PermanentFailure

func PermanentFailure(err error) error

PermanentFailure indicates a permanent failure of an undo action.

func SerializeFailed

func SerializeFailed(message string) error

SerializeFailed indicates a failure to serialize saga data.

func SubsagaCreateFailed

func SubsagaCreateFailed(message string) error

SubsagaCreateFailed indicates a failure to create a subsaga.

Types

type Action

type Action[T any, S SagaType[T]] interface {
	DoIt(ctx context.Context, sgctx ActionContext[T, S]) (ActionResult[ActionData], error)
	UndoIt(ctx context.Context, sgctx ActionContext[T, S]) error
	Name() ActionName
}

Action represents the building blocks of sagas.

type ActionConstant

type ActionConstant[T any] struct {
	// contains filtered or unexported fields
}

ActionConstant is an Action implementation that emits a predefined value.

func NewActionConstant

func NewActionConstant[T any](value T) *ActionConstant[T]

NewActionConstant creates a new ActionConstant.

func (*ActionConstant[T]) DoIt

DoIt implements the Action interface for ActionConstant.

func (*ActionConstant[T]) Name

func (ac *ActionConstant[T]) Name() ActionName

Name implements the Action interface for ActionConstant.

func (*ActionConstant[T]) UndoIt

func (ac *ActionConstant[T]) UndoIt(ctx context.Context, _ EmptyActionContext) error

UndoIt implements the Action interface for ActionConstant.

type ActionContext

type ActionContext[T any, S SagaType[T]] struct {
	AncestorTree *btree.Map[NodeName, any]
	NodeID       int
	DAG          *SagaDag
	UserContext  T
}

ActionContext provides context to individual actions.

func (*ActionContext[T, S]) Lookup

func (ac *ActionContext[T, S]) Lookup(nodeName NodeName) (any, bool)

Lookup retrieves the output from a previous node by name. Returns the output and true if found, or zero value and false if not found.

func (*ActionContext[T, S]) LookupTypedResult

func (ac *ActionContext[T, S]) LookupTypedResult(name string, result any) error

LookupTypedResult retrieves and unmarshals the output from a previous node. This method provides the ergonomic interface suggested in the plan.

type ActionData

type ActionData interface{}

ActionData represents data that can be serialized in the saga context.

type ActionError

type ActionError struct {
	// contains filtered or unexported fields
}

ActionError represents an error produced by a saga action.

type ActionFunc

type ActionFunc[T any, S SagaType[T], R ActionData] struct {
	// contains filtered or unexported fields
}

ActionFunc is an implementation of Action that uses ordinary functions.

func NewActionFunc

func NewActionFunc[T any, S SagaType[T], R ActionData](name ActionName, actionFunc DoItFunc[T, S, R], undoFunc UndoItFunc[T, S]) *ActionFunc[T, S, R]

NewActionFunc constructs a new ActionFunc from a pair of functions.

func NewActionFuncWithNoOpUndo

func NewActionFuncWithNoOpUndo[T any, S SagaType[T], R ActionData](name ActionName, actionFunc DoItFunc[T, S, R]) *ActionFunc[T, S, R]

NewActionFuncWithNoOpUndo constructs a new ActionFunc with a no-op undo function.

func (*ActionFunc[T, S, R]) DoIt

func (af *ActionFunc[T, S, R]) DoIt(ctx context.Context, sgctx ActionContext[T, S]) (ActionResult[ActionData], error)

DoIt implements the Action interface for ActionFunc.

func (*ActionFunc[T, S, R]) Name

func (af *ActionFunc[T, S, R]) Name() ActionName

Name implements the Action interface for ActionFunc.

func (*ActionFunc[T, S, R]) String

func (af *ActionFunc[T, S, R]) String() string

String implements the fmt.Stringer interface for ActionFunc.

func (*ActionFunc[T, S, R]) UndoIt

func (af *ActionFunc[T, S, R]) UndoIt(ctx context.Context, sgctx ActionContext[T, S]) error

UndoIt implements the Action interface for ActionFunc.

type ActionFuncResult

type ActionFuncResult[T ActionData] struct {
	Output T
}

ActionFuncResult represents the result of a function that implements a saga action.

type ActionInjectError

type ActionInjectError struct{}

ActionInjectError is an Action implementation that simulates an error.

func (*ActionInjectError) DoIt

DoIt implements the Action interface for ActionInjectError.

func (*ActionInjectError) Name

func (aie *ActionInjectError) Name() ActionName

Name implements the Action interface for ActionInjectError.

func (*ActionInjectError) UndoIt

UndoIt implements the Action interface for ActionInjectError.

type ActionName

type ActionName string

ActionName represents a unique name for a saga Action.

type ActionNodeInternal

type ActionNodeInternal struct {
	Name       NodeName
	LabelValue string
	ActionName ActionName
}

ActionNodeInternal represents an action node in the DAG.

func (*ActionNodeInternal) Label

func (n *ActionNodeInternal) Label() string

func (*ActionNodeInternal) NodeName

func (n *ActionNodeInternal) NodeName() *NodeName

type ActionNodeKind

type ActionNodeKind[T any, S SagaType[T]] struct {
	NodeName NodeName
	Action   Action[T, S]
	Label    string
}

type ActionRegistry

type ActionRegistry[T any, S SagaType[T]] struct {
	// contains filtered or unexported fields
}

ActionRegistry is a registry of saga actions that can be used across multiple sagas.

Actions are identified by their ActionName. Actions can exist at multiple nodes in each saga DAG. Since saga construction is dynamic and based upon user input, we need to allow a way to insert actions at runtime into the DAG. While this could be achieved by referencing the action during saga construction, this is not possible when reloading a saga from persistent storage. In this case, the concrete type of the Action is erased and the only mechanism we have to recover it is an `ActionName`. We therefore have all users register their actions for use across sagas so we can dynamically construct and restore sagas.

func NewActionRegistry

func NewActionRegistry[T any, S SagaType[T]]() *ActionRegistry[T, S]

NewActionRegistry creates a new ActionRegistry.

func (*ActionRegistry[T, S]) Get

func (r *ActionRegistry[T, S]) Get(name ActionName) (Action[T, S], error)

Get retrieves an action from the registry by its name.

func (*ActionRegistry[T, S]) Register

func (r *ActionRegistry[T, S]) Register(action Action[T, S]) error

Register adds an action to the registry.

type ActionRegistryError

type ActionRegistryError struct {
	// contains filtered or unexported fields
}

ActionRegistryError represents an error returned from ActionRegistry.Get().

type ActionResult

type ActionResult[T any] struct {
	Output T
}

ActionResult represents the result of a saga action.

type ActionState

type ActionState int

ActionState represents the execution state of an action

const (
	ActionStatePending ActionState = iota
	ActionStateRunning
	ActionStateCompleted
	ActionStateFailed
	ActionStateUndoing
	ActionStateUndone
)

func (ActionState) String

func (s ActionState) String() string

type BTreeMap

type BTreeMap[K comparable, V any] map[K]V

BTreeMap is a sorted map (similar to Rust's BTreeMap). We'll need to replace this with an actual implementation later.

type CompletedAction

type CompletedAction struct {
	Name   string          `json:"name"`
	Output json.RawMessage `json:"output,omitempty"`
}

CompletedAction records an action that has been successfully executed, along with its output for use by dependent actions during rollback.

type ConstantNodeInternal

type ConstantNodeInternal struct {
	Name  NodeName
	Value json.RawMessage
}

ConstantNodeInternal represents a constant node in the DAG.

func (*ConstantNodeInternal) Label

func (n *ConstantNodeInternal) Label() string

func (*ConstantNodeInternal) NodeName

func (n *ConstantNodeInternal) NodeName() *NodeName

type ConstantNodeKind

type ConstantNodeKind struct {
	NodeName NodeName
	Label    string
	Constant *ActionData
}

type Dag

type Dag struct {
	*dag.Graph
	SagaName SagaName
	// contains filtered or unexported fields
}

Dag represents a directed acyclic graph (DAG).

func NewDag

func NewDag(sagaName SagaName) *Dag

NewDag creates a new empty Dag.

func (*Dag) AddEdge

func (d *Dag) AddEdge(fromID, toID NodeIndex) error

AddEdge adds a directed edge between two nodes in the DAG.

func (*Dag) AddNode

func (d *Dag) AddNode(node InternalNode) NodeIndex

AddNode adds an internal node to the DAG.

func (*Dag) GetNode

func (d *Dag) GetNode(id int64) (InternalNode, error)

GetNode retrieves an internal node by its gonumNode ID.

type DagBuilder

type DagBuilder[T any, S SagaType[T]] struct {
	// contains filtered or unexported fields
}

DagBuilder builds a DAG that can be executed as a saga or subsaga.

func NewDagBuilder

func NewDagBuilder[T any, S SagaType[T]](sagaName SagaName, registry *ActionRegistry[T, S]) *DagBuilder[T, S]

NewDagBuilder creates a new DagBuilder.

func (*DagBuilder[T, S]) Append

func (b *DagBuilder[T, S]) Append(node Node) error

Append adds a single node sequentially to the DAG.

func (*DagBuilder[T, S]) AppendParallel

func (b *DagBuilder[T, S]) AppendParallel(nodes ...Node) error

AppendParallel adds multiple nodes that can run concurrently (variadic API).

func (*DagBuilder[T, S]) Build

func (b *DagBuilder[T, S]) Build() (*Dag, error)

Build finalizes the DAG construction and returns the DAG.

type DoItFunc

type DoItFunc[T any, S SagaType[T], R ActionData] func(ctx context.Context, sgctx ActionContext[T, S]) (ActionFuncResult[R], error)

type EmptyActionContext

type EmptyActionContext ActionContext[any, SagaType[any]]

type EmptyOutput

type EmptyOutput struct{}

EmptyOutput is an empty struct type used for ActionInjectError.

type EndNode

type EndNode struct{}

EndNode represents the end of the DAG.

func (*EndNode) Label

func (n *EndNode) Label() string

func (*EndNode) NodeName

func (n *EndNode) NodeName() *NodeName

type ExecutionNode

type ExecutionNode struct {
	NodeIndex int64
	NodeName  NodeName
	State     ActionState
	Output    ActionData
	Error     error
}

ExecutionNode represents a node in the execution context

type ExecutionRecord

type ExecutionRecord struct {
	ActionName string
	NodeID     int64
	StartTime  time.Time
	EndTime    time.Time
	Status     ActionState
	Error      error
}

ExecutionRecord tracks the execution of a single action

type FileStore

type FileStore[T any] struct {
	// contains filtered or unexported fields
}

FileStore provides a file-based implementation of Store that persists saga state as JSON files on disk.

func (*FileStore[T]) Delete

func (f *FileStore[T]) Delete(ctx context.Context, sagaID string) error

Delete removes the saga state file.

func (*FileStore[T]) Load

func (f *FileStore[T]) Load(ctx context.Context, sagaID string) (*State[T], error)

Load retrieves the saga state from a JSON file.

func (*FileStore[T]) Save

func (f *FileStore[T]) Save(ctx context.Context, sagaID string, state State[T]) error

Save persists the saga state to a JSON file.

type InternalNode

type InternalNode interface {
	NodeName() *NodeName
	Label() string
}

InternalNode represents an internal node in the saga DAG.

type MemoryStore

type MemoryStore[T any] struct {
	// contains filtered or unexported fields
}

MemoryStore provides an in-memory implementation of Store for testing or scenarios where persistence is not required.

func (*MemoryStore[T]) Delete

func (m *MemoryStore[T]) Delete(ctx context.Context, sagaID string) error

Delete removes the saga state from memory.

func (*MemoryStore[T]) Load

func (m *MemoryStore[T]) Load(ctx context.Context, sagaID string) (*State[T], error)

Load retrieves the saga state from memory.

func (*MemoryStore[T]) Save

func (m *MemoryStore[T]) Save(ctx context.Context, sagaID string, state State[T]) error

Save stores the saga state in memory.

type Node

type Node interface {
	// contains filtered or unexported methods
}

Node represents a node in the saga DAG.

There are three kinds of nodes you can add to a graph:

  • an _action_ (see NewActionNode), which executes a particular Action with an associated undo action
  • a _constant_ (see NewConstantNode), which is like an action that outputs a value that's known when the DAG is constructed
  • a _subsaga_ (see NewSubsagaNode), which executes another DAG in the context of this saga

Each of these node types has a `node_name` and produces an output. Other nodes that depend on this node (directly or indirectly) can access the output by looking it up by the node name using `ActionContext.Lookup`:

  • The output of an action node is emitted by the action itself.
  • The output of a constant node is the value provided when the node was created (see NewConstantNode).
  • The output of a subsaga node is the output of the subsaga itself. Note that the output of individual nodes from the subsaga DAG is _not_ available to other nodes in this DAG. Only the final output is available.

type NodeEntry

type NodeEntry struct {
	Internal InternalNode
	Index    NodeIndex
}

NodeEntry represents a node in the SagaDag.

type NodeIndex

type NodeIndex int64

func (NodeIndex) ToInt64

func (n NodeIndex) ToInt64() int64

type NodeKind

type NodeKind struct {
	// Action holds the action details for an Action node.
	Action *struct {
		Label      string
		ActionName ActionName
	}
	// Constant holds the constant value for a Constant node.
	Constant *ActionData
	// Subsaga holds the subsaga details for a Subsaga node.
	Subsaga *struct {
		ParamsNodeName NodeName
		Dag            *Dag
	}
}

NodeKind represents the type of a Node.

type NodeName

type NodeName string

NodeName represents a unique name for a saga Node.

type SagaDag

type SagaDag struct {
	Graph     *dag.Graph
	SagaName  SagaName
	StartNode int64
	EndNode   int64
	Nodes     map[int64]InternalNode // Access nodes by gonumNode ID
}

SagaDag represents a saga DAG that is built on top of a regular Dag.

func NewSagaDag

func NewSagaDag(dag *Dag, params json.RawMessage) *SagaDag

NewSagaDag creates a new SagaDag by wrapping the DAG with Start and End nodes.

func (*SagaDag) GetNode

func (s *SagaDag) GetNode(nodeID int64) (InternalNode, error)

GetNode returns a node given its index.

func (*SagaDag) GetNodeIndex

func (s *SagaDag) GetNodeIndex(name string) (int64, error)

GetNodeIndex returns the index for a given node name.

func (*SagaDag) GetNodes

func (s *SagaDag) GetNodes() *SagaDagIterator

GetNodes returns an iterator over all named nodes in the saga DAG.

func (*SagaDag) GetSagaName

func (s *SagaDag) GetSagaName() SagaName

GetSagaName returns the saga name.

type SagaDagIterator

type SagaDagIterator struct {
	// contains filtered or unexported fields
}

SagaDagIterator iterates over the named nodes in a SagaDag.

func NewSagaDagIterator

func NewSagaDagIterator(dag *SagaDag) *SagaDagIterator

NewSagaDagIterator creates a new iterator for a SagaDag.

func (*SagaDagIterator) Next

func (it *SagaDagIterator) Next() (*NodeEntry, bool)

Next advances the iterator to the next named node in the SagaDag.

type SagaExecutor

type SagaExecutor[T any, S SagaType[T]] struct {
	// contains filtered or unexported fields
}

SagaExecutor handles the sequential execution of saga actions

func NewExecutorFromState

func NewExecutorFromState[T any, S SagaType[T]](
	dag *SagaDag,
	registry *ActionRegistry[T, S],
	sagaContext S,
	state *State[T],
	store Store[T],
) *SagaExecutor[T, S]

NewExecutorFromState creates an executor from a saved state for rollback

func NewSagaExecutor

func NewSagaExecutor[T any, S SagaType[T]](
	dag *SagaDag,
	actionRegistry *ActionRegistry[T, S],
	sagaContext S,
	sagaID string,
	store Store[T],
) *SagaExecutor[T, S]

NewSagaExecutor creates a new saga executor with required persistence

func (*SagaExecutor[T, S]) Execute

func (e *SagaExecutor[T, S]) Execute(ctx context.Context) error

Execute runs the saga sequentially

func (*SagaExecutor[T, S]) GetCompletedNodes

func (e *SagaExecutor[T, S]) GetCompletedNodes() []int64

GetCompletedNodes returns the list of completed node indices

func (*SagaExecutor[T, S]) GetExecutionOrder

func (e *SagaExecutor[T, S]) GetExecutionOrder() []string

GetExecutionOrder returns just the action names in execution order (for easy testing)

func (*SagaExecutor[T, S]) GetExecutionState

func (e *SagaExecutor[T, S]) GetExecutionState() map[int64]*ExecutionNode

GetExecutionState returns the current state of all nodes

func (*SagaExecutor[T, S]) GetExecutionTrace

func (e *SagaExecutor[T, S]) GetExecutionTrace() []ExecutionRecord

GetExecutionTrace returns the execution trace (copy to avoid external modification)

func (*SagaExecutor[T, S]) GetFailedNodes

func (e *SagaExecutor[T, S]) GetFailedNodes() []int64

GetFailedNodes returns the list of failed node indices

func (*SagaExecutor[T, S]) Rollback

func (e *SagaExecutor[T, S]) Rollback(ctx context.Context) error

Rollback manually triggers compensation to undo all completed actions This can be called after a successful execution to deprovision resources

type SagaID

type SagaID struct {
	UUID uuid.UUID
}

SagaID represents a unique identifier for a Saga execution.

func (SagaID) String

func (s SagaID) String() string

String returns the string representation of the SagaID.

type SagaLog

type SagaLog struct {
	sync.Mutex // for thread-safety
	// contains filtered or unexported fields
}

SagaLog represents the write log for a saga.

func NewEmptySagaLog

func NewEmptySagaLog(sagaID SagaID) *SagaLog

NewEmptySagaLog creates a new, empty SagaLog.

func NewSagaLogRecover

func NewSagaLogRecover(sagaID SagaID, events []*SagaNodeEvent) (*SagaLog, error)

NewSagaLogRecover creates a new SagaLog from a list of events (for recovery).

func (*SagaLog) Events

func (l *SagaLog) Events() []*SagaNodeEvent

Events returns the slice of events in the SagaLog.

func (*SagaLog) Record

func (l *SagaLog) Record(event *SagaNodeEvent) error

Record adds an event to the SagaLog.

func (*SagaLog) Unwinding

func (l *SagaLog) Unwinding() bool

Unwinding returns true if the SagaLog is currently unwinding.

type SagaLogPretty

type SagaLogPretty struct {
	Log *SagaLog
}

SagaLogPretty is a helper for pretty-printing a SagaLog.

func (*SagaLogPretty) String

func (p *SagaLogPretty) String() string

String implements the fmt.Stringer interface for SagaLogPretty.

type SagaName

type SagaName string

SagaName represents a human-readable name for a particular saga.

func (SagaName) String

func (s SagaName) String() string

String returns the string representation of the SagaName.

type SagaNodeEvent

type SagaNodeEvent struct {
	SagaID    SagaID
	NodeID    SagaNodeID
	EventType SagaNodeEventType
}

SagaNodeEvent represents an entry in the saga log.

func (*SagaNodeEvent) String

func (e *SagaNodeEvent) String() string

String implements the fmt.Stringer interface for SagaNodeEvent.

type SagaNodeEventType

type SagaNodeEventType int

SagaNodeEventType defines the types of events that can occur for a saga node.

const (
	EventStarted SagaNodeEventType = iota
	EventSucceeded
	EventFailed
	EventUndoStarted
	EventUndoFinished
	EventUndoFailed
)

func (SagaNodeEventType) String

func (s SagaNodeEventType) String() string

String returns the string representation of the SagaNodeEventType.

type SagaNodeID

type SagaNodeID int

SagaNodeID represents a unique identifier for a saga node.

type SagaNodeLoadStatus

type SagaNodeLoadStatus int

SagaNodeLoadStatus represents the persistent status for a saga node.

const (
	LoadNeverStarted SagaNodeLoadStatus = iota
	LoadStarted
	LoadSucceeded
	LoadFailed
	LoadUndoStarted
	LoadUndoFinished
	LoadUndoFailed
)

func (SagaNodeLoadStatus) MarshalJSON

func (s SagaNodeLoadStatus) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for SagaNodeLoadStatus.

func (SagaNodeLoadStatus) String

func (s SagaNodeLoadStatus) String() string

String returns the string representation of the SagaNodeLoadStatus.

func (*SagaNodeLoadStatus) UnmarshalJSON

func (s *SagaNodeLoadStatus) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the json.Unmarshaler interface for SagaNodeLoadStatus.

type SagaParams

type SagaParams any

type SagaType

type SagaType[T any] interface {
	ExecContext() T
}

SagaType defines the type signature for a saga. type SagaType[T any] interface {

type StartNode

type StartNode struct {
	Params json.RawMessage
}

StartNode represents the start of the DAG.

func (*StartNode) Label

func (n *StartNode) Label() string

func (*StartNode) NodeName

func (n *StartNode) NodeName() *NodeName

type State

type State[T any] struct {
	SagaID           string            `json:"saga_id"`
	SagaName         string            `json:"saga_name"`
	Status           string            `json:"status"` // "running", "completed", "failed", "rolled_back"
	Context          T                 `json:"context"`
	CompletedActions []CompletedAction `json:"completed_actions"`
	CreatedAt        time.Time         `json:"created_at"`
	UpdatedAt        time.Time         `json:"updated_at"`
}

State contains the minimal information needed to resume or rollback a saga. It is generic over T, the saga context type.

type Store

type Store[T any] interface {
	// Save persists the current saga state
	Save(ctx context.Context, sagaID string, state State[T]) error

	// Load retrieves a saga state by ID
	Load(ctx context.Context, sagaID string) (*State[T], error)

	// Delete removes a saga state after successful rollback
	Delete(ctx context.Context, sagaID string) error
}

Store defines the interface for persisting saga state. It is generic over T, the saga context type, to provide type safety without reflection for the main saga state.

func NewFileStore

func NewFileStore[T any](basePath string) (Store[T], error)

NewFileStore creates a new file-based store that saves saga state to the specified directory.

func NewMemoryStore

func NewMemoryStore[T any]() Store[T]

NewMemoryStore creates a new in-memory store.

type SubsagaEndNode

type SubsagaEndNode struct {
	Name NodeName
}

SubsagaEndNode represents the end of a subsaga.

func (*SubsagaEndNode) Label

func (n *SubsagaEndNode) Label() string

func (*SubsagaEndNode) NodeName

func (n *SubsagaEndNode) NodeName() *NodeName

type SubsagaNodeKind

type SubsagaNodeKind struct {
	NodeName       NodeName
	ParamsNodeName NodeName
	Dag            *Dag
}

type SubsagaStartNode

type SubsagaStartNode struct {
	SagaName       SagaName
	ParamsNodeName NodeName
}

SubsagaStartNode represents the start of a subsaga.

func (*SubsagaStartNode) Label

func (n *SubsagaStartNode) Label() string

func (*SubsagaStartNode) NodeName

func (n *SubsagaStartNode) NodeName() *NodeName

type UndoActionError

type UndoActionError struct {
	// contains filtered or unexported fields
}

UndoActionError represents an error produced by a failed undo action.

type UndoItFunc

type UndoItFunc[T any, S SagaType[T]] func(ctx context.Context, sgctx ActionContext[T, S]) error

type UndoResult

type UndoResult struct {
	Err error
}

UndoResult represents the result of a saga undo action.

Directories

Path Synopsis
examples
manual_rollback command
persistent_cli command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL