checkpoint

package
v0.0.0-...-05004e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0, Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package checkpoint provides state persistence for streaming analysis.

Index

Constants

View Source
const (
	DefaultMaxAge  = 7 * 24 * time.Hour // 7 days.
	DefaultMaxSize = 1 << 30            // 1GB.
)

Default retention values.

View Source
const MetadataVersion = 2

MetadataVersion is the current checkpoint metadata format version. Bumped from 1 to 2 when aggregator spill state was added.

Variables

View Source
var (
	ErrRepoPathMismatch = errors.New("repo path mismatch")
	ErrAnalyzerMismatch = errors.New("analyzer mismatch")
	ErrVersionMismatch  = errors.New("checkpoint version mismatch")
)

Sentinel errors for checkpoint validation.

Functions

func DefaultDir

func DefaultDir() string

DefaultDir returns the default checkpoint directory (~/.codefang/checkpoints).

func LoadState

func LoadState(dir, basename string, codec Codec, state any) error

LoadState loads state from a file in the specified directory. The filename is constructed from the basename and the codec's extension. The state parameter must be a pointer to the target struct.

func RepoHash

func RepoHash(repoPath string) string

RepoHash computes a short hash of the repository path for use as directory name.

func SaveState

func SaveState(dir, basename string, codec Codec, state any) error

SaveState saves the given state to a file in the specified directory. The filename is constructed from the basename and the codec's extension.

Types

type AggregatorSpillEntry

type AggregatorSpillEntry struct {
	// Dir is the directory containing gob-encoded spill files.
	Dir string `json:"dir,omitempty"`

	// Count is the number of spill files in Dir.
	Count int `json:"count,omitempty"`
}

AggregatorSpillEntry records on-disk spill state for a single aggregator.

type Checkpointable

type Checkpointable interface {
	// SaveCheckpoint writes analyzer state to the given directory.
	SaveCheckpoint(dir string) error

	// LoadCheckpoint restores analyzer state from the given directory.
	LoadCheckpoint(dir string) error

	// CheckpointSize returns the estimated size of the checkpoint in bytes.
	CheckpointSize() int64
}

Checkpointable is an optional interface for analyzers that support checkpointing.

type Codec

type Codec interface {
	// Encode writes the state to the writer.
	Encode(w io.Writer, state any) error
	// Decode reads the state from the reader.
	Decode(r io.Reader, state any) error
	// Extension returns the file extension for this codec (e.g., ".json", ".gob").
	Extension() string
}

Codec defines how checkpoint state is serialized and deserialized.

type GobCodec

type GobCodec struct{}

GobCodec implements Codec using gob encoding.

func NewGobCodec

func NewGobCodec() *GobCodec

NewGobCodec creates a gob codec.

func (*GobCodec) Decode

func (c *GobCodec) Decode(r io.Reader, state any) error

Decode implements Codec.Decode using gob decoding.

func (*GobCodec) Encode

func (c *GobCodec) Encode(w io.Writer, state any) error

Encode implements Codec.Encode using gob encoding.

func (*GobCodec) Extension

func (c *GobCodec) Extension() string

Extension implements Codec.Extension for gob files.

type JSONCodec

type JSONCodec struct {
	// Indent specifies the indentation string. Empty string means compact JSON.
	Indent string
}

JSONCodec implements Codec using JSON encoding with indentation.

func NewCompactJSONCodec

func NewCompactJSONCodec() *JSONCodec

NewCompactJSONCodec creates a JSON codec without indentation.

func NewJSONCodec

func NewJSONCodec() *JSONCodec

NewJSONCodec creates a JSON codec with pretty-printing.

func (*JSONCodec) Decode

func (c *JSONCodec) Decode(r io.Reader, state any) error

Decode implements Codec.Decode using JSON decoding.

func (*JSONCodec) Encode

func (c *JSONCodec) Encode(w io.Writer, state any) error

Encode implements Codec.Encode using JSON encoding.

func (*JSONCodec) Extension

func (c *JSONCodec) Extension() string

Extension implements Codec.Extension for JSON files.

type Manager

type Manager struct {
	BaseDir  string
	RepoHash string
	MaxAge   time.Duration
	MaxSize  int64
}

Manager coordinates checkpoints across analyzers.

func NewManager

func NewManager(baseDir, repoHash string) *Manager

NewManager creates a new checkpoint manager.

func (*Manager) CheckpointDir

func (m *Manager) CheckpointDir() string

CheckpointDir returns the directory for this repository's checkpoint.

func (*Manager) Clear

func (m *Manager) Clear() error

Clear removes the checkpoint for the current repository.

func (*Manager) Exists

func (m *Manager) Exists() bool

Exists returns true if a valid checkpoint exists.

func (*Manager) Load

func (m *Manager) Load(checkpointables []Checkpointable) (*StreamingState, error)

Load restores state for all checkpointable analyzers.

func (*Manager) LoadMetadata

func (m *Manager) LoadMetadata() (*Metadata, error)

LoadMetadata loads the checkpoint metadata.

func (*Manager) MetadataPath

func (m *Manager) MetadataPath() string

MetadataPath returns the path to the metadata file.

func (*Manager) Save

func (m *Manager) Save(
	checkpointables []Checkpointable,
	state StreamingState,
	repoPath string,
	analyzerNames []string,
) error

Save creates a checkpoint for all checkpointable analyzers.

func (*Manager) Validate

func (m *Manager) Validate(repoPath string, analyzerNames []string) error

Validate checks if the checkpoint is valid for the given parameters.

type Metadata

type Metadata struct {
	Version        int               `json:"version"`
	RepoPath       string            `json:"repo_path"`
	RepoHash       string            `json:"repo_hash"`
	CreatedAt      string            `json:"created_at"`
	Analyzers      []string          `json:"analyzers"`
	StreamingState StreamingState    `json:"streaming_state"`
	Checksums      map[string]string `json:"checksums"`
}

Metadata holds checkpoint metadata for validation and resume.

type Persister

type Persister[T any] struct {
	// contains filtered or unexported fields
}

Persister handles checkpoint I/O for a specific state type.

func NewPersister

func NewPersister[T any](basename string, codec Codec) *Persister[T]

NewPersister creates a checkpoint persister with the given basename and codec.

func (*Persister[T]) Load

func (p *Persister[T]) Load(dir string, restoreState func(*T)) error

Load restores state from the given directory using the provided restore function.

func (*Persister[T]) Save

func (p *Persister[T]) Save(dir string, buildState func() *T) error

Save writes state to the given directory using the provided build function.

type StreamingState

type StreamingState struct {
	TotalCommits     int    `json:"total_commits"`
	ProcessedCommits int    `json:"processed_commits"`
	CurrentChunk     int    `json:"current_chunk"`
	TotalChunks      int    `json:"total_chunks"`
	LastCommitHash   string `json:"last_commit_hash"`
	LastTick         int    `json:"last_tick"`

	// AggregatorSpills records the spill state of each aggregator at checkpoint time.
	// Indexed by analyzer position in the Runner.Analyzers slice.
	// Nil entries mean the analyzer has no aggregator (plumbing, file_history).
	AggregatorSpills []AggregatorSpillEntry `json:"aggregator_spills,omitempty"`
}

StreamingState tracks chunk orchestrator progress.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL