storage

package
v0.0.0-...-5161f6c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// BlockVersion is the current block format version
	BlockVersion = 1

	// ChunksDir is the subdirectory for chunks
	ChunksDir = "chunks"

	// MetaFile is the metadata file name
	MetaFile = "meta.json"

	// IndexFile is the index file name (placeholder for Phase 4)
	IndexFile = "index"

	// DefaultBlockDuration is the default block time window (2 hours)
	DefaultBlockDuration = 2 * time.Hour
)
View Source
const (
	// ChunkHeaderSize is the size of the chunk header in bytes
	ChunkHeaderSize = 24

	// ChunkFooterSize is the size of the chunk footer in bytes
	ChunkFooterSize = 4

	// DefaultMaxSamplesPerChunk is the default maximum number of samples per chunk
	// 120 samples = 2 hours @ 1-minute intervals
	DefaultMaxSamplesPerChunk = 120

	// EncodingGorilla indicates Gorilla compression (delta-of-delta + XOR)
	EncodingGorilla uint16 = 1
)
View Source
const (
	// DefaultCompactionInterval is how often to run compaction
	DefaultCompactionInterval = 5 * time.Minute

	// Level0Duration is the duration of Level 0 blocks (2 hours)
	Level0Duration = 2 * time.Hour

	// Level1Duration is the duration of Level 1 blocks (12 hours)
	Level1Duration = 12 * time.Hour

	// Level2Duration is the duration of Level 2 blocks (7 days)
	Level2Duration = 7 * 24 * time.Hour

	// MinBlocksForCompaction is the minimum number of blocks to trigger compaction
	MinBlocksForCompaction = 3
)
View Source
const (
	// DefaultMaxSize is the default maximum size in bytes (256MB)
	DefaultMaxSize = 256 * 1024 * 1024

	// EstimatedBytesPerSample is an estimate of memory usage per sample
	EstimatedBytesPerSample = 24 // 8 bytes timestamp + 8 bytes value + ~8 bytes overhead
)
View Source
const (
	// DefaultRetentionCheckInterval is how often to check retention policy
	DefaultRetentionCheckInterval = 1 * time.Hour

	// DefaultRetentionPeriod is the default data retention period (30 days)
	DefaultRetentionPeriod = 30 * 24 * time.Hour
)
View Source
const (
	// DefaultFlushInterval is how often to check if MemTable should be flushed
	DefaultFlushInterval = 30 * time.Second

	// DefaultWALDir is the default directory name for WAL files
	DefaultWALDir = "wal"
)

Variables

View Source
var (
	// ErrMemTableFull indicates the MemTable has reached its size limit
	ErrMemTableFull = errors.New("memtable is full")

	// ErrInvalidSample indicates the sample data is invalid
	ErrInvalidSample = errors.New("invalid sample")
)
View Source
var (
	// ErrClosed indicates the TSDB is closed
	ErrClosed = errors.New("tsdb: closed")

	// ErrReadOnly indicates the TSDB is in read-only mode
	ErrReadOnly = errors.New("tsdb: read-only mode")
)

Functions

This section is empty.

Types

type Block

type Block struct {
	// Metadata
	ULID    ulid.ULID // Unique, time-sortable identifier
	MinTime int64     // Minimum timestamp in block
	MaxTime int64     // Maximum timestamp in block

	// Statistics
	NumSamples int64 // Total number of samples
	NumSeries  int64 // Total number of unique series
	NumChunks  int64 // Total number of chunks
	// contains filtered or unexported fields
}

Block represents a time-partitioned immutable block of time-series data. Blocks organize data into 2-hour windows (configurable) and use ULID for sortable, time-based identification.

Directory structure:

data/
├── 01H8XABC00000000/    # Block ULID (sortable by time)
│   ├── meta.json         # Block metadata
│   ├── chunks/           # Compressed chunks directory
│   │   ├── 000001        # Chunk file for series 1
│   │   ├── 000002        # Chunk file for series 2
│   │   └── ...
│   └── index             # Series index (future: inverted index)
└── 01H8XDEF00000000/
    └── ...

func NewBlock

func NewBlock(minTime, maxTime int64) (*Block, error)

NewBlock creates a new empty block

func OpenBlock

func OpenBlock(dir string) (*Block, error)

OpenBlock opens an existing block from disk

func (*Block) AddSeries

func (b *Block) AddSeries(s *series.Series, samples []series.Sample) error

AddSeries adds a series with its samples to the block

func (*Block) Delete

func (b *Block) Delete() error

Delete removes the block from disk

func (*Block) Dir

func (b *Block) Dir() string

Dir returns the block directory path

func (*Block) GetSeries

func (b *Block) GetSeries(seriesHash uint64, minTime, maxTime int64) ([]series.Sample, error)

GetSeries retrieves samples for a series within a time range

func (*Block) LoadChunk

func (b *Block) LoadChunk(chunkFile string) (*Chunk, error)

LoadChunk loads a specific chunk from a block

func (*Block) Overlaps

func (b *Block) Overlaps(minTime, maxTime int64) bool

Overlaps checks if the block overlaps with the given time range

func (*Block) Persist

func (b *Block) Persist(dataDir string) error

Persist writes the block to disk

func (*Block) Size

func (b *Block) Size() int64

Size returns the approximate size of the block in bytes

func (*Block) String

func (b *Block) String() string

String returns a human-readable representation of the block

type BlockMeta

type BlockMeta struct {
	ULID         string            `json:"ulid"`
	MinTime      int64             `json:"minTime"`
	MaxTime      int64             `json:"maxTime"`
	Stats        BlockStats        `json:"stats"`
	Version      int               `json:"version"`
	Labels       map[string]string `json:"labels,omitempty"`
	SeriesChunks map[string]int    `json:"seriesChunks"` // seriesHash -> chunkFile number
}

BlockMeta contains block metadata stored in meta.json

type BlockReader

type BlockReader struct {
	// contains filtered or unexported fields
}

BlockReader helps read blocks from disk

func NewBlockReader

func NewBlockReader(dataDir string) *BlockReader

NewBlockReader creates a new block reader

func (*BlockReader) Blocks

func (br *BlockReader) Blocks() []*Block

Blocks returns all loaded blocks

func (*BlockReader) LoadBlocks

func (br *BlockReader) LoadBlocks() error

LoadBlocks loads all blocks from the data directory

func (*BlockReader) Query

func (br *BlockReader) Query(seriesHash uint64, minTime, maxTime int64) ([]series.Sample, error)

Query retrieves samples for a series across all blocks

type BlockStats

type BlockStats struct {
	NumSamples int64 `json:"numSamples"`
	NumSeries  int64 `json:"numSeries"`
	NumChunks  int64 `json:"numChunks"`
}

BlockStats contains block statistics

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter helps write MemTable data to blocks

func NewBlockWriter

func NewBlockWriter(dataDir string) *BlockWriter

NewBlockWriter creates a new block writer

func (*BlockWriter) WriteMemTable

func (bw *BlockWriter) WriteMemTable(mt *MemTable) (*Block, error)

WriteMemTable writes a MemTable to disk as a block

type Chunk

type Chunk struct {
	MinTime    int64  // Minimum timestamp in chunk
	MaxTime    int64  // Maximum timestamp in chunk
	NumSamples uint16 // Number of samples in chunk
	Encoding   uint16 // Encoding flags (reserved for future use)
	Data       []byte // Compressed data (timestamps + values)
	Checksum   uint32 // CRC32 checksum of data
}

Chunk represents a compressed time-series chunk containing multiple samples. It uses delta-of-delta encoding for timestamps and XOR compression for values as described in Facebook's Gorilla paper.

Format:

Header (24 bytes):
  [8 bytes: minTime]
  [8 bytes: maxTime]
  [2 bytes: numSamples]
  [4 bytes: dataLength]
  [2 bytes: encoding flags]

Data:
  [N bytes: compressed timestamps]
  [M bytes: compressed values]

Footer:
  [4 bytes: CRC32 checksum]

func NewChunk

func NewChunk() *Chunk

NewChunk creates a new empty chunk

func (*Chunk) Append

func (c *Chunk) Append(samples []series.Sample) error

Append compresses and appends samples to the chunk. This creates a new chunk with the provided samples.

func (*Chunk) CompressionRatio

func (c *Chunk) CompressionRatio() float64

CompressionRatio returns the compression ratio (uncompressed / compressed)

func (*Chunk) Iterator

func (c *Chunk) Iterator() (*ChunkIterator, error)

Iterator returns an iterator over the samples in the chunk

func (*Chunk) MarshalBinary

func (c *Chunk) MarshalBinary() ([]byte, error)

MarshalBinary serializes the chunk to bytes

func (*Chunk) ReadFrom

func (c *Chunk) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads a chunk from a reader

func (*Chunk) Size

func (c *Chunk) Size() int

Size returns the total size of the chunk in bytes

func (*Chunk) UnmarshalBinary

func (c *Chunk) UnmarshalBinary(data []byte) error

UnmarshalBinary deserializes the chunk from bytes

func (*Chunk) WriteTo

func (c *Chunk) WriteTo(w io.Writer) (int64, error)

WriteTo writes the chunk to a writer

type ChunkBuilder

type ChunkBuilder struct {
	// contains filtered or unexported fields
}

ChunkBuilder helps build chunks incrementally

func NewChunkBuilder

func NewChunkBuilder(maxSamples int) *ChunkBuilder

NewChunkBuilder creates a new chunk builder

func (*ChunkBuilder) Add

func (cb *ChunkBuilder) Add(sample series.Sample) bool

Add adds a sample to the builder

func (*ChunkBuilder) Build

func (cb *ChunkBuilder) Build() (*Chunk, error)

Build creates a chunk from the accumulated samples

func (*ChunkBuilder) Count

func (cb *ChunkBuilder) Count() int

Count returns the number of samples in the builder

func (*ChunkBuilder) IsFull

func (cb *ChunkBuilder) IsFull() bool

IsFull returns true if the chunk is full

func (*ChunkBuilder) Reset

func (cb *ChunkBuilder) Reset()

Reset clears the builder for reuse

type ChunkIterator

type ChunkIterator struct {
	// contains filtered or unexported fields
}

ChunkIterator iterates over samples in a chunk

func (*ChunkIterator) At

func (it *ChunkIterator) At() (series.Sample, error)

At returns the current sample

func (*ChunkIterator) Err

func (it *ChunkIterator) Err() error

Err returns any error that occurred during iteration

func (*ChunkIterator) Next

func (it *ChunkIterator) Next() bool

Next advances the iterator to the next sample

type CompactionLevel

type CompactionLevel int

CompactionLevel represents the tiered compaction level

const (
	// Level0 represents raw 2-hour ingestion blocks
	Level0 CompactionLevel = 0
	// Level1 represents merged 12-hour blocks (6x L0 blocks)
	Level1 CompactionLevel = 1
	// Level2 represents merged 7-day blocks (14x L1 blocks)
	Level2 CompactionLevel = 2
)

type CompactionStats

type CompactionStats struct {
	TotalCompactions   atomic.Int64
	BlocksMerged       atomic.Int64
	BytesReclaimed     atomic.Int64
	LastCompactionTime atomic.Int64 // Unix milliseconds
	CompactionErrors   atomic.Int64
	Level0Compactions  atomic.Int64
	Level1Compactions  atomic.Int64
}

CompactionStats holds compaction metrics

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor manages background compaction of time-series blocks. It implements a tiered compaction strategy similar to LSM trees: - Level 0: 2-hour blocks (raw ingestion) - Level 1: 12-hour blocks (merge 6x L0 blocks) - Level 2: 7-day blocks (merge 14x L1 blocks)

func NewCompactor

func NewCompactor(opts *CompactorOptions) *Compactor

NewCompactor creates a new compactor instance

func (*Compactor) BlockCount

func (c *Compactor) BlockCount() (level0, level1, level2 int, err error)

BlockCount returns the number of blocks at each level

func (*Compactor) CleanupOldBlocks

func (c *Compactor) CleanupOldBlocks(cutoffTime int64) (int, error)

CleanupOldBlocks removes blocks older than the specified cutoff time This is used by the retention policy

func (*Compactor) CompactNow

func (c *Compactor) CompactNow() error

CompactNow triggers an immediate compaction (for testing/debugging)

func (*Compactor) GetStats

func (c *Compactor) GetStats() CompactionStats

GetStats returns a snapshot of compaction statistics

func (*Compactor) Run

func (c *Compactor) Run() error

Run starts the background compaction loop

func (*Compactor) SetDataDir

func (c *Compactor) SetDataDir(dir string)

SetDataDir updates the data directory (for testing)

func (*Compactor) Stop

func (c *Compactor) Stop() error

Stop stops the compactor gracefully

func (*Compactor) ValidateBlocks

func (c *Compactor) ValidateBlocks() error

ValidateBlocks checks all blocks for corruption

type CompactorOptions

type CompactorOptions struct {
	DataDir     string
	Interval    time.Duration
	Concurrency int // Number of concurrent compaction workers
}

CompactorOptions configures the compactor

func DefaultCompactorOptions

func DefaultCompactorOptions(dataDir string) *CompactorOptions

DefaultCompactorOptions returns default compactor options

type MemTable

type MemTable struct {
	// contains filtered or unexported fields
}

MemTable is an in-memory buffer for time-series samples. It provides thread-safe operations for inserting and querying samples. When the MemTable reaches its size threshold, it should be flushed to disk.

func NewMemTable

func NewMemTable() *MemTable

NewMemTable creates a new MemTable with the default maximum size.

func NewMemTableWithSize

func NewMemTableWithSize(maxSize int64) *MemTable

NewMemTableWithSize creates a new MemTable with a custom maximum size.

func (*MemTable) AllSeries

func (m *MemTable) AllSeries() []uint64

AllSeries returns all series hashes in the MemTable.

func (*MemTable) Clear

func (m *MemTable) Clear()

Clear removes all data from the MemTable and resets its state. This is typically called after a successful flush to disk.

func (*MemTable) CreatedAt

func (m *MemTable) CreatedAt() time.Time

CreatedAt returns when this MemTable was created.

func (*MemTable) GetSeries

func (m *MemTable) GetSeries(seriesHash uint64) (*series.Series, bool)

GetSeries retrieves the series metadata for a given hash.

func (*MemTable) Insert

func (m *MemTable) Insert(s *series.Series, samples []series.Sample) error

Insert adds samples for a given series to the MemTable. Returns an error if the MemTable is full or if the input is invalid.

func (*MemTable) IsFull

func (m *MemTable) IsFull() bool

IsFull returns true if the MemTable has reached its size threshold.

func (*MemTable) MaxSize

func (m *MemTable) MaxSize() int64

MaxSize returns the maximum size threshold.

func (*MemTable) Query

func (m *MemTable) Query(seriesHash uint64, start, end int64) ([]series.Sample, error)

Query retrieves samples for a given series hash within a time range. Returns all samples if start and end are both 0.

func (*MemTable) SampleCount

func (m *MemTable) SampleCount() int64

SampleCount returns the total number of samples in the MemTable.

func (*MemTable) SeriesCount

func (m *MemTable) SeriesCount() int

SeriesCount returns the number of unique series in the MemTable.

func (*MemTable) Size

func (m *MemTable) Size() int64

Size returns the current size of the MemTable in bytes.

func (*MemTable) Stats

func (m *MemTable) Stats() string

Stats returns statistics about the MemTable.

func (*MemTable) TimeRange

func (m *MemTable) TimeRange() (int64, int64)

TimeRange returns the minimum and maximum timestamps in the MemTable.

type Options

type Options struct {
	DataDir            string
	FlushInterval      time.Duration
	WALOptions         *wal.Options
	MemTableSize       int64
	EnableCompaction   bool
	CompactionInterval time.Duration
	EnableRetention    bool
	RetentionPeriod    time.Duration
}

Options configures the TSDB

func DefaultOptions

func DefaultOptions(dataDir string) *Options

DefaultOptions returns default TSDB options

type RetentionManager

type RetentionManager struct {
	// contains filtered or unexported fields
}

RetentionManager manages data retention and garbage collection

func NewRetentionManager

func NewRetentionManager(compactor *Compactor, opts *RetentionManagerOptions) *RetentionManager

NewRetentionManager creates a new retention manager

func (*RetentionManager) CalculateRetentionStats

func (rm *RetentionManager) CalculateRetentionStats() (*RetentionStatsReport, error)

CalculateRetentionStats calculates statistics about data retention

func (*RetentionManager) CleanupNow

func (rm *RetentionManager) CleanupNow() error

CleanupNow triggers an immediate cleanup (for testing/debugging)

func (*RetentionManager) Disable

func (rm *RetentionManager) Disable()

Disable disables the retention policy

func (*RetentionManager) Enable

func (rm *RetentionManager) Enable()

Enable enables the retention policy

func (*RetentionManager) GetPolicy

func (rm *RetentionManager) GetPolicy() RetentionPolicy

GetPolicy returns the current retention policy

func (*RetentionManager) GetStats

func (rm *RetentionManager) GetStats() RetentionStats

GetStats returns a snapshot of retention statistics

func (*RetentionManager) IsEnabled

func (rm *RetentionManager) IsEnabled() bool

IsEnabled returns whether the retention policy is enabled

func (*RetentionManager) Run

func (rm *RetentionManager) Run() error

Run starts the background retention enforcement loop

func (*RetentionManager) SetPolicy

func (rm *RetentionManager) SetPolicy(policy RetentionPolicy)

SetPolicy updates the retention policy

func (*RetentionManager) Stop

func (rm *RetentionManager) Stop() error

Stop stops the retention manager gracefully

type RetentionManagerOptions

type RetentionManagerOptions struct {
	Policy   RetentionPolicy
	Interval time.Duration
}

RetentionManagerOptions configures the retention manager

func DefaultRetentionManagerOptions

func DefaultRetentionManagerOptions() *RetentionManagerOptions

DefaultRetentionManagerOptions returns default retention manager options

type RetentionPolicy

type RetentionPolicy struct {
	// MaxAge is the maximum age of data to keep
	MaxAge time.Duration

	// MinSamples is the minimum number of samples to keep per series
	// If set, series with fewer samples won't be deleted even if old
	MinSamples int64

	// Enabled indicates if retention policy is active
	Enabled bool
}

RetentionPolicy defines data retention rules

type RetentionStats

type RetentionStats struct {
	BlocksDeleted          atomic.Int64
	BytesReclaimed         atomic.Int64
	LastCleanupTime        atomic.Int64 // Unix milliseconds
	CleanupErrors          atomic.Int64
	TotalCleanups          atomic.Int64
	SeriesGarbageCollected atomic.Int64
}

RetentionStats holds retention metrics

type RetentionStatsReport

type RetentionStatsReport struct {
	TotalBlocks               int
	BlocksEligibleForDeletion int
	TotalDataSize             int64
	ReclaimableSize           int64
	PolicyMaxAge              time.Duration
	CutoffTime                int64
	OldestBlockAge            int64 // milliseconds
	NewestBlockAge            int64 // milliseconds
}

RetentionStatsReport provides a detailed retention statistics report

func (*RetentionStatsReport) String

func (r *RetentionStatsReport) String() string

String returns a human-readable representation of the retention stats

type Stats

type Stats struct {
	TotalSamples       atomic.Int64
	TotalSeries        atomic.Int64
	FlushCount         atomic.Int64
	LastFlushTime      atomic.Int64 // Unix milliseconds
	WALSize            atomic.Int64
	ActiveMemTableSize atomic.Int64
}

Stats holds TSDB statistics

type StatsSnapshot

type StatsSnapshot struct {
	TotalSamples       int64
	TotalSeries        int64
	FlushCount         int64
	LastFlushTime      int64
	WALSize            int64
	ActiveMemTableSize int64
}

StatsSnapshot is a point-in-time snapshot of statistics

type TSDB

type TSDB struct {
	// contains filtered or unexported fields
}

TSDB is the main time-series database orchestrator. It coordinates WAL writes, MemTable operations, and background flushing.

func Open

func Open(opts *Options) (*TSDB, error)

Open opens or creates a TSDB instance

func (*TSDB) Close

func (db *TSDB) Close() error

Close closes the TSDB and all its components

func (*TSDB) GetAllLabels

func (db *TSDB) GetAllLabels() ([]string, error)

GetAllLabels returns all unique label names across all series (Phase 7)

func (*TSDB) GetCompactionStats

func (db *TSDB) GetCompactionStats() *CompactionStats

GetCompactionStats returns compaction statistics (Phase 6)

func (*TSDB) GetLabelValues

func (db *TSDB) GetLabelValues(labelName string) ([]string, error)

GetLabelValues returns all unique values for a specific label (Phase 7)

func (*TSDB) GetRetentionPolicy

func (db *TSDB) GetRetentionPolicy() *RetentionPolicy

GetRetentionPolicy returns the current retention policy (Phase 6)

func (*TSDB) GetRetentionStats

func (db *TSDB) GetRetentionStats() *RetentionStats

GetRetentionStats returns retention statistics (Phase 6)

func (*TSDB) GetSeries

func (db *TSDB) GetSeries(seriesHash uint64) (*series.Series, bool)

GetSeries retrieves series metadata

func (*TSDB) GetStats

func (db *TSDB) GetStats() Stats

GetStats returns a snapshot of current TSDB statistics

func (*TSDB) GetStatsSnapshot

func (db *TSDB) GetStatsSnapshot() StatsSnapshot

GetStatsSnapshot returns a simple snapshot of stats without atomic types

func (*TSDB) Insert

func (db *TSDB) Insert(s *series.Series, samples []series.Sample) error

Insert adds samples for a series to the TSDB

func (*TSDB) MemTableStats

func (db *TSDB) MemTableStats() (active, flushing string)

MemTableStats returns statistics about the current MemTables

func (*TSDB) Query

func (db *TSDB) Query(seriesHash uint64, start, end int64) ([]series.Sample, error)

Query retrieves samples for a series within a time range

func (*TSDB) SetRetentionPolicy

func (db *TSDB) SetRetentionPolicy(policy RetentionPolicy) error

SetRetentionPolicy updates the retention policy (Phase 6)

func (*TSDB) TriggerCompaction

func (db *TSDB) TriggerCompaction() error

TriggerCompaction manually triggers compaction (Phase 6)

func (*TSDB) TriggerFlush

func (db *TSDB) TriggerFlush() error

TriggerFlush manually triggers a flush operation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL