saga

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: MIT Imports: 10 Imported by: 0

README

saga-engine-go

Crash-resilient, PostgreSQL-backed saga executor for Go

A battle-tested implementation of the saga pattern for distributed transactions.
Survives crashes. Compensates failures. Never loses state.

Go Reference Go Report Card CI codecov


Why This Exists

When a payment succeeds but shipping fails, you need to refund. When your process crashes mid-transaction, you need to resume. When compensations fail, you need visibility.

This library handles all of that with a single PostgreSQL table and zero external dependencies.


Table of Contents


Features

Crash Resilience

State persisted to PostgreSQL after every step. Process dies? Pick up exactly where you left off.

Type-Safe Generics

Step[T] function provides compile-time type safety. No interface{} casting at runtime.

Automatic Compensation

Failed workflows trigger reverse-order compensation. Step 3 fails? Steps 2 and 1 roll back automatically.

Dead Letter Queue

When compensation fails, workflows move to dead_letter for manual review. Nothing silently disappears.

Advisory Locking

PostgreSQL advisory locks prevent concurrent execution of the same workflow. No distributed lock service needed.

Hard Limits

15-minute execution cap. 10 retry maximum. No runaway workflows, no infinite loops.


Requirements

Before using this library, understand three hard requirements:

1. Idempotency keys are mandatory. Every transaction and every step must have an idempotency key. The library returns ErrIdempotencyRequired if any are missing. This is non-negotiable for crash safety.

2. Step results must be JSON-serializable. On crash recovery, step results are reconstructed via json.Unmarshal. Only exported struct fields with json tags survive this round-trip. Unexported fields will be silently zeroed.

// Correct: exported fields with json tags
type OrderResult struct {
    ID     string  `json:"id"`
    Amount float64 `json:"amount"`
}

// Broken on resume: unexported fields are lost
type badResult struct {
    id string  // json.Unmarshal cannot see this
}

3. Your functions must respect context.Context. The engine enforces timeouts by cancelling the context. If your Execute or Compensate functions ignore ctx.Done(), timeouts cannot be enforced and goroutines will leak.

// Correct: context-aware HTTP call
Execute: func(ctx context.Context) (string, error) {
    req, _ := http.NewRequestWithContext(ctx, "POST", url, body)
    resp, err := client.Do(req)
    // ...
}

// Broken: ignores context, blocks indefinitely
Execute: func(ctx context.Context) (string, error) {
    resp, err := http.Post(url, "application/json", body) // no context!
    // ...
}

Installation

go get github.com/grafikui/saga-engine-go

Go 1.22+, PostgreSQL 12+


Quick Start

// Setup (once per application)
db, _ := sql.Open("postgres", "postgres://localhost/mydb")
storage, _ := saga.NewPostgresStorage(db, "transactions")
lock := saga.NewPostgresLock(db)
// Define and run a saga
tx, err := saga.NewTransaction("order-123", storage, saga.TransactionOptions{
    IdempotencyKey: "order-123-v1",
    Lock:           lock,
    Input:          map[string]any{"orderId": "123", "amount": 99.99},
})
if err != nil {
    log.Fatal(err)
}

err = tx.Run(ctx, func(ctx context.Context, tx *saga.Transaction) error {
    // Step 1: Reserve inventory
    reservation, err := saga.Step(ctx, tx, "reserve-inventory", saga.StepDefinition[string]{
        IdempotencyKey: "reserve-123",
        Execute: func(ctx context.Context) (string, error) {
            return inventory.Reserve(ctx, "SKU-001", 1)
        },
        Compensate: func(ctx context.Context, id string) error {
            return inventory.Release(ctx, id)
        },
    })
    if err != nil {
        return err
    }

    // Step 2: Charge payment (with retry)
    _, err = saga.Step(ctx, tx, "charge-payment", saga.StepDefinition[string]{
        IdempotencyKey: "charge-123",
        Execute: func(ctx context.Context) (string, error) {
            return gateway.Charge(ctx, 99.99)
        },
        Compensate: func(ctx context.Context, chargeID string) error {
            return gateway.Refund(ctx, chargeID)
        },
        Retry: &saga.RetryPolicy{Attempts: 3, BackoffMs: 1000},
    })
    if err != nil {
        return err
    }

    // Step 3: Ship order
    _, err = saga.Step(ctx, tx, "ship-order", saga.StepDefinition[string]{
        IdempotencyKey: "ship-123",
        Execute: func(ctx context.Context) (string, error) {
            return shipping.Ship(ctx, reservation)
        },
        Compensate: func(ctx context.Context, trackingID string) error {
            return shipping.Cancel(ctx, trackingID)
        },
    })
    return err
})

Database Schema

CREATE TABLE transactions (
    id TEXT PRIMARY KEY,
    status TEXT NOT NULL DEFAULT 'pending',
    step_stack JSONB NOT NULL DEFAULT '[]',
    input JSONB,
    retry_count INTEGER NOT NULL DEFAULT 0,
    error JSONB,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_transactions_status ON transactions(status);
CREATE INDEX idx_transactions_created_at ON transactions(created_at);

CLI Tool

The saga-admin CLI provides operational visibility and recovery tools.

# Build
go build -o saga-admin ./cmd/saga-admin

# List workflows
saga-admin -db "postgres://localhost/mydb" list
saga-admin -db "postgres://localhost/mydb" list --status dead_letter

# Inspect a workflow
saga-admin -db "postgres://localhost/mydb" show tx-123

# Retry a failed workflow
saga-admin -db "postgres://localhost/mydb" retry tx-123

# View statistics
saga-admin -db "postgres://localhost/mydb" stats

Workflow States

Status Description
pending Running or awaiting retry
completed All steps succeeded
failed Steps failed, compensation succeeded
dead_letter Compensation failed, requires manual intervention
pending ──> completed
    │
    └──> failed (compensation succeeded)
    │
    └──> dead_letter (compensation failed)

Error Handling

All errors support errors.Is() and errors.As() for idiomatic Go error handling:

if errors.Is(err, saga.ErrExecutionTimeout) {
    // Workflow exceeded 15-minute limit
}

var compErr *saga.CompensationFailedError
if errors.As(err, &compErr) {
    log.Printf("Compensation failed at step: %s", compErr.FailedStep)
}
Error Type Sentinel When
ExecutionTimeoutError ErrExecutionTimeout Wall-clock exceeded 15 minutes
IdempotencyRequiredError ErrIdempotencyRequired Missing idempotency key
CompensationFailedError ErrCompensationFailed Rollback failed
StepTimeoutError ErrStepTimeout Step exceeded timeout
TransactionLockedError ErrTransactionLocked Another process holds lock

Configuration

Hard Limits

These values are intentionally non-configurable to prevent misuse:

Constant Value Purpose
MaxExecutionDuration 15 minutes Prevents runaway workflows
MaxRetryCount 10 Prevents infinite retry loops
MaxErrorLength 2048 chars Prevents unbounded storage
What We Don't Do
Scope Limitation Rationale
Distributed transactions Single-process, single-database design. No 2PC.
Long-running workflows 15-minute limit. Use Temporal for hours/days.
External consistency If Stripe charges before crash, it stays charged. Use their idempotency keys.
Auto-resume dead letters Terminal state by design. Manual intervention required.

Infrastructure

PostgreSQL Connection Compatibility

Saga Engine uses session-level advisory locks (pg_try_advisory_lock). This has implications for connection pooling:

Connection Setup Compatible Notes
*sql.DB (direct) Yes Standard Go database connection
PgBouncer (session mode) Yes Lock held for session lifetime
PgBouncer (transaction mode) No Lock ownership lost between queries

If you use PgBouncer in transaction mode, advisory locks will silently fail to provide mutual exclusion. Use session mode or connect directly.


Edge Cases

External System Idempotency

This library handles your idempotency. For external APIs, use their idempotency keys:

Execute: func(ctx context.Context) (string, error) {
    charge, err := stripe.Charges.New(&stripe.ChargeParams{
        Amount:         stripe.Int64(1000),
        IdempotencyKey: stripe.String("order-123-charge"),
    })
    return charge.ID, err
}

Without this, a crash after Stripe charges (but before persisting) causes double-charge on retry.

Dead Letter Recovery
# 1. Find dead letters
saga-admin -db "$DATABASE_URL" dead-letter

# 2. Investigate
saga-admin -db "$DATABASE_URL" show tx-failed-123

# 3. Fix root cause, then retry
saga-admin -db "$DATABASE_URL" retry tx-failed-123

Maximum 10 retries per workflow.


Testing

# Unit tests
go test ./...

# Integration tests (requires PostgreSQL)
DATABASE_URL="postgres://localhost/testdb" go test -tags=integration ./...

# Race detector
go test -race ./...

# Coverage
go test -cover ./...

License

MIT


Documentation

Overview

Package saga provides a crash-resilient, PostgreSQL-backed saga executor for Go.

The saga pattern coordinates distributed transactions by executing a series of steps, each with a compensating action. If any step fails, previously completed steps are compensated in reverse order.

Key features:

  • Crash resilience: State is persisted to PostgreSQL after each step
  • Type-safe steps: Generic Step[T] function provides compile-time type safety
  • Automatic compensation: Failed workflows trigger reverse-order compensation
  • Dead letter queue: Failed compensations move to dead_letter for manual review
  • Hard limits: 15-minute execution max, 10 retry cap to prevent runaway workflows
  • Required idempotency: Keys mandatory at transaction AND step level

Example:

tx, _ := saga.NewTransaction("order-123", storage, saga.TransactionOptions{
    IdempotencyKey: "order-123-v1",
    Lock:           lock,
})
err := tx.Run(ctx, func(ctx context.Context, tx *saga.Transaction) error {
    _, err := saga.Step(ctx, tx, "reserve", saga.StepDefinition[string]{
        IdempotencyKey: "reserve-123",
        Execute:        func(ctx context.Context) (string, error) { ... },
        Compensate:     func(ctx context.Context, id string) error { ... },
    })
    return err
})

Index

Constants

View Source
const (
	// MaxExecutionDuration is the maximum wall-clock time for a workflow (15 minutes).
	MaxExecutionDuration = 15 * time.Minute

	// MaxExecutionMs is MaxExecutionDuration in milliseconds.
	MaxExecutionMs = int64(MaxExecutionDuration / time.Millisecond)

	// MaxErrorLength is the maximum length of error messages stored in DB (2KB).
	MaxErrorLength = 2048

	// MaxRetryCount is the maximum number of CLI retries for dead_letter workflows.
	MaxRetryCount = 10
)

Hard limits - non-configurable by design

View Source
const (
	ErrCodeExecutionTimeout    = "EXECUTION_TIMEOUT"
	ErrCodeIdempotencyRequired = "IDEMPOTENCY_REQUIRED"
	ErrCodeCompensationFailed  = "COMPENSATION_FAILED"
	ErrCodeRetryCapExceeded    = "RETRY_CAP_EXCEEDED"
	ErrCodeDeadLetter          = "DEAD_LETTER"
	ErrCodeTransactionLocked   = "TRANSACTION_LOCKED"
	ErrCodeStepTimeout         = "STEP_TIMEOUT"
)

Error codes for saga errors

Variables

View Source
var (
	ErrExecutionTimeout    = errors.New("execution timeout")
	ErrIdempotencyRequired = errors.New("idempotency required")
	ErrCompensationFailed  = errors.New("compensation failed")
	ErrRetryCapExceeded    = errors.New("retry cap exceeded")
	ErrDeadLetter          = errors.New("dead letter")
	ErrTransactionLocked   = errors.New("transaction locked")
	ErrStepTimeout         = errors.New("step timeout")
)

Sentinel errors for errors.Is() support

Functions

func Step

func Step[T any](ctx context.Context, tx *Transaction, name string, def StepDefinition[T]) (T, error)

Step executes a saga step.

func TruncateError

func TruncateError(err error) string

TruncateError truncates an error message to MaxErrorLength.

Types

type CompensationFailedError

type CompensationFailedError struct {
	SagaError
	TransactionID     string
	FailedStep        string
	OriginalError     error
	CompensationError error
}

CompensationFailedError is thrown when compensation fails during rollback.

func NewCompensationFailedError

func NewCompensationFailedError(txID, stepName string, originalErr, compErr error) *CompensationFailedError

NewCompensationFailedError creates a new CompensationFailedError.

func (*CompensationFailedError) Is

func (e *CompensationFailedError) Is(target error) bool

type CompensationPolicy

type CompensationPolicy struct {
	Retry   *RetryPolicy
	Timeout time.Duration
}

CompensationPolicy configures compensation behavior.

type DeadLetterError

type DeadLetterError struct {
	SagaError
	TransactionID string
}

DeadLetterError is thrown when workflow is in terminal dead_letter state.

func NewDeadLetterError

func NewDeadLetterError(txID string) *DeadLetterError

NewDeadLetterError creates a new DeadLetterError.

func (*DeadLetterError) Is

func (e *DeadLetterError) Is(target error) bool

type ExecutionTimeoutError

type ExecutionTimeoutError struct {
	SagaError
	TransactionID string
	DeadlineMs    int64
	ElapsedMs     int64
}

ExecutionTimeoutError is thrown when wall-clock exceeds 15 minutes.

func NewExecutionTimeoutError

func NewExecutionTimeoutError(txID string, deadlineMs, elapsedMs int64) *ExecutionTimeoutError

NewExecutionTimeoutError creates a new ExecutionTimeoutError.

func (*ExecutionTimeoutError) Is

func (e *ExecutionTimeoutError) Is(target error) bool

type IdempotencyRequiredError

type IdempotencyRequiredError struct {
	SagaError
	Location      string // "transaction" or "step"
	TransactionID string
	StepName      string
}

IdempotencyRequiredError is thrown when idempotency key is missing.

func NewIdempotencyRequiredError

func NewIdempotencyRequiredError(location, txID, stepName string) *IdempotencyRequiredError

NewIdempotencyRequiredError creates a new IdempotencyRequiredError.

func (*IdempotencyRequiredError) Is

func (e *IdempotencyRequiredError) Is(target error) bool

type Lock

type Lock interface {
	// Acquire attempts to acquire a lock for the transaction.
	// Returns a token if successful, or an error if the lock is held.
	Acquire(ctx context.Context, txID string, ttl time.Duration) (string, error)

	// Release releases the lock for the transaction.
	Release(ctx context.Context, txID string, token string) error
}

Lock is the interface for distributed locking.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage implements Storage using in-memory maps. WARNING: Not production safe - use only for testing.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates a new MemoryStorage.

func (*MemoryStorage) AtomicRetry

func (s *MemoryStorage) AtomicRetry(ctx context.Context, txID string) (int, error)

AtomicRetry atomically transitions a dead_letter workflow to pending.

func (*MemoryStorage) Clear

func (s *MemoryStorage) Clear(ctx context.Context, txID string) error

Clear marks a transaction as completed.

func (*MemoryStorage) CountByStatus

func (s *MemoryStorage) CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)

CountByStatus counts workflows by status.

func (*MemoryStorage) GetWorkflow

func (s *MemoryStorage) GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)

GetWorkflow retrieves the full workflow record.

func (*MemoryStorage) IsProductionSafe

func (s *MemoryStorage) IsProductionSafe() bool

IsProductionSafe returns false - MemoryStorage is not production safe.

func (*MemoryStorage) Load

func (s *MemoryStorage) Load(ctx context.Context, txID string) ([]StepContext, error)

Load retrieves the step stack for a transaction.

func (*MemoryStorage) Query

Query retrieves workflows matching the filter.

func (*MemoryStorage) Save

func (s *MemoryStorage) Save(ctx context.Context, txID string, steps []StepContext, input any) error

Save persists the current step stack for a transaction.

func (*MemoryStorage) UpdateStatus

func (s *MemoryStorage) UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, workflowErr *WorkflowError) error

UpdateStatus updates the workflow status and optionally sets an error.

type NoOpLock

type NoOpLock struct{}

NoOpLock is a lock that does nothing (for single-process use).

func (*NoOpLock) Acquire

func (l *NoOpLock) Acquire(ctx context.Context, txID string, ttl time.Duration) (string, error)

Acquire always succeeds for NoOpLock.

func (*NoOpLock) Release

func (l *NoOpLock) Release(ctx context.Context, txID string, token string) error

Release does nothing for NoOpLock.

type PostgresLock

type PostgresLock struct {
	// contains filtered or unexported fields
}

PostgresLock implements Lock using PostgreSQL advisory locks.

func NewPostgresLock

func NewPostgresLock(db *sql.DB) *PostgresLock

NewPostgresLock creates a new PostgresLock.

func (*PostgresLock) Acquire

func (l *PostgresLock) Acquire(ctx context.Context, txID string, ttl time.Duration) (string, error)

Acquire attempts to acquire a PostgreSQL advisory lock.

func (*PostgresLock) Release

func (l *PostgresLock) Release(ctx context.Context, txID string, token string) error

Release releases a PostgreSQL advisory lock.

type PostgresStorage

type PostgresStorage struct {
	// contains filtered or unexported fields
}

PostgresStorage implements Storage using PostgreSQL.

func NewPostgresStorage

func NewPostgresStorage(db *sql.DB, tableName string) (*PostgresStorage, error)

NewPostgresStorage creates a new PostgresStorage. tableName defaults to "transactions" if empty.

func (*PostgresStorage) AtomicRetry

func (s *PostgresStorage) AtomicRetry(ctx context.Context, txID string) (int, error)

AtomicRetry atomically transitions a dead_letter workflow to pending.

func (*PostgresStorage) Clear

func (s *PostgresStorage) Clear(ctx context.Context, txID string) error

Clear marks a transaction as completed.

func (*PostgresStorage) CountByStatus

func (s *PostgresStorage) CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)

CountByStatus counts workflows by status.

func (*PostgresStorage) GetWorkflow

func (s *PostgresStorage) GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)

GetWorkflow retrieves the full workflow record.

func (*PostgresStorage) IsProductionSafe

func (s *PostgresStorage) IsProductionSafe() bool

IsProductionSafe returns true - PostgresStorage is production safe.

func (*PostgresStorage) Load

func (s *PostgresStorage) Load(ctx context.Context, txID string) ([]StepContext, error)

Load retrieves the step stack for a transaction.

func (*PostgresStorage) Query

Query retrieves workflows matching the filter.

func (*PostgresStorage) Save

func (s *PostgresStorage) Save(ctx context.Context, txID string, steps []StepContext, input any) error

Save persists the current step stack for a transaction.

func (*PostgresStorage) UpdateStatus

func (s *PostgresStorage) UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, workflowErr *WorkflowError) error

UpdateStatus updates the workflow status and optionally sets an error.

type RetryCapExceededError

type RetryCapExceededError struct {
	SagaError
	TransactionID string
	RetryCount    int
	MaxRetries    int
}

RetryCapExceededError is thrown when retry limit is exceeded.

func NewRetryCapExceededError

func NewRetryCapExceededError(txID string, retryCount, maxRetries int) *RetryCapExceededError

NewRetryCapExceededError creates a new RetryCapExceededError.

func (*RetryCapExceededError) Is

func (e *RetryCapExceededError) Is(target error) bool

type RetryPolicy

type RetryPolicy struct {
	Attempts  int
	BackoffMs int64
}

RetryPolicy configures retry behavior for a step.

type SagaError

type SagaError struct {
	Code    string
	Message string
	Cause   error
}

SagaError is the base error type for all saga errors.

func (*SagaError) Error

func (e *SagaError) Error() string

func (*SagaError) Unwrap

func (e *SagaError) Unwrap() error

type StepContext

type StepContext struct {
	Name           string     `json:"name"`
	IdempotencyKey string     `json:"idempotencyKey"`
	Result         any        `json:"result"`
	Status         StepStatus `json:"status"`
}

StepContext represents a completed step stored in the database.

type StepDefinition

type StepDefinition[T any] struct {
	IdempotencyKey     string
	Execute            func(ctx context.Context) (T, error)
	Compensate         func(ctx context.Context, result T) error
	Retry              *RetryPolicy
	Timeout            time.Duration
	CompensationPolicy *CompensationPolicy
}

StepDefinition defines a saga step.

type StepStatus

type StepStatus string

StepStatus represents the state of a step.

const (
	StepStatusCompleted StepStatus = "completed"
)

type StepTimeoutError

type StepTimeoutError struct {
	SagaError
	StepName  string
	TimeoutMs int64
}

StepTimeoutError is thrown when a step exceeds its timeout.

func NewStepTimeoutError

func NewStepTimeoutError(stepName string, timeoutMs int64) *StepTimeoutError

NewStepTimeoutError creates a new StepTimeoutError.

func (*StepTimeoutError) Is

func (e *StepTimeoutError) Is(target error) bool

type Storage

type Storage interface {
	// IsProductionSafe returns true if this storage is safe for production use.
	IsProductionSafe() bool

	// Save persists the current step stack for a transaction.
	Save(ctx context.Context, txID string, steps []StepContext, input any) error

	// Load retrieves the step stack for a transaction.
	Load(ctx context.Context, txID string) ([]StepContext, error)

	// Clear marks a transaction as completed.
	Clear(ctx context.Context, txID string) error

	// GetWorkflow retrieves the full workflow record.
	GetWorkflow(ctx context.Context, txID string) (*WorkflowRecord, error)

	// UpdateStatus updates the workflow status and optionally sets an error.
	UpdateStatus(ctx context.Context, txID string, status WorkflowStatus, workflowErr *WorkflowError) error

	// AtomicRetry atomically transitions a dead_letter workflow to pending.
	// Returns the new retry count, or -1 if the workflow is not in dead_letter state.
	AtomicRetry(ctx context.Context, txID string) (int, error)

	// Query retrieves workflows matching the filter.
	Query(ctx context.Context, filter WorkflowFilter) (*WorkflowQueryResult, error)

	// CountByStatus counts workflows by status.
	CountByStatus(ctx context.Context, statuses ...WorkflowStatus) (int, error)
}

Storage is the interface for workflow persistence.

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is the core saga executor.

func NewTransaction

func NewTransaction(id string, storage Storage, opts TransactionOptions) (*Transaction, error)

NewTransaction creates a new Transaction.

func (*Transaction) ID

func (tx *Transaction) ID() string

ID returns the transaction ID.

func (*Transaction) IdempotencyKey

func (tx *Transaction) IdempotencyKey() string

IdempotencyKey returns the transaction's idempotency key.

func (*Transaction) Run

func (tx *Transaction) Run(ctx context.Context, workflow func(ctx context.Context, t *Transaction) error) error

Run executes the saga workflow.

type TransactionEvents

type TransactionEvents struct {
	// Transaction lifecycle
	OnTransactionStart    func(id string, input any)
	OnTransactionComplete func(id string)
	OnTransactionFailed   func(id string, err error)

	// Step lifecycle
	OnStepStart    func(name string)
	OnStepComplete func(name string, result any, duration time.Duration)
	OnStepFailed   func(name string, err error, attempt int)
	OnStepRetry    func(name string, attempt int, err error)
	OnStepSkipped  func(name string, cachedResult any)
	OnStepTimeout  func(name string, timeout time.Duration)

	// Compensation lifecycle
	OnCompensationStart    func(name string)
	OnCompensationComplete func(name string)
	OnCompensationFailed   func(name string, err error)

	// Terminal state
	OnDeadLetter func(id string, err *WorkflowError)
}

TransactionEvents provides hooks for observability and monitoring. All callbacks are optional - only set the ones you need. Event handlers are called synchronously but wrapped in panic recovery, so a panicking handler won't break the transaction flow.

Example:

events := &saga.TransactionEvents{
    OnStepComplete: func(name string, result any, duration time.Duration) {
        log.Printf("Step %s completed in %v", name, duration)
    },
    OnDeadLetter: func(id string, err *saga.WorkflowError) {
        alerting.SendAlert("Workflow %s moved to dead letter: %s", id, err.Error)
    },
}

type TransactionLockedError

type TransactionLockedError struct {
	SagaError
	TransactionID string
}

TransactionLockedError is thrown when transaction is already locked.

func NewTransactionLockedError

func NewTransactionLockedError(txID string) *TransactionLockedError

NewTransactionLockedError creates a new TransactionLockedError.

func (*TransactionLockedError) Is

func (e *TransactionLockedError) Is(target error) bool

type TransactionOptions

type TransactionOptions struct {
	IdempotencyKey string
	Input          any
	Lock           Lock
	Events         *TransactionEvents
}

TransactionOptions configures a transaction.

type WorkflowError

type WorkflowError struct {
	StepName          string    `json:"stepName"`
	Error             string    `json:"error"`
	CompensationError string    `json:"compensationError,omitempty"`
	Timestamp         time.Time `json:"timestamp"`
}

WorkflowError represents a structured error stored in the database.

func CreateWorkflowError

func CreateWorkflowError(stepName string, originalErr, compensationErr error) *WorkflowError

CreateWorkflowError creates a WorkflowError from errors.

type WorkflowFilter

type WorkflowFilter struct {
	Status        []WorkflowStatus
	CreatedAfter  *time.Time
	CreatedBefore *time.Time
	UpdatedAfter  *time.Time
	Offset        int
	Limit         int
}

WorkflowFilter is used to query workflows.

type WorkflowQueryResult

type WorkflowQueryResult struct {
	Workflows []WorkflowRecord
	Total     int
}

WorkflowQueryResult is the result of a workflow query.

type WorkflowRecord

type WorkflowRecord struct {
	ID         string          `json:"id"`
	Status     WorkflowStatus  `json:"status"`
	StepStack  []StepContext   `json:"stepStack"`
	Input      json.RawMessage `json:"input"`
	RetryCount int             `json:"retryCount"`
	Error      *WorkflowError  `json:"error,omitempty"`
	CreatedAt  time.Time       `json:"createdAt"`
	UpdatedAt  time.Time       `json:"updatedAt"`
}

WorkflowRecord represents a workflow stored in the database.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the state of a workflow.

const (
	StatusPending    WorkflowStatus = "pending"
	StatusCompleted  WorkflowStatus = "completed"
	StatusFailed     WorkflowStatus = "failed"
	StatusDeadLetter WorkflowStatus = "dead_letter"
)

Directories

Path Synopsis
cmd
saga-admin command
saga-admin is a CLI tool for managing saga workflows.
saga-admin is a CLI tool for managing saga workflows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL