agent

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package agent contains the contracts and implementations of background agents: Ingester, GC, Scrub, Snapshot, Sync, Ejector. A coreutils-style toolkit for the storage: it automates the maintenance work without forcing the host application to hand-roll its own logic.

Two lifecycle modalities:

  • BackgroundAgent — cyclic or continuous work. Implemented here.
  • MaintenanceAgent — one-shot operation. The contract lives in core (see core.MaintenanceAgent); the implementations live in maintenance/.

Two ownership modalities:

  • Curator-managed (Scrub, Snapshot) — automatically launched by Curator for every registered Target.
  • User-managed (Ingester, GC, Ejector) — created and started by the host application explicitly through the package constructors.

DAG: agent imports core, driver, event. It does not import curator, maintenance, or projection.

Implementations land in M3 (GC, Scrub, Snapshot, RebuildIndex) and M6 (Ingester, Ejector). In M0 — contracts and configuration types.

Index

Constants

View Source
const (
	// EventAgentStarted — Run entered its main loop. Emitted once
	// per Run call.
	EventAgentStarted = "agent.started"

	// EventAgentProgress — periodic progress snapshot. Emission
	// rate is agent-specific; defaults are documented at each
	// concrete agent.
	EventAgentProgress = "agent.progress"

	// EventAgentCycle — one unit of work completed (a full GC
	// pass, a single Scrub batch, one Ingester flush). Payload:
	// core.AgentResult with the cycle's stats. The same payload
	// shape is reused by EventAgentCompleted; the difference is
	// semantic — Cycle means "one unit done, agent continues",
	// Completed means "agent finished, Run returned".
	EventAgentCycle = "agent.cycle"

	// EventAgentCompleted — Run returned cleanly (only meaningful
	// for one-shot agents like Snapshot or RebuildIndex).
	// Background agents in steady state emit Cycle, not Completed.
	EventAgentCompleted = "agent.completed"

	// EventAgentFailed — Run returned with a fatal error.
	EventAgentFailed = "agent.failed"

	// EventAgentStopped — graceful shutdown via context
	// cancellation.
	EventAgentStopped = "agent.stopped"

	// EventAgentCancelled — Run aborted before completion (host
	// requested cancellation, lease lost, etc.).
	EventAgentCancelled = "agent.cancelled"

	// EventAgentStaleLease — agent took over a lease whose owner
	// stopped renewing. Payload: core.LeaseTakeoverPayload (same
	// shape as core.EventStaleLeaseTakeover; see core/events.go).
	// The struct lives in core because the stale-lease concept is
	// shared with the core-level Store lease takeover; declaring
	// it once and reusing it keeps the two events decoder-
	// compatible.
	EventAgentStaleLease = "agent.stale_lease"
)

Agent lifecycle events. Emitted by every BackgroundAgent (built-in or user-defined) at well-defined points in its run loop.

The host application filters by AgentType in the payload — the same constant set is reused for "gc", "scrub", "ingester", and any custom agent the host has registered.

All four event-type prefixes ("core.", "agent.", "curator.", "index.") are reserved per docs/2. Internals/01 §1.7. User agents must emit their own events under their own namespace (e.g. "acme.quota_monitor.threshold_exceeded").

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentFailedPayload

type AgentFailedPayload struct {
	AgentType string
	StoreID   string
	Err       error
}

AgentFailedPayload is the payload of EventAgentFailed.

type AgentProgressPayload

type AgentProgressPayload struct {
	AgentType   string
	StoreID     string
	Processed   int64
	Total       int64
	CurrentItem string
}

AgentProgressPayload is the payload of EventAgentProgress. Total is 0 when the total amount of work is unknown up front (for example, in a continuous loop).

type AgentStartedPayload

type AgentStartedPayload struct {
	AgentType string
	StoreID   string
	StartedAt time.Time
}

AgentStartedPayload is the payload of EventAgentStarted.

type AgentState

type AgentState uint8

AgentState is the state of a background agent reported by Status.

const (
	// StateIdle — Run has not been started yet, or it has finished
	// cleanly.
	StateIdle AgentState = iota

	// StateRunning — Run is active; the current unit of work is in
	// flight.
	StateRunning

	// StatePaused — reserved. Not used by any built-in agent in
	// v1; the slot is held for the future "auto-pause under
	// pressure" backlog item.
	StatePaused

	// StateFaulted — Run finished with an error; Status returns a
	// non-nil error explaining the cause.
	StateFaulted
)

type BackgroundAgent

type BackgroundAgent interface {
	// Run starts the main loop. Blocks until ctx is cancelled or a
	// fatal error occurs. Returns nil on a graceful shutdown via
	// ctx, an error on a fatal failure. The results of individual
	// units of work are published through EventAgentCycle.
	Run(ctx context.Context) error

	// Status returns the current state and the last error. Must be
	// safe for concurrent calls with Run.
	Status() (AgentState, error)
}

BackgroundAgent is the base lifecycle contract of a background agent. A public SPI: the host application can implement custom agents for bespoke validators, metric exporters, business-specific integrations.

Conventions for AgentType (used in EventAgent* payloads):

  • Built-in agents: short names without a prefix ("gc", "scrub", "snapshot", "ingester", "ejector", "sync").
  • User agents: <namespace>.<n> ("acme.quota_monitor").

The event.Event.Type prefixes "core.", "agent.", "curator.", and "index." are reserved. User agents must emit their own events under their own namespace.

type Ejector

type Ejector interface {
	BackgroundAgent

	// Eject schedules the materialisation of the given artifact
	// at targetPath. The method does not block on the physical
	// copy. It returns immediately with an error on a full queue or
	// a missing artifact. The execution result (success/failure)
	// is delivered through EventAgentCycle or EventAgentFailed.
	Eject(ctx context.Context, id domain.ArtifactID, targetPath string) error
}

Ejector materialises artifacts into the host OS environment on demand. User-managed: created by the host application explicitly. It uses a background worker pool for heavy I/O — this fits the BackgroundAgent model with Run for the pool and an Eject method that submits tasks.

func NewEjector

func NewEjector(
	source core.DataStore,
	bus event.EventBus,
	cfg EjectorConfig,
) (Ejector, error)

NewEjector creates an Ejector instance. TODO(M6.3): host-driven artifact ejection.

type EjectorConfig

type EjectorConfig struct {
	// TempDir is the directory for ejected files. Should reside
	// on the same filesystem as the Location to allow efficient
	// Clone (CoW).
	TempDir string

	// Concurrency is the worker-pool size.
	Concurrency int

	// QueueSize is the depth of the task queue. Eject returns
	// ErrEjectorQueueFull when the queue is full.
	QueueSize int
}

EjectorConfig configures the Ejector.

type GCAgent

type GCAgent interface {
	BackgroundAgent

	// RunOnce executes one full Mark+Sweep cycle and returns. Used
	// for manual runs and tests; unlike Run it does not block on
	// ScanInterval.
	RunOnce(ctx context.Context) (GCStats, error)
}

GCAgent is the background reaper of orphan blobs governed by Two-Phase Deletion. User-managed: Curator does not start the GC automatically — the deletion policy is a deployment-specific decision.

func NewGCAgent

func NewGCAgent(
	store core.Store,
	bus event.EventBus,
	cfg GCConfig,
) (GCAgent, error)

NewGCAgent creates a GC Agent. Takes core.Store (not DataStore): GC needs both halves of the contract — AdminStore for reading StoreConfig (GCLeasePolicy, TombstoneGracePeriod, DeletionPolicy) and DataStore for WalkSystem (lease coordination).

TODO(M3.2): two-phase GC with tombstone reaping.

type GCConfig

type GCConfig struct {
	// ScanInterval is the interval between Mark+Sweep cycles.
	ScanInterval time.Duration

	// BatchSize is the number of blobs in a single StoreIndex
	// transaction during a scan.
	BatchSize int

	// LeaseTTL is the hold time for gc/lease under
	// GCLeasePolicy: LeaderElection. Renew runs at half the TTL.
	LeaseTTL time.Duration

	// CompactionEnabled toggles compaction of partially dead .pack
	// volumes. The default in v1 is false (a deferred feature; see
	// docs/2. Internals/05 Asynchronous Engine §5.3.7).
	CompactionEnabled bool

	// CompactionThreshold — a dead_ratio at or above this value
	// triggers compaction. Default 0.5.
	CompactionThreshold float64

	// MinPackAge — packs younger than this are not compacted.
	// Fresh packs may collapse on their own without paying a
	// repacking cost.
	MinPackAge time.Duration

	// MaxCompactionsPerCycle bounds the I/O load per GC cycle.
	MaxCompactionsPerCycle int
}

GCConfig configures the GC Agent.

type GCStats

type GCStats struct {
	ScannedBlobs        int64
	MarkedBlobs         int64
	RemovedBlobs        int64
	FreedBytes          int64
	CompactedPacks      int64
	CompactedFreedBytes int64
}

GCStats are the statistics of a single GC cycle.

type IngestMode

type IngestMode string

IngestMode is the operating mode of the Ingester.

const (
	// IngestModeOneShot — a single sweep of SourcePath; the agent
	// finishes once every found file has been processed.
	IngestModeOneShot IngestMode = "one-shot"

	// IngestModeWatch — continuous observation of SourcePath via
	// native OS mechanisms (fsnotify/inotify/FSEvents) when the
	// driver reports CapWatch; otherwise polling. Requires a
	// StateFile for resumable semantics.
	IngestModeWatch IngestMode = "watch"
)

type Ingester

type Ingester interface {
	BackgroundAgent

	// ForceCommit immediately commits the accumulated batch
	// regardless of BatchSize/FlushTimeout. Used before an external
	// event (log rotation, snapshot, graceful shutdown).
	ForceCommit(ctx context.Context) error
}

Ingester is the background agent that captures data from an external source.

func NewIngester

func NewIngester(
	source driver.Driver,
	target core.DataStore,
	bus event.EventBus,
	cfg IngesterConfig,
) (Ingester, error)

NewIngester creates an Ingester instance. User-managed: started by the host application explicitly. TODO(M6.3): fsnotify-driven ingestion.

Returns errs.ErrIngesterNoState when cfg.Mode is Watch and no StateFile is set.

type IngesterConfig

type IngesterConfig struct {
	// SourcePath is the source's root directory or URI. Interpreted
	// by the driver.Driver passed to NewIngester.
	SourcePath string

	// Mode is OneShot or Watch.
	Mode IngestMode

	// BatchSize is the maximum number of files in a single flush.
	BatchSize int

	// FlushTimeout is the maximum waiting time for a batch in Watch
	// mode before a forced flush.
	FlushTimeout time.Duration

	// Concurrency is the number of parallel workers used for
	// hashing/transformation.
	Concurrency int

	// StateFile is the path to the cursor file. Required in Watch
	// mode.
	StateFile string
}

IngesterConfig is the configuration of a single Ingester. One instance — one external source.

type ScrubAgent

type ScrubAgent interface {
	BackgroundAgent

	// RunOnce performs one full verification pass over every blob
	// whose last_verified_at is older than MaxAge and returns. Used
	// for ad hoc runs after media-corruption suspicions.
	RunOnce(ctx context.Context) (ScrubStats, error)
}

ScrubAgent is the background blob-integrity verifier. Curator-managed: Curator automatically launches a single Scrub Agent for every registered Target Store.

func NewScrubAgent

func NewScrubAgent(
	store core.Store,
	bus event.EventBus,
	cfg ScrubConfig,
) (ScrubAgent, error)

NewScrubAgent creates a Scrub Agent instance. The constructor is public — the host application can create a ScrubAgent manually for a one-shot run or a custom integration. It is not required for normal operation under Curator: Curator creates instances on its own.

TODO(M3.3): blob verification with high-water-mark cursor.

type ScrubConfig

type ScrubConfig struct {
	// Enabled toggles background verification.
	Enabled bool

	// ScanInterval is the interval between verification cycles.
	ScanInterval time.Duration

	// MaxAge — blobs whose last_verified_at is older than
	// now() - MaxAge are eligible for verification.
	MaxAge time.Duration

	// MaxAgeNativeChecksum is an extended MaxAge for blobs on
	// media that report CapNativeChecksum. Silent bit rot is
	// impossible there, so the verification rate can be lowered.
	MaxAgeNativeChecksum time.Duration

	// BatchSize is the number of blobs in a single StoreIndex
	// fetch.
	BatchSize int
}

ScrubConfig configures the Scrub Agent. The same type is also re-exported through curator.ScrubConfig for passing into curator.WithScrubConfig (Curator-managed launch).

type ScrubStats

type ScrubStats struct {
	ScannedBlobs  int64
	VerifiedBlobs int64
	FailedBlobs   int64
}

ScrubStats are the statistics of a single Scrub cycle.

type SnapshotAgent

type SnapshotAgent interface {
	BackgroundAgent

	// TakeSnapshot forces a snapshot regardless of Interval and
	// ArtifactThreshold. Used before critical maintenance
	// operations (RebuildIndex, MigrateIndex).
	TakeSnapshot(ctx context.Context) (SnapshotStats, error)
}

SnapshotAgent is the background creator of StoreIndex snapshots via VacuumInto + packing into the CAS. Curator-managed: launched for every Target Store with an available StoreIndex.

Snapshot Agent is creation only. StoreIndex recovery from a snapshot is the job of RebuildIndexAgent (maintenance), which uses a fresh snapshot as the starting point and reads in the new manifests through ListObjectsWithModTime.

func NewSnapshotAgent

func NewSnapshotAgent(
	store core.Store,
	bus event.EventBus,
	cfg SnapshotConfig,
) (SnapshotAgent, error)

NewSnapshotAgent creates a Snapshot Agent instance.

type SnapshotConfig

type SnapshotConfig struct {
	// Enabled toggles background snapshotting.
	Enabled bool

	// Interval is the periodic snapshot interval.
	Interval time.Duration

	// ArtifactThreshold also triggers a snapshot once this many
	// new artifacts have been added since the previous snapshot.
	ArtifactThreshold int

	// Retention is the number of snapshots to keep; older ones are
	// removed.
	Retention int

	// RecoveryOverlap is the recovery overlap: when loading a
	// snapshot, RebuildIndexAgent re-reads objects that appeared
	// after snapshot_created_at - RecoveryOverlap. It guards
	// against the edge case "an object was written between the
	// snapshot and the crash".
	RecoveryOverlap time.Duration
}

SnapshotConfig configures the Snapshot Agent.

type SnapshotStats

type SnapshotStats struct {
	SnapshotID  string
	DBBytes     int64
	ArtifactsAt int64
	CreatedAt   time.Time
}

SnapshotStats are the statistics of a single snapshot.

type SyncAgent

type SyncAgent interface {
	BackgroundAgent

	// Trigger schedules an out-of-band synchronisation of the
	// given artifact between the Target and the Backup Store
	// outside the event queue.
	Trigger(ctx context.Context, artifactID domain.ArtifactID) error
}

SyncAgent is the background replicator of artifacts between a Target and a Backup Store.

Status: Reserved. The interface is fixed for Curator API stability; the implementation is deferred until a separate decision on the Reconciliation mechanism (event-driven, pull/push, quarantine) is made.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL