Documentation
¶
Overview ¶
Package curator is the orchestrator at layer L3. It unites several Stores under a single management surface: routing, transit buffering (HostStorage), cross-store deduplication, transparent decorators (bundler, chunker), and background services (Scrub, Snapshot).
Curator is optional: the minimal stack (Driver + Store + StoreIndex) works without it. Curator implements core.DataStore — the user-facing artifact API; administrative operations (Unlock, RotateKEK, SetMaintenanceMode) do not exist at the Curator level and are performed per-store via Curator.Store(id).
Subpackages:
- curator/bundler — small-blob packing decorator for .pack volumes.
- curator/chunker — CDC chunker decorator.
- curator/host — internal HostStorage package. It has no public API; configuration goes through curator.WithHostStorage.
DAG: curator imports core, driver, event. It does not import agent, maintenance, or projection.
Index ¶
- Constants
- type AfterBackup
- type BackupConfig
- type BackupUnavailablePayload
- type Curator
- type CuratorOption
- func WithBackup(targetID string, store core.Store, cfg BackupConfig, ...) CuratorOption
- func WithEventBus(bus event.EventBus) CuratorOption
- func WithHostStorage(localDrv driver.Driver, cfg HostStorageConfig) CuratorOption
- func WithMetadataRouter(fn MetadataRouter) CuratorOption
- func WithMultistoreIndex(idx MultistoreIndex) CuratorOption
- func WithRoutingFunc(fn RoutingFunc) CuratorOption
- func WithScrubConfig(cfg agent.ScrubConfig) CuratorOption
- func WithSnapshotConfig(cfg agent.SnapshotConfig) CuratorOption
- func WithStore(id string, store core.Store, cfg StoreRegistrationConfig, ...) CuratorOption
- type CuratorWarningPayload
- type DrainCompletedPayload
- type DrainNoTargetPayload
- type DrainQuarantinedPayload
- type DrainRequeuedPayload
- type DrainRetryPayload
- type EvictionPolicy
- type HostAdmin
- type HostStorage
- type HostStorageConfig
- type HostStorageFullPayload
- type HostStoragePressurePayload
- type HostStorageStats
- type MetadataRouter
- type MultistoreIndex
- type OnDrainNoTarget
- type OnHostStorageFull
- type OnUnavailable
- type QuarantineFilter
- type QuarantinedItem
- type ReadCost
- type ReadPolicy
- type ReplicationLagPayload
- type RoutingFunc
- type RoutingMetadata
- type StoreRegistrationConfig
- type StoreTarget
- type StoreUnreachablePayload
- type TransitStore
- type WrapperDeps
- type WrapperFactory
- type WriteStrategy
Constants ¶
const ( EventHostStoragePressure = "curator.host_storage_pressure" EventHostStorageFull = "curator.host_storage_full" EventReplicationLag = "curator.replication_lag" EventStoreUnreachable = "curator.store_unreachable" EventDrainCompleted = "curator.drain_completed" EventDrainNoTarget = "curator.drain_no_target" EventDrainQuarantined = "curator.drain_quarantined" EventDrainRequeued = "curator.drain_requeued" EventDrainRetry = "curator.drain_retry" EventCuratorWarning = "curator.warning" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AfterBackup ¶
type AfterBackup string
AfterBackup controls what happens to the original in the Target after a successful backup.
const ( AfterBackupKeep AfterBackup = "Keep" AfterBackupDelete AfterBackup = "Delete" )
type BackupConfig ¶
type BackupConfig struct {
PhysicalCopy bool
ReadPolicy ReadPolicy
AfterBackup AfterBackup
Priority int
}
BackupConfig are the parameters of registering a Backup Store via WithBackup.
type BackupUnavailablePayload ¶
type BackupUnavailablePayload struct {
}
type Curator ¶
type Curator interface {
core.DataStore
// MultistoreIndex returns the global index when one has been
// registered. It is usually nil with a single Target Store.
MultistoreIndex() MultistoreIndex
// Store returns a registered Target Store by ID with the full
// core.Store interface, including administrative methods.
// Returns ErrStoreNotRegistered for an unknown ID.
Store(id string) (core.Store, error)
// Close stops every background process in order: Flush bundler →
// Drain HostStorage → stop agents → wait for active Get calls.
Close(ctx context.Context) error
// Stats returns a snapshot of the local transit-buffer state.
Stats(ctx context.Context) (HostStorageStats, error)
}
Curator is the L3 facade. It implements core.DataStore, adds access to registered Stores, transit management, and graceful shutdown.
func New ¶
func New(opts ...CuratorOption) (Curator, error)
New creates a Curator. It applies the options, validates the configuration (the Rules Engine for forbidden combinations), and starts the background services (Scrub, Snapshot) for the registered Targets.
Implementation lands in M4.
type CuratorOption ¶
type CuratorOption func(*curatorOptions)
CuratorOption is an option for the Curator constructor (New).
func WithBackup ¶
func WithBackup(targetID string, store core.Store, cfg BackupConfig, wrappers ...WrapperFactory) CuratorOption
WithBackup registers a Backup Store for a specific Target. Decorators are applied as in WithStore. Note: chunker.Wrapper on a Backup is forbidden by the Rules Engine (see docs/4. API Reference/05 Configuration §5.5).
func WithEventBus ¶
func WithEventBus(bus event.EventBus) CuratorOption
WithEventBus provides the event bus. Used by Curator itself for emitting curator.* events and forwarded to registered Stores through Publisher.
func WithHostStorage ¶
func WithHostStorage(localDrv driver.Driver, cfg HostStorageConfig) CuratorOption
WithHostStorage registers the local-disk driver for the transit buffer. One per Curator. Without HostStorage the Local/Replicated/ HostBuffered strategies and the bundler/chunker decorators are not available.
func WithMetadataRouter ¶
func WithMetadataRouter(fn MetadataRouter) CuratorOption
WithMetadataRouter provides the function that reconstructs RoutingHints from Manifest.Metadata at deferred-Drain time.
func WithMultistoreIndex ¶
func WithMultistoreIndex(idx MultistoreIndex) CuratorOption
WithMultistoreIndex provides the global-index implementation. Usually not required with a single Target Store.
func WithRoutingFunc ¶
func WithRoutingFunc(fn RoutingFunc) CuratorOption
WithRoutingFunc provides the function that selects Target Stores at write time.
func WithScrubConfig ¶
func WithScrubConfig(cfg agent.ScrubConfig) CuratorOption
WithScrubConfig configures the Curator-managed Scrub Agent. Curator automatically launches a Scrub for every registered Target Store.
func WithSnapshotConfig ¶
func WithSnapshotConfig(cfg agent.SnapshotConfig) CuratorOption
WithSnapshotConfig configures the Curator-managed Snapshot Agent. Curator automatically launches a Snapshot for every registered Target Store with an available StoreIndex.
func WithStore ¶
func WithStore(id string, store core.Store, cfg StoreRegistrationConfig, wrappers ...WrapperFactory) CuratorOption
WithStore registers a Target Store with Curator. Decorators are applied "outside in": the first wrapper is closest to the client, the last is closest to the underlying store.
type CuratorWarningPayload ¶
CuratorWarningPayload describes a non-blocking violation. Rule is a machine-readable rule identifier; Message is a human-readable description.
type DrainCompletedPayload ¶
type DrainCompletedPayload struct {
ArtifactID domain.ArtifactID
StoreID string
BlobRef string
}
type DrainNoTargetPayload ¶
type DrainNoTargetPayload struct {
ArtifactID domain.ArtifactID
BlobRef string
Namespace string
Reason string
}
type DrainQuarantinedPayload ¶
type DrainQuarantinedPayload struct {
ArtifactID domain.ArtifactID
BlobRef string
Namespace string
Reason string
}
type DrainRequeuedPayload ¶
type DrainRetryPayload ¶
type EvictionPolicy ¶
type EvictionPolicy string
EvictionPolicy is the eviction policy of HostStorage.
const ( EvictionPolicyLRU EvictionPolicy = "LRU" EvictionPolicyTTL EvictionPolicy = "TTL" EvictionPolicyPressure EvictionPolicy = "Pressure" )
type HostAdmin ¶
type HostAdmin interface {
// Drain transfers files from system.transit to the Target
// Stores. The route is computed by the Router at the moment of
// transfer (DL-01).
Drain(ctx context.Context) error
// Stats returns a snapshot of the current transit state.
// Distinct from Curator.Stats: HostAdmin.Stats is a synchronous
// read on the in-memory state of the transit buffer; Curator's
// own Stats wraps this with a context for the broader API.
Stats() HostStorageStats
// Recover restores HostStorage after a process crash: it
// cleans up .tmp files, checks locks, and re-indexes the
// transit area.
Recover(ctx context.Context) error
// Requeue moves a file (or files) from
// system.transit/quarantine/ back into the active Drain queue.
// The route will be recomputed during the next Drain (deferred
// routing). When artifactID is nil, all files in quarantine
// are returned.
//
// Returns the number of files actually moved. Files missing
// from quarantine are silently skipped — the operation is
// idempotent.
Requeue(ctx context.Context, artifactID *domain.ArtifactID) (int, error)
// ListQuarantined returns a snapshot of the current quarantine
// state. It does not block Drain. QuarantineFilter provides
// pagination.
ListQuarantined(ctx context.Context, filter QuarantineFilter) ([]QuarantinedItem, error)
}
HostAdmin holds the administrative operations of HostStorage. They are accessible to Curator. Decorators see only TransitStore.
type HostStorage ¶
type HostStorage interface {
TransitStore
HostAdmin
}
HostStorage is the full HostStorage contract — the transit buffer on a fast local disk used by Curator for deferred writes to slow Target Stores, manifest caching with ManifestStorage: Local/Replicated, and buffering before bundler packing. Combines the per-blob surface (TransitStore, exposed to decorators via WrapperDeps) with the administrative surface (HostAdmin, used by Curator itself).
Implementation lives in curator/host (package host) and is constructed internally by Curator from a driver.Driver and HostStorageConfig — host-applications never instantiate this type directly.
type HostStorageConfig ¶
type HostStorageConfig struct {
EvictionPolicy EvictionPolicy
OnHostStorageFull OnHostStorageFull
OnDrainNoTarget OnDrainNoTarget
SoftLimitBytes int64
HardLimitBytes int64
EventCooldown time.Duration
DrainInterval time.Duration
WorkspaceDir string
}
HostStorageConfig is the configuration of the transit buffer. WorkspaceDir is required.
type HostStorageFullPayload ¶
type HostStorageStats ¶
type HostStorageStats struct {
TransitBytes int64 // bytes in transit (excluding quarantine)
TransitFiles int // number of files awaiting Drain
QuarantineBytes int64 // bytes in system.transit/quarantine/
QuarantineFiles int // number of files in quarantine
MaxTransitBytes int64 // hard limit from configuration
}
HostStorageStats is the current physical state of the transit buffer. The values are a snapshot at the moment of the request.
type MetadataRouter ¶
type MetadataRouter func(m domain.Manifest) domain.RoutingHints
MetadataRouter reconstructs RoutingHints from Manifest.Metadata. Used at deferred-Drain time (DL-01) when the original hints from PutOptions are no longer available.
type MultistoreIndex ¶
type MultistoreIndex interface {
// ResolveArtifact returns the list of Stores in which the
// artifact is registered. Used when reading through Curator.
ResolveArtifact(id domain.ArtifactID) ([]domain.StoreID, error)
// ExistsAny is a batch presence check across every Store.
// Used by the Ingester to aggregate requests before physical
// writes. Without OriginalSize: an exact composite-key check is
// excessive for a batch optimisation.
ExistsAny(hashes []domain.ContentHash) (map[domain.ContentHash]bool, error)
// RegisterArtifact records that an artifact is present in a
// given Store. Called by Curator after a successful write or
// Drain.
RegisterArtifact(id domain.ArtifactID, storeID domain.StoreID, hash domain.ContentHash) error
// MarkStale marks a record as stale (Read-Repair on a cache
// miss: the index has a route but the artifact is physically
// missing from the Location).
MarkStale(id domain.ArtifactID) error
// PruneStale periodically clears stale records. May be invoked
// in the background or on demand.
PruneStale(ctx context.Context) error
}
MultistoreIndex is the aggregating index at the Curator level. A wrapper over several domain.StoreIndexes; needed only when there are multiple Stores. With a single Store, Curator works with the StoreIndex directly.
Eventually consistent by nature — fully derivable from the physical state of the underlying StoreIndexes.
type OnDrainNoTarget ¶
type OnDrainNoTarget string
OnDrainNoTarget controls behaviour when the Router returns 0 targets at Drain time.
const ( OnDrainNoTargetRetain OnDrainNoTarget = "Retain" OnDrainNoTargetQuarantine OnDrainNoTarget = "Quarantine" )
type OnHostStorageFull ¶
type OnHostStorageFull string
OnHostStorageFull controls behaviour when the HostStorage hard limit is hit.
const ( OnHostStorageFullBlock OnHostStorageFull = "Block" OnHostStorageFullDirectStream OnHostStorageFull = "DirectStream" OnHostStorageFullReject OnHostStorageFull = "Reject" )
type OnUnavailable ¶
type OnUnavailable string
OnUnavailable controls behaviour when a Backup is unavailable on the write path.
const ( )
type QuarantineFilter ¶
type QuarantineFilter struct {
// Namespace filters by namespace; an empty string means all.
Namespace string
// OlderThan limits results to files quarantined before the
// given moment. The zero value means no filter.
OlderThan time.Time
// Limit caps the number of returned records. 0 means no limit.
Limit int
}
QuarantineFilter is the selection used by ListQuarantined.
type QuarantinedItem ¶
type QuarantinedItem struct {
ArtifactID domain.ArtifactID
BlobRef string
Namespace string
OriginalSize int64
QuarantinedAt time.Time
Reason string
}
QuarantinedItem describes a single file in quarantine.
type ReadCost ¶
type ReadCost string
ReadCost labels the cost of a read. Used to keep cold Stores out of the regular Get flow.
type ReadPolicy ¶
type ReadPolicy string
ReadPolicy controls when a Backup is read.
const ( // ReadPolicyFallback — High Availability. Read automatically // when the Target is unavailable. ReadPolicyFallback ReadPolicy = "Fallback" // ReadPolicyNever — Compliance & Isolation. Fully excluded from // normal routing; reachable only through an explicit // Curator.Store(backupID).Get. ReadPolicyNever ReadPolicy = "Never" // ReadPolicyAuto — Storage Tiering. Excluded from regular Get, // but used when GetOptions.AllowColdRead is true. ReadPolicyAuto ReadPolicy = "Auto" )
type ReplicationLagPayload ¶
type RoutingFunc ¶
type RoutingFunc func(meta RoutingMetadata) []StoreTarget
RoutingFunc selects Target Stores for a write through Curator. It can be compiled from declarative configuration rules or supplied directly by the developer.
type RoutingMetadata ¶
type RoutingMetadata struct {
Namespace string
Size int64
ContentType string
Source string
Attributes map[string]string
}
RoutingMetadata is the input to RoutingFunc on the write path. Curator builds it from PutOptions.
type StoreRegistrationConfig ¶
type StoreRegistrationConfig struct {
Priority int
ReadCost ReadCost
WriteStrategy WriteStrategy
AllowCrossStoreDedup bool
}
StoreRegistrationConfig are the parameters of registering a Target Store with Curator via WithStore.
type StoreTarget ¶
StoreTarget is one outcome of a RoutingFunc: where to write and at what priority.
type StoreUnreachablePayload ¶
type TransitStore ¶
type TransitStore interface {
Write(ctx context.Context, blobRef string, r io.Reader) (path string, err error)
Read(ctx context.Context, blobRef string) (io.ReadCloser, error)
Has(ctx context.Context, blobRef string) bool
Remove(ctx context.Context, blobRef string) error
}
TransitStore is the surface decorators see for working with HostStorage. Passed through WrapperDeps. The full HostStorage contract (TransitStore + HostAdmin) is internal to curator/.
type WrapperDeps ¶
type WrapperDeps struct {
HostStorage TransitStore
Publisher core.Publisher
}
WrapperDeps are the dependencies provided by Curator to a decorator at registration time. HostStorage may be nil if no transit buffer has been registered with Curator; the decorator is responsible for checking this and returning an error if it requires HostStorage.
type WrapperFactory ¶
type WrapperFactory interface {
Wrap(store core.DataStore, deps WrapperDeps) (core.DataStore, error)
}
WrapperFactory creates a decorator on top of a Store while receiving its dependencies from Curator. It is applied during Target/Backup registration through WithStore/WithBackup. This resolves the dependency cycle: decorators get access to HostStorage and Publisher through a standard contract, not via public objects.
type WriteStrategy ¶
type WriteStrategy string
WriteStrategy is the strategy for writing into a Target Store through Curator.
const ( // WriteStrategyAuto — the engine decides based on the target // Store's capabilities. CapSlowRead becomes HostBuffered; // otherwise DirectStream. WriteStrategyAuto WriteStrategy = "Auto" // WriteStrategyHostBuffered — write into HostStorage with an // asynchronous Drain. The artifact is visible through Get // immediately, before the Drain completes. WriteStrategyHostBuffered WriteStrategy = "HostBuffered" // WriteStrategyDirectStream — write directly through the target // Store's Driver. WriteStrategyDirectStream WriteStrategy = "DirectStream" )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package bundler implements the decorator that transparently packs small blobs into .pack volumes through HostStorage.
|
Package bundler implements the decorator that transparently packs small blobs into .pack volumes through HostStorage. |
|
Package chunker implements the decorator that transparently CDC-slices large streams into anonymous chunks and creates a TOC manifest.
|
Package chunker implements the decorator that transparently CDC-slices large streams into anonymous chunks and creates a TOC manifest. |
|
Package host implements HostStorage — the transit buffer on a fast local disk used by Curator for deferred writes to slow Target Stores, manifest caching with ManifestStorage: Local/Replicated, and buffering before bundler packing.
|
Package host implements HostStorage — the transit buffer on a fast local disk used by Curator for deferred writes to slow Target Stores, manifest caching with ManifestStorage: Local/Replicated, and buffering before bundler packing. |