Documentation
¶
Index ¶
- Constants
- Variables
- type Block
- func (b *Block) AddSeries(s *series.Series, samples []series.Sample) error
- func (b *Block) Delete() error
- func (b *Block) Dir() string
- func (b *Block) GetSeries(seriesHash uint64, minTime, maxTime int64) ([]series.Sample, error)
- func (b *Block) LoadChunk(chunkFile string) (*Chunk, error)
- func (b *Block) Overlaps(minTime, maxTime int64) bool
- func (b *Block) Persist(dataDir string) error
- func (b *Block) Size() int64
- func (b *Block) String() string
- type BlockMeta
- type BlockReader
- type BlockStats
- type BlockWriter
- type Chunk
- func (c *Chunk) Append(samples []series.Sample) error
- func (c *Chunk) CompressionRatio() float64
- func (c *Chunk) Iterator() (*ChunkIterator, error)
- func (c *Chunk) MarshalBinary() ([]byte, error)
- func (c *Chunk) ReadFrom(r io.Reader) (int64, error)
- func (c *Chunk) Size() int
- func (c *Chunk) UnmarshalBinary(data []byte) error
- func (c *Chunk) WriteTo(w io.Writer) (int64, error)
- type ChunkBuilder
- type ChunkIterator
- type CompactionLevel
- type CompactionStats
- type Compactor
- func (c *Compactor) BlockCount() (level0, level1, level2 int, err error)
- func (c *Compactor) CleanupOldBlocks(cutoffTime int64) (int, error)
- func (c *Compactor) CompactNow() error
- func (c *Compactor) GetStats() CompactionStats
- func (c *Compactor) Run() error
- func (c *Compactor) SetDataDir(dir string)
- func (c *Compactor) Stop() error
- func (c *Compactor) ValidateBlocks() error
- type CompactorOptions
- type MemTable
- func (m *MemTable) AllSeries() []uint64
- func (m *MemTable) Clear()
- func (m *MemTable) CreatedAt() time.Time
- func (m *MemTable) GetSeries(seriesHash uint64) (*series.Series, bool)
- func (m *MemTable) Insert(s *series.Series, samples []series.Sample) error
- func (m *MemTable) IsFull() bool
- func (m *MemTable) MaxSize() int64
- func (m *MemTable) Query(seriesHash uint64, start, end int64) ([]series.Sample, error)
- func (m *MemTable) SampleCount() int64
- func (m *MemTable) SeriesCount() int
- func (m *MemTable) Size() int64
- func (m *MemTable) Stats() string
- func (m *MemTable) TimeRange() (int64, int64)
- type Options
- type RetentionManager
- func (rm *RetentionManager) CalculateRetentionStats() (*RetentionStatsReport, error)
- func (rm *RetentionManager) CleanupNow() error
- func (rm *RetentionManager) Disable()
- func (rm *RetentionManager) Enable()
- func (rm *RetentionManager) GetPolicy() RetentionPolicy
- func (rm *RetentionManager) GetStats() RetentionStats
- func (rm *RetentionManager) IsEnabled() bool
- func (rm *RetentionManager) Run() error
- func (rm *RetentionManager) SetPolicy(policy RetentionPolicy)
- func (rm *RetentionManager) Stop() error
- type RetentionManagerOptions
- type RetentionPolicy
- type RetentionStats
- type RetentionStatsReport
- type Stats
- type StatsSnapshot
- type TSDB
- func (db *TSDB) Close() error
- func (db *TSDB) GetAllLabels() ([]string, error)
- func (db *TSDB) GetCompactionStats() *CompactionStats
- func (db *TSDB) GetLabelValues(labelName string) ([]string, error)
- func (db *TSDB) GetRetentionPolicy() *RetentionPolicy
- func (db *TSDB) GetRetentionStats() *RetentionStats
- func (db *TSDB) GetSeries(seriesHash uint64) (*series.Series, bool)
- func (db *TSDB) GetStats() Stats
- func (db *TSDB) GetStatsSnapshot() StatsSnapshot
- func (db *TSDB) Insert(s *series.Series, samples []series.Sample) error
- func (db *TSDB) MemTableStats() (active, flushing string)
- func (db *TSDB) Query(seriesHash uint64, start, end int64) ([]series.Sample, error)
- func (db *TSDB) SetRetentionPolicy(policy RetentionPolicy) error
- func (db *TSDB) TriggerCompaction() error
- func (db *TSDB) TriggerFlush() error
Constants ¶
const ( // BlockVersion is the current block format version BlockVersion = 1 // ChunksDir is the subdirectory for chunks ChunksDir = "chunks" // MetaFile is the metadata file name MetaFile = "meta.json" // IndexFile is the index file name (placeholder for Phase 4) IndexFile = "index" // DefaultBlockDuration is the default block time window (2 hours) DefaultBlockDuration = 2 * time.Hour )
const ( // ChunkHeaderSize is the size of the chunk header in bytes ChunkHeaderSize = 24 ChunkFooterSize = 4 // DefaultMaxSamplesPerChunk is the default maximum number of samples per chunk // 120 samples = 2 hours @ 1-minute intervals DefaultMaxSamplesPerChunk = 120 // EncodingGorilla indicates Gorilla compression (delta-of-delta + XOR) EncodingGorilla uint16 = 1 )
const ( // DefaultCompactionInterval is how often to run compaction DefaultCompactionInterval = 5 * time.Minute // Level0Duration is the duration of Level 0 blocks (2 hours) Level0Duration = 2 * time.Hour // Level1Duration is the duration of Level 1 blocks (12 hours) Level1Duration = 12 * time.Hour // Level2Duration is the duration of Level 2 blocks (7 days) Level2Duration = 7 * 24 * time.Hour // MinBlocksForCompaction is the minimum number of blocks to trigger compaction MinBlocksForCompaction = 3 )
const ( // DefaultMaxSize is the default maximum size in bytes (256MB) DefaultMaxSize = 256 * 1024 * 1024 // EstimatedBytesPerSample is an estimate of memory usage per sample EstimatedBytesPerSample = 24 // 8 bytes timestamp + 8 bytes value + ~8 bytes overhead )
const ( // DefaultRetentionCheckInterval is how often to check retention policy DefaultRetentionCheckInterval = 1 * time.Hour // DefaultRetentionPeriod is the default data retention period (30 days) DefaultRetentionPeriod = 30 * 24 * time.Hour )
const ( // DefaultFlushInterval is how often to check if MemTable should be flushed DefaultFlushInterval = 30 * time.Second // DefaultWALDir is the default directory name for WAL files DefaultWALDir = "wal" )
Variables ¶
var ( // ErrMemTableFull indicates the MemTable has reached its size limit ErrMemTableFull = errors.New("memtable is full") // ErrInvalidSample indicates the sample data is invalid ErrInvalidSample = errors.New("invalid sample") )
var ( // ErrClosed indicates the TSDB is closed ErrClosed = errors.New("tsdb: closed") // ErrReadOnly indicates the TSDB is in read-only mode ErrReadOnly = errors.New("tsdb: read-only mode") )
Functions ¶
This section is empty.
Types ¶
type Block ¶
type Block struct {
// Metadata
ULID ulid.ULID // Unique, time-sortable identifier
MinTime int64 // Minimum timestamp in block
MaxTime int64 // Maximum timestamp in block
// Statistics
NumSamples int64 // Total number of samples
NumSeries int64 // Total number of unique series
NumChunks int64 // Total number of chunks
// contains filtered or unexported fields
}
Block represents a time-partitioned immutable block of time-series data. Blocks organize data into 2-hour windows (configurable) and use ULID for sortable, time-based identification.
Directory structure:
data/
├── 01H8XABC00000000/ # Block ULID (sortable by time)
│ ├── meta.json # Block metadata
│ ├── chunks/ # Compressed chunks directory
│ │ ├── 000001 # Chunk file for series 1
│ │ ├── 000002 # Chunk file for series 2
│ │ └── ...
│ └── index # Series index (future: inverted index)
└── 01H8XDEF00000000/
└── ...
type BlockMeta ¶
type BlockMeta struct {
ULID string `json:"ulid"`
MinTime int64 `json:"minTime"`
MaxTime int64 `json:"maxTime"`
Stats BlockStats `json:"stats"`
Version int `json:"version"`
Labels map[string]string `json:"labels,omitempty"`
SeriesChunks map[string]int `json:"seriesChunks"` // seriesHash -> chunkFile number
}
BlockMeta contains block metadata stored in meta.json
type BlockReader ¶
type BlockReader struct {
// contains filtered or unexported fields
}
BlockReader helps read blocks from disk
func NewBlockReader ¶
func NewBlockReader(dataDir string) *BlockReader
NewBlockReader creates a new block reader
func (*BlockReader) Blocks ¶
func (br *BlockReader) Blocks() []*Block
Blocks returns all loaded blocks
func (*BlockReader) LoadBlocks ¶
func (br *BlockReader) LoadBlocks() error
LoadBlocks loads all blocks from the data directory
type BlockStats ¶
type BlockStats struct {
NumSamples int64 `json:"numSamples"`
NumSeries int64 `json:"numSeries"`
NumChunks int64 `json:"numChunks"`
}
BlockStats contains block statistics
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter helps write MemTable data to blocks
func NewBlockWriter ¶
func NewBlockWriter(dataDir string) *BlockWriter
NewBlockWriter creates a new block writer
func (*BlockWriter) WriteMemTable ¶
func (bw *BlockWriter) WriteMemTable(mt *MemTable) (*Block, error)
WriteMemTable writes a MemTable to disk as a block
type Chunk ¶
type Chunk struct {
MinTime int64 // Minimum timestamp in chunk
MaxTime int64 // Maximum timestamp in chunk
NumSamples uint16 // Number of samples in chunk
Encoding uint16 // Encoding flags (reserved for future use)
Data []byte // Compressed data (timestamps + values)
Checksum uint32 // CRC32 checksum of data
}
Chunk represents a compressed time-series chunk containing multiple samples. It uses delta-of-delta encoding for timestamps and XOR compression for values as described in Facebook's Gorilla paper.
Format:
Header (24 bytes): [8 bytes: minTime] [8 bytes: maxTime] [2 bytes: numSamples] [4 bytes: dataLength] [2 bytes: encoding flags] Data: [N bytes: compressed timestamps] [M bytes: compressed values] Footer: [4 bytes: CRC32 checksum]
func (*Chunk) Append ¶
Append compresses and appends samples to the chunk. This creates a new chunk with the provided samples.
func (*Chunk) CompressionRatio ¶
CompressionRatio returns the compression ratio (uncompressed / compressed)
func (*Chunk) Iterator ¶
func (c *Chunk) Iterator() (*ChunkIterator, error)
Iterator returns an iterator over the samples in the chunk
func (*Chunk) MarshalBinary ¶
MarshalBinary serializes the chunk to bytes
func (*Chunk) UnmarshalBinary ¶
UnmarshalBinary deserializes the chunk from bytes
type ChunkBuilder ¶
type ChunkBuilder struct {
// contains filtered or unexported fields
}
ChunkBuilder helps build chunks incrementally
func NewChunkBuilder ¶
func NewChunkBuilder(maxSamples int) *ChunkBuilder
NewChunkBuilder creates a new chunk builder
func (*ChunkBuilder) Add ¶
func (cb *ChunkBuilder) Add(sample series.Sample) bool
Add adds a sample to the builder
func (*ChunkBuilder) Build ¶
func (cb *ChunkBuilder) Build() (*Chunk, error)
Build creates a chunk from the accumulated samples
func (*ChunkBuilder) Count ¶
func (cb *ChunkBuilder) Count() int
Count returns the number of samples in the builder
func (*ChunkBuilder) IsFull ¶
func (cb *ChunkBuilder) IsFull() bool
IsFull returns true if the chunk is full
type ChunkIterator ¶
type ChunkIterator struct {
// contains filtered or unexported fields
}
ChunkIterator iterates over samples in a chunk
func (*ChunkIterator) At ¶
func (it *ChunkIterator) At() (series.Sample, error)
At returns the current sample
func (*ChunkIterator) Err ¶
func (it *ChunkIterator) Err() error
Err returns any error that occurred during iteration
func (*ChunkIterator) Next ¶
func (it *ChunkIterator) Next() bool
Next advances the iterator to the next sample
type CompactionLevel ¶
type CompactionLevel int
CompactionLevel represents the tiered compaction level
const ( // Level0 represents raw 2-hour ingestion blocks Level0 CompactionLevel = 0 // Level1 represents merged 12-hour blocks (6x L0 blocks) Level1 CompactionLevel = 1 // Level2 represents merged 7-day blocks (14x L1 blocks) Level2 CompactionLevel = 2 )
type CompactionStats ¶
type CompactionStats struct {
TotalCompactions atomic.Int64
BlocksMerged atomic.Int64
BytesReclaimed atomic.Int64
LastCompactionTime atomic.Int64 // Unix milliseconds
CompactionErrors atomic.Int64
Level0Compactions atomic.Int64
Level1Compactions atomic.Int64
}
CompactionStats holds compaction metrics
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor manages background compaction of time-series blocks. It implements a tiered compaction strategy similar to LSM trees: - Level 0: 2-hour blocks (raw ingestion) - Level 1: 12-hour blocks (merge 6x L0 blocks) - Level 2: 7-day blocks (merge 14x L1 blocks)
func NewCompactor ¶
func NewCompactor(opts *CompactorOptions) *Compactor
NewCompactor creates a new compactor instance
func (*Compactor) BlockCount ¶
BlockCount returns the number of blocks at each level
func (*Compactor) CleanupOldBlocks ¶
CleanupOldBlocks removes blocks older than the specified cutoff time This is used by the retention policy
func (*Compactor) CompactNow ¶
CompactNow triggers an immediate compaction (for testing/debugging)
func (*Compactor) GetStats ¶
func (c *Compactor) GetStats() CompactionStats
GetStats returns a snapshot of compaction statistics
func (*Compactor) SetDataDir ¶
SetDataDir updates the data directory (for testing)
func (*Compactor) ValidateBlocks ¶
ValidateBlocks checks all blocks for corruption
type CompactorOptions ¶
type CompactorOptions struct {
DataDir string
Interval time.Duration
Concurrency int // Number of concurrent compaction workers
}
CompactorOptions configures the compactor
func DefaultCompactorOptions ¶
func DefaultCompactorOptions(dataDir string) *CompactorOptions
DefaultCompactorOptions returns default compactor options
type MemTable ¶
type MemTable struct {
// contains filtered or unexported fields
}
MemTable is an in-memory buffer for time-series samples. It provides thread-safe operations for inserting and querying samples. When the MemTable reaches its size threshold, it should be flushed to disk.
func NewMemTable ¶
func NewMemTable() *MemTable
NewMemTable creates a new MemTable with the default maximum size.
func NewMemTableWithSize ¶
NewMemTableWithSize creates a new MemTable with a custom maximum size.
func (*MemTable) Clear ¶
func (m *MemTable) Clear()
Clear removes all data from the MemTable and resets its state. This is typically called after a successful flush to disk.
func (*MemTable) Insert ¶
Insert adds samples for a given series to the MemTable. Returns an error if the MemTable is full or if the input is invalid.
func (*MemTable) Query ¶
Query retrieves samples for a given series hash within a time range. Returns all samples if start and end are both 0.
func (*MemTable) SampleCount ¶
SampleCount returns the total number of samples in the MemTable.
func (*MemTable) SeriesCount ¶
SeriesCount returns the number of unique series in the MemTable.
type Options ¶
type Options struct {
DataDir string
FlushInterval time.Duration
WALOptions *wal.Options
MemTableSize int64
EnableCompaction bool
CompactionInterval time.Duration
EnableRetention bool
RetentionPeriod time.Duration
}
Options configures the TSDB
func DefaultOptions ¶
DefaultOptions returns default TSDB options
type RetentionManager ¶
type RetentionManager struct {
// contains filtered or unexported fields
}
RetentionManager manages data retention and garbage collection
func NewRetentionManager ¶
func NewRetentionManager(compactor *Compactor, opts *RetentionManagerOptions) *RetentionManager
NewRetentionManager creates a new retention manager
func (*RetentionManager) CalculateRetentionStats ¶
func (rm *RetentionManager) CalculateRetentionStats() (*RetentionStatsReport, error)
CalculateRetentionStats calculates statistics about data retention
func (*RetentionManager) CleanupNow ¶
func (rm *RetentionManager) CleanupNow() error
CleanupNow triggers an immediate cleanup (for testing/debugging)
func (*RetentionManager) Disable ¶
func (rm *RetentionManager) Disable()
Disable disables the retention policy
func (*RetentionManager) Enable ¶
func (rm *RetentionManager) Enable()
Enable enables the retention policy
func (*RetentionManager) GetPolicy ¶
func (rm *RetentionManager) GetPolicy() RetentionPolicy
GetPolicy returns the current retention policy
func (*RetentionManager) GetStats ¶
func (rm *RetentionManager) GetStats() RetentionStats
GetStats returns a snapshot of retention statistics
func (*RetentionManager) IsEnabled ¶
func (rm *RetentionManager) IsEnabled() bool
IsEnabled returns whether the retention policy is enabled
func (*RetentionManager) Run ¶
func (rm *RetentionManager) Run() error
Run starts the background retention enforcement loop
func (*RetentionManager) SetPolicy ¶
func (rm *RetentionManager) SetPolicy(policy RetentionPolicy)
SetPolicy updates the retention policy
func (*RetentionManager) Stop ¶
func (rm *RetentionManager) Stop() error
Stop stops the retention manager gracefully
type RetentionManagerOptions ¶
type RetentionManagerOptions struct {
Policy RetentionPolicy
Interval time.Duration
}
RetentionManagerOptions configures the retention manager
func DefaultRetentionManagerOptions ¶
func DefaultRetentionManagerOptions() *RetentionManagerOptions
DefaultRetentionManagerOptions returns default retention manager options
type RetentionPolicy ¶
type RetentionPolicy struct {
// MaxAge is the maximum age of data to keep
MaxAge time.Duration
// MinSamples is the minimum number of samples to keep per series
// If set, series with fewer samples won't be deleted even if old
MinSamples int64
// Enabled indicates if retention policy is active
Enabled bool
}
RetentionPolicy defines data retention rules
type RetentionStats ¶
type RetentionStats struct {
BlocksDeleted atomic.Int64
BytesReclaimed atomic.Int64
LastCleanupTime atomic.Int64 // Unix milliseconds
CleanupErrors atomic.Int64
TotalCleanups atomic.Int64
SeriesGarbageCollected atomic.Int64
}
RetentionStats holds retention metrics
type RetentionStatsReport ¶
type RetentionStatsReport struct {
TotalBlocks int
BlocksEligibleForDeletion int
TotalDataSize int64
ReclaimableSize int64
PolicyMaxAge time.Duration
CutoffTime int64
OldestBlockAge int64 // milliseconds
NewestBlockAge int64 // milliseconds
}
RetentionStatsReport provides a detailed retention statistics report
func (*RetentionStatsReport) String ¶
func (r *RetentionStatsReport) String() string
String returns a human-readable representation of the retention stats
type Stats ¶
type Stats struct {
TotalSamples atomic.Int64
TotalSeries atomic.Int64
FlushCount atomic.Int64
LastFlushTime atomic.Int64 // Unix milliseconds
WALSize atomic.Int64
ActiveMemTableSize atomic.Int64
}
Stats holds TSDB statistics
type StatsSnapshot ¶
type StatsSnapshot struct {
TotalSamples int64
TotalSeries int64
FlushCount int64
LastFlushTime int64
WALSize int64
ActiveMemTableSize int64
}
StatsSnapshot is a point-in-time snapshot of statistics
type TSDB ¶
type TSDB struct {
// contains filtered or unexported fields
}
TSDB is the main time-series database orchestrator. It coordinates WAL writes, MemTable operations, and background flushing.
func (*TSDB) GetAllLabels ¶
GetAllLabels returns all unique label names across all series (Phase 7)
func (*TSDB) GetCompactionStats ¶
func (db *TSDB) GetCompactionStats() *CompactionStats
GetCompactionStats returns compaction statistics (Phase 6)
func (*TSDB) GetLabelValues ¶
GetLabelValues returns all unique values for a specific label (Phase 7)
func (*TSDB) GetRetentionPolicy ¶
func (db *TSDB) GetRetentionPolicy() *RetentionPolicy
GetRetentionPolicy returns the current retention policy (Phase 6)
func (*TSDB) GetRetentionStats ¶
func (db *TSDB) GetRetentionStats() *RetentionStats
GetRetentionStats returns retention statistics (Phase 6)
func (*TSDB) GetStatsSnapshot ¶
func (db *TSDB) GetStatsSnapshot() StatsSnapshot
GetStatsSnapshot returns a simple snapshot of stats without atomic types
func (*TSDB) MemTableStats ¶
MemTableStats returns statistics about the current MemTables
func (*TSDB) SetRetentionPolicy ¶
func (db *TSDB) SetRetentionPolicy(policy RetentionPolicy) error
SetRetentionPolicy updates the retention policy (Phase 6)
func (*TSDB) TriggerCompaction ¶
TriggerCompaction manually triggers compaction (Phase 6)
func (*TSDB) TriggerFlush ¶
TriggerFlush manually triggers a flush operation