Documentation
¶
Overview ¶
Package saga provides a crash-resilient, PostgreSQL-backed saga executor for Go.
The saga pattern coordinates distributed transactions by executing a series of steps, each with a compensating action. If any step fails, previously completed steps are compensated in reverse order.
Key features:
- Crash resilience: State is persisted to PostgreSQL after each step
- Type-safe steps: Generic Step[T] function provides compile-time type safety
- Automatic compensation: Failed workflows trigger reverse-order compensation
- Dead letter queue: Failed compensations move to dead_letter for manual review
- Hard limits: 15-minute execution max, 10 retry cap to prevent runaway workflows
- Required idempotency: Keys mandatory at transaction AND step level
Example:
tx, _ := saga.NewTransaction("order-123", storage, saga.TransactionOptions{
IdempotencyKey: "order-123-v1",
Lock: lock,
})
err := tx.Run(ctx, func(ctx context.Context, tx *saga.Transaction) error {
_, err := saga.Step(ctx, tx, "reserve", saga.StepDefinition[string]{
IdempotencyKey: "reserve-123",
Execute: func(ctx context.Context) (string, error) { ... },
Compensate: func(ctx context.Context, id string) error { ... },
})
return err
})
Index ¶
- Constants
- Variables
- func Step[T any](ctx context.Context, tx *Transaction, name string, def StepDefinition[T]) (T, error)
- func TruncateError(err error) string
- type CompensationFailedError
- type CompensationPolicy
- type DeadLetterError
- type ExecutionTimeoutError
- type IdempotencyRequiredError
- type Lock
- type MemoryStorage
- func (s *MemoryStorage) AtomicRetry(ctx context.Context, txID string) (int, error)
- func (s *MemoryStorage) Clear(ctx context.Context, txID string) error
- func (s *MemoryStorage) CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)
- func (s *MemoryStorage) GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)
- func (s *MemoryStorage) IsProductionSafe() bool
- func (s *MemoryStorage) Load(ctx context.Context, txID string) ([]StepContext, error)
- func (s *MemoryStorage) Query(ctx context.Context, filter WorkflowFilter) (*WorkflowQueryResult, error)
- func (s *MemoryStorage) Save(ctx context.Context, txID string, steps []StepContext, input any) error
- func (s *MemoryStorage) UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, ...) error
- type NoOpLock
- type PostgresLock
- type PostgresStorage
- func (s *PostgresStorage) AtomicRetry(ctx context.Context, txID string) (int, error)
- func (s *PostgresStorage) Clear(ctx context.Context, txID string) error
- func (s *PostgresStorage) CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)
- func (s *PostgresStorage) GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)
- func (s *PostgresStorage) IsProductionSafe() bool
- func (s *PostgresStorage) Load(ctx context.Context, txID string) ([]StepContext, error)
- func (s *PostgresStorage) Query(ctx context.Context, filter WorkflowFilter) (*WorkflowQueryResult, error)
- func (s *PostgresStorage) Save(ctx context.Context, txID string, steps []StepContext, input any) error
- func (s *PostgresStorage) UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, ...) error
- type RetryCapExceededError
- type RetryPolicy
- type SagaError
- type StepContext
- type StepDefinition
- type StepStatus
- type StepTimeoutError
- type Storage
- type Transaction
- type TransactionEvents
- type TransactionLockedError
- type TransactionOptions
- type WorkflowError
- type WorkflowFilter
- type WorkflowQueryResult
- type WorkflowRecord
- type WorkflowStatus
Constants ¶
const ( // MaxExecutionDuration is the maximum wall-clock time for a workflow (15 minutes). MaxExecutionDuration = 15 * time.Minute // MaxExecutionMs is MaxExecutionDuration in milliseconds. MaxExecutionMs = int64(MaxExecutionDuration / time.Millisecond) // MaxErrorLength is the maximum length of error messages stored in DB (2KB). MaxErrorLength = 2048 // MaxRetryCount is the maximum number of CLI retries for dead_letter workflows. MaxRetryCount = 10 )
Hard limits - non-configurable by design
const ( ErrCodeExecutionTimeout = "EXECUTION_TIMEOUT" ErrCodeIdempotencyRequired = "IDEMPOTENCY_REQUIRED" ErrCodeCompensationFailed = "COMPENSATION_FAILED" ErrCodeRetryCapExceeded = "RETRY_CAP_EXCEEDED" ErrCodeDeadLetter = "DEAD_LETTER" ErrCodeTransactionLocked = "TRANSACTION_LOCKED" ErrCodeStepTimeout = "STEP_TIMEOUT" )
Error codes for saga errors
Variables ¶
var ( ErrExecutionTimeout = errors.New("execution timeout") ErrIdempotencyRequired = errors.New("idempotency required") ErrCompensationFailed = errors.New("compensation failed") ErrRetryCapExceeded = errors.New("retry cap exceeded") ErrDeadLetter = errors.New("dead letter") ErrTransactionLocked = errors.New("transaction locked") ErrStepTimeout = errors.New("step timeout") )
Sentinel errors for errors.Is() support
Functions ¶
func Step ¶
func Step[T any](ctx context.Context, tx *Transaction, name string, def StepDefinition[T]) (T, error)
Step executes a saga step.
func TruncateError ¶
TruncateError truncates an error message to MaxErrorLength.
Types ¶
type CompensationFailedError ¶
type CompensationFailedError struct {
SagaError
TransactionID string
FailedStep string
OriginalError error
CompensationError error
}
CompensationFailedError is thrown when compensation fails during rollback.
func NewCompensationFailedError ¶
func NewCompensationFailedError(txID, stepName string, originalErr, compErr error) *CompensationFailedError
NewCompensationFailedError creates a new CompensationFailedError.
func (*CompensationFailedError) Is ¶
func (e *CompensationFailedError) Is(target error) bool
type CompensationPolicy ¶
type CompensationPolicy struct {
Retry *RetryPolicy
Timeout time.Duration
}
CompensationPolicy configures compensation behavior.
type DeadLetterError ¶
DeadLetterError is thrown when workflow is in terminal dead_letter state.
func NewDeadLetterError ¶
func NewDeadLetterError(txID string) *DeadLetterError
NewDeadLetterError creates a new DeadLetterError.
func (*DeadLetterError) Is ¶
func (e *DeadLetterError) Is(target error) bool
type ExecutionTimeoutError ¶
type ExecutionTimeoutError struct {
SagaError
TransactionID string
DeadlineMs int64
ElapsedMs int64
}
ExecutionTimeoutError is thrown when wall-clock exceeds 15 minutes.
func NewExecutionTimeoutError ¶
func NewExecutionTimeoutError(txID string, deadlineMs, elapsedMs int64) *ExecutionTimeoutError
NewExecutionTimeoutError creates a new ExecutionTimeoutError.
func (*ExecutionTimeoutError) Is ¶
func (e *ExecutionTimeoutError) Is(target error) bool
type IdempotencyRequiredError ¶
type IdempotencyRequiredError struct {
SagaError
Location string // "transaction" or "step"
TransactionID string
StepName string
}
IdempotencyRequiredError is thrown when idempotency key is missing.
func NewIdempotencyRequiredError ¶
func NewIdempotencyRequiredError(location, txID, stepName string) *IdempotencyRequiredError
NewIdempotencyRequiredError creates a new IdempotencyRequiredError.
func (*IdempotencyRequiredError) Is ¶
func (e *IdempotencyRequiredError) Is(target error) bool
type Lock ¶
type Lock interface {
// Acquire attempts to acquire a lock for the transaction.
// Returns a token if successful, or an error if the lock is held.
Acquire(ctx context.Context, txID string, ttl time.Duration) (string, error)
// Release releases the lock for the transaction.
Release(ctx context.Context, txID string, token string) error
}
Lock is the interface for distributed locking.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage implements Storage using in-memory maps. WARNING: Not production safe - use only for testing.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a new MemoryStorage.
func (*MemoryStorage) AtomicRetry ¶
AtomicRetry atomically transitions a dead_letter workflow to pending.
func (*MemoryStorage) Clear ¶
func (s *MemoryStorage) Clear(ctx context.Context, txID string) error
Clear marks a transaction as completed.
func (*MemoryStorage) CountByStatus ¶
func (s *MemoryStorage) CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)
CountByStatus counts workflows by status.
func (*MemoryStorage) GetWorkflow ¶
func (s *MemoryStorage) GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)
GetWorkflow retrieves the full workflow record.
func (*MemoryStorage) IsProductionSafe ¶
func (s *MemoryStorage) IsProductionSafe() bool
IsProductionSafe returns false - MemoryStorage is not production safe.
func (*MemoryStorage) Load ¶
func (s *MemoryStorage) Load(ctx context.Context, txID string) ([]StepContext, error)
Load retrieves the step stack for a transaction.
func (*MemoryStorage) Query ¶
func (s *MemoryStorage) Query(ctx context.Context, filter WorkflowFilter) (*WorkflowQueryResult, error)
Query retrieves workflows matching the filter.
func (*MemoryStorage) Save ¶
func (s *MemoryStorage) Save(ctx context.Context, txID string, steps []StepContext, input any) error
Save persists the current step stack for a transaction.
func (*MemoryStorage) UpdateStatus ¶
func (s *MemoryStorage) UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, workflowErr *WorkflowError) error
UpdateStatus updates the workflow status and optionally sets an error.
type NoOpLock ¶
type NoOpLock struct{}
NoOpLock is a lock that does nothing (for single-process use).
type PostgresLock ¶
type PostgresLock struct {
// contains filtered or unexported fields
}
PostgresLock implements Lock using PostgreSQL advisory locks.
func NewPostgresLock ¶
func NewPostgresLock(db *sql.DB) *PostgresLock
NewPostgresLock creates a new PostgresLock.
type PostgresStorage ¶
type PostgresStorage struct {
// contains filtered or unexported fields
}
PostgresStorage implements Storage using PostgreSQL.
func NewPostgresStorage ¶
func NewPostgresStorage(db *sql.DB, tableName string) (*PostgresStorage, error)
NewPostgresStorage creates a new PostgresStorage. tableName defaults to "transactions" if empty.
func (*PostgresStorage) AtomicRetry ¶
AtomicRetry atomically transitions a dead_letter workflow to pending.
func (*PostgresStorage) Clear ¶
func (s *PostgresStorage) Clear(ctx context.Context, txID string) error
Clear marks a transaction as completed.
func (*PostgresStorage) CountByStatus ¶
func (s *PostgresStorage) CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)
CountByStatus counts workflows by status.
func (*PostgresStorage) GetWorkflow ¶
func (s *PostgresStorage) GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)
GetWorkflow retrieves the full workflow record.
func (*PostgresStorage) IsProductionSafe ¶
func (s *PostgresStorage) IsProductionSafe() bool
IsProductionSafe returns true - PostgresStorage is production safe.
func (*PostgresStorage) Load ¶
func (s *PostgresStorage) Load(ctx context.Context, txID string) ([]StepContext, error)
Load retrieves the step stack for a transaction.
func (*PostgresStorage) Query ¶
func (s *PostgresStorage) Query(ctx context.Context, filter WorkflowFilter) (*WorkflowQueryResult, error)
Query retrieves workflows matching the filter.
func (*PostgresStorage) Save ¶
func (s *PostgresStorage) Save(ctx context.Context, txID string, steps []StepContext, input any) error
Save persists the current step stack for a transaction.
func (*PostgresStorage) UpdateStatus ¶
func (s *PostgresStorage) UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, workflowErr *WorkflowError) error
UpdateStatus updates the workflow status and optionally sets an error.
type RetryCapExceededError ¶
RetryCapExceededError is thrown when retry limit is exceeded.
func NewRetryCapExceededError ¶
func NewRetryCapExceededError(txID string, retryCount, maxRetries int) *RetryCapExceededError
NewRetryCapExceededError creates a new RetryCapExceededError.
func (*RetryCapExceededError) Is ¶
func (e *RetryCapExceededError) Is(target error) bool
type RetryPolicy ¶
RetryPolicy configures retry behavior for a step.
type StepContext ¶
type StepContext struct {
Name string `json:"name"`
IdempotencyKey string `json:"idempotencyKey"`
Result any `json:"result"`
Status StepStatus `json:"status"`
}
StepContext represents a completed step stored in the database.
type StepDefinition ¶
type StepDefinition[T any] struct { IdempotencyKey string Execute func(ctx context.Context) (T, error) Compensate func(ctx context.Context, result T) error Retry *RetryPolicy Timeout time.Duration CompensationPolicy *CompensationPolicy }
StepDefinition defines a saga step.
type StepStatus ¶
type StepStatus string
StepStatus represents the state of a step.
const (
StepStatusCompleted StepStatus = "completed"
)
type StepTimeoutError ¶
StepTimeoutError is thrown when a step exceeds its timeout.
func NewStepTimeoutError ¶
func NewStepTimeoutError(stepName string, timeoutMs int64) *StepTimeoutError
NewStepTimeoutError creates a new StepTimeoutError.
func (*StepTimeoutError) Is ¶
func (e *StepTimeoutError) Is(target error) bool
type Storage ¶
type Storage interface {
// IsProductionSafe returns true if this storage is safe for production use.
IsProductionSafe() bool
// Save persists the current step stack for a transaction.
Save(ctx context.Context, txID string, steps []StepContext, input any) error
// Load retrieves the step stack for a transaction.
Load(ctx context.Context, txID string) ([]StepContext, error)
// Clear marks a transaction as completed.
Clear(ctx context.Context, txID string) error
// GetWorkflow retrieves the full workflow record.
GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)
// UpdateStatus updates the workflow status and optionally sets an error.
UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, workflowErr *WorkflowError) error
// AtomicRetry atomically transitions a dead_letter workflow to pending.
// Returns the new retry count, or -1 if the workflow is not in dead_letter state.
AtomicRetry(ctx context.Context, txID string) (int, error)
// Query retrieves workflows matching the filter.
Query(ctx context.Context, filter WorkflowFilter) (*WorkflowQueryResult, error)
// CountByStatus counts workflows by status.
CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)
}
Storage is the interface for workflow persistence.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction is the core saga executor.
func NewTransaction ¶
func NewTransaction(id string, storage Storage, opts TransactionOptions) (*Transaction, error)
NewTransaction creates a new Transaction.
func (*Transaction) IdempotencyKey ¶
func (tx *Transaction) IdempotencyKey() string
IdempotencyKey returns the transaction's idempotency key.
func (*Transaction) Run ¶
func (tx *Transaction) Run(ctx context.Context, workflow func(ctx context.Context, t *Transaction) error) error
Run executes the saga workflow.
type TransactionEvents ¶
type TransactionEvents struct {
// Transaction lifecycle
OnTransactionStart func(id string, input any)
OnTransactionComplete func(id string)
OnTransactionFailed func(id string, err error)
// Step lifecycle
OnStepStart func(name string)
OnStepComplete func(name string, result any, duration time.Duration)
OnStepFailed func(name string, err error, attempt int)
OnStepRetry func(name string, attempt int, err error)
OnStepSkipped func(name string, cachedResult any)
OnStepTimeout func(name string, timeout time.Duration)
// Compensation lifecycle
OnCompensationStart func(name string)
OnCompensationComplete func(name string)
OnCompensationFailed func(name string, err error)
// Terminal state
OnDeadLetter func(id string, err *WorkflowError)
}
TransactionEvents provides hooks for observability and monitoring. All callbacks are optional - only set the ones you need. Event handlers are called synchronously but wrapped in panic recovery, so a panicking handler won't break the transaction flow.
Example:
events := &saga.TransactionEvents{
OnStepComplete: func(name string, result any, duration time.Duration) {
log.Printf("Step %s completed in %v", name, duration)
},
OnDeadLetter: func(id string, err *saga.WorkflowError) {
alerting.SendAlert("Workflow %s moved to dead letter: %s", id, err.Error)
},
}
type TransactionLockedError ¶
TransactionLockedError is thrown when transaction is already locked.
func NewTransactionLockedError ¶
func NewTransactionLockedError(txID string) *TransactionLockedError
NewTransactionLockedError creates a new TransactionLockedError.
func (*TransactionLockedError) Is ¶
func (e *TransactionLockedError) Is(target error) bool
type TransactionOptions ¶
type TransactionOptions struct {
IdempotencyKey string
Input any
Lock Lock
Events *TransactionEvents
}
TransactionOptions configures a transaction.
type WorkflowError ¶
type WorkflowError struct {
StepName string `json:"stepName"`
Error string `json:"error"`
CompensationError string `json:"compensationError,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
WorkflowError represents a structured error stored in the database.
func CreateWorkflowError ¶
func CreateWorkflowError(stepName string, originalErr, compensationErr error) *WorkflowError
CreateWorkflowError creates a WorkflowError from errors.
type WorkflowFilter ¶
type WorkflowFilter struct {
Status []WorkflowStatus
CreatedAfter *time.Time
CreatedBefore *time.Time
UpdatedAfter *time.Time
Offset int
Limit int
}
WorkflowFilter is used to query workflows.
type WorkflowQueryResult ¶
type WorkflowQueryResult struct {
Workflows []WorkflowRecord
Total int
}
WorkflowQueryResult is the result of a workflow query.
type WorkflowRecord ¶
type WorkflowRecord struct {
ID string `json:"id"`
Status WorkflowStatus `json:"status"`
StepStack []StepContext `json:"stepStack"`
Input json.RawMessage `json:"input"`
RetryCount int `json:"retryCount"`
Error *WorkflowError `json:"error,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
WorkflowRecord represents a workflow stored in the database.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the state of a workflow.
const ( StatusPending WorkflowStatus = "pending" StatusCompleted WorkflowStatus = "completed" StatusFailed WorkflowStatus = "failed" StatusDeadLetter WorkflowStatus = "dead_letter" )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
saga-admin
command
saga-admin is a CLI tool for managing saga workflows.
|
saga-admin is a CLI tool for managing saga workflows. |