Documentation
¶
Index ¶
- Constants
- Variables
- func Aggregators() map[string]Aggregator
- func AppendSample[T SampleType](w *WAL, metricID MetricID, ts Timestamp, value T) (uint16, error)
- func AppendSampleWithMetricName[T SampleType](w *WAL, metricID MetricID, metricName string, ts Timestamp, value T) (uint16, error)
- func DefaultStepAggregate() string
- func FormatTimestamp(ts Timestamp) string
- func LoadEngineConfig(rootDataDir string, fallbackWalMaxSegSize int64) (EngineConfig, time.Duration, DBInfo, error)
- func NewLogger(cfg EngineConfigLogging) (*slog.Logger, func() error, error)
- func ReadMetricFileVersion(path string) (uint16, error)
- func ScanDataFileHeaders(path string) (DataFileStats, []DataFrameHeader, error)
- func ScanWALFile(path string) (WALFileStats, []WALWalkRecord, error)
- func SupportedAggregates() []string
- func WalkMetricFileFrames(path string, fn func(MetricFileFrameInfo) error) error
- func WalkMetricFilePageInfosV1(path string, fn func(MetricFilePageInfoV1) error) error
- func WalkMetricFileV1(path string, fn func(MetricFilePage) error) error
- func WriteMetricFileV1(path string, partitionKind uint8, codec BlockCompressionCodec, ...) error
- func WriteMetricFileV2(path string, partitionKind uint8, codec BlockCompressionCodec, ...) error
- type AggregateBucket
- type AggregateBucketCallback
- type Aggregator
- type BlockCompressionCodec
- type Catalog
- func (c *Catalog) EnsureMetricEntry(name string, mid MetricID, valueType byte) error
- func (c *Catalog) GetMetricByID(mid MetricID) (string, MetricEntry, bool)
- func (c *Catalog) GetMetricEntry(name string) (MetricEntry, bool)
- func (c *Catalog) GetMetricType(mid MetricID) (byte, error)
- func (c *Catalog) GetValueWidth(mid MetricID) int
- func (c *Catalog) IsDirty() bool
- func (c *Catalog) ListMetrics() []MetricInfo
- func (c *Catalog) UpdateLastByMetricID(mid MetricID, ts Timestamp, raw []byte) error
- func (c *Catalog) WriteCatalog() error
- type DBInfo
- type DBManifestPage
- type DBManifestRetention
- type DBManifestRollupJob
- type DBManifestRollups
- type DBManifestTOML
- type DBManifestWAL
- type DBRuntimeInspect
- type DBStats
- type DataFileMetricCompactReport
- type DataFileRecompactReport
- type DataFileRecoverReport
- type DataFileStats
- type DataFrameCallback
- type DataFrameHeader
- type DataRuntimeStats
- type Database
- func NewDatabase(name string, walMaxSegSize int64) (*Database, error)
- func NewDatabaseWithWALConfig(name string, walMaxSegSize int64, fsyncPolicy string) (*Database, error)
- func OpenDatabase(name string) (*Database, error)
- func OpenDatabaseWithWALConfig(name string, fsyncPolicy string) (*Database, error)
- type Engine
- func (e *Engine) AddLine(line string) error
- func (e *Engine) AddSample(database, metric string, ts Timestamp, value any) error
- func (e *Engine) BackfillRollups(sourceDBNames []string) (RollupBackfillReport, error)
- func (e *Engine) BuildMetricFile(database, partition string) (string, error)
- func (e *Engine) BuildMetricFileV1(database, partition string) (string, error)
- func (e *Engine) BuildMetricFileV2(database, partition string) (string, error)
- func (e *Engine) Close() error
- func (e *Engine) CompactDataFileToMetricV2(database, part string) (DataFileMetricCompactReport, error)
- func (e *Engine) CompareDataAndMetricPartition(database, partition string) error
- func (e *Engine) CompareDataAndMetricPartitionV1(database, partition string) error
- func (e *Engine) CompareDataAndMetricPartitionV2(database, partition string) error
- func (e *Engine) DBStats(database string) (DBStats, bool)
- func (e *Engine) ExportFile(database, outPath string) error
- func (e *Engine) ExportToWriter(database string, out io.Writer) error
- func (e *Engine) GetAllDatabaseNames() []string
- func (e *Engine) GetMetricRollupDownstream(database, metric string, maxHops int) ([]MetricRollupDownstream, bool, error)
- func (e *Engine) ImportFile(path string) error
- func (e *Engine) InspectDBRuntime(database string) (DBRuntimeInspect, bool)
- func (e *Engine) InspectDBWAL(database string) ([]WALRecord, bool, error)
- func (e *Engine) IsDatabaseActive(database string) bool
- func (e *Engine) ListMetrics(database string) ([]MetricInfo, error)
- func (e *Engine) QueryAggregateRange(database, metric string, fromTS, toTS Timestamp, window time.Duration, ...) error
- func (e *Engine) QueryLast(database, metric string) (Sample, bool, error)
- func (e *Engine) QueryRange(database, metric string, fromTS, toTS Timestamp, stride int, fn SampleCallback) error
- func (e *Engine) QueryRangeMany(database string, metrics []string, fromTS, toTS Timestamp, stride int, ...) error
- func (e *Engine) RecompactDataFile(database, part string) (DataFileRecompactReport, error)
- func (e *Engine) RecoverDataFile(database, part, outputPath string) (DataFileRecoverReport, error)
- func (e *Engine) SetAutoRollupTrigger(enabled bool)
- func (e *Engine) TriggerRollupsForSource(sourceDBName string)
- func (e *Engine) TriggerRollupsForSources(sourceDBNames []string)
- type EngineConfig
- type EngineConfigDefaults
- type EngineConfigDurability
- type EngineConfigEngine
- type EngineConfigLogger
- type EngineConfigLogging
- type EngineConfigManifestDefaults
- type EngineConfigMetrics
- type EngineConfigStats
- type EngineConfigWAL
- type MetricEntry
- type MetricFileFrameInfo
- type MetricFilePage
- type MetricFilePageInfoV1
- type MetricFilePageInput
- type MetricFileSummary
- type MetricID
- type MetricInfo
- type MetricRollupDownstream
- type OpenPageStats
- type Page
- func (p *Page) AddSample(metricID MetricID, ts Timestamp, value []byte) error
- func (p *Page) DecodeCompressedFrame(h PageHeader, compressed []byte, expectedCRC uint32) error
- func (p *Page) DecodeFrom(r *bytes.Reader) error
- func (p *Page) EncodeInto(bb *bytes.Buffer) error
- func (p *Page) EndTime() Timestamp
- func (p *Page) IsFull() bool
- func (p *Page) SetWalSegmentID(segmentID uint16)
- func (p *Page) StartTime() Timestamp
- type PageHeader
- type RollupBackfillReport
- type Sample
- type SampleCallback
- type SampleType
- type Timestamp
- type WAL
- func (w *WAL) Close() error
- func (w *WAL) DeleteSegment(segmentID uint16) error
- func (w *WAL) Flush() error
- func (w *WAL) Fsync() error
- func (w *WAL) Records() ([]WALRecord, error)
- func (w *WAL) RecordsWithCatalog(cat *Catalog) ([]WALRecord, error)
- func (w *WAL) Reset() error
- func (w *WAL) RotateSegment() (uint16, error)
- func (w *WAL) SegmentFull() bool
- func (w *WAL) ShouldFsyncAfterAppend() bool
- func (w *WAL) Stats() WALStats
- type WALFileStats
- type WALFlushEvent
- type WALRecord
- type WALRecordCallback
- type WALStats
- type WALWalkRecord
Constants ¶
const ( CompressionCodecS2ID uint16 = 1 CompressionCodecS2BetterID uint16 = 2 CompressionCodecZstdFastestID uint16 = 3 CompressionCodecZstdDefaultID uint16 = 4 CompressionCodecS2Name = "s2" CompressionCodecS2BetterName = "s2_better" CompressionCodecZstdFastestName = "zstd_fastest" CompressionCodecZstdDefaultName = "zstd_default" )
const ( Int32Sample byte = 1 Float32Sample byte = 2 )
const ( RetentionActionKeep = "keep" RetentionActionDelete = "delete" RetentionActionArchive = "archive" )
const ( DurabilityProfileStrict = "strict" DurabilityProfileBalanced = "balanced" DurabilityProfileThroughput = "throughput" )
const ( LogLevelInfo = "info" LogLevelDebug = "debug" LogLevelTrace = "trace" )
const ( MetricPartitionDay uint8 = iota + 1 MetricPartitionMonth MetricPartitionYear MetricPartitionForever )
const ( MetricRawIngestActionKeep = "keep" MetricRawIngestActionDelete = "delete" MetricRawIngestActionRename = "rename" )
const ( PageMaxRecords = 12000 PageMaxBytes = 256 * 1024 // 256 KB estimated uncompressed payload PageMaxAge = 10 * time.Second // max age before forced flush HeaderSize = 18 // StartTime(8) + EndTime(8) + NumRecords(2) )
const ( WALFsyncPolicySegment = "segment" WALFsyncPolicyAlways = "always" )
const MaxMetricsPerDatabase = 65535
MaxMetricsPerDatabase is the hard limit on distinct metrics per database (uint16 address space).
const TraceSlogLevel slog.Level = slog.LevelDebug - 4
Variables ¶
var ErrDataFileActive = errors.New("data file partition is currently open")
var ErrOutOfOrderTimestamp = fmt.Errorf("timestamp must be non-decreasing")
var ErrTooManyMetrics = fmt.Errorf("too many metrics in database")
var ErrWALMissingBaseline = errors.New("wal compact record missing initial baseline")
Functions ¶
func Aggregators ¶ added in v1.4.0
func Aggregators() map[string]Aggregator
func AppendSample ¶
AppendSample writes a typed sample to the current WAL segment Satisfies LAW 1 — sample written to WAL before memory mutation Returns current segment ID for tracking page origin (LAW 6)
func AppendSampleWithMetricName ¶
func AppendSampleWithMetricName[T SampleType](w *WAL, metricID MetricID, metricName string, ts Timestamp, value T) (uint16, error)
AppendSampleWithMetricName writes a typed sample and embeds the metric name. This is used on first-seen metrics so catalog state can be rebuilt from WAL.
func DefaultStepAggregate ¶ added in v1.4.0
func DefaultStepAggregate() string
func FormatTimestamp ¶
FormatTimestamp converts a nanosecond Unix timestamp to human-readable UTC format. Format: YYYY-MM-DD HH:MM:SS.nnnnnnnnn
func LoadEngineConfig ¶ added in v1.2.0
func NewLogger ¶ added in v1.2.0
func NewLogger(cfg EngineConfigLogging) (*slog.Logger, func() error, error)
func ReadMetricFileVersion ¶ added in v1.3.0
func ScanDataFileHeaders ¶
func ScanDataFileHeaders(path string) (DataFileStats, []DataFrameHeader, error)
ScanDataFileHeaders walks page headers in a .dat file and returns per-frame headers plus aggregate stats. It does not decode payload pages.
func ScanWALFile ¶
func ScanWALFile(path string) (WALFileStats, []WALWalkRecord, error)
ScanWALFile walks WAL records from the beginning and stops at the first invalid or truncated tail. It returns all successfully decoded records before the stop point.
func SupportedAggregates ¶ added in v1.4.0
func SupportedAggregates() []string
func WalkMetricFileFrames ¶ added in v1.3.0
func WalkMetricFileFrames(path string, fn func(MetricFileFrameInfo) error) error
func WalkMetricFilePageInfosV1 ¶ added in v1.3.0
func WalkMetricFilePageInfosV1(path string, fn func(MetricFilePageInfoV1) error) error
func WalkMetricFileV1 ¶ added in v1.3.0
func WalkMetricFileV1(path string, fn func(MetricFilePage) error) error
func WriteMetricFileV1 ¶ added in v1.3.0
func WriteMetricFileV1(path string, partitionKind uint8, codec BlockCompressionCodec, pages []MetricFilePageInput) error
func WriteMetricFileV2 ¶ added in v1.3.0
func WriteMetricFileV2(path string, partitionKind uint8, codec BlockCompressionCodec, pages []MetricFilePageInput) error
Types ¶
type AggregateBucket ¶ added in v1.4.0
type AggregateBucketCallback ¶ added in v1.4.0
type AggregateBucketCallback func(AggregateBucket) error
type Aggregator ¶ added in v1.4.0
type Aggregator interface {
Name() string
Compute(periodStart, periodEnd Timestamp, points []float32) (float32, error)
}
Aggregator computes one aggregate value for a time window. points are source metric values collected for that window.
type BlockCompressionCodec ¶ added in v1.3.0
type BlockCompressionCodec interface {
ID() uint16
Name() string
Encode(src []byte) ([]byte, error)
Decode(src []byte) ([]byte, error)
}
func BlockCompressionCodecByID ¶ added in v1.3.0
func BlockCompressionCodecByID(id uint16) (BlockCompressionCodec, error)
func BlockCompressionCodecByName ¶ added in v1.3.0
func BlockCompressionCodecByName(name string) (BlockCompressionCodec, error)
func DefaultMetricFileCompressionCodec ¶ added in v1.3.0
func DefaultMetricFileCompressionCodec() BlockCompressionCodec
type Catalog ¶
type Catalog struct {
// contains filtered or unexported fields
}
Catalog is the in-memory registry of metric names, IDs, and value types for one database. It is persisted as catalog.json next to the database data files.
func LoadCatalog ¶
func (*Catalog) EnsureMetricEntry ¶
func (*Catalog) GetMetricByID ¶
func (c *Catalog) GetMetricByID(mid MetricID) (string, MetricEntry, bool)
func (*Catalog) GetMetricEntry ¶
func (c *Catalog) GetMetricEntry(name string) (MetricEntry, bool)
func (*Catalog) GetMetricType ¶
GetMetricType returns the value type (Int32Sample or Float32Sample) for a given metric.
func (*Catalog) GetValueWidth ¶
GetValueWidth returns the number of bytes per value for this metric. Currently all metrics are 4 bytes (int32 or float32).
func (*Catalog) ListMetrics ¶
func (c *Catalog) ListMetrics() []MetricInfo
ListMetrics returns a stable, name-sorted snapshot of metrics in this catalog.
func (*Catalog) UpdateLastByMetricID ¶
func (*Catalog) WriteCatalog ¶
type DBInfo ¶
type DBInfo struct {
Grace string `json:"grace" toml:"grace"`
RetentionDays int `json:"retention_days" toml:"retention_days"`
RetentionAction string `json:"retention_action" toml:"retention_action"`
MaxActiveDays int `json:"max_active_days" toml:"max_active_days"`
Partition string `json:"partition" toml:"partition"`
WALEnabled bool `json:"wal_enabled" toml:"wal_enabled"`
WALSkipBefore string `json:"wal_skip_before" toml:"wal_skip_before"`
PageMaxRecords int `json:"page_max_records" toml:"page_max_records"`
PageMaxBytes int `json:"page_max_bytes" toml:"page_max_bytes"`
PageMaxAge string `json:"page_max_age" toml:"page_max_age"`
Rollups DBManifestRollups `json:"rollups" toml:"rollups"`
}
type DBManifestPage ¶
type DBManifestRetention ¶
type DBManifestRollupJob ¶
type DBManifestRollupJob struct {
ID string `toml:"id"`
SourceMetric string `toml:"source_metric"`
SourcePattern string `toml:"source_pattern"`
ExcludePatterns []string `toml:"exclude_patterns"`
Interval string `toml:"interval"`
Aggregates []string `toml:"aggregates"`
DestinationDB string `toml:"destination_db"`
DestinationMetricPrefix string `toml:"destination_metric_prefix"`
Grace string `toml:"grace"`
}
type DBManifestRollups ¶
type DBManifestRollups struct {
Enabled bool `toml:"enabled"`
CheckpointFile string `toml:"checkpoint_file"`
DefaultGrace string `toml:"default_grace"`
DefaultInterval string `toml:"default_interval"`
DefaultDestinationDB string `toml:"default_destination_db"`
DefaultAggregates []string `toml:"default_aggregates"`
GlobalExcludePatterns []string `toml:"global_exclude_patterns"`
Jobs []DBManifestRollupJob `toml:"jobs"`
}
type DBManifestTOML ¶
type DBManifestTOML struct {
Retention DBManifestRetention `toml:"retention"`
WAL DBManifestWAL `toml:"wal"`
Page DBManifestPage `toml:"page"`
Rollups DBManifestRollups `toml:"rollups"`
}
type DBManifestWAL ¶
type DBRuntimeInspect ¶ added in v1.3.0
type DBRuntimeInspect struct {
Database string `json:"database"`
MetricCount int `json:"metric_count"`
Manifest DBInfo `json:"manifest"`
Stats DBStats `json:"stats"`
OpenPages []OpenPageStats `json:"open_pages"`
}
type DBStats ¶
type DBStats struct {
DataFile DataRuntimeStats
WAL WALStats
}
type DataFileMetricCompactReport ¶ added in v1.4.0
type DataFileMetricCompactReport struct {
Database string `json:"database"`
Part string `json:"part"`
DataPath string `json:"data_path"`
MetricPath string `json:"metric_path"`
DataBytes int64 `json:"data_bytes"`
MetricBytes int64 `json:"metric_bytes"`
SavedBytes int64 `json:"saved_bytes"`
DurationMS int64 `json:"duration_ms"`
}
type DataFileRecompactReport ¶ added in v1.3.0
type DataFileRecompactReport struct {
Database string `json:"database"`
Part string `json:"part"`
Path string `json:"path"`
OldFrames int `json:"old_frames"`
NewFrames int `json:"new_frames"`
OldRecords int64 `json:"old_records"`
NewRecords int64 `json:"new_records"`
OldBytes int64 `json:"old_bytes"`
NewBytes int64 `json:"new_bytes"`
DurationMS int64 `json:"duration_ms"`
}
type DataFileRecoverReport ¶ added in v1.3.0
type DataFileRecoverReport struct {
Database string `json:"database"`
Part string `json:"part"`
SourcePath string `json:"source_path"`
OutputPath string `json:"output_path"`
SourceBytes int64 `json:"source_bytes"`
OutputBytes int64 `json:"output_bytes"`
RecoveredFrames int `json:"recovered_frames"`
RecoveredRecords int64 `json:"recovered_records"`
SkippedBytes int64 `json:"skipped_bytes"`
RejectedFrames int `json:"rejected_frames"`
FirstAcceptedOffset int64 `json:"first_accepted_offset"`
LastAcceptedOffset int64 `json:"last_accepted_offset"`
DurationMS int64 `json:"duration_ms"`
}
type DataFileStats ¶
type DataFileStats struct {
Path string
FileBytes int64
Frames int
TotalRecords int64
TotalCompressed int64
TotalFrameBytes int64
MinStart Timestamp
MaxEnd Timestamp
}
DataFileStats summarizes one .dat file.
func ScanDataFileStats ¶ added in v1.3.0
func ScanDataFileStats(path string) (DataFileStats, error)
ScanDataFileStats reads only frame headers and compressed lengths for a .dat file. It does not decompress payloads or populate per-frame page details.
func WalkDataFileHeaders ¶
func WalkDataFileHeaders(path string, fn DataFrameCallback) (DataFileStats, error)
WalkDataFileHeaders walks page headers in a .dat file and invokes fn for each decoded frame header. It does not decode payload pages.
type DataFrameCallback ¶
type DataFrameCallback func(DataFrameHeader) error
type DataFrameHeader ¶
type DataFrameHeader struct {
Index int
Offset int64
StartTime Timestamp
EndTime Timestamp
NumRecords uint16
CompressedLen uint64
UncompressedLen uint64
FrameBytes int64
}
DataFrameHeader describes one page frame parsed from a .dat file without decompression.
type DataRuntimeStats ¶
type DataRuntimeStats struct {
FlushCount int64
TotalFlushBytes int64
TotalFlushRecords int64
TotalFlushCompressed int64
MinFlushBytes int64
MaxFlushBytes int64
MinFlushRecords int64
MaxFlushRecords int64
MinFlushCompressed int64
MaxFlushCompressed int64
FlushDurationTotal time.Duration
MinFlushDuration time.Duration
MaxFlushDuration time.Duration
SyncCount int64
SyncDurationTotal time.Duration
MinSyncDuration time.Duration
MaxSyncDuration time.Duration
}
type Database ¶
Database represents one named time-series database on disk. Each database has its own WAL, catalog, and set of daily data files. Use Engine.AddLine / Engine.QueryRange for typical access; Database is exposed for low-level tooling (nanocli inspect, export, etc.).
func NewDatabase ¶
NewDatabase creates a new database at the given base path with the specified WAL segment size.
func OpenDatabase ¶
OpenDatabase opens an existing database by base path (no WAL size limit). Used by read-only tooling that does not need to append samples.
type Engine ¶
type Engine struct {
RootDataDir string
WALMaxSegSize int64
WALFsyncPolicy string
Durability string
PreferMetricFiles bool
AutoCreateMetricFiles bool
MetricFileCompression string
MetricRawIngestAction string
MetricTimeCacheSlots int
Logging EngineConfigLogging
SyncDataFile bool
SyncCatalog bool
StatsEnabled bool
StatsInterval time.Duration
// contains filtered or unexported fields
}
Engine is the top-level coordinator for all databases in a root data directory. It is safe for concurrent use. Open with OpenEngine; always call Close when done.
func OpenEngine ¶
OpenEngine opens or creates the engine rooted at rootDataDir. If engine.toml does not exist it is written from the embedded defaults. walMaxSegSize sets the per-database WAL segment size; pass 0 for the 64 MiB default.
func OpenEngineWithConfig ¶ added in v1.2.0
func (*Engine) AddLine ¶
AddLine ingests one sample in line-protocol format: "DB/metric value [ts]" where value is an integer or float literal and ts is optional. ts can be Unix nanoseconds or a human-readable timestamp accepted by ParseTimestamp. AddLine is safe for concurrent use.
func (*Engine) AddSample ¶
AddSample ingests one typed sample directly. This is the canonical ingest API used by all write paths.
func (*Engine) BackfillRollups ¶ added in v1.1.0
func (e *Engine) BackfillRollups(sourceDBNames []string) (RollupBackfillReport, error)
func (*Engine) BuildMetricFile ¶ added in v1.3.0
BuildMetricFile creates the default metric-<partition>.dat format for one database. V2 is the current default format.
func (*Engine) BuildMetricFileV1 ¶ added in v1.3.0
BuildMetricFileV1 creates metric-<partition>.dat from data-<partition>.dat for one database. It does not delete or modify the source data file.
func (*Engine) BuildMetricFileV2 ¶ added in v1.3.0
BuildMetricFileV2 creates metric-<partition>.dat from data-<partition>.dat for one database. It does not delete or modify the source data file until the configured raw-ingest action is applied after a successful write.
func (*Engine) Close ¶
Close flushes all open day-pages, resets WAL files, emits a final stats snapshot, and closes every open database. Always call Close before the process exits.
func (*Engine) CompactDataFileToMetricV2 ¶ added in v1.4.0
func (e *Engine) CompactDataFileToMetricV2(database, part string) (DataFileMetricCompactReport, error)
func (*Engine) CompareDataAndMetricPartition ¶ added in v1.3.0
CompareDataAndMetricPartition validates a metric file against its raw data partition, dispatching to the appropriate checker for the on-disk metric file version.
func (*Engine) CompareDataAndMetricPartitionV1 ¶ added in v1.3.0
CompareDataAndMetricPartitionV1 validates that data-<partition>.dat and metric-<partition>.dat contain exactly the same per-metric sample stream.
func (*Engine) CompareDataAndMetricPartitionV2 ¶ added in v1.3.0
CompareDataAndMetricPartitionV2 validates that data-<partition>.dat and a v2 metric-<partition>.dat contain exactly the same per-metric sample stream.
func (*Engine) DBStats ¶
DBStats returns a snapshot of engine-level stats for the given database. Values for data flushes come from the engine stat store; WAL stats are read directly from the WAL so they are always current.
func (*Engine) ExportFile ¶
ExportFile exports one database to a LP file using: DB/metric value ts. Exported timestamps use FormatTimestamp (UTC, YYYY-MM-DD HH:MM:SS.nnnnnnnnn).
func (*Engine) ExportToWriter ¶
ExportToWriter exports one database to an arbitrary writer using line protocol. Timestamps are written with FormatTimestamp (UTC, YYYY-MM-DD HH:MM:SS.nnnnnnnnn).
func (*Engine) GetAllDatabaseNames ¶
GetAllDatabaseNames returns all database names managed by this engine.
func (*Engine) GetMetricRollupDownstream ¶ added in v1.3.0
func (e *Engine) GetMetricRollupDownstream(database, metric string, maxHops int) ([]MetricRollupDownstream, bool, error)
GetMetricRollupDownstream returns bounded downstream rollup lineage for one metric. Lineage is derived from configured rollup jobs in loaded database manifests.
func (*Engine) ImportFile ¶
ImportFile imports LP lines in the format: DB/metric value [ts]. ts can be Unix nanoseconds or a human-readable value accepted by ParseTimestamp.
func (*Engine) InspectDBRuntime ¶ added in v1.3.0
func (e *Engine) InspectDBRuntime(database string) (DBRuntimeInspect, bool)
func (*Engine) InspectDBWAL ¶ added in v1.3.0
func (*Engine) IsDatabaseActive ¶ added in v1.3.0
IsDatabaseActive reports whether a database is currently loaded in memory.
func (*Engine) ListMetrics ¶
func (e *Engine) ListMetrics(database string) ([]MetricInfo, error)
ListMetrics returns all known metrics for a database in stable name order.
func (*Engine) QueryAggregateRange ¶ added in v1.4.0
func (*Engine) QueryLast ¶
QueryLast returns the most recently written sample for a metric. Returns (sample, true, nil) if a sample exists, (zero, false, nil) if not.
func (*Engine) QueryRange ¶
func (e *Engine) QueryRange(database, metric string, fromTS, toTS Timestamp, stride int, fn SampleCallback) error
QueryRange scans samples for a metric within a time range. Stride controls downsampling: stride=1 returns every sample, stride=N returns every Nth. Each matching sample is passed to the callback; callback errors terminate early.
func (*Engine) QueryRangeMany ¶ added in v1.4.0
func (e *Engine) QueryRangeMany(database string, metrics []string, fromTS, toTS Timestamp, stride int, fn SampleCallback) error
QueryRangeMany scans samples for multiple metrics within a time range. It reuses each persisted partition scan across all requested metrics.
func (*Engine) RecompactDataFile ¶ added in v1.3.0
func (e *Engine) RecompactDataFile(database, part string) (DataFileRecompactReport, error)
func (*Engine) RecoverDataFile ¶ added in v1.3.0
func (e *Engine) RecoverDataFile(database, part, outputPath string) (DataFileRecoverReport, error)
func (*Engine) SetAutoRollupTrigger ¶
SetAutoRollupTrigger controls whether ingest-time flushes automatically trigger rollup computation for the source database.
func (*Engine) TriggerRollupsForSource ¶
TriggerRollupsForSource computes rollups for one source database using jobs configured in the source database manifest.
func (*Engine) TriggerRollupsForSources ¶
TriggerRollupsForSources computes rollups for each unique source database name.
type EngineConfig ¶
type EngineConfig struct {
Engine EngineConfigEngine `toml:"engine"`
WAL EngineConfigWAL `toml:"wal"`
Durability EngineConfigDurability `toml:"durability"`
Metrics EngineConfigMetrics `toml:"metrics"`
Logging EngineConfigLogging `toml:"logging"`
Stats EngineConfigStats `toml:"stats"`
Defaults EngineConfigDefaults `toml:"defaults"`
ManifestDefaults EngineConfigManifestDefaults `toml:"manifest_defaults"`
}
type EngineConfigDefaults ¶
type EngineConfigDefaults struct {
Databases []string `toml:"databases"`
}
type EngineConfigDurability ¶
type EngineConfigDurability struct {
Profile string `toml:"profile"`
}
type EngineConfigEngine ¶
type EngineConfigEngine struct {
Listen string `toml:"listen"`
}
type EngineConfigLogger ¶ added in v1.2.0
type EngineConfigLogging ¶ added in v1.2.0
type EngineConfigLogging struct {
Loggers []EngineConfigLogger `toml:"logger"`
}
type EngineConfigManifestDefaults ¶
type EngineConfigManifestDefaults struct {
Retention DBManifestRetention `toml:"retention"`
WAL DBManifestWAL `toml:"wal"`
Page DBManifestPage `toml:"page"`
Rollups DBManifestRollups `toml:"rollups"`
}
type EngineConfigMetrics ¶ added in v1.3.0
type EngineConfigStats ¶
type EngineConfigWAL ¶
type MetricEntry ¶
type MetricEntry struct {
MetricID MetricID
ValueType byte
// In-memory only cache for QueryLast. Never persisted to catalog JSON.
LastTS Timestamp
LastRaw [4]byte
LastValid bool
}
MetricEntry holds the catalog record for one metric: its assigned ID, value type, and an in-memory cache of the last written sample (for QueryLast).
type MetricFileFrameInfo ¶ added in v1.3.0
type MetricFilePage ¶ added in v1.3.0
type MetricFilePage struct {
MetricID MetricID
CodecID uint16
ValueType byte
Times []Timestamp
Int32 []int32
Float32 []float32
PageOffset uint64
PointCount uint32
PayloadLen uint32
UncompressedLen uint32
MetricMinTS Timestamp
MetricMaxTS Timestamp
}
func ReadMetricFileV1 ¶ added in v1.3.0
func ReadMetricFileV1(path string) ([]MetricFilePage, error)
type MetricFilePageInfoV1 ¶ added in v1.3.0
type MetricFilePageInfoV1 struct {
Index int
MetricID MetricID
ValueType byte
PageOffset uint64
MetricMinTS Timestamp
MetricMaxTS Timestamp
PointCount uint32
UncompressedLen uint32
PayloadLen uint32
}
func ReadMetricFilePageInfosV1 ¶ added in v1.3.0
func ReadMetricFilePageInfosV1(path string) ([]MetricFilePageInfoV1, error)
type MetricFilePageInput ¶ added in v1.3.0
type MetricFileSummary ¶ added in v1.3.0
type MetricFileSummary struct {
Version uint16
TimeFrameCount int
MetricFrames []MetricFileFrameInfo
}
func ReadMetricFileSummary ¶ added in v1.3.0
func ReadMetricFileSummary(path string) (MetricFileSummary, error)
type MetricID ¶
type MetricID uint16
MetricID is a compact uint16 identifier assigned to each metric within a database.
func GetMetricID ¶
func GetMetricID[T SampleType](c *Catalog, name string) (MetricID, error)
GetMetricID returns the MetricID for a named metric, registering it if it has not been seen before. The type parameter T determines whether an Int32Sample or Float32Sample entry is created. Returns an error if the metric already exists with a different value type, or if the per-database metric limit (65535) has been reached.
type MetricInfo ¶
type MetricRollupDownstream ¶ added in v1.3.0
type OpenPageStats ¶ added in v1.3.0
type OpenPageStats struct {
Day string `json:"day"`
Records int `json:"records"`
MetricSlots int `json:"metric_slots"`
UniqueMetric int `json:"unique_metrics"`
ValueBytes int `json:"value_bytes"`
StartTS Timestamp `json:"start_timestamp_ns"`
EndTS Timestamp `json:"end_timestamp_ns"`
MaxRecords int `json:"max_records"`
MaxBytes int `json:"max_bytes"`
MaxAge time.Duration `json:"max_age_ns"`
Age time.Duration `json:"age_ns"`
WALSegmentID uint16 `json:"wal_segment_id"`
Full bool `json:"full"`
Persisted bool `json:"persisted"`
}
type Page ¶
type Page struct {
Start Timestamp
End Timestamp
Metrics []MetricID
Times []Timestamp
Values *bytes.Buffer // raw value bytes buffer; catalog knows per-metric width/encoding
MaxRecords int
MaxBytes int
MaxAge time.Duration
WALSegmentID uint16
// contains filtered or unexported fields
}
Page holds interleaved samples from multiple metrics. Value bytes are stored raw in a pre-allocated buffer; the catalog is responsible for interpreting them.
func NewPageWithLimits ¶
func (*Page) AddSample ¶
AddSample appends a raw value for the given metric and timestamp. value must be exactly the bytes the catalog will use to decode this metric. Timestamps must be non-decreasing across all metrics in this page.
func (*Page) DecodeCompressedFrame ¶ added in v1.3.0
func (p *Page) DecodeCompressedFrame(h PageHeader, compressed []byte, expectedCRC uint32) error
func (*Page) SetWalSegmentID ¶
type PageHeader ¶
func (*PageHeader) EncodeInto ¶
func (h *PageHeader) EncodeInto(w *bytes.Buffer) (err error)
type RollupBackfillReport ¶ added in v1.1.0
type RollupBackfillReport struct {
RequestedSources []string `json:"requested_sources"`
SourceDatabases []string `json:"source_databases"`
DestinationDatabases []string `json:"destination_databases"`
ClearedCheckpointFiles []string `json:"cleared_checkpoint_files"`
ClearedDataFiles []string `json:"cleared_data_files"`
ClearedWALFiles []string `json:"cleared_wal_files"`
ClearedCatalogFiles []string `json:"cleared_catalog_files"`
ReplayPasses int `json:"replay_passes"`
}
type Sample ¶
type Sample struct {
Database string
Metric string
TS Timestamp
ValueType byte
Int32 int32
Float32 float32
}
Sample is one decoded data point returned by QueryRange or QueryLast.
type SampleCallback ¶
SampleCallback is invoked for each sample in a range query.
type SampleType ¶
SampleType constrains the numeric types supported as metric values.
type Timestamp ¶
type Timestamp int64
Timestamp is a Unix nanosecond epoch value (int64). All sample timestamps must be monotonically non-decreasing within a metric.
func ParseTimestamp ¶
ParseTimestamp converts a human-readable timestamp or nanosecond value to Timestamp. Accepts formats:
- Raw nanosecond integer: "1234567890123456789"
- Date only: "2026-05-14" (time set to 00:00:00 UTC)
- Date and time: "2026-05-14 12:34:56" (nanoseconds set to 0)
- Date, time, and nanos: "2026-05-14 12:34:56.123456789"
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
WAL manages a single reusable write-ahead log file.
func (*WAL) DeleteSegment ¶
DeleteSegment is retained for API compatibility.
func (*WAL) Flush ¶
Flush buffers buffered records to disk without rotating Does not fsync (records remain in OS page cache)
func (*WAL) Fsync ¶
Fsync calls fsync on the current segment Called on page flush and segment rotation (Durability & fsync Policy)
func (*WAL) RecordsWithCatalog ¶
RecordsWithCatalog returns all WAL records with optional catalog for ValueType lookups.
func (*WAL) RotateSegment ¶
RotateSegment is retained for API compatibility. WAL v1 uses a single file; rotate maps to reset-and-reuse.
func (*WAL) SegmentFull ¶
SegmentFull checks if current segment should rotate Returns true if bufferSize >= maxSegSize
func (*WAL) ShouldFsyncAfterAppend ¶
type WALFileStats ¶
type WALFileStats struct {
Path string
FileBytes int64
Records int
DecodedBytes int64
MinTS Timestamp
MaxTS Timestamp
HasTail bool
TailBytes int64
StopOffset int64
StopReason string
}
WALFileStats summarizes a WAL file walk.
func WalkWALFile ¶
func WalkWALFile(path string, fn WALRecordCallback) (WALFileStats, error)
WalkWALFile walks WAL records and invokes fn for each successfully decoded record. It stops at first invalid/truncated tail and returns tail metadata in stats.
type WALFlushEvent ¶
type WALRecord ¶
type WALRecord struct {
SegmentID uint16 // ID of the segment this record came from (LAW 6 origin, wal-NNNN.log format)
MetricID MetricID
MetricName string
Timestamp Timestamp
ValueType byte // Int32Sample or Float32Sample
Value interface{} // int32 or float32
}
WALRecord represents a single entry in the WAL Used for crash recovery (LAW 9 — deterministic replay)
type WALRecordCallback ¶
type WALRecordCallback func(WALWalkRecord) error
type WALStats ¶
type WALStats struct {
AppendCount int64
AppendBytes int64
BufferBytes int64
FsyncCount int64
FsyncDurationTotal time.Duration
MinFsyncDuration time.Duration
MaxFsyncDuration time.Duration
FlushCount int64
FlushedBytes int64
MinFlushBytes int64
MaxFlushBytes int64
ResetDurationTotal time.Duration
MinResetDuration time.Duration
MaxResetDuration time.Duration
LastAppendAt time.Time
LastFsyncAt time.Time
LastFlushAt time.Time
FlushIntervalCount int64
FlushIntervalTotal time.Duration
MinFlushInterval time.Duration
MaxFlushInterval time.Duration
RecentFlushes []WALFlushEvent
}