Documentation
¶
Overview ¶
Package agent contains the contracts and implementations of background agents: Ingester, GC, Scrub, Snapshot, Sync, Ejector. A coreutils-style toolkit for the storage: it automates the maintenance work without forcing the host application to hand-roll its own logic.
Two lifecycle modalities:
- BackgroundAgent — cyclic or continuous work. Implemented here.
- MaintenanceAgent — one-shot operation. The contract lives in core (see core.MaintenanceAgent); the implementations live in maintenance/.
Two ownership modalities:
- Curator-managed (Scrub, Snapshot) — automatically launched by Curator for every registered Target.
- User-managed (Ingester, GC, Ejector) — created and started by the host application explicitly through the package constructors.
DAG: agent imports core, driver, event. It does not import curator, maintenance, or projection.
Implementations land in M3 (GC, Scrub, Snapshot, RebuildIndex) and M6 (Ingester, Ejector). In M0 — contracts and configuration types.
Index ¶
- Constants
- type AgentFailedPayload
- type AgentProgressPayload
- type AgentStartedPayload
- type AgentState
- type BackgroundAgent
- type Ejector
- type EjectorConfig
- type GCAgent
- type GCConfig
- type GCStats
- type IngestMode
- type Ingester
- type IngesterConfig
- type ScrubAgent
- type ScrubConfig
- type ScrubStats
- type SnapshotAgent
- type SnapshotConfig
- type SnapshotStats
- type SyncAgent
Constants ¶
const ( // EventAgentStarted — Run entered its main loop. Emitted once // per Run call. EventAgentStarted = "agent.started" // EventAgentProgress — periodic progress snapshot. Emission // rate is agent-specific; defaults are documented at each // concrete agent. EventAgentProgress = "agent.progress" // EventAgentCycle — one unit of work completed (a full GC // pass, a single Scrub batch, one Ingester flush). Payload: // core.AgentResult with the cycle's stats. The same payload // shape is reused by EventAgentCompleted; the difference is // semantic — Cycle means "one unit done, agent continues", // Completed means "agent finished, Run returned". EventAgentCycle = "agent.cycle" // EventAgentCompleted — Run returned cleanly (only meaningful // for one-shot agents like Snapshot or RebuildIndex). // Background agents in steady state emit Cycle, not Completed. EventAgentCompleted = "agent.completed" // EventAgentFailed — Run returned with a fatal error. EventAgentFailed = "agent.failed" // EventAgentStopped — graceful shutdown via context // cancellation. EventAgentStopped = "agent.stopped" // EventAgentCancelled — Run aborted before completion (host // requested cancellation, lease lost, etc.). EventAgentCancelled = "agent.cancelled" // EventAgentStaleLease — agent took over a lease whose owner // stopped renewing. Payload: core.LeaseTakeoverPayload (same // shape as core.EventStaleLeaseTakeover; see core/events.go). // The struct lives in core because the stale-lease concept is // shared with the core-level Store lease takeover; declaring // it once and reusing it keeps the two events decoder- // compatible. EventAgentStaleLease = "agent.stale_lease" )
Agent lifecycle events. Emitted by every BackgroundAgent (built-in or user-defined) at well-defined points in its run loop.
The host application filters by AgentType in the payload — the same constant set is reused for "gc", "scrub", "ingester", and any custom agent the host has registered.
All four event-type prefixes ("core.", "agent.", "curator.", "index.") are reserved per docs/2. Internals/01 §1.7. User agents must emit their own events under their own namespace (e.g. "acme.quota_monitor.threshold_exceeded").
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentFailedPayload ¶
AgentFailedPayload is the payload of EventAgentFailed.
type AgentProgressPayload ¶
type AgentProgressPayload struct {
AgentType string
StoreID string
Processed int64
Total int64
CurrentItem string
}
AgentProgressPayload is the payload of EventAgentProgress. Total is 0 when the total amount of work is unknown up front (for example, in a continuous loop).
type AgentStartedPayload ¶
AgentStartedPayload is the payload of EventAgentStarted.
type AgentState ¶
type AgentState uint8
AgentState is the state of a background agent reported by Status.
const ( // StateIdle — Run has not been started yet, or it has finished // cleanly. StateIdle AgentState = iota // StateRunning — Run is active; the current unit of work is in // flight. StateRunning // StatePaused — reserved. Not used by any built-in agent in // v1; the slot is held for the future "auto-pause under // pressure" backlog item. StatePaused // StateFaulted — Run finished with an error; Status returns a // non-nil error explaining the cause. StateFaulted )
type BackgroundAgent ¶
type BackgroundAgent interface {
// Run starts the main loop. Blocks until ctx is cancelled or a
// fatal error occurs. Returns nil on a graceful shutdown via
// ctx, an error on a fatal failure. The results of individual
// units of work are published through EventAgentCycle.
Run(ctx context.Context) error
// Status returns the current state and the last error. Must be
// safe for concurrent calls with Run.
Status() (AgentState, error)
}
BackgroundAgent is the base lifecycle contract of a background agent. A public SPI: the host application can implement custom agents for bespoke validators, metric exporters, business-specific integrations.
Conventions for AgentType (used in EventAgent* payloads):
- Built-in agents: short names without a prefix ("gc", "scrub", "snapshot", "ingester", "ejector", "sync").
- User agents: <namespace>.<n> ("acme.quota_monitor").
The event.Event.Type prefixes "core.", "agent.", "curator.", and "index." are reserved. User agents must emit their own events under their own namespace.
type Ejector ¶
type Ejector interface {
BackgroundAgent
// Eject schedules the materialisation of the given artifact
// at targetPath. The method does not block on the physical
// copy. It returns immediately with an error on a full queue or
// a missing artifact. The execution result (success/failure)
// is delivered through EventAgentCycle or EventAgentFailed.
Eject(ctx context.Context, id domain.ArtifactID, targetPath string) error
}
Ejector materialises artifacts into the host OS environment on demand. User-managed: created by the host application explicitly. It uses a background worker pool for heavy I/O — this fits the BackgroundAgent model with Run for the pool and an Eject method that submits tasks.
func NewEjector ¶
NewEjector creates an Ejector instance. TODO(M6.3): host-driven artifact ejection.
type EjectorConfig ¶
type EjectorConfig struct {
// TempDir is the directory for ejected files. Should reside
// on the same filesystem as the Location to allow efficient
// Clone (CoW).
TempDir string
// Concurrency is the worker-pool size.
Concurrency int
// QueueSize is the depth of the task queue. Eject returns
// ErrEjectorQueueFull when the queue is full.
QueueSize int
}
EjectorConfig configures the Ejector.
type GCAgent ¶
type GCAgent interface {
BackgroundAgent
// RunOnce executes one full Mark+Sweep cycle and returns. Used
// for manual runs and tests; unlike Run it does not block on
// ScanInterval.
RunOnce(ctx context.Context) (GCStats, error)
}
GCAgent is the background reaper of orphan blobs governed by Two-Phase Deletion. User-managed: Curator does not start the GC automatically — the deletion policy is a deployment-specific decision.
func NewGCAgent ¶
NewGCAgent creates a GC Agent. Takes core.Store (not DataStore): GC needs both halves of the contract — AdminStore for reading StoreConfig (GCLeasePolicy, TombstoneGracePeriod, DeletionPolicy) and DataStore for WalkSystem (lease coordination).
TODO(M3.2): two-phase GC with tombstone reaping.
type GCConfig ¶
type GCConfig struct {
// ScanInterval is the interval between Mark+Sweep cycles.
ScanInterval time.Duration
// BatchSize is the number of blobs in a single StoreIndex
// transaction during a scan.
BatchSize int
// LeaseTTL is the hold time for gc/lease under
// GCLeasePolicy: LeaderElection. Renew runs at half the TTL.
LeaseTTL time.Duration
// CompactionEnabled toggles compaction of partially dead .pack
// volumes. The default in v1 is false (a deferred feature; see
// docs/2. Internals/05 Asynchronous Engine §5.3.7).
CompactionEnabled bool
// CompactionThreshold — a dead_ratio at or above this value
// triggers compaction. Default 0.5.
CompactionThreshold float64
// MinPackAge — packs younger than this are not compacted.
// Fresh packs may collapse on their own without paying a
// repacking cost.
MinPackAge time.Duration
// MaxCompactionsPerCycle bounds the I/O load per GC cycle.
MaxCompactionsPerCycle int
}
GCConfig configures the GC Agent.
type GCStats ¶
type GCStats struct {
ScannedBlobs int64
MarkedBlobs int64
RemovedBlobs int64
FreedBytes int64
CompactedPacks int64
CompactedFreedBytes int64
}
GCStats are the statistics of a single GC cycle.
type IngestMode ¶
type IngestMode string
IngestMode is the operating mode of the Ingester.
const ( // IngestModeOneShot — a single sweep of SourcePath; the agent // finishes once every found file has been processed. IngestModeOneShot IngestMode = "one-shot" // IngestModeWatch — continuous observation of SourcePath via // native OS mechanisms (fsnotify/inotify/FSEvents) when the // driver reports CapWatch; otherwise polling. Requires a // StateFile for resumable semantics. IngestModeWatch IngestMode = "watch" )
type Ingester ¶
type Ingester interface {
BackgroundAgent
// ForceCommit immediately commits the accumulated batch
// regardless of BatchSize/FlushTimeout. Used before an external
// event (log rotation, snapshot, graceful shutdown).
ForceCommit(ctx context.Context) error
}
Ingester is the background agent that captures data from an external source.
func NewIngester ¶
func NewIngester( source driver.Driver, target core.DataStore, bus event.EventBus, cfg IngesterConfig, ) (Ingester, error)
NewIngester creates an Ingester instance. User-managed: started by the host application explicitly. TODO(M6.3): fsnotify-driven ingestion.
Returns errs.ErrIngesterNoState when cfg.Mode is Watch and no StateFile is set.
type IngesterConfig ¶
type IngesterConfig struct {
// SourcePath is the source's root directory or URI. Interpreted
// by the driver.Driver passed to NewIngester.
SourcePath string
// Mode is OneShot or Watch.
Mode IngestMode
// BatchSize is the maximum number of files in a single flush.
BatchSize int
// FlushTimeout is the maximum waiting time for a batch in Watch
// mode before a forced flush.
FlushTimeout time.Duration
// Concurrency is the number of parallel workers used for
// hashing/transformation.
Concurrency int
// StateFile is the path to the cursor file. Required in Watch
// mode.
StateFile string
}
IngesterConfig is the configuration of a single Ingester. One instance — one external source.
type ScrubAgent ¶
type ScrubAgent interface {
BackgroundAgent
// RunOnce performs one full verification pass over every blob
// whose last_verified_at is older than MaxAge and returns. Used
// for ad hoc runs after media-corruption suspicions.
RunOnce(ctx context.Context) (ScrubStats, error)
}
ScrubAgent is the background blob-integrity verifier. Curator-managed: Curator automatically launches a single Scrub Agent for every registered Target Store.
func NewScrubAgent ¶
func NewScrubAgent( store core.Store, bus event.EventBus, cfg ScrubConfig, ) (ScrubAgent, error)
NewScrubAgent creates a Scrub Agent instance. The constructor is public — the host application can create a ScrubAgent manually for a one-shot run or a custom integration. It is not required for normal operation under Curator: Curator creates instances on its own.
TODO(M3.3): blob verification with high-water-mark cursor.
type ScrubConfig ¶
type ScrubConfig struct {
// Enabled toggles background verification.
Enabled bool
// ScanInterval is the interval between verification cycles.
ScanInterval time.Duration
// MaxAge — blobs whose last_verified_at is older than
// now() - MaxAge are eligible for verification.
MaxAge time.Duration
// MaxAgeNativeChecksum is an extended MaxAge for blobs on
// media that report CapNativeChecksum. Silent bit rot is
// impossible there, so the verification rate can be lowered.
MaxAgeNativeChecksum time.Duration
// BatchSize is the number of blobs in a single StoreIndex
// fetch.
BatchSize int
}
ScrubConfig configures the Scrub Agent. The same type is also re-exported through curator.ScrubConfig for passing into curator.WithScrubConfig (Curator-managed launch).
type ScrubStats ¶
ScrubStats are the statistics of a single Scrub cycle.
type SnapshotAgent ¶
type SnapshotAgent interface {
BackgroundAgent
// TakeSnapshot forces a snapshot regardless of Interval and
// ArtifactThreshold. Used before critical maintenance
// operations (RebuildIndex, MigrateIndex).
TakeSnapshot(ctx context.Context) (SnapshotStats, error)
}
SnapshotAgent is the background creator of StoreIndex snapshots via VacuumInto + packing into the CAS. Curator-managed: launched for every Target Store with an available StoreIndex.
Snapshot Agent is creation only. StoreIndex recovery from a snapshot is the job of RebuildIndexAgent (maintenance), which uses a fresh snapshot as the starting point and reads in the new manifests through ListObjectsWithModTime.
func NewSnapshotAgent ¶
func NewSnapshotAgent( store core.Store, bus event.EventBus, cfg SnapshotConfig, ) (SnapshotAgent, error)
NewSnapshotAgent creates a Snapshot Agent instance.
type SnapshotConfig ¶
type SnapshotConfig struct {
// Enabled toggles background snapshotting.
Enabled bool
// Interval is the periodic snapshot interval.
Interval time.Duration
// ArtifactThreshold also triggers a snapshot once this many
// new artifacts have been added since the previous snapshot.
ArtifactThreshold int
// Retention is the number of snapshots to keep; older ones are
// removed.
Retention int
// RecoveryOverlap is the recovery overlap: when loading a
// snapshot, RebuildIndexAgent re-reads objects that appeared
// after snapshot_created_at - RecoveryOverlap. It guards
// against the edge case "an object was written between the
// snapshot and the crash".
RecoveryOverlap time.Duration
}
SnapshotConfig configures the Snapshot Agent.
type SnapshotStats ¶
SnapshotStats are the statistics of a single snapshot.
type SyncAgent ¶
type SyncAgent interface {
BackgroundAgent
// Trigger schedules an out-of-band synchronisation of the
// given artifact between the Target and the Backup Store
// outside the event queue.
Trigger(ctx context.Context, artifactID domain.ArtifactID) error
}
SyncAgent is the background replicator of artifacts between a Target and a Backup Store.
Status: Reserved. The interface is fixed for Curator API stability; the implementation is deferred until a separate decision on the Reconciliation mechanism (event-driven, pull/push, quarantine) is made.