engine

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2026 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CompressionCodecS2ID          uint16 = 1
	CompressionCodecS2BetterID    uint16 = 2
	CompressionCodecZstdFastestID uint16 = 3
	CompressionCodecZstdDefaultID uint16 = 4

	CompressionCodecS2Name          = "s2"
	CompressionCodecS2BetterName    = "s2_better"
	CompressionCodecZstdFastestName = "zstd_fastest"
	CompressionCodecZstdDefaultName = "zstd_default"
)
View Source
const (
	Int32Sample   byte = 1
	Float32Sample byte = 2
)
View Source
const (
	RetentionActionKeep    = "keep"
	RetentionActionDelete  = "delete"
	RetentionActionArchive = "archive"
)
View Source
const (
	DurabilityProfileStrict     = "strict"
	DurabilityProfileBalanced   = "balanced"
	DurabilityProfileThroughput = "throughput"
)
View Source
const (
	LogLevelInfo  = "info"
	LogLevelDebug = "debug"
	LogLevelTrace = "trace"
)
View Source
const (
	MetricPartitionDay uint8 = iota + 1
	MetricPartitionMonth
	MetricPartitionYear
	MetricPartitionForever
)
View Source
const (
	MetricRawIngestActionKeep   = "keep"
	MetricRawIngestActionDelete = "delete"
	MetricRawIngestActionRename = "rename"
)
View Source
const (
	PageMaxRecords = 12000
	PageMaxBytes   = 256 * 1024       // 256 KB estimated uncompressed payload
	PageMaxAge     = 10 * time.Second // max age before forced flush
	HeaderSize     = 18               // StartTime(8) + EndTime(8) + NumRecords(2)
)
View Source
const (
	WALFsyncPolicySegment = "segment"
	WALFsyncPolicyAlways  = "always"
)
View Source
const MaxMetricsPerDatabase = 65535

MaxMetricsPerDatabase is the hard limit on distinct metrics per database (uint16 address space).

View Source
const TraceSlogLevel slog.Level = slog.LevelDebug - 4

Variables

View Source
var ErrDataFileActive = errors.New("data file partition is currently open")
View Source
var ErrOutOfOrderTimestamp = fmt.Errorf("timestamp must be non-decreasing")
View Source
var ErrTooManyMetrics = fmt.Errorf("too many metrics in database")
View Source
var ErrWALMissingBaseline = errors.New("wal compact record missing initial baseline")

Functions

func Aggregators added in v1.4.0

func Aggregators() map[string]Aggregator

func AppendSample

func AppendSample[T SampleType](w *WAL, metricID MetricID, ts Timestamp, value T) (uint16, error)

AppendSample writes a typed sample to the current WAL segment Satisfies LAW 1 — sample written to WAL before memory mutation Returns current segment ID for tracking page origin (LAW 6)

func AppendSampleWithMetricName

func AppendSampleWithMetricName[T SampleType](w *WAL, metricID MetricID, metricName string, ts Timestamp, value T) (uint16, error)

AppendSampleWithMetricName writes a typed sample and embeds the metric name. This is used on first-seen metrics so catalog state can be rebuilt from WAL.

func DefaultStepAggregate added in v1.4.0

func DefaultStepAggregate() string

func FormatTimestamp

func FormatTimestamp(ts Timestamp) string

FormatTimestamp converts a nanosecond Unix timestamp to human-readable UTC format. Format: YYYY-MM-DD HH:MM:SS.nnnnnnnnn

func LoadEngineConfig added in v1.2.0

func LoadEngineConfig(rootDataDir string, fallbackWalMaxSegSize int64) (EngineConfig, time.Duration, DBInfo, error)

func NewLogger added in v1.2.0

func NewLogger(cfg EngineConfigLogging) (*slog.Logger, func() error, error)

func ReadMetricFileVersion added in v1.3.0

func ReadMetricFileVersion(path string) (uint16, error)

func ScanDataFileHeaders

func ScanDataFileHeaders(path string) (DataFileStats, []DataFrameHeader, error)

ScanDataFileHeaders walks page headers in a .dat file and returns per-frame headers plus aggregate stats. It does not decode payload pages.

func ScanWALFile

func ScanWALFile(path string) (WALFileStats, []WALWalkRecord, error)

ScanWALFile walks WAL records from the beginning and stops at the first invalid or truncated tail. It returns all successfully decoded records before the stop point.

func SupportedAggregates added in v1.4.0

func SupportedAggregates() []string

func WalkMetricFileFrames added in v1.3.0

func WalkMetricFileFrames(path string, fn func(MetricFileFrameInfo) error) error

func WalkMetricFilePageInfosV1 added in v1.3.0

func WalkMetricFilePageInfosV1(path string, fn func(MetricFilePageInfoV1) error) error

func WalkMetricFileV1 added in v1.3.0

func WalkMetricFileV1(path string, fn func(MetricFilePage) error) error

func WriteMetricFileV1 added in v1.3.0

func WriteMetricFileV1(path string, partitionKind uint8, codec BlockCompressionCodec, pages []MetricFilePageInput) error

func WriteMetricFileV2 added in v1.3.0

func WriteMetricFileV2(path string, partitionKind uint8, codec BlockCompressionCodec, pages []MetricFilePageInput) error

Types

type AggregateBucket added in v1.4.0

type AggregateBucket struct {
	Database  string
	Metric    string
	Aggregate string
	Window    time.Duration
	StartTS   Timestamp
	EndTS     Timestamp
	Value     float32
}

type AggregateBucketCallback added in v1.4.0

type AggregateBucketCallback func(AggregateBucket) error

type Aggregator added in v1.4.0

type Aggregator interface {
	Name() string
	Compute(periodStart, periodEnd Timestamp, points []float32) (float32, error)
}

Aggregator computes one aggregate value for a time window. points are source metric values collected for that window.

type BlockCompressionCodec added in v1.3.0

type BlockCompressionCodec interface {
	ID() uint16
	Name() string
	Encode(src []byte) ([]byte, error)
	Decode(src []byte) ([]byte, error)
}

func BlockCompressionCodecByID added in v1.3.0

func BlockCompressionCodecByID(id uint16) (BlockCompressionCodec, error)

func BlockCompressionCodecByName added in v1.3.0

func BlockCompressionCodecByName(name string) (BlockCompressionCodec, error)

func DefaultMetricFileCompressionCodec added in v1.3.0

func DefaultMetricFileCompressionCodec() BlockCompressionCodec

type Catalog

type Catalog struct {
	// contains filtered or unexported fields
}

Catalog is the in-memory registry of metric names, IDs, and value types for one database. It is persisted as catalog.json next to the database data files.

func LoadCatalog

func LoadCatalog(filename string) (*Catalog, error)

func (*Catalog) EnsureMetricEntry

func (c *Catalog) EnsureMetricEntry(name string, mid MetricID, valueType byte) error

func (*Catalog) GetMetricByID

func (c *Catalog) GetMetricByID(mid MetricID) (string, MetricEntry, bool)

func (*Catalog) GetMetricEntry

func (c *Catalog) GetMetricEntry(name string) (MetricEntry, bool)

func (*Catalog) GetMetricType

func (c *Catalog) GetMetricType(mid MetricID) (byte, error)

GetMetricType returns the value type (Int32Sample or Float32Sample) for a given metric.

func (*Catalog) GetValueWidth

func (c *Catalog) GetValueWidth(mid MetricID) int

GetValueWidth returns the number of bytes per value for this metric. Currently all metrics are 4 bytes (int32 or float32).

func (*Catalog) IsDirty

func (c *Catalog) IsDirty() bool

func (*Catalog) ListMetrics

func (c *Catalog) ListMetrics() []MetricInfo

ListMetrics returns a stable, name-sorted snapshot of metrics in this catalog.

func (*Catalog) UpdateLastByMetricID

func (c *Catalog) UpdateLastByMetricID(mid MetricID, ts Timestamp, raw []byte) error

func (*Catalog) WriteCatalog

func (c *Catalog) WriteCatalog() error

type DBInfo

type DBInfo struct {
	Grace           string            `json:"grace" toml:"grace"`
	RetentionDays   int               `json:"retention_days" toml:"retention_days"`
	RetentionAction string            `json:"retention_action" toml:"retention_action"`
	MaxActiveDays   int               `json:"max_active_days" toml:"max_active_days"`
	Partition       string            `json:"partition" toml:"partition"`
	WALEnabled      bool              `json:"wal_enabled" toml:"wal_enabled"`
	WALSkipBefore   string            `json:"wal_skip_before" toml:"wal_skip_before"`
	PageMaxRecords  int               `json:"page_max_records" toml:"page_max_records"`
	PageMaxBytes    int               `json:"page_max_bytes" toml:"page_max_bytes"`
	PageMaxAge      string            `json:"page_max_age" toml:"page_max_age"`
	Rollups         DBManifestRollups `json:"rollups" toml:"rollups"`
}

type DBManifestPage

type DBManifestPage struct {
	MaxRecords int    `toml:"max_records"`
	MaxBytes   int    `toml:"max_bytes"`
	MaxAge     string `toml:"max_age"`
}

type DBManifestRetention

type DBManifestRetention struct {
	Grace           string `toml:"grace"`
	RetentionDays   int    `toml:"retention_days"`
	RetentionAction string `toml:"retention_action"`
	MaxActiveDays   int    `toml:"max_active_days"`
	Partition       string `toml:"partition"`
}

type DBManifestRollupJob

type DBManifestRollupJob struct {
	ID                      string   `toml:"id"`
	SourceMetric            string   `toml:"source_metric"`
	SourcePattern           string   `toml:"source_pattern"`
	ExcludePatterns         []string `toml:"exclude_patterns"`
	Interval                string   `toml:"interval"`
	Aggregates              []string `toml:"aggregates"`
	DestinationDB           string   `toml:"destination_db"`
	DestinationMetricPrefix string   `toml:"destination_metric_prefix"`
	Grace                   string   `toml:"grace"`
}

type DBManifestRollups

type DBManifestRollups struct {
	Enabled               bool                  `toml:"enabled"`
	CheckpointFile        string                `toml:"checkpoint_file"`
	DefaultGrace          string                `toml:"default_grace"`
	DefaultInterval       string                `toml:"default_interval"`
	DefaultDestinationDB  string                `toml:"default_destination_db"`
	DefaultAggregates     []string              `toml:"default_aggregates"`
	GlobalExcludePatterns []string              `toml:"global_exclude_patterns"`
	Jobs                  []DBManifestRollupJob `toml:"jobs"`
}

type DBManifestTOML

type DBManifestTOML struct {
	Retention DBManifestRetention `toml:"retention"`
	WAL       DBManifestWAL       `toml:"wal"`
	Page      DBManifestPage      `toml:"page"`
	Rollups   DBManifestRollups   `toml:"rollups"`
}

type DBManifestWAL

type DBManifestWAL struct {
	Enabled    bool   `toml:"enabled"`
	SkipBefore string `toml:"skip_before"`
}

type DBRuntimeInspect added in v1.3.0

type DBRuntimeInspect struct {
	Database    string          `json:"database"`
	MetricCount int             `json:"metric_count"`
	Manifest    DBInfo          `json:"manifest"`
	Stats       DBStats         `json:"stats"`
	OpenPages   []OpenPageStats `json:"open_pages"`
}

type DBStats

type DBStats struct {
	DataFile DataRuntimeStats
	WAL      WALStats
}

type DataFileMetricCompactReport added in v1.4.0

type DataFileMetricCompactReport struct {
	Database    string `json:"database"`
	Part        string `json:"part"`
	DataPath    string `json:"data_path"`
	MetricPath  string `json:"metric_path"`
	DataBytes   int64  `json:"data_bytes"`
	MetricBytes int64  `json:"metric_bytes"`
	SavedBytes  int64  `json:"saved_bytes"`
	DurationMS  int64  `json:"duration_ms"`
}

type DataFileRecompactReport added in v1.3.0

type DataFileRecompactReport struct {
	Database   string `json:"database"`
	Part       string `json:"part"`
	Path       string `json:"path"`
	OldFrames  int    `json:"old_frames"`
	NewFrames  int    `json:"new_frames"`
	OldRecords int64  `json:"old_records"`
	NewRecords int64  `json:"new_records"`
	OldBytes   int64  `json:"old_bytes"`
	NewBytes   int64  `json:"new_bytes"`
	DurationMS int64  `json:"duration_ms"`
}

type DataFileRecoverReport added in v1.3.0

type DataFileRecoverReport struct {
	Database            string `json:"database"`
	Part                string `json:"part"`
	SourcePath          string `json:"source_path"`
	OutputPath          string `json:"output_path"`
	SourceBytes         int64  `json:"source_bytes"`
	OutputBytes         int64  `json:"output_bytes"`
	RecoveredFrames     int    `json:"recovered_frames"`
	RecoveredRecords    int64  `json:"recovered_records"`
	SkippedBytes        int64  `json:"skipped_bytes"`
	RejectedFrames      int    `json:"rejected_frames"`
	FirstAcceptedOffset int64  `json:"first_accepted_offset"`
	LastAcceptedOffset  int64  `json:"last_accepted_offset"`
	DurationMS          int64  `json:"duration_ms"`
}

type DataFileStats

type DataFileStats struct {
	Path            string
	FileBytes       int64
	Frames          int
	TotalRecords    int64
	TotalCompressed int64
	TotalFrameBytes int64
	MinStart        Timestamp
	MaxEnd          Timestamp
}

DataFileStats summarizes one .dat file.

func ScanDataFileStats added in v1.3.0

func ScanDataFileStats(path string) (DataFileStats, error)

ScanDataFileStats reads only frame headers and compressed lengths for a .dat file. It does not decompress payloads or populate per-frame page details.

func WalkDataFileHeaders

func WalkDataFileHeaders(path string, fn DataFrameCallback) (DataFileStats, error)

WalkDataFileHeaders walks page headers in a .dat file and invokes fn for each decoded frame header. It does not decode payload pages.

type DataFrameCallback

type DataFrameCallback func(DataFrameHeader) error

type DataFrameHeader

type DataFrameHeader struct {
	Index           int
	Offset          int64
	StartTime       Timestamp
	EndTime         Timestamp
	NumRecords      uint16
	CompressedLen   uint64
	UncompressedLen uint64
	FrameBytes      int64
}

DataFrameHeader describes one page frame parsed from a .dat file without decompression.

type DataRuntimeStats

type DataRuntimeStats struct {
	FlushCount           int64
	TotalFlushBytes      int64
	TotalFlushRecords    int64
	TotalFlushCompressed int64
	MinFlushBytes        int64
	MaxFlushBytes        int64
	MinFlushRecords      int64
	MaxFlushRecords      int64
	MinFlushCompressed   int64
	MaxFlushCompressed   int64
	FlushDurationTotal   time.Duration
	MinFlushDuration     time.Duration
	MaxFlushDuration     time.Duration
	SyncCount            int64
	SyncDurationTotal    time.Duration
	MinSyncDuration      time.Duration
	MaxSyncDuration      time.Duration
}

type Database

type Database struct {
	Name        string
	RootDataDir string
	// contains filtered or unexported fields
}

Database represents one named time-series database on disk. Each database has its own WAL, catalog, and set of daily data files. Use Engine.AddLine / Engine.QueryRange for typical access; Database is exposed for low-level tooling (nanocli inspect, export, etc.).

func NewDatabase

func NewDatabase(name string, walMaxSegSize int64) (*Database, error)

NewDatabase creates a new database at the given base path with the specified WAL segment size.

func NewDatabaseWithWALConfig

func NewDatabaseWithWALConfig(name string, walMaxSegSize int64, fsyncPolicy string) (*Database, error)

func OpenDatabase

func OpenDatabase(name string) (*Database, error)

OpenDatabase opens an existing database by base path (no WAL size limit). Used by read-only tooling that does not need to append samples.

func OpenDatabaseWithWALConfig

func OpenDatabaseWithWALConfig(name string, fsyncPolicy string) (*Database, error)

func (*Database) Close

func (db *Database) Close() error

type Engine

type Engine struct {
	RootDataDir           string
	WALMaxSegSize         int64
	WALFsyncPolicy        string
	Durability            string
	PreferMetricFiles     bool
	AutoCreateMetricFiles bool
	MetricFileCompression string
	MetricRawIngestAction string
	MetricTimeCacheSlots  int
	Logging               EngineConfigLogging

	SyncDataFile  bool
	SyncCatalog   bool
	StatsEnabled  bool
	StatsInterval time.Duration
	// contains filtered or unexported fields
}

Engine is the top-level coordinator for all databases in a root data directory. It is safe for concurrent use. Open with OpenEngine; always call Close when done.

func OpenEngine

func OpenEngine(rootDataDir string, walMaxSegSize int64) (*Engine, error)

OpenEngine opens or creates the engine rooted at rootDataDir. If engine.toml does not exist it is written from the embedded defaults. walMaxSegSize sets the per-database WAL segment size; pass 0 for the 64 MiB default.

func OpenEngineWithConfig added in v1.2.0

func OpenEngineWithConfig(rootDataDir string, cfg EngineConfig, statsInterval time.Duration, dbDefaults DBInfo, logger *slog.Logger) (*Engine, error)

func (*Engine) AddLine

func (e *Engine) AddLine(line string) error

AddLine ingests one sample in line-protocol format: "DB/metric value [ts]" where value is an integer or float literal and ts is optional. ts can be Unix nanoseconds or a human-readable timestamp accepted by ParseTimestamp. AddLine is safe for concurrent use.

func (*Engine) AddSample

func (e *Engine) AddSample(database, metric string, ts Timestamp, value any) error

AddSample ingests one typed sample directly. This is the canonical ingest API used by all write paths.

func (*Engine) BackfillRollups added in v1.1.0

func (e *Engine) BackfillRollups(sourceDBNames []string) (RollupBackfillReport, error)

func (*Engine) BuildMetricFile added in v1.3.0

func (e *Engine) BuildMetricFile(database, partition string) (string, error)

BuildMetricFile creates the default metric-<partition>.dat format for one database. V2 is the current default format.

func (*Engine) BuildMetricFileV1 added in v1.3.0

func (e *Engine) BuildMetricFileV1(database, partition string) (string, error)

BuildMetricFileV1 creates metric-<partition>.dat from data-<partition>.dat for one database. It does not delete or modify the source data file.

func (*Engine) BuildMetricFileV2 added in v1.3.0

func (e *Engine) BuildMetricFileV2(database, partition string) (string, error)

BuildMetricFileV2 creates metric-<partition>.dat from data-<partition>.dat for one database. It does not delete or modify the source data file until the configured raw-ingest action is applied after a successful write.

func (*Engine) Close

func (e *Engine) Close() error

Close flushes all open day-pages, resets WAL files, emits a final stats snapshot, and closes every open database. Always call Close before the process exits.

func (*Engine) CompactDataFileToMetricV2 added in v1.4.0

func (e *Engine) CompactDataFileToMetricV2(database, part string) (DataFileMetricCompactReport, error)

func (*Engine) CompareDataAndMetricPartition added in v1.3.0

func (e *Engine) CompareDataAndMetricPartition(database, partition string) error

CompareDataAndMetricPartition validates a metric file against its raw data partition, dispatching to the appropriate checker for the on-disk metric file version.

func (*Engine) CompareDataAndMetricPartitionV1 added in v1.3.0

func (e *Engine) CompareDataAndMetricPartitionV1(database, partition string) error

CompareDataAndMetricPartitionV1 validates that data-<partition>.dat and metric-<partition>.dat contain exactly the same per-metric sample stream.

func (*Engine) CompareDataAndMetricPartitionV2 added in v1.3.0

func (e *Engine) CompareDataAndMetricPartitionV2(database, partition string) error

CompareDataAndMetricPartitionV2 validates that data-<partition>.dat and a v2 metric-<partition>.dat contain exactly the same per-metric sample stream.

func (*Engine) DBStats

func (e *Engine) DBStats(database string) (DBStats, bool)

DBStats returns a snapshot of engine-level stats for the given database. Values for data flushes come from the engine stat store; WAL stats are read directly from the WAL so they are always current.

func (*Engine) ExportFile

func (e *Engine) ExportFile(database, outPath string) error

ExportFile exports one database to a LP file using: DB/metric value ts. Exported timestamps use FormatTimestamp (UTC, YYYY-MM-DD HH:MM:SS.nnnnnnnnn).

func (*Engine) ExportToWriter

func (e *Engine) ExportToWriter(database string, out io.Writer) error

ExportToWriter exports one database to an arbitrary writer using line protocol. Timestamps are written with FormatTimestamp (UTC, YYYY-MM-DD HH:MM:SS.nnnnnnnnn).

func (*Engine) GetAllDatabaseNames

func (e *Engine) GetAllDatabaseNames() []string

GetAllDatabaseNames returns all database names managed by this engine.

func (*Engine) GetMetricRollupDownstream added in v1.3.0

func (e *Engine) GetMetricRollupDownstream(database, metric string, maxHops int) ([]MetricRollupDownstream, bool, error)

GetMetricRollupDownstream returns bounded downstream rollup lineage for one metric. Lineage is derived from configured rollup jobs in loaded database manifests.

func (*Engine) ImportFile

func (e *Engine) ImportFile(path string) error

ImportFile imports LP lines in the format: DB/metric value [ts]. ts can be Unix nanoseconds or a human-readable value accepted by ParseTimestamp.

func (*Engine) InspectDBRuntime added in v1.3.0

func (e *Engine) InspectDBRuntime(database string) (DBRuntimeInspect, bool)

func (*Engine) InspectDBWAL added in v1.3.0

func (e *Engine) InspectDBWAL(database string) ([]WALRecord, bool, error)

func (*Engine) IsDatabaseActive added in v1.3.0

func (e *Engine) IsDatabaseActive(database string) bool

IsDatabaseActive reports whether a database is currently loaded in memory.

func (*Engine) ListMetrics

func (e *Engine) ListMetrics(database string) ([]MetricInfo, error)

ListMetrics returns all known metrics for a database in stable name order.

func (*Engine) QueryAggregateRange added in v1.4.0

func (e *Engine) QueryAggregateRange(database, metric string, fromTS, toTS Timestamp, window time.Duration, aggregates []string, fn AggregateBucketCallback) error

func (*Engine) QueryLast

func (e *Engine) QueryLast(database, metric string) (Sample, bool, error)

QueryLast returns the most recently written sample for a metric. Returns (sample, true, nil) if a sample exists, (zero, false, nil) if not.

func (*Engine) QueryRange

func (e *Engine) QueryRange(database, metric string, fromTS, toTS Timestamp, stride int, fn SampleCallback) error

QueryRange scans samples for a metric within a time range. Stride controls downsampling: stride=1 returns every sample, stride=N returns every Nth. Each matching sample is passed to the callback; callback errors terminate early.

func (*Engine) QueryRangeMany added in v1.4.0

func (e *Engine) QueryRangeMany(database string, metrics []string, fromTS, toTS Timestamp, stride int, fn SampleCallback) error

QueryRangeMany scans samples for multiple metrics within a time range. It reuses each persisted partition scan across all requested metrics.

func (*Engine) RecompactDataFile added in v1.3.0

func (e *Engine) RecompactDataFile(database, part string) (DataFileRecompactReport, error)

func (*Engine) RecoverDataFile added in v1.3.0

func (e *Engine) RecoverDataFile(database, part, outputPath string) (DataFileRecoverReport, error)

func (*Engine) SetAutoRollupTrigger

func (e *Engine) SetAutoRollupTrigger(enabled bool)

SetAutoRollupTrigger controls whether ingest-time flushes automatically trigger rollup computation for the source database.

func (*Engine) TriggerRollupsForSource

func (e *Engine) TriggerRollupsForSource(sourceDBName string)

TriggerRollupsForSource computes rollups for one source database using jobs configured in the source database manifest.

func (*Engine) TriggerRollupsForSources

func (e *Engine) TriggerRollupsForSources(sourceDBNames []string)

TriggerRollupsForSources computes rollups for each unique source database name.

type EngineConfig

type EngineConfig struct {
	Engine           EngineConfigEngine           `toml:"engine"`
	WAL              EngineConfigWAL              `toml:"wal"`
	Durability       EngineConfigDurability       `toml:"durability"`
	Metrics          EngineConfigMetrics          `toml:"metrics"`
	Logging          EngineConfigLogging          `toml:"logging"`
	Stats            EngineConfigStats            `toml:"stats"`
	Defaults         EngineConfigDefaults         `toml:"defaults"`
	ManifestDefaults EngineConfigManifestDefaults `toml:"manifest_defaults"`
}

type EngineConfigDefaults

type EngineConfigDefaults struct {
	Databases []string `toml:"databases"`
}

type EngineConfigDurability

type EngineConfigDurability struct {
	Profile string `toml:"profile"`
}

type EngineConfigEngine

type EngineConfigEngine struct {
	Listen string `toml:"listen"`
}

type EngineConfigLogger added in v1.2.0

type EngineConfigLogger struct {
	Output string `toml:"output"`
	Level  string `toml:"level"`
}

type EngineConfigLogging added in v1.2.0

type EngineConfigLogging struct {
	Loggers []EngineConfigLogger `toml:"logger"`
}

type EngineConfigManifestDefaults

type EngineConfigManifestDefaults struct {
	Retention DBManifestRetention `toml:"retention"`
	WAL       DBManifestWAL       `toml:"wal"`
	Page      DBManifestPage      `toml:"page"`
	Rollups   DBManifestRollups   `toml:"rollups"`
}

type EngineConfigMetrics added in v1.3.0

type EngineConfigMetrics struct {
	Enabled         bool   `toml:"enabled"`
	Compression     string `toml:"compression"`
	RawIngestAction string `toml:"raw_ingest_action"`
	TimeCacheSlots  int    `toml:"time_cache_slots"`
}

type EngineConfigStats

type EngineConfigStats struct {
	Enabled  bool   `toml:"enabled"`
	Interval string `toml:"interval"`
}

type EngineConfigWAL

type EngineConfigWAL struct {
	MaxSegmentSize int64  `toml:"max_segment_size"`
	FsyncPolicy    string `toml:"fsync_policy"`
}

type MetricEntry

type MetricEntry struct {
	MetricID  MetricID
	ValueType byte

	// In-memory only cache for QueryLast. Never persisted to catalog JSON.
	LastTS    Timestamp
	LastRaw   [4]byte
	LastValid bool
}

MetricEntry holds the catalog record for one metric: its assigned ID, value type, and an in-memory cache of the last written sample (for QueryLast).

type MetricFileFrameInfo added in v1.3.0

type MetricFileFrameInfo struct {
	Index           int
	MetricID        MetricID
	ValueType       byte
	MetricMinTS     Timestamp
	MetricMaxTS     Timestamp
	PointCount      uint32
	UncompressedLen uint32
	PayloadLen      uint32
}

type MetricFilePage added in v1.3.0

type MetricFilePage struct {
	MetricID        MetricID
	CodecID         uint16
	ValueType       byte
	Times           []Timestamp
	Int32           []int32
	Float32         []float32
	PageOffset      uint64
	PointCount      uint32
	PayloadLen      uint32
	UncompressedLen uint32
	MetricMinTS     Timestamp
	MetricMaxTS     Timestamp
}

func ReadMetricFileV1 added in v1.3.0

func ReadMetricFileV1(path string) ([]MetricFilePage, error)

type MetricFilePageInfoV1 added in v1.3.0

type MetricFilePageInfoV1 struct {
	Index           int
	MetricID        MetricID
	ValueType       byte
	PageOffset      uint64
	MetricMinTS     Timestamp
	MetricMaxTS     Timestamp
	PointCount      uint32
	UncompressedLen uint32
	PayloadLen      uint32
}

func ReadMetricFilePageInfosV1 added in v1.3.0

func ReadMetricFilePageInfosV1(path string) ([]MetricFilePageInfoV1, error)

type MetricFilePageInput added in v1.3.0

type MetricFilePageInput struct {
	MetricID  MetricID
	ValueType byte
	Times     []Timestamp
	Int32     []int32
	Float32   []float32
}

type MetricFileSummary added in v1.3.0

type MetricFileSummary struct {
	Version        uint16
	TimeFrameCount int
	MetricFrames   []MetricFileFrameInfo
}

func ReadMetricFileSummary added in v1.3.0

func ReadMetricFileSummary(path string) (MetricFileSummary, error)

type MetricID

type MetricID uint16

MetricID is a compact uint16 identifier assigned to each metric within a database.

func GetMetricID

func GetMetricID[T SampleType](c *Catalog, name string) (MetricID, error)

GetMetricID returns the MetricID for a named metric, registering it if it has not been seen before. The type parameter T determines whether an Int32Sample or Float32Sample entry is created. Returns an error if the metric already exists with a different value type, or if the per-database metric limit (65535) has been reached.

type MetricInfo

type MetricInfo struct {
	Name      string
	MetricID  MetricID
	ValueType byte
}

type MetricRollupDownstream added in v1.3.0

type MetricRollupDownstream struct {
	Hop       int
	JobID     string
	Interval  string
	Aggregate string
	Database  string
	Metric    string
}

type OpenPageStats added in v1.3.0

type OpenPageStats struct {
	Day          string        `json:"day"`
	Records      int           `json:"records"`
	MetricSlots  int           `json:"metric_slots"`
	UniqueMetric int           `json:"unique_metrics"`
	ValueBytes   int           `json:"value_bytes"`
	StartTS      Timestamp     `json:"start_timestamp_ns"`
	EndTS        Timestamp     `json:"end_timestamp_ns"`
	MaxRecords   int           `json:"max_records"`
	MaxBytes     int           `json:"max_bytes"`
	MaxAge       time.Duration `json:"max_age_ns"`
	Age          time.Duration `json:"age_ns"`
	WALSegmentID uint16        `json:"wal_segment_id"`
	Full         bool          `json:"full"`
	Persisted    bool          `json:"persisted"`
}

type Page

type Page struct {
	Start        Timestamp
	End          Timestamp
	Metrics      []MetricID
	Times        []Timestamp
	Values       *bytes.Buffer // raw value bytes buffer; catalog knows per-metric width/encoding
	MaxRecords   int
	MaxBytes     int
	MaxAge       time.Duration
	WALSegmentID uint16
	// contains filtered or unexported fields
}

Page holds interleaved samples from multiple metrics. Value bytes are stored raw in a pre-allocated buffer; the catalog is responsible for interpreting them.

func NewPage

func NewPage(firstTS Timestamp) *Page

func NewPageWithLimits

func NewPageWithLimits(firstTS Timestamp, maxRecords, maxBytes int, maxAge time.Duration) *Page

func (*Page) AddSample

func (p *Page) AddSample(metricID MetricID, ts Timestamp, value []byte) error

AddSample appends a raw value for the given metric and timestamp. value must be exactly the bytes the catalog will use to decode this metric. Timestamps must be non-decreasing across all metrics in this page.

func (*Page) DecodeCompressedFrame added in v1.3.0

func (p *Page) DecodeCompressedFrame(h PageHeader, compressed []byte, expectedCRC uint32) error

func (*Page) DecodeFrom

func (p *Page) DecodeFrom(r *bytes.Reader) error

func (*Page) EncodeInto

func (p *Page) EncodeInto(bb *bytes.Buffer) error

func (*Page) EndTime

func (p *Page) EndTime() Timestamp

func (*Page) IsFull

func (p *Page) IsFull() bool

IsFull returns true when any flush threshold is exceeded.

func (*Page) SetWalSegmentID

func (p *Page) SetWalSegmentID(segmentID uint16)

func (*Page) StartTime

func (p *Page) StartTime() Timestamp
type PageHeader struct {
	StartTime  Timestamp
	EndTime    Timestamp
	NumRecords uint16
}

func (*PageHeader) Decode

func (h *PageHeader) Decode(r *bytes.Reader) (err error)

func (*PageHeader) EncodeInto

func (h *PageHeader) EncodeInto(w *bytes.Buffer) (err error)

type RollupBackfillReport added in v1.1.0

type RollupBackfillReport struct {
	RequestedSources       []string `json:"requested_sources"`
	SourceDatabases        []string `json:"source_databases"`
	DestinationDatabases   []string `json:"destination_databases"`
	ClearedCheckpointFiles []string `json:"cleared_checkpoint_files"`
	ClearedDataFiles       []string `json:"cleared_data_files"`
	ClearedWALFiles        []string `json:"cleared_wal_files"`
	ClearedCatalogFiles    []string `json:"cleared_catalog_files"`
	ReplayPasses           int      `json:"replay_passes"`
}

type Sample

type Sample struct {
	Database  string
	Metric    string
	TS        Timestamp
	ValueType byte
	Int32     int32
	Float32   float32
}

Sample is one decoded data point returned by QueryRange or QueryLast.

type SampleCallback

type SampleCallback func(Sample) error

SampleCallback is invoked for each sample in a range query.

type SampleType

type SampleType = interface{ int32 | float32 }

SampleType constrains the numeric types supported as metric values.

type Timestamp

type Timestamp int64

Timestamp is a Unix nanosecond epoch value (int64). All sample timestamps must be monotonically non-decreasing within a metric.

func ParseTimestamp

func ParseTimestamp(input string) (Timestamp, error)

ParseTimestamp converts a human-readable timestamp or nanosecond value to Timestamp. Accepts formats:

  • Raw nanosecond integer: "1234567890123456789"
  • Date only: "2026-05-14" (time set to 00:00:00 UTC)
  • Date and time: "2026-05-14 12:34:56" (nanoseconds set to 0)
  • Date, time, and nanos: "2026-05-14 12:34:56.123456789"

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL manages a single reusable write-ahead log file.

func NewWAL

func NewWAL(path string, maxSegmentSize int64, fsyncPolicy string) (*WAL, error)

NewWAL creates a new WAL manager

func OpenAndRecoverWAL

func OpenAndRecoverWAL(name string, fsyncPolicy string) (*WAL, error)

func (*WAL) Close

func (w *WAL) Close() error

Close closes the current segment and cleans up

func (*WAL) DeleteSegment

func (w *WAL) DeleteSegment(segmentID uint16) error

DeleteSegment is retained for API compatibility.

func (*WAL) Flush

func (w *WAL) Flush() error

Flush buffers buffered records to disk without rotating Does not fsync (records remain in OS page cache)

func (*WAL) Fsync

func (w *WAL) Fsync() error

Fsync calls fsync on the current segment Called on page flush and segment rotation (Durability & fsync Policy)

func (*WAL) Records

func (w *WAL) Records() ([]WALRecord, error)

Records returns all WAL records currently present in the WAL file.

func (*WAL) RecordsWithCatalog

func (w *WAL) RecordsWithCatalog(cat *Catalog) ([]WALRecord, error)

RecordsWithCatalog returns all WAL records with optional catalog for ValueType lookups.

func (*WAL) Reset

func (w *WAL) Reset() error

Reset truncates the WAL to zero and reuses it from the beginning.

func (*WAL) RotateSegment

func (w *WAL) RotateSegment() (uint16, error)

RotateSegment is retained for API compatibility. WAL v1 uses a single file; rotate maps to reset-and-reuse.

func (*WAL) SegmentFull

func (w *WAL) SegmentFull() bool

SegmentFull checks if current segment should rotate Returns true if bufferSize >= maxSegSize

func (*WAL) ShouldFsyncAfterAppend

func (w *WAL) ShouldFsyncAfterAppend() bool

func (*WAL) Stats

func (w *WAL) Stats() WALStats

type WALFileStats

type WALFileStats struct {
	Path         string
	FileBytes    int64
	Records      int
	DecodedBytes int64
	MinTS        Timestamp
	MaxTS        Timestamp
	HasTail      bool
	TailBytes    int64
	StopOffset   int64
	StopReason   string
}

WALFileStats summarizes a WAL file walk.

func WalkWALFile

func WalkWALFile(path string, fn WALRecordCallback) (WALFileStats, error)

WalkWALFile walks WAL records and invokes fn for each successfully decoded record. It stops at first invalid/truncated tail and returns tail metadata in stats.

type WALFlushEvent

type WALFlushEvent struct {
	At    time.Time
	Bytes int64
}

type WALRecord

type WALRecord struct {
	SegmentID  uint16 // ID of the segment this record came from (LAW 6 origin, wal-NNNN.log format)
	MetricID   MetricID
	MetricName string
	Timestamp  Timestamp
	ValueType  byte        // Int32Sample or Float32Sample
	Value      interface{} // int32 or float32
}

WALRecord represents a single entry in the WAL Used for crash recovery (LAW 9 — deterministic replay)

type WALRecordCallback

type WALRecordCallback func(WALWalkRecord) error

type WALStats

type WALStats struct {
	AppendCount        int64
	AppendBytes        int64
	BufferBytes        int64
	FsyncCount         int64
	FsyncDurationTotal time.Duration
	MinFsyncDuration   time.Duration
	MaxFsyncDuration   time.Duration
	FlushCount         int64
	FlushedBytes       int64
	MinFlushBytes      int64
	MaxFlushBytes      int64
	ResetDurationTotal time.Duration
	MinResetDuration   time.Duration
	MaxResetDuration   time.Duration
	LastAppendAt       time.Time
	LastFsyncAt        time.Time
	LastFlushAt        time.Time
	FlushIntervalCount int64
	FlushIntervalTotal time.Duration
	MinFlushInterval   time.Duration
	MaxFlushInterval   time.Duration
	RecentFlushes      []WALFlushEvent
}

type WALWalkRecord

type WALWalkRecord struct {
	Index       int
	Offset      int64
	RecordBytes int64
	MetricID    MetricID
	Timestamp   Timestamp
	ValueType   byte
	Value       interface{}
	Raw         []byte
}

WALWalkRecord is one decoded WAL record plus its raw encoded bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL