checkpoint

package
v0.0.0-...-7871f83 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package checkpoint provides interfaces and implementations for persisting graph execution state.

Checkpointing enables fault-tolerant workflows by saving execution progress at regular intervals. If a workflow fails or is interrupted, it can be resumed from the last checkpoint without starting over.

Overview

The package defines a Checkpointer interface that can be implemented with various storage backends:

  • MemoryCheckpointer: In-memory storage (testing/development)
  • SQLCheckpointer: SQL database - SQLite, PostgreSQL, MySQL (production)
  • DynamoDBCheckpointer: AWS DynamoDB (serverless/cloud production)

See subpackages for specific implementations:

  • pkg/checkpoint/sql: SQL-based checkpointers (SQLite, PostgreSQL, MySQL)
  • pkg/checkpoint/dynamodb: AWS DynamoDB checkpointer

Basic Usage

// In-memory (testing/development)
checkpointer := checkpoint.NewInMemoryCheckpointer()

// SQL (production - see pkg/checkpoint/sql for details)
checkpointer := sql.NewSQLiteCheckpointer("checkpoints.db")
// or
checkpointer := sql.NewPostgreSQLCheckpointer(connString)

// DynamoDB (AWS production - see pkg/checkpoint/dynamodb for details)
checkpointer := dynamodb.NewCheckpointer(dynamoClient, "checkpoints-table")

// Configure graph with checkpointer
g := graph.New(keys...)
g.WithCheckpointer(checkpointer, "workflow-123")
compiled, _ := g.Build()

// Run with checkpointing
for _, err := range compiled.Run(ctx, nil,
    graph.WithCheckpointInterval(1),  // Save every superstep
) {
    if err != nil {
        log.Fatal(err)
    }
}

Checkpoint Signing (Security)

Enable HMAC-SHA256 signatures to detect tampering:

// Generate secure signing key (32+ bytes recommended)
signingKey := make([]byte, 32)
rand.Read(signingKey)

// Create checkpointer with signing enabled
checkpointer := checkpoint.NewInMemoryCheckpointer(
    checkpoint.WithSigning(signingKey),
)

// Checkpoints are automatically signed on Save()
// and verified on Load() - fails if tampered
checkpoint, err := checkpointer.Load(ctx, "workflow-123")
if errors.Is(err, checkpoint.ErrInvalidSignature) {
    // Tampering detected!
}

See examples/checkpoint_signing for comprehensive usage.

Resume from Checkpoint

// Load and inspect checkpoint
cp, _ := checkpointer.Load(ctx, "workflow-123")
fmt.Printf("Last checkpoint at superstep %d\n", cp.Superstep)

// Resume execution using the Resume method
for _, err := range compiled.Resume(ctx, "workflow-123") {
    if err != nil {
        log.Fatal(err)
    }
}

// Or resume with a specific checkpoint
for _, err := range compiled.Resume(ctx, "workflow-123",
    graph.WithCheckpoint(cp),
) {
    if err != nil {
        log.Fatal(err)
    }
}

Time-Travel Debugging

// Load checkpoint
checkpoint, _ := checkpointer.Load(ctx, "workflow-123")

// Resume from that checkpoint
for _, err := range compiled.Resume(ctx, "workflow-123",
    graph.WithCheckpoint(checkpoint),
) {
    // Handle results
}

Checkpoint Structure

Each checkpoint captures:

  • RunID: Unique workflow identifier
  • Superstep: BSP superstep number
  • Timestamp: When checkpoint was created
  • State: Full graph state including message history (via MessagesKey in state channels)
  • CompletedNodes: Nodes that finished execution (for smart resume)
  • PausedNodes: Nodes waiting for input (for human-in-the-loop workflows)
  • Metadata: Custom annotations

Note: Message history is stored in State (not as a separate field) via the MessagesKey channel, enabling consistent state management and restoration.

Package checkpoint provides sentinel and structured errors for the checkpoint package.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNilCheckpoint is returned when a checkpoint is nil.
	ErrNilCheckpoint = errors.New("checkpoint: checkpoint is nil")

	// ErrEmptyRunID is returned when a run ID is empty.
	ErrEmptyRunID = errors.New("checkpoint: runID is empty")

	// ErrCiphertextTooShort is returned when ciphertext is too short.
	ErrCiphertextTooShort = errors.New("checkpoint: ciphertext too short")

	// ErrEncryptorRequired is returned when an encryptor is required.
	ErrEncryptorRequired = errors.New("checkpoint: encryptor is required")

	// ErrMissingPayload is returned when encrypted checkpoint is missing payload.
	ErrMissingPayload = errors.New("checkpoint: encrypted checkpoint missing payload")

	// ErrInvalidKeySize is returned when the key size is invalid.
	ErrInvalidKeySize = errors.New("checkpoint: current key must be 32 bytes for AES-256")

	// ErrDatabaseRequired is returned when database connection is required.
	ErrDatabaseRequired = errors.New("checkpoint/sql: database connection is required")

	// ErrNotImplemented is returned when a feature is not implemented.
	ErrNotImplemented = errors.New("checkpoint: not yet implemented")

	// ErrKMSKeyIDRequired is returned when KMS key ID is required.
	ErrKMSKeyIDRequired = errors.New("checkpoint/awskms: key ID is required")

	// ErrKMSClientRequired is returned when KMS client is required.
	ErrKMSClientRequired = errors.New("checkpoint/awskms: client is required")

	// ErrVaultClientRequired is returned when Vault client is required.
	ErrVaultClientRequired = errors.New("checkpoint/vault: client is required")

	// ErrVaultKeyNameRequired is returned when Vault key name is required.
	ErrVaultKeyNameRequired = errors.New("checkpoint/vault: key name is required")

	// ErrVaultMissingCiphertext is returned when Vault response is missing ciphertext.
	ErrVaultMissingCiphertext = errors.New("checkpoint/vault: encrypted checkpoint missing ciphertext")

	// ErrVaultMissingPlaintext is returned when Vault response is missing plaintext.
	ErrVaultMissingPlaintext = errors.New("checkpoint/vault: decryption response missing plaintext")
)
View Source
var (
	// ErrInvalidSignature indicates the checkpoint signature verification failed.
	// This could mean the checkpoint was tampered with or signed with a different key.
	ErrInvalidSignature = errors.New("checkpoint: invalid signature")

	// ErrSigningKeyRequired indicates signing was expected but no key was provided.
	ErrSigningKeyRequired = errors.New("checkpoint: signing key required but not configured")

	// ErrEmptySigningKey indicates an empty signing key was provided.
	ErrEmptySigningKey = errors.New("checkpoint: signing key cannot be empty")
)

Functions

func DeriveKeyFromPassword

func DeriveKeyFromPassword(password string, salt []byte) []byte

DeriveKeyFromPassword derives a 32-byte key from a password Use this to generate keys from user-provided passwords

func SignCheckpoint

func SignCheckpoint(cp *Checkpoint, key []byte) ([]byte, error)

SignCheckpoint generates an HMAC-SHA256 signature for a checkpoint using the provided key. The signature covers all critical checkpoint fields to detect any tampering.

Signed fields (in order):

  • RunID
  • Superstep
  • Version
  • Timestamp (Unix nanoseconds)
  • State (keys and values, sorted by key)
  • CompletedNodes (sorted)
  • PausedNodes (sorted)

The Signature field itself is excluded from signing to avoid circular dependency. Messages are intentionally excluded as they can be very large and are typically rebuilt from state during recovery.

func VerifyCheckpoint

func VerifyCheckpoint(cp *Checkpoint, key []byte) error

VerifyCheckpoint verifies a checkpoint's signature using the provided key. Returns nil if the signature is valid, ErrInvalidSignature if invalid, or another error if verification cannot be performed.

Types

type AES256GCMEncryptor

type AES256GCMEncryptor struct {
	// contains filtered or unexported fields
}

AES256GCMEncryptor implements AES-256-GCM encryption

func NewAES256GCMEncryptor

func NewAES256GCMEncryptor(key []byte) (*AES256GCMEncryptor, error)

NewAES256GCMEncryptor creates an AES-256-GCM encryptor

func (*AES256GCMEncryptor) Algorithm

func (e *AES256GCMEncryptor) Algorithm() string

Algorithm returns the encryption algorithm identifier.

func (*AES256GCMEncryptor) Decrypt

func (e *AES256GCMEncryptor) Decrypt(ciphertext []byte) ([]byte, error)

Decrypt decrypts ciphertext using AES-256-GCM. Expects the nonce to be prepended to the ciphertext.

func (*AES256GCMEncryptor) Encrypt

func (e *AES256GCMEncryptor) Encrypt(plaintext []byte) ([]byte, error)

Encrypt encrypts plaintext using AES-256-GCM. Returns the ciphertext with prepended nonce.

type ApprovalMetadata

type ApprovalMetadata struct {
	// PendingApprovals maps node name to approval request details.
	// Nodes in this map are waiting for human approval before continuing.
	PendingApprovals map[string]*PendingApproval `json:"pendingApprovals,omitempty"`

	// ApprovalHistory is a chronological list of all approval decisions for this run.
	// Used for audit trails and compliance reporting.
	ApprovalHistory []ApprovalRecord `json:"approvalHistory,omitempty"`

	// GuardReasons maps node name to the reason approval was required.
	// Stored separately for quick access without parsing approval details.
	GuardReasons map[string]string `json:"guardReasons,omitempty"`
}

ApprovalMetadata captures approval workflow information in a checkpoint. This enables approval queues, audit trails, and timeout enforcement.

type ApprovalRecord

type ApprovalRecord struct {
	// NodeName is the node that was approved/rejected
	NodeName string `json:"nodeName"`

	// Decision is the approval outcome (APPROVED, REJECTED, EDIT, SKIP)
	Decision string `json:"decision"`

	// Reason is the human's explanation for their decision
	Reason string `json:"reason"`

	// User identifies who made the approval decision
	User string `json:"user"`

	// Timestamp when the decision was made
	Timestamp time.Time `json:"timestamp"`

	// StateEdits contains state modifications if decision was EDIT
	StateEdits map[string]any `json:"stateEdits,omitempty"`

	// Annotations contains custom metadata about the approval
	Annotations map[string]any `json:"annotations,omitempty"`
}

ApprovalRecord is an immutable record of an approval decision. These records form the audit trail for compliance and debugging.

type Checkpoint

type Checkpoint struct {
	// RunID uniquely identifies the execution run
	RunID string `json:"runID"`

	// Superstep is the BSP superstep number when this checkpoint was created
	Superstep int64 `json:"superstep"`

	// Version is a monotonically increasing counter for checkpoint validation.
	// Each state mutation increments the version, enabling detection of checkpoint corruption,
	// concurrent modifications, or incorrect restore sequences.
	Version uint64 `json:"version"`

	// Timestamp when the checkpoint was created
	Timestamp time.Time `json:"timestamp"`

	// Signature is an HMAC-SHA256 signature of the checkpoint data for integrity verification.
	// When signing is enabled, this field is populated during Save() and verified during Load().
	// An empty signature indicates the checkpoint was saved without signing enabled.
	Signature []byte `json:"signature,omitempty"`

	// State contains all channel values including message history (via MessagesKey),
	// conversation state, and any custom state registered with the state manager.
	// Message history is stored in state, not as a separate Messages field.
	State map[string]any `json:"state"`

	// CompletedNodes tracks which nodes have finished execution.
	// Needed for smart resume: skip re-executing completed nodes.
	// On resume, the BSP executor calculates which nodes to execute next based on
	// CompletedNodes and graph topology (immediate successors of completed nodes).
	CompletedNodes []string `json:"completedNodes"`

	// PausedNodes tracks which nodes are paused (e.g., waiting for human input).
	// Critical for human-in-the-loop workflows: resume from the exact pause point.
	PausedNodes []string `json:"pausedNodes,omitempty"`

	// PendingWrites are state updates produced by nodes but not yet applied.
	// Used for two-phase commit: checkpoint after node execution but before
	// state application. Enables fine-grained interrupts and human review.
	// When resuming, these writes are applied first before continuing execution.
	PendingWrites []PendingWrite `json:"pendingWrites,omitempty"`

	// Committed indicates whether PendingWrites have been applied to state.
	// Two-phase commit protocol:
	//   1. Save checkpoint with PendingWrites (Committed=false)
	//   2. Apply PendingWrites to state
	//   3. Update checkpoint (Committed=true)
	// On resume: only apply PendingWrites if Committed=false to prevent double-application.
	Committed bool `json:"committed"`

	// Metadata for custom checkpoint annotations
	Metadata map[string]any `json:"metadata,omitempty"`

	// ApprovalMetadata tracks approval workflow state for human-in-the-loop workflows.
	// This field enables:
	//   - Tracking which nodes are awaiting approval
	//   - Recording approval decision history for audit/compliance
	//   - Querying checkpoints by approval status
	//   - Implementing approval timeouts and SLAs
	ApprovalMetadata *ApprovalMetadata `json:"approvalMetadata,omitempty"`

	// ManagedValues captures the managed value descriptors that were registered
	// when the checkpoint was taken. These descriptors allow the runtime to
	// verify that all required managed values are reattached and rehydrated before
	// resuming execution.
	ManagedValues []ManagedValueDescriptor `json:"managedValues,omitempty"`
}

Checkpoint represents a snapshot of graph execution state at a specific point in time. It captures all information needed to resume execution from that point.

type Checkpointer

type Checkpointer interface {
	// Save persists a checkpoint for the given run ID.
	// Must be safe to call concurrently with other Checkpointer methods.
	// Returns error if save fails.
	Save(ctx context.Context, checkpoint *Checkpoint) error

	// Load retrieves the most recent checkpoint for the given run ID.
	// Must be safe to call concurrently with other Checkpointer methods.
	// Returns nil checkpoint if no checkpoint exists (first run).
	// Returns error if load fails.
	Load(ctx context.Context, runID string) (*Checkpoint, error)

	// List returns all checkpoints for a run ID, ordered by superstep (newest first).
	// Returns empty slice if no checkpoints exist.
	// Returns error if listing fails.
	List(ctx context.Context, runID string) ([]*Checkpoint, error)

	// Delete removes all checkpoints for a run ID.
	// Returns error if deletion fails or no checkpoints found.
	Delete(ctx context.Context, runID string) error

	// LoadAtSuperstep retrieves a checkpoint at a specific superstep.
	// Useful for time-travel debugging.
	// Returns nil if no checkpoint exists at that superstep.
	// Returns error if load fails.
	LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)

	// ListPendingApprovals returns all checkpoints with pending approvals.
	// Ordered by oldest approval request first (FIFO queue).
	// Used by approval dashboards to show pending work.
	// Returns empty slice if no checkpoints have pending approvals.
	ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)

	// GetApprovalHistory returns the full approval audit trail for a run.
	// Ordered chronologically (oldest first).
	// Used for compliance reporting and debugging.
	// Returns empty slice if no approvals have been recorded.
	GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
}

Checkpointer defines the interface for checkpoint persistence. Implementations can use any storage backend (in-memory, SQLite, PostgreSQL, Redis, etc.)

Thread-Safety Requirements:

All methods MUST be safe for concurrent access from multiple goroutines. This is critical because:

  • Graph execution may checkpoint from multiple parallel nodes
  • Background checkpoint savers may run concurrently with execution
  • Human-in-the-loop workflows may query checkpoints while execution continues

Implementations MUST ensure:

  • Save() can be called concurrently with Load(), List(), and other Save() calls
  • Load() operations are safe during concurrent Save() operations
  • All methods properly synchronize access to shared state (use mutexes, atomic operations, or rely on database transaction isolation)

Performance Considerations:

  • Prefer RWMutex over Mutex when possible (allow concurrent reads)
  • Database implementations inherit thread-safety from database/sql package (sql.DB is thread-safe)
  • Deep copy mutable data returned to callers to prevent data races

func ApplyOptions

func ApplyOptions(opts []Option) (Checkpointer, int, bool)

ApplyOptions applies checkpoint options to RunOptions (used by graph package)

type EncryptedCheckpointer

type EncryptedCheckpointer struct {
	// contains filtered or unexported fields
}

EncryptedCheckpointer wraps a base checkpointer with encryption

func NewEncryptedCheckpointer

func NewEncryptedCheckpointer(base Checkpointer, encryptor Encryptor) (*EncryptedCheckpointer, error)

NewEncryptedCheckpointer creates an encrypted checkpointer with the given encryptor

func (*EncryptedCheckpointer) Delete

func (ec *EncryptedCheckpointer) Delete(ctx context.Context, runID string) error

Delete removes all checkpoints for the given runID by delegating to the base checkpointer.

func (*EncryptedCheckpointer) GetApprovalHistory

func (ec *EncryptedCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)

GetApprovalHistory returns the approval history for a specific run by delegating to the base checkpointer.

func (*EncryptedCheckpointer) List

func (ec *EncryptedCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)

List returns all checkpoints for the given runID by delegating to the base checkpointer. Note: The returned list contains metadata only; actual checkpoint data requires Load().

func (*EncryptedCheckpointer) ListPendingApprovals

func (ec *EncryptedCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)

ListPendingApprovals returns all checkpoints with pending approvals by delegating to the base checkpointer.

func (*EncryptedCheckpointer) Load

func (ec *EncryptedCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)

Load retrieves and decrypts a checkpoint. It loads the encrypted checkpoint, verifies the algorithm matches, and decrypts the data using the configured encryptor.

func (*EncryptedCheckpointer) LoadAtSuperstep

func (ec *EncryptedCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)

LoadAtSuperstep retrieves and decrypts a checkpoint at a specific superstep. It loads the encrypted checkpoint, verifies the algorithm matches, and decrypts the data using the configured encryptor.

func (*EncryptedCheckpointer) Save

func (ec *EncryptedCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save encrypts and saves a checkpoint. The checkpoint data is serialized, encrypted using the configured encryptor, and stored with the encryption algorithm metadata.

type Encryptor

type Encryptor interface {
	// Encrypt encrypts plaintext and returns ciphertext
	Encrypt(plaintext []byte) ([]byte, error)

	// Decrypt decrypts ciphertext and returns plaintext
	Decrypt(ciphertext []byte) ([]byte, error)

	// Algorithm returns the algorithm identifier
	Algorithm() string
}

Encryptor defines the interface for encryption/decryption operations

type InMemoryCheckpointer

type InMemoryCheckpointer struct {
	// contains filtered or unexported fields
}

InMemoryCheckpointer implements Checkpointer using an in-memory store. This is useful for testing and development. Data is lost when the process exits.

Thread-safe for concurrent access.

func NewInMemoryCheckpointer

func NewInMemoryCheckpointer(opts ...InMemoryCheckpointerOption) *InMemoryCheckpointer

NewInMemoryCheckpointer creates a new in-memory checkpointer.

Example:

checkpointer := checkpoint.NewInMemoryCheckpointer()
result, _ := graph.Last(compiled.Run(ctx, messages,
    graph.WithCheckpointer(checkpointer),
    graph.WithRunID("test-run"),
))

With signing enabled:

signingKey := []byte("your-secure-signing-key")
checkpointer := checkpoint.NewInMemoryCheckpointer(checkpoint.WithSigning(signingKey))

func (*InMemoryCheckpointer) Clear

func (m *InMemoryCheckpointer) Clear()

Clear removes all checkpoints from memory (useful for testing)

func (*InMemoryCheckpointer) Delete

func (m *InMemoryCheckpointer) Delete(ctx context.Context, runID string) error

Delete removes all checkpoints for a run ID

func (*InMemoryCheckpointer) GetApprovalHistory

func (m *InMemoryCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)

GetApprovalHistory returns the approval history for a specific run.

func (*InMemoryCheckpointer) List

func (m *InMemoryCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)

List returns all checkpoints for a run ID, ordered by superstep descending

func (*InMemoryCheckpointer) ListPendingApprovals

func (m *InMemoryCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)

ListPendingApprovals returns all checkpoints with pending approvals.

func (*InMemoryCheckpointer) Load

func (m *InMemoryCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)

Load retrieves the most recent checkpoint for a run ID

func (*InMemoryCheckpointer) LoadAtSuperstep

func (m *InMemoryCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)

LoadAtSuperstep retrieves a checkpoint at a specific superstep

func (*InMemoryCheckpointer) Save

func (m *InMemoryCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save persists a checkpoint to memory

func (*InMemoryCheckpointer) Stats

func (m *InMemoryCheckpointer) Stats() map[string]int

Stats returns statistics about stored checkpoints

type InMemoryCheckpointerOption

type InMemoryCheckpointerOption func(*InMemoryCheckpointer)

InMemoryCheckpointerOption is a functional option for configuring InMemoryCheckpointer.

func WithSigning

func WithSigning(key []byte) InMemoryCheckpointerOption

WithSigning configures the checkpointer to sign checkpoints on save and verify signatures on load. The signing key should be a secure random value (at least 32 bytes recommended).

Example:

signingKey := []byte("your-secure-signing-key-at-least-32-bytes-long")
checkpointer := checkpoint.NewInMemoryCheckpointer(checkpoint.WithSigning(signingKey))

type ManagedValueDescriptor

type ManagedValueDescriptor struct {
	// Name is the unique identifier for the managed value.
	Name string `json:"name"`

	// Required indicates whether the managed value must be provided when resuming
	// from the checkpoint. Optional managed values can be omitted during resume.
	Required bool `json:"required,omitempty"`
}

ManagedValueDescriptor captures metadata about managed values that must be rehydrated before a checkpoint can be resumed. These descriptors allow the graph runtime to verify that required managed values are provided again during restore and to detect mismatched factories/configurations.

type MultiKeyCheckpointer

type MultiKeyCheckpointer struct {
	// contains filtered or unexported fields
}

MultiKeyCheckpointer supports key rotation by trying multiple keys

func NewMultiKeyCheckpointer

func NewMultiKeyCheckpointer(base Checkpointer, currentKey []byte, oldKeys ...[]byte) (*MultiKeyCheckpointer, error)

NewMultiKeyCheckpointer creates a checkpointer that supports key rotation

func (*MultiKeyCheckpointer) Delete

func (mkc *MultiKeyCheckpointer) Delete(ctx context.Context, runID string) error

Delete removes all checkpoints for the given runID by delegating to the base checkpointer.

func (*MultiKeyCheckpointer) GetApprovalHistory

func (mkc *MultiKeyCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)

GetApprovalHistory returns the approval history for a specific run by delegating to the base checkpointer.

func (*MultiKeyCheckpointer) List

func (mkc *MultiKeyCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)

List returns all checkpoints for the given runID by delegating to the base checkpointer. Note: The returned list contains metadata only; actual checkpoint data requires Load().

func (*MultiKeyCheckpointer) ListPendingApprovals

func (mkc *MultiKeyCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)

ListPendingApprovals returns all checkpoints with pending approvals by delegating to the base checkpointer.

func (*MultiKeyCheckpointer) Load

func (mkc *MultiKeyCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)

Load attempts to decrypt a checkpoint by trying each key in order. It first tries the current key, then falls back to old keys. Returns the first successfully decrypted checkpoint or an error if all keys fail.

func (*MultiKeyCheckpointer) LoadAtSuperstep

func (mkc *MultiKeyCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)

LoadAtSuperstep attempts to decrypt a checkpoint at a specific superstep by trying each key in order. Returns the first successfully decrypted checkpoint or an error if all keys fail.

func (*MultiKeyCheckpointer) Save

func (mkc *MultiKeyCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save encrypts and saves a checkpoint using the primary (current) key. The checkpoint is always encrypted with the first key in the keys list.

type Option

type Option func(*options)

Option is a functional option for configuring checkpoint behavior

func WithAutoRestore

func WithAutoRestore(enabled bool) Option

WithAutoRestore automatically loads the last checkpoint on Invoke/Stream if it exists

func WithCheckpointer

func WithCheckpointer(checkpointer Checkpointer) Option

WithCheckpointer sets the storage backend for checkpoints

func WithSaveInterval

func WithSaveInterval(interval int) Option

WithSaveInterval controls checkpoint frequency:

0 = save after every superstep (default)
1 = save every superstep
N = save every N supersteps

type PendingApproval

type PendingApproval struct {
	// NodeName is the node requiring approval
	NodeName string `json:"nodeName"`

	// Reason explains why approval is needed (from ApprovalGuard)
	Reason string `json:"reason"`

	// RequestedAt is when the approval was first requested
	RequestedAt time.Time `json:"requestedAt"`

	// TimeoutAt is when the approval request expires (nil if no timeout)
	TimeoutAt *time.Time `json:"timeoutAt,omitempty"`

	// RequiredState is a snapshot of relevant state for review.
	// This allows approval UIs to show context without loading the full checkpoint.
	RequiredState map[string]any `json:"requiredState,omitempty"`
}

PendingApproval represents an active approval request for a node. This information is used by approval dashboards and timeout enforcement.

type PendingWrite

type PendingWrite struct {
	// NodeName is the node that produced this write
	NodeName string `json:"nodeName"`

	// Channel is the state channel being updated
	Channel string `json:"channel"`

	// Value is the update value to be applied
	Value any `json:"value"`

	// Timestamp when this write was created
	Timestamp time.Time `json:"timestamp"`
}

PendingWrite represents a state update that has been produced by a node but not yet applied to the graph state. This enables two-phase commit semantics for checkpointing: save pending writes before applying them, allowing for fine-grained interrupts and human review before state changes take effect.

Use cases:

  • Interrupt after node execution, before state application
  • Human review of pending changes before committing
  • Transactional semantics (all-or-nothing updates)
  • Audit trail of what was written vs what was applied

type RunNotFoundError

type RunNotFoundError struct {
	// RunID is the run ID that was not found.
	RunID string
}

RunNotFoundError represents an error when a run ID is not found. Use errors.As to extract the RunID field for programmatic handling.

func (*RunNotFoundError) Error

func (e *RunNotFoundError) Error() string

Error implements the error interface.

func (*RunNotFoundError) Is

func (e *RunNotFoundError) Is(target error) bool

Is enables comparison with sentinel errors.

Directories

Path Synopsis
Package awskms provides AWS KMS encryption support for checkpoints.
Package awskms provides AWS KMS encryption support for checkpoints.
Package dynamodb provides DynamoDB-based checkpoint persistence using AWS SDK for Go v2.
Package dynamodb provides DynamoDB-based checkpoint persistence using AWS SDK for Go v2.
Package sql provides SQL-based checkpoint persistence using database/sql.
Package sql provides SQL-based checkpoint persistence using database/sql.
Package vault provides HashiCorp Vault transit encryption support for checkpoints.
Package vault provides HashiCorp Vault transit encryption support for checkpoints.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL