curator

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package curator is the orchestrator at layer L3. It unites several Stores under a single management surface: routing, transit buffering (HostStorage), cross-store deduplication, transparent decorators (bundler, chunker), and background services (Scrub, Snapshot).

Curator is optional: the minimal stack (Driver + Store + StoreIndex) works without it. Curator implements core.DataStore — the user-facing artifact API; administrative operations (Unlock, RotateKEK, SetMaintenanceMode) do not exist at the Curator level and are performed per-store via Curator.Store(id).

Subpackages:

  • curator/bundler — small-blob packing decorator for .pack volumes.
  • curator/chunker — CDC chunker decorator.
  • curator/host — internal HostStorage package. It has no public API; configuration goes through curator.WithHostStorage.

DAG: curator imports core, driver, event. It does not import agent, maintenance, or projection.

Index

Constants

View Source
const (
	EventHostStoragePressure = "curator.host_storage_pressure"
	EventHostStorageFull     = "curator.host_storage_full"
	EventBackupUnavailable   = "curator.backup_unavailable"
	EventReplicationLag      = "curator.replication_lag"
	EventStoreUnreachable    = "curator.store_unreachable"
	EventDrainCompleted      = "curator.drain_completed"
	EventDrainNoTarget       = "curator.drain_no_target"
	EventDrainQuarantined    = "curator.drain_quarantined"
	EventDrainRequeued       = "curator.drain_requeued"
	EventDrainRetry          = "curator.drain_retry"
	EventCuratorWarning      = "curator.warning"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AfterBackup

type AfterBackup string

AfterBackup controls what happens to the original in the Target after a successful backup.

const (
	AfterBackupKeep   AfterBackup = "Keep"
	AfterBackupDelete AfterBackup = "Delete"
)

type BackupConfig

type BackupConfig struct {
	PhysicalCopy  bool
	ReadPolicy    ReadPolicy
	AfterBackup   AfterBackup
	OnUnavailable OnUnavailable
	Priority      int
}

BackupConfig are the parameters of registering a Backup Store via WithBackup.

type BackupUnavailablePayload

type BackupUnavailablePayload struct {
	StoreID    string
	TargetID   string
	ArtifactID domain.ArtifactID
}

type Curator

type Curator interface {
	core.DataStore

	// MultistoreIndex returns the global index when one has been
	// registered. It is usually nil with a single Target Store.
	MultistoreIndex() MultistoreIndex

	// Store returns a registered Target Store by ID with the full
	// core.Store interface, including administrative methods.
	// Returns ErrStoreNotRegistered for an unknown ID.
	Store(id string) (core.Store, error)

	// Close stops every background process in order: Flush bundler →
	// Drain HostStorage → stop agents → wait for active Get calls.
	Close(ctx context.Context) error

	// Stats returns a snapshot of the local transit-buffer state.
	Stats(ctx context.Context) (HostStorageStats, error)
}

Curator is the L3 facade. It implements core.DataStore, adds access to registered Stores, transit management, and graceful shutdown.

func New

func New(opts ...CuratorOption) (Curator, error)

New creates a Curator. It applies the options, validates the configuration (the Rules Engine for forbidden combinations), and starts the background services (Scrub, Snapshot) for the registered Targets.

Implementation lands in M4.

type CuratorOption

type CuratorOption func(*curatorOptions)

CuratorOption is an option for the Curator constructor (New).

func WithBackup

func WithBackup(targetID string, store core.Store, cfg BackupConfig, wrappers ...WrapperFactory) CuratorOption

WithBackup registers a Backup Store for a specific Target. Decorators are applied as in WithStore. Note: chunker.Wrapper on a Backup is forbidden by the Rules Engine (see docs/4. API Reference/05 Configuration §5.5).

func WithEventBus

func WithEventBus(bus event.EventBus) CuratorOption

WithEventBus provides the event bus. Used by Curator itself for emitting curator.* events and forwarded to registered Stores through Publisher.

func WithHostStorage

func WithHostStorage(localDrv driver.Driver, cfg HostStorageConfig) CuratorOption

WithHostStorage registers the local-disk driver for the transit buffer. One per Curator. Without HostStorage the Local/Replicated/ HostBuffered strategies and the bundler/chunker decorators are not available.

func WithMetadataRouter

func WithMetadataRouter(fn MetadataRouter) CuratorOption

WithMetadataRouter provides the function that reconstructs RoutingHints from Manifest.Metadata at deferred-Drain time.

func WithMultistoreIndex

func WithMultistoreIndex(idx MultistoreIndex) CuratorOption

WithMultistoreIndex provides the global-index implementation. Usually not required with a single Target Store.

func WithRoutingFunc

func WithRoutingFunc(fn RoutingFunc) CuratorOption

WithRoutingFunc provides the function that selects Target Stores at write time.

func WithScrubConfig

func WithScrubConfig(cfg agent.ScrubConfig) CuratorOption

WithScrubConfig configures the Curator-managed Scrub Agent. Curator automatically launches a Scrub for every registered Target Store.

func WithSnapshotConfig

func WithSnapshotConfig(cfg agent.SnapshotConfig) CuratorOption

WithSnapshotConfig configures the Curator-managed Snapshot Agent. Curator automatically launches a Snapshot for every registered Target Store with an available StoreIndex.

func WithStore

func WithStore(id string, store core.Store, cfg StoreRegistrationConfig, wrappers ...WrapperFactory) CuratorOption

WithStore registers a Target Store with Curator. Decorators are applied "outside in": the first wrapper is closest to the client, the last is closest to the underlying store.

type CuratorWarningPayload

type CuratorWarningPayload struct {
	Rule    string
	Message string
}

CuratorWarningPayload describes a non-blocking violation. Rule is a machine-readable rule identifier; Message is a human-readable description.

type DrainCompletedPayload

type DrainCompletedPayload struct {
	ArtifactID domain.ArtifactID
	StoreID    string
	BlobRef    string
}

type DrainNoTargetPayload

type DrainNoTargetPayload struct {
	ArtifactID domain.ArtifactID
	BlobRef    string
	Namespace  string
	Reason     string
}

type DrainQuarantinedPayload

type DrainQuarantinedPayload struct {
	ArtifactID domain.ArtifactID
	BlobRef    string
	Namespace  string
	Reason     string
}

type DrainRequeuedPayload

type DrainRequeuedPayload struct {
	ArtifactID    domain.ArtifactID
	BlobRef       string
	Namespace     string
	QuarantinedAt time.Time
}

type DrainRetryPayload

type DrainRetryPayload struct {
	ArtifactID   domain.ArtifactID
	BlobRef      string
	TotalTargets int
	FailedCount  int
	FirstError   string
}

type EvictionPolicy

type EvictionPolicy string

EvictionPolicy is the eviction policy of HostStorage.

const (
	EvictionPolicyLRU      EvictionPolicy = "LRU"
	EvictionPolicyTTL      EvictionPolicy = "TTL"
	EvictionPolicyPressure EvictionPolicy = "Pressure"
)

type HostAdmin

type HostAdmin interface {
	// Drain transfers files from system.transit to the Target
	// Stores. The route is computed by the Router at the moment of
	// transfer (DL-01).
	Drain(ctx context.Context) error

	// Stats returns a snapshot of the current transit state.
	// Distinct from Curator.Stats: HostAdmin.Stats is a synchronous
	// read on the in-memory state of the transit buffer; Curator's
	// own Stats wraps this with a context for the broader API.
	Stats() HostStorageStats

	// Recover restores HostStorage after a process crash: it
	// cleans up .tmp files, checks locks, and re-indexes the
	// transit area.
	Recover(ctx context.Context) error

	// Requeue moves a file (or files) from
	// system.transit/quarantine/ back into the active Drain queue.
	// The route will be recomputed during the next Drain (deferred
	// routing). When artifactID is nil, all files in quarantine
	// are returned.
	//
	// Returns the number of files actually moved. Files missing
	// from quarantine are silently skipped — the operation is
	// idempotent.
	Requeue(ctx context.Context, artifactID *domain.ArtifactID) (int, error)

	// ListQuarantined returns a snapshot of the current quarantine
	// state. It does not block Drain. QuarantineFilter provides
	// pagination.
	ListQuarantined(ctx context.Context, filter QuarantineFilter) ([]QuarantinedItem, error)
}

HostAdmin holds the administrative operations of HostStorage. They are accessible to Curator. Decorators see only TransitStore.

type HostStorage

type HostStorage interface {
	TransitStore
	HostAdmin
}

HostStorage is the full HostStorage contract — the transit buffer on a fast local disk used by Curator for deferred writes to slow Target Stores, manifest caching with ManifestStorage: Local/Replicated, and buffering before bundler packing. Combines the per-blob surface (TransitStore, exposed to decorators via WrapperDeps) with the administrative surface (HostAdmin, used by Curator itself).

Implementation lives in curator/host (package host) and is constructed internally by Curator from a driver.Driver and HostStorageConfig — host-applications never instantiate this type directly.

type HostStorageConfig

type HostStorageConfig struct {
	EvictionPolicy    EvictionPolicy
	OnHostStorageFull OnHostStorageFull
	OnDrainNoTarget   OnDrainNoTarget
	SoftLimitBytes    int64
	HardLimitBytes    int64
	EventCooldown     time.Duration
	DrainInterval     time.Duration
	WorkspaceDir      string
}

HostStorageConfig is the configuration of the transit buffer. WorkspaceDir is required.

type HostStorageFullPayload

type HostStorageFullPayload struct {
	UsedBytes int64
	MaxBytes  int64
}

type HostStoragePressurePayload

type HostStoragePressurePayload struct {
	UsedPct        float64
	AvailableBytes int64
}

type HostStorageStats

type HostStorageStats struct {
	TransitBytes    int64 // bytes in transit (excluding quarantine)
	TransitFiles    int   // number of files awaiting Drain
	QuarantineBytes int64 // bytes in system.transit/quarantine/
	QuarantineFiles int   // number of files in quarantine
	MaxTransitBytes int64 // hard limit from configuration
}

HostStorageStats is the current physical state of the transit buffer. The values are a snapshot at the moment of the request.

type MetadataRouter

type MetadataRouter func(m domain.Manifest) domain.RoutingHints

MetadataRouter reconstructs RoutingHints from Manifest.Metadata. Used at deferred-Drain time (DL-01) when the original hints from PutOptions are no longer available.

type MultistoreIndex

type MultistoreIndex interface {
	// ResolveArtifact returns the list of Stores in which the
	// artifact is registered. Used when reading through Curator.
	ResolveArtifact(id domain.ArtifactID) ([]domain.StoreID, error)

	// ExistsAny is a batch presence check across every Store.
	// Used by the Ingester to aggregate requests before physical
	// writes. Without OriginalSize: an exact composite-key check is
	// excessive for a batch optimisation.
	ExistsAny(hashes []domain.ContentHash) (map[domain.ContentHash]bool, error)

	// RegisterArtifact records that an artifact is present in a
	// given Store. Called by Curator after a successful write or
	// Drain.
	RegisterArtifact(id domain.ArtifactID, storeID domain.StoreID, hash domain.ContentHash) error

	// MarkStale marks a record as stale (Read-Repair on a cache
	// miss: the index has a route but the artifact is physically
	// missing from the Location).
	MarkStale(id domain.ArtifactID) error

	// PruneStale periodically clears stale records. May be invoked
	// in the background or on demand.
	PruneStale(ctx context.Context) error
}

MultistoreIndex is the aggregating index at the Curator level. A wrapper over several domain.StoreIndexes; needed only when there are multiple Stores. With a single Store, Curator works with the StoreIndex directly.

Eventually consistent by nature — fully derivable from the physical state of the underlying StoreIndexes.

type OnDrainNoTarget

type OnDrainNoTarget string

OnDrainNoTarget controls behaviour when the Router returns 0 targets at Drain time.

const (
	OnDrainNoTargetRetain     OnDrainNoTarget = "Retain"
	OnDrainNoTargetQuarantine OnDrainNoTarget = "Quarantine"
)

type OnHostStorageFull

type OnHostStorageFull string

OnHostStorageFull controls behaviour when the HostStorage hard limit is hit.

const (
	OnHostStorageFullBlock        OnHostStorageFull = "Block"
	OnHostStorageFullDirectStream OnHostStorageFull = "DirectStream"
	OnHostStorageFullReject       OnHostStorageFull = "Reject"
)

type OnUnavailable

type OnUnavailable string

OnUnavailable controls behaviour when a Backup is unavailable on the write path.

const (
	OnUnavailableBestEffort OnUnavailable = "BestEffort"
	OnUnavailableRequired   OnUnavailable = "Required"
	OnUnavailableQueued     OnUnavailable = "Queued"
)

type QuarantineFilter

type QuarantineFilter struct {
	// Namespace filters by namespace; an empty string means all.
	Namespace string

	// OlderThan limits results to files quarantined before the
	// given moment. The zero value means no filter.
	OlderThan time.Time

	// Limit caps the number of returned records. 0 means no limit.
	Limit int
}

QuarantineFilter is the selection used by ListQuarantined.

type QuarantinedItem

type QuarantinedItem struct {
	ArtifactID    domain.ArtifactID
	BlobRef       string
	Namespace     string
	OriginalSize  int64
	QuarantinedAt time.Time
	Reason        string
}

QuarantinedItem describes a single file in quarantine.

type ReadCost

type ReadCost string

ReadCost labels the cost of a read. Used to keep cold Stores out of the regular Get flow.

const (
	ReadCostLow  ReadCost = "Low"
	ReadCostHigh ReadCost = "High"
)

type ReadPolicy

type ReadPolicy string

ReadPolicy controls when a Backup is read.

const (
	// ReadPolicyFallback — High Availability. Read automatically
	// when the Target is unavailable.
	ReadPolicyFallback ReadPolicy = "Fallback"

	// ReadPolicyNever — Compliance & Isolation. Fully excluded from
	// normal routing; reachable only through an explicit
	// Curator.Store(backupID).Get.
	ReadPolicyNever ReadPolicy = "Never"

	// ReadPolicyAuto — Storage Tiering. Excluded from regular Get,
	// but used when GetOptions.AllowColdRead is true.
	ReadPolicyAuto ReadPolicy = "Auto"
)

type ReplicationLagPayload

type ReplicationLagPayload struct {
	StoreID  string
	LagCount int
	LagBytes int64
}

type RoutingFunc

type RoutingFunc func(meta RoutingMetadata) []StoreTarget

RoutingFunc selects Target Stores for a write through Curator. It can be compiled from declarative configuration rules or supplied directly by the developer.

type RoutingMetadata

type RoutingMetadata struct {
	Namespace   string
	Size        int64
	ContentType string
	Source      string
	Attributes  map[string]string
}

RoutingMetadata is the input to RoutingFunc on the write path. Curator builds it from PutOptions.

type StoreRegistrationConfig

type StoreRegistrationConfig struct {
	Priority             int
	ReadCost             ReadCost
	WriteStrategy        WriteStrategy
	AllowCrossStoreDedup bool
}

StoreRegistrationConfig are the parameters of registering a Target Store with Curator via WithStore.

type StoreTarget

type StoreTarget struct {
	StoreID  string
	Priority int
}

StoreTarget is one outcome of a RoutingFunc: where to write and at what priority.

type StoreUnreachablePayload

type StoreUnreachablePayload struct {
	StoreID string
	Reason  string
}

type TransitStore

type TransitStore interface {
	Write(ctx context.Context, blobRef string, r io.Reader) (path string, err error)
	Read(ctx context.Context, blobRef string) (io.ReadCloser, error)
	Has(ctx context.Context, blobRef string) bool
	Remove(ctx context.Context, blobRef string) error
}

TransitStore is the surface decorators see for working with HostStorage. Passed through WrapperDeps. The full HostStorage contract (TransitStore + HostAdmin) is internal to curator/.

type WrapperDeps

type WrapperDeps struct {
	HostStorage TransitStore
	Publisher   core.Publisher
}

WrapperDeps are the dependencies provided by Curator to a decorator at registration time. HostStorage may be nil if no transit buffer has been registered with Curator; the decorator is responsible for checking this and returning an error if it requires HostStorage.

type WrapperFactory

type WrapperFactory interface {
	Wrap(store core.DataStore, deps WrapperDeps) (core.DataStore, error)
}

WrapperFactory creates a decorator on top of a Store while receiving its dependencies from Curator. It is applied during Target/Backup registration through WithStore/WithBackup. This resolves the dependency cycle: decorators get access to HostStorage and Publisher through a standard contract, not via public objects.

type WriteStrategy

type WriteStrategy string

WriteStrategy is the strategy for writing into a Target Store through Curator.

const (
	// WriteStrategyAuto — the engine decides based on the target
	// Store's capabilities. CapSlowRead becomes HostBuffered;
	// otherwise DirectStream.
	WriteStrategyAuto WriteStrategy = "Auto"

	// WriteStrategyHostBuffered — write into HostStorage with an
	// asynchronous Drain. The artifact is visible through Get
	// immediately, before the Drain completes.
	WriteStrategyHostBuffered WriteStrategy = "HostBuffered"

	// WriteStrategyDirectStream — write directly through the target
	// Store's Driver.
	WriteStrategyDirectStream WriteStrategy = "DirectStream"
)

Directories

Path Synopsis
Package bundler implements the decorator that transparently packs small blobs into .pack volumes through HostStorage.
Package bundler implements the decorator that transparently packs small blobs into .pack volumes through HostStorage.
Package chunker implements the decorator that transparently CDC-slices large streams into anonymous chunks and creates a TOC manifest.
Package chunker implements the decorator that transparently CDC-slices large streams into anonymous chunks and creates a TOC manifest.
Package host implements HostStorage — the transit buffer on a fast local disk used by Curator for deferred writes to slow Target Stores, manifest caching with ManifestStorage: Local/Replicated, and buffering before bundler packing.
Package host implements HostStorage — the transit buffer on a fast local disk used by Curator for deferred writes to slow Target Stores, manifest caching with ManifestStorage: Local/Replicated, and buffering before bundler packing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL