Documentation
¶
Overview ¶
Package checkpoint provides interfaces and implementations for persisting graph execution state.
Checkpointing enables fault-tolerant workflows by saving execution progress at regular intervals. If a workflow fails or is interrupted, it can be resumed from the last checkpoint without starting over.
Overview ¶
The package defines a Checkpointer interface that can be implemented with various storage backends:
- MemoryCheckpointer: In-memory storage (testing/development)
- SQLCheckpointer: SQL database - SQLite, PostgreSQL, MySQL (production)
- DynamoDBCheckpointer: AWS DynamoDB (serverless/cloud production)
See subpackages for specific implementations:
- pkg/checkpoint/sql: SQL-based checkpointers (SQLite, PostgreSQL, MySQL)
- pkg/checkpoint/dynamodb: AWS DynamoDB checkpointer
Basic Usage ¶
// In-memory (testing/development)
checkpointer := checkpoint.NewInMemoryCheckpointer()
// SQL (production - see pkg/checkpoint/sql for details)
checkpointer := sql.NewSQLiteCheckpointer("checkpoints.db")
// or
checkpointer := sql.NewPostgreSQLCheckpointer(connString)
// DynamoDB (AWS production - see pkg/checkpoint/dynamodb for details)
checkpointer := dynamodb.NewCheckpointer(dynamoClient, "checkpoints-table")
// Configure graph with checkpointer
g := graph.New(keys...)
g.WithCheckpointer(checkpointer, "workflow-123")
compiled, _ := g.Build()
// Run with checkpointing
for _, err := range compiled.Run(ctx, nil,
graph.WithCheckpointInterval(1), // Save every superstep
) {
if err != nil {
log.Fatal(err)
}
}
Checkpoint Signing (Security) ¶
Enable HMAC-SHA256 signatures to detect tampering:
// Generate secure signing key (32+ bytes recommended)
signingKey := make([]byte, 32)
rand.Read(signingKey)
// Create checkpointer with signing enabled
checkpointer := checkpoint.NewInMemoryCheckpointer(
checkpoint.WithSigning(signingKey),
)
// Checkpoints are automatically signed on Save()
// and verified on Load() - fails if tampered
checkpoint, err := checkpointer.Load(ctx, "workflow-123")
if errors.Is(err, checkpoint.ErrInvalidSignature) {
// Tampering detected!
}
See examples/checkpoint_signing for comprehensive usage.
Resume from Checkpoint ¶
// Load and inspect checkpoint
cp, _ := checkpointer.Load(ctx, "workflow-123")
fmt.Printf("Last checkpoint at superstep %d\n", cp.Superstep)
// Resume execution using the Resume method
for _, err := range compiled.Resume(ctx, "workflow-123") {
if err != nil {
log.Fatal(err)
}
}
// Or resume with a specific checkpoint
for _, err := range compiled.Resume(ctx, "workflow-123",
graph.WithCheckpoint(cp),
) {
if err != nil {
log.Fatal(err)
}
}
Time-Travel Debugging ¶
// Load checkpoint
checkpoint, _ := checkpointer.Load(ctx, "workflow-123")
// Resume from that checkpoint
for _, err := range compiled.Resume(ctx, "workflow-123",
graph.WithCheckpoint(checkpoint),
) {
// Handle results
}
Checkpoint Structure ¶
Each checkpoint captures:
- RunID: Unique workflow identifier
- Superstep: BSP superstep number
- Timestamp: When checkpoint was created
- State: Full graph state including message history (via MessagesKey in state channels)
- CompletedNodes: Nodes that finished execution (for smart resume)
- PausedNodes: Nodes waiting for input (for human-in-the-loop workflows)
- Metadata: Custom annotations
Note: Message history is stored in State (not as a separate field) via the MessagesKey channel, enabling consistent state management and restoration.
Package checkpoint provides sentinel and structured errors for the checkpoint package.
Index ¶
- Variables
- func DeriveKeyFromPassword(password string, salt []byte) []byte
- func SignCheckpoint(cp *Checkpoint, key []byte) ([]byte, error)
- func VerifyCheckpoint(cp *Checkpoint, key []byte) error
- type AES256GCMEncryptor
- type ApprovalMetadata
- type ApprovalRecord
- type Checkpoint
- type Checkpointer
- type EncryptedCheckpointer
- func (ec *EncryptedCheckpointer) Delete(ctx context.Context, runID string) error
- func (ec *EncryptedCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
- func (ec *EncryptedCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)
- func (ec *EncryptedCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
- func (ec *EncryptedCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)
- func (ec *EncryptedCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
- func (ec *EncryptedCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- type Encryptor
- type InMemoryCheckpointer
- func (m *InMemoryCheckpointer) Clear()
- func (m *InMemoryCheckpointer) Delete(ctx context.Context, runID string) error
- func (m *InMemoryCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
- func (m *InMemoryCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)
- func (m *InMemoryCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
- func (m *InMemoryCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)
- func (m *InMemoryCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
- func (m *InMemoryCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- func (m *InMemoryCheckpointer) Stats() map[string]int
- type InMemoryCheckpointerOption
- type ManagedValueDescriptor
- type MultiKeyCheckpointer
- func (mkc *MultiKeyCheckpointer) Delete(ctx context.Context, runID string) error
- func (mkc *MultiKeyCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
- func (mkc *MultiKeyCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)
- func (mkc *MultiKeyCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
- func (mkc *MultiKeyCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)
- func (mkc *MultiKeyCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
- func (mkc *MultiKeyCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- type Option
- type PendingApproval
- type PendingWrite
- type RunNotFoundError
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNilCheckpoint is returned when a checkpoint is nil. ErrNilCheckpoint = errors.New("checkpoint: checkpoint is nil") // ErrEmptyRunID is returned when a run ID is empty. ErrEmptyRunID = errors.New("checkpoint: runID is empty") // ErrCiphertextTooShort is returned when ciphertext is too short. ErrCiphertextTooShort = errors.New("checkpoint: ciphertext too short") // ErrEncryptorRequired is returned when an encryptor is required. ErrEncryptorRequired = errors.New("checkpoint: encryptor is required") // ErrMissingPayload is returned when encrypted checkpoint is missing payload. ErrMissingPayload = errors.New("checkpoint: encrypted checkpoint missing payload") // ErrInvalidKeySize is returned when the key size is invalid. ErrInvalidKeySize = errors.New("checkpoint: current key must be 32 bytes for AES-256") // ErrDatabaseRequired is returned when database connection is required. ErrDatabaseRequired = errors.New("checkpoint/sql: database connection is required") // ErrNotImplemented is returned when a feature is not implemented. ErrNotImplemented = errors.New("checkpoint: not yet implemented") // ErrKMSKeyIDRequired is returned when KMS key ID is required. ErrKMSKeyIDRequired = errors.New("checkpoint/awskms: key ID is required") // ErrKMSClientRequired is returned when KMS client is required. ErrKMSClientRequired = errors.New("checkpoint/awskms: client is required") // ErrVaultClientRequired is returned when Vault client is required. ErrVaultClientRequired = errors.New("checkpoint/vault: client is required") // ErrVaultKeyNameRequired is returned when Vault key name is required. ErrVaultKeyNameRequired = errors.New("checkpoint/vault: key name is required") // ErrVaultMissingCiphertext is returned when Vault response is missing ciphertext. ErrVaultMissingCiphertext = errors.New("checkpoint/vault: encrypted checkpoint missing ciphertext") // ErrVaultMissingPlaintext is returned when Vault response is missing plaintext. ErrVaultMissingPlaintext = errors.New("checkpoint/vault: decryption response missing plaintext") )
var ( // ErrInvalidSignature indicates the checkpoint signature verification failed. // This could mean the checkpoint was tampered with or signed with a different key. ErrInvalidSignature = errors.New("checkpoint: invalid signature") // ErrSigningKeyRequired indicates signing was expected but no key was provided. ErrSigningKeyRequired = errors.New("checkpoint: signing key required but not configured") // ErrEmptySigningKey indicates an empty signing key was provided. ErrEmptySigningKey = errors.New("checkpoint: signing key cannot be empty") )
Functions ¶
func DeriveKeyFromPassword ¶
DeriveKeyFromPassword derives a 32-byte key from a password Use this to generate keys from user-provided passwords
func SignCheckpoint ¶
func SignCheckpoint(cp *Checkpoint, key []byte) ([]byte, error)
SignCheckpoint generates an HMAC-SHA256 signature for a checkpoint using the provided key. The signature covers all critical checkpoint fields to detect any tampering.
Signed fields (in order):
- RunID
- Superstep
- Version
- Timestamp (Unix nanoseconds)
- State (keys and values, sorted by key)
- CompletedNodes (sorted)
- PausedNodes (sorted)
The Signature field itself is excluded from signing to avoid circular dependency. Messages are intentionally excluded as they can be very large and are typically rebuilt from state during recovery.
func VerifyCheckpoint ¶
func VerifyCheckpoint(cp *Checkpoint, key []byte) error
VerifyCheckpoint verifies a checkpoint's signature using the provided key. Returns nil if the signature is valid, ErrInvalidSignature if invalid, or another error if verification cannot be performed.
Types ¶
type AES256GCMEncryptor ¶
type AES256GCMEncryptor struct {
// contains filtered or unexported fields
}
AES256GCMEncryptor implements AES-256-GCM encryption
func NewAES256GCMEncryptor ¶
func NewAES256GCMEncryptor(key []byte) (*AES256GCMEncryptor, error)
NewAES256GCMEncryptor creates an AES-256-GCM encryptor
func (*AES256GCMEncryptor) Algorithm ¶
func (e *AES256GCMEncryptor) Algorithm() string
Algorithm returns the encryption algorithm identifier.
type ApprovalMetadata ¶
type ApprovalMetadata struct {
// PendingApprovals maps node name to approval request details.
// Nodes in this map are waiting for human approval before continuing.
PendingApprovals map[string]*PendingApproval `json:"pendingApprovals,omitempty"`
// ApprovalHistory is a chronological list of all approval decisions for this run.
// Used for audit trails and compliance reporting.
ApprovalHistory []ApprovalRecord `json:"approvalHistory,omitempty"`
// GuardReasons maps node name to the reason approval was required.
// Stored separately for quick access without parsing approval details.
GuardReasons map[string]string `json:"guardReasons,omitempty"`
}
ApprovalMetadata captures approval workflow information in a checkpoint. This enables approval queues, audit trails, and timeout enforcement.
type ApprovalRecord ¶
type ApprovalRecord struct {
// NodeName is the node that was approved/rejected
NodeName string `json:"nodeName"`
// Decision is the approval outcome (APPROVED, REJECTED, EDIT, SKIP)
Decision string `json:"decision"`
// Reason is the human's explanation for their decision
Reason string `json:"reason"`
// User identifies who made the approval decision
User string `json:"user"`
// Timestamp when the decision was made
Timestamp time.Time `json:"timestamp"`
// StateEdits contains state modifications if decision was EDIT
StateEdits map[string]any `json:"stateEdits,omitempty"`
// Annotations contains custom metadata about the approval
Annotations map[string]any `json:"annotations,omitempty"`
}
ApprovalRecord is an immutable record of an approval decision. These records form the audit trail for compliance and debugging.
type Checkpoint ¶
type Checkpoint struct {
// RunID uniquely identifies the execution run
RunID string `json:"runID"`
// Superstep is the BSP superstep number when this checkpoint was created
Superstep int64 `json:"superstep"`
// Version is a monotonically increasing counter for checkpoint validation.
// Each state mutation increments the version, enabling detection of checkpoint corruption,
// concurrent modifications, or incorrect restore sequences.
Version uint64 `json:"version"`
// Timestamp when the checkpoint was created
Timestamp time.Time `json:"timestamp"`
// Signature is an HMAC-SHA256 signature of the checkpoint data for integrity verification.
// When signing is enabled, this field is populated during Save() and verified during Load().
// An empty signature indicates the checkpoint was saved without signing enabled.
Signature []byte `json:"signature,omitempty"`
// State contains all channel values including message history (via MessagesKey),
// conversation state, and any custom state registered with the state manager.
// Message history is stored in state, not as a separate Messages field.
State map[string]any `json:"state"`
// CompletedNodes tracks which nodes have finished execution.
// Needed for smart resume: skip re-executing completed nodes.
// On resume, the BSP executor calculates which nodes to execute next based on
// CompletedNodes and graph topology (immediate successors of completed nodes).
CompletedNodes []string `json:"completedNodes"`
// PausedNodes tracks which nodes are paused (e.g., waiting for human input).
// Critical for human-in-the-loop workflows: resume from the exact pause point.
PausedNodes []string `json:"pausedNodes,omitempty"`
// PendingWrites are state updates produced by nodes but not yet applied.
// Used for two-phase commit: checkpoint after node execution but before
// state application. Enables fine-grained interrupts and human review.
// When resuming, these writes are applied first before continuing execution.
PendingWrites []PendingWrite `json:"pendingWrites,omitempty"`
// Committed indicates whether PendingWrites have been applied to state.
// Two-phase commit protocol:
// 1. Save checkpoint with PendingWrites (Committed=false)
// 2. Apply PendingWrites to state
// 3. Update checkpoint (Committed=true)
// On resume: only apply PendingWrites if Committed=false to prevent double-application.
Committed bool `json:"committed"`
// Metadata for custom checkpoint annotations
Metadata map[string]any `json:"metadata,omitempty"`
// ApprovalMetadata tracks approval workflow state for human-in-the-loop workflows.
// This field enables:
// - Tracking which nodes are awaiting approval
// - Recording approval decision history for audit/compliance
// - Querying checkpoints by approval status
// - Implementing approval timeouts and SLAs
ApprovalMetadata *ApprovalMetadata `json:"approvalMetadata,omitempty"`
// ManagedValues captures the managed value descriptors that were registered
// when the checkpoint was taken. These descriptors allow the runtime to
// verify that all required managed values are reattached and rehydrated before
// resuming execution.
ManagedValues []ManagedValueDescriptor `json:"managedValues,omitempty"`
}
Checkpoint represents a snapshot of graph execution state at a specific point in time. It captures all information needed to resume execution from that point.
type Checkpointer ¶
type Checkpointer interface {
// Save persists a checkpoint for the given run ID.
// Must be safe to call concurrently with other Checkpointer methods.
// Returns error if save fails.
Save(ctx context.Context, checkpoint *Checkpoint) error
// Load retrieves the most recent checkpoint for the given run ID.
// Must be safe to call concurrently with other Checkpointer methods.
// Returns nil checkpoint if no checkpoint exists (first run).
// Returns error if load fails.
Load(ctx context.Context, runID string) (*Checkpoint, error)
// List returns all checkpoints for a run ID, ordered by superstep (newest first).
// Returns empty slice if no checkpoints exist.
// Returns error if listing fails.
List(ctx context.Context, runID string) ([]*Checkpoint, error)
// Delete removes all checkpoints for a run ID.
// Returns error if deletion fails or no checkpoints found.
Delete(ctx context.Context, runID string) error
// LoadAtSuperstep retrieves a checkpoint at a specific superstep.
// Useful for time-travel debugging.
// Returns nil if no checkpoint exists at that superstep.
// Returns error if load fails.
LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
// ListPendingApprovals returns all checkpoints with pending approvals.
// Ordered by oldest approval request first (FIFO queue).
// Used by approval dashboards to show pending work.
// Returns empty slice if no checkpoints have pending approvals.
ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
// GetApprovalHistory returns the full approval audit trail for a run.
// Ordered chronologically (oldest first).
// Used for compliance reporting and debugging.
// Returns empty slice if no approvals have been recorded.
GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
}
Checkpointer defines the interface for checkpoint persistence. Implementations can use any storage backend (in-memory, SQLite, PostgreSQL, Redis, etc.)
Thread-Safety Requirements:
All methods MUST be safe for concurrent access from multiple goroutines. This is critical because:
- Graph execution may checkpoint from multiple parallel nodes
- Background checkpoint savers may run concurrently with execution
- Human-in-the-loop workflows may query checkpoints while execution continues
Implementations MUST ensure:
- Save() can be called concurrently with Load(), List(), and other Save() calls
- Load() operations are safe during concurrent Save() operations
- All methods properly synchronize access to shared state (use mutexes, atomic operations, or rely on database transaction isolation)
Performance Considerations:
- Prefer RWMutex over Mutex when possible (allow concurrent reads)
- Database implementations inherit thread-safety from database/sql package (sql.DB is thread-safe)
- Deep copy mutable data returned to callers to prevent data races
func ApplyOptions ¶
func ApplyOptions(opts []Option) (Checkpointer, int, bool)
ApplyOptions applies checkpoint options to RunOptions (used by graph package)
type EncryptedCheckpointer ¶
type EncryptedCheckpointer struct {
// contains filtered or unexported fields
}
EncryptedCheckpointer wraps a base checkpointer with encryption
func NewEncryptedCheckpointer ¶
func NewEncryptedCheckpointer(base Checkpointer, encryptor Encryptor) (*EncryptedCheckpointer, error)
NewEncryptedCheckpointer creates an encrypted checkpointer with the given encryptor
func (*EncryptedCheckpointer) Delete ¶
func (ec *EncryptedCheckpointer) Delete(ctx context.Context, runID string) error
Delete removes all checkpoints for the given runID by delegating to the base checkpointer.
func (*EncryptedCheckpointer) GetApprovalHistory ¶
func (ec *EncryptedCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
GetApprovalHistory returns the approval history for a specific run by delegating to the base checkpointer.
func (*EncryptedCheckpointer) List ¶
func (ec *EncryptedCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)
List returns all checkpoints for the given runID by delegating to the base checkpointer. Note: The returned list contains metadata only; actual checkpoint data requires Load().
func (*EncryptedCheckpointer) ListPendingApprovals ¶
func (ec *EncryptedCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
ListPendingApprovals returns all checkpoints with pending approvals by delegating to the base checkpointer.
func (*EncryptedCheckpointer) Load ¶
func (ec *EncryptedCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)
Load retrieves and decrypts a checkpoint. It loads the encrypted checkpoint, verifies the algorithm matches, and decrypts the data using the configured encryptor.
func (*EncryptedCheckpointer) LoadAtSuperstep ¶
func (ec *EncryptedCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
LoadAtSuperstep retrieves and decrypts a checkpoint at a specific superstep. It loads the encrypted checkpoint, verifies the algorithm matches, and decrypts the data using the configured encryptor.
func (*EncryptedCheckpointer) Save ¶
func (ec *EncryptedCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save encrypts and saves a checkpoint. The checkpoint data is serialized, encrypted using the configured encryptor, and stored with the encryption algorithm metadata.
type Encryptor ¶
type Encryptor interface {
// Encrypt encrypts plaintext and returns ciphertext
Encrypt(plaintext []byte) ([]byte, error)
// Decrypt decrypts ciphertext and returns plaintext
Decrypt(ciphertext []byte) ([]byte, error)
// Algorithm returns the algorithm identifier
Algorithm() string
}
Encryptor defines the interface for encryption/decryption operations
type InMemoryCheckpointer ¶
type InMemoryCheckpointer struct {
// contains filtered or unexported fields
}
InMemoryCheckpointer implements Checkpointer using an in-memory store. This is useful for testing and development. Data is lost when the process exits.
Thread-safe for concurrent access.
func NewInMemoryCheckpointer ¶
func NewInMemoryCheckpointer(opts ...InMemoryCheckpointerOption) *InMemoryCheckpointer
NewInMemoryCheckpointer creates a new in-memory checkpointer.
Example:
checkpointer := checkpoint.NewInMemoryCheckpointer()
result, _ := graph.Last(compiled.Run(ctx, messages,
graph.WithCheckpointer(checkpointer),
graph.WithRunID("test-run"),
))
With signing enabled:
signingKey := []byte("your-secure-signing-key")
checkpointer := checkpoint.NewInMemoryCheckpointer(checkpoint.WithSigning(signingKey))
func (*InMemoryCheckpointer) Clear ¶
func (m *InMemoryCheckpointer) Clear()
Clear removes all checkpoints from memory (useful for testing)
func (*InMemoryCheckpointer) Delete ¶
func (m *InMemoryCheckpointer) Delete(ctx context.Context, runID string) error
Delete removes all checkpoints for a run ID
func (*InMemoryCheckpointer) GetApprovalHistory ¶
func (m *InMemoryCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
GetApprovalHistory returns the approval history for a specific run.
func (*InMemoryCheckpointer) List ¶
func (m *InMemoryCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)
List returns all checkpoints for a run ID, ordered by superstep descending
func (*InMemoryCheckpointer) ListPendingApprovals ¶
func (m *InMemoryCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
ListPendingApprovals returns all checkpoints with pending approvals.
func (*InMemoryCheckpointer) Load ¶
func (m *InMemoryCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)
Load retrieves the most recent checkpoint for a run ID
func (*InMemoryCheckpointer) LoadAtSuperstep ¶
func (m *InMemoryCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
LoadAtSuperstep retrieves a checkpoint at a specific superstep
func (*InMemoryCheckpointer) Save ¶
func (m *InMemoryCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save persists a checkpoint to memory
func (*InMemoryCheckpointer) Stats ¶
func (m *InMemoryCheckpointer) Stats() map[string]int
Stats returns statistics about stored checkpoints
type InMemoryCheckpointerOption ¶
type InMemoryCheckpointerOption func(*InMemoryCheckpointer)
InMemoryCheckpointerOption is a functional option for configuring InMemoryCheckpointer.
func WithSigning ¶
func WithSigning(key []byte) InMemoryCheckpointerOption
WithSigning configures the checkpointer to sign checkpoints on save and verify signatures on load. The signing key should be a secure random value (at least 32 bytes recommended).
Example:
signingKey := []byte("your-secure-signing-key-at-least-32-bytes-long")
checkpointer := checkpoint.NewInMemoryCheckpointer(checkpoint.WithSigning(signingKey))
type ManagedValueDescriptor ¶
type ManagedValueDescriptor struct {
// Name is the unique identifier for the managed value.
Name string `json:"name"`
// Required indicates whether the managed value must be provided when resuming
// from the checkpoint. Optional managed values can be omitted during resume.
Required bool `json:"required,omitempty"`
}
ManagedValueDescriptor captures metadata about managed values that must be rehydrated before a checkpoint can be resumed. These descriptors allow the graph runtime to verify that required managed values are provided again during restore and to detect mismatched factories/configurations.
type MultiKeyCheckpointer ¶
type MultiKeyCheckpointer struct {
// contains filtered or unexported fields
}
MultiKeyCheckpointer supports key rotation by trying multiple keys
func NewMultiKeyCheckpointer ¶
func NewMultiKeyCheckpointer(base Checkpointer, currentKey []byte, oldKeys ...[]byte) (*MultiKeyCheckpointer, error)
NewMultiKeyCheckpointer creates a checkpointer that supports key rotation
func (*MultiKeyCheckpointer) Delete ¶
func (mkc *MultiKeyCheckpointer) Delete(ctx context.Context, runID string) error
Delete removes all checkpoints for the given runID by delegating to the base checkpointer.
func (*MultiKeyCheckpointer) GetApprovalHistory ¶
func (mkc *MultiKeyCheckpointer) GetApprovalHistory(ctx context.Context, runID string) ([]ApprovalRecord, error)
GetApprovalHistory returns the approval history for a specific run by delegating to the base checkpointer.
func (*MultiKeyCheckpointer) List ¶
func (mkc *MultiKeyCheckpointer) List(ctx context.Context, runID string) ([]*Checkpoint, error)
List returns all checkpoints for the given runID by delegating to the base checkpointer. Note: The returned list contains metadata only; actual checkpoint data requires Load().
func (*MultiKeyCheckpointer) ListPendingApprovals ¶
func (mkc *MultiKeyCheckpointer) ListPendingApprovals(ctx context.Context) ([]*Checkpoint, error)
ListPendingApprovals returns all checkpoints with pending approvals by delegating to the base checkpointer.
func (*MultiKeyCheckpointer) Load ¶
func (mkc *MultiKeyCheckpointer) Load(ctx context.Context, runID string) (*Checkpoint, error)
Load attempts to decrypt a checkpoint by trying each key in order. It first tries the current key, then falls back to old keys. Returns the first successfully decrypted checkpoint or an error if all keys fail.
func (*MultiKeyCheckpointer) LoadAtSuperstep ¶
func (mkc *MultiKeyCheckpointer) LoadAtSuperstep(ctx context.Context, runID string, superstep int64) (*Checkpoint, error)
LoadAtSuperstep attempts to decrypt a checkpoint at a specific superstep by trying each key in order. Returns the first successfully decrypted checkpoint or an error if all keys fail.
func (*MultiKeyCheckpointer) Save ¶
func (mkc *MultiKeyCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save encrypts and saves a checkpoint using the primary (current) key. The checkpoint is always encrypted with the first key in the keys list.
type Option ¶
type Option func(*options)
Option is a functional option for configuring checkpoint behavior
func WithAutoRestore ¶
WithAutoRestore automatically loads the last checkpoint on Invoke/Stream if it exists
func WithCheckpointer ¶
func WithCheckpointer(checkpointer Checkpointer) Option
WithCheckpointer sets the storage backend for checkpoints
func WithSaveInterval ¶
WithSaveInterval controls checkpoint frequency:
0 = save after every superstep (default) 1 = save every superstep N = save every N supersteps
type PendingApproval ¶
type PendingApproval struct {
// NodeName is the node requiring approval
NodeName string `json:"nodeName"`
// Reason explains why approval is needed (from ApprovalGuard)
Reason string `json:"reason"`
// RequestedAt is when the approval was first requested
RequestedAt time.Time `json:"requestedAt"`
// TimeoutAt is when the approval request expires (nil if no timeout)
TimeoutAt *time.Time `json:"timeoutAt,omitempty"`
// RequiredState is a snapshot of relevant state for review.
// This allows approval UIs to show context without loading the full checkpoint.
RequiredState map[string]any `json:"requiredState,omitempty"`
}
PendingApproval represents an active approval request for a node. This information is used by approval dashboards and timeout enforcement.
type PendingWrite ¶
type PendingWrite struct {
// NodeName is the node that produced this write
NodeName string `json:"nodeName"`
// Channel is the state channel being updated
Channel string `json:"channel"`
// Value is the update value to be applied
Value any `json:"value"`
// Timestamp when this write was created
Timestamp time.Time `json:"timestamp"`
}
PendingWrite represents a state update that has been produced by a node but not yet applied to the graph state. This enables two-phase commit semantics for checkpointing: save pending writes before applying them, allowing for fine-grained interrupts and human review before state changes take effect.
Use cases:
- Interrupt after node execution, before state application
- Human review of pending changes before committing
- Transactional semantics (all-or-nothing updates)
- Audit trail of what was written vs what was applied
type RunNotFoundError ¶
type RunNotFoundError struct {
// RunID is the run ID that was not found.
RunID string
}
RunNotFoundError represents an error when a run ID is not found. Use errors.As to extract the RunID field for programmatic handling.
func (*RunNotFoundError) Error ¶
func (e *RunNotFoundError) Error() string
Error implements the error interface.
func (*RunNotFoundError) Is ¶
func (e *RunNotFoundError) Is(target error) bool
Is enables comparison with sentinel errors.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package awskms provides AWS KMS encryption support for checkpoints.
|
Package awskms provides AWS KMS encryption support for checkpoints. |
|
Package dynamodb provides DynamoDB-based checkpoint persistence using AWS SDK for Go v2.
|
Package dynamodb provides DynamoDB-based checkpoint persistence using AWS SDK for Go v2. |
|
Package sql provides SQL-based checkpoint persistence using database/sql.
|
Package sql provides SQL-based checkpoint persistence using database/sql. |
|
Package vault provides HashiCorp Vault transit encryption support for checkpoints.
|
Package vault provides HashiCorp Vault transit encryption support for checkpoints. |