tsdb

package
v0.37.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2022 License: Apache-2.0 Imports: 43 Imported by: 0

README

TSDB

GoPkg

This directory contains the Prometheus TSDB (Time Series DataBase) library, which handles storage and querying of all Prometheus v2 data.

Documentation

External resources

A series of blog posts explaining different components of TSDB:

Documentation

Overview

Package tsdb implements a time series storage for float64 sample data.

Example
package main

import (
	"context"
	"fmt"
	"math"
	"os"
	"time"

	"github.com/BraeTroutman/prometheus/model/labels"
)

func main() {
	// Create a random dir to work in.  Open() doesn't require a pre-existing dir, but
	// we want to make sure not to make a mess where we shouldn't.
	dir, err := os.MkdirTemp("", "tsdb-test")
	noErr(err)

	// Open a TSDB for reading and/or writing.
	db, err := Open(dir, nil, nil, DefaultOptions(), nil)
	noErr(err)

	// Open an appender for writing.
	app := db.Appender(context.Background())

	series := labels.FromStrings("foo", "bar")

	// Ref is 0 for the first append since we don't know the reference for the series.
	ref, err := app.Append(0, series, time.Now().Unix(), 123)
	noErr(err)

	// Another append for a second later.
	// Re-using the ref from above since it's the same series, makes append faster.
	time.Sleep(time.Second)
	_, err = app.Append(ref, series, time.Now().Unix(), 124)
	noErr(err)

	// Commit to storage.
	err = app.Commit()
	noErr(err)

	// In case you want to do more appends after app.Commit(),
	// you need a new appender.
	app = db.Appender(context.Background())
	// ... adding more samples.

	// Open a querier for reading.
	querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
	noErr(err)
	ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))

	for ss.Next() {
		series := ss.At()
		fmt.Println("series:", series.Labels().String())

		it := series.Iterator()
		for it.Next() {
			_, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
			fmt.Println("sample", v)
		}

		fmt.Println("it.Err():", it.Err())
	}
	fmt.Println("ss.Err():", ss.Err())
	ws := ss.Warnings()
	if len(ws) > 0 {
		fmt.Println("warnings:", ws)
	}
	err = querier.Close()
	noErr(err)

	// Clean up any last resources when done.
	err = db.Close()
	noErr(err)
	err = os.RemoveAll(dir)
	noErr(err)

}

func noErr(err error) {
	if err != nil {
		panic(err)
	}
}
Output:

series: {foo="bar"}
sample 123
sample 124
it.Err(): <nil>
ss.Err(): <nil>

Index

Examples

Constants

View Source
const (
	// WALMagic is a 4 byte number every WAL segment file starts with.
	WALMagic = uint32(0x43AF00EF)

	// WALFormatDefault is the version flag for the default outer segment file format.
	WALFormatDefault = byte(1)
)
View Source
const (
	// Default duration of a block in milliseconds.
	DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond)
)
View Source
const (
	// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
	DefaultStripeSize = 1 << 14
)

Variables

View Source
var (
	// ErrInvalidSample is returned if an appended sample is not valid and can't
	// be ingested.
	ErrInvalidSample = errors.New("invalid sample")
	// ErrInvalidExemplar is returned if an appended exemplar is not valid and can't
	// be ingested.
	ErrInvalidExemplar = errors.New("invalid exemplar")
	// ErrAppenderClosed is returned if an appender has already be successfully
	// rolled back or committed.
	ErrAppenderClosed = errors.New("appender closed")
)
View Source
var ErrClosed = errors.New("db already closed")

ErrClosed is returned when the db is closed.

View Source
var ErrClosing = errors.New("block is closing")

ErrClosing is returned when a block is in the process of being closed.

View Source
var ErrInvalidTimes = fmt.Errorf("max time is lesser than min time")
View Source
var ErrNoSeriesAppended error = errors.New("no series appended, aborting")

ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.

View Source
var ErrNotReady = errors.New("TSDB not ready")

ErrNotReady is returned if the underlying storage is not ready yet.

Functions

func BeyondSizeRetention

func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})

BeyondSizeRetention returns those blocks which are beyond the size retention set in the db options.

func BeyondTimeRetention

func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})

BeyondTimeRetention returns those blocks which are beyond the time retention set in the db options.

func CreateBlock

func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)

CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk.

func DeleteChunkSnapshots

func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error

DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.

func ExponentialBlockRanges

func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64

ExponentialBlockRanges returns the time ranges based on the stepSize.

func LastChunkSnapshot

func LastChunkSnapshot(dir string) (string, int, int, error)

LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. If dir does not contain any chunk snapshots, ErrNotFound is returned.

func MigrateWAL

func MigrateWAL(logger log.Logger, dir string) (err error)

MigrateWAL rewrites the deprecated write ahead log into the new format.

func NewBlockChunkQuerier

func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)

NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.

func NewBlockQuerier

func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)

NewBlockQuerier returns a querier against the block reader and requested min and max time range.

func NewMergedStringIter

func NewMergedStringIter(a, b index.StringIter) index.StringIter

NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.

func PostingsForMatchers

func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

Types

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block represents a directory of time series data covering a continuous time range.

func OpenBlock

func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error)

OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used to instantiate chunk structs.

func (*Block) Chunks

func (pb *Block) Chunks() (ChunkReader, error)

Chunks returns a new ChunkReader against the block data.

func (*Block) CleanTombstones

func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error)

CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). If there was a rewrite, then it returns the ULID of the new block written, else nil. If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. It returns a boolean indicating if the parent block can be deleted safely of not.

func (*Block) Close

func (pb *Block) Close() error

Close closes the on-disk block. It blocks as long as there are readers reading from the block.

func (*Block) Delete

func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error

Delete matching series between mint and maxt in the block.

func (*Block) Dir

func (pb *Block) Dir() string

Dir returns the directory of the block.

func (*Block) GetSymbolTableSize

func (pb *Block) GetSymbolTableSize() uint64

GetSymbolTableSize returns the Symbol Table Size in the index of this block.

func (*Block) Index

func (pb *Block) Index() (IndexReader, error)

Index returns a new IndexReader against the block data.

func (*Block) LabelNames

func (pb *Block) LabelNames() ([]string, error)

LabelNames returns all the unique label names present in the Block in sorted order.

func (*Block) MaxTime

func (pb *Block) MaxTime() int64

MaxTime returns the max time of the meta.

func (*Block) Meta

func (pb *Block) Meta() BlockMeta

Meta returns meta information about the block.

func (*Block) MinTime

func (pb *Block) MinTime() int64

MinTime returns the min time of the meta.

func (*Block) OverlapsClosedInterval

func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool

OverlapsClosedInterval returns true if the block overlaps [mint, maxt].

func (*Block) Size

func (pb *Block) Size() int64

Size returns the number of bytes that the block takes up.

func (*Block) Snapshot

func (pb *Block) Snapshot(dir string) error

Snapshot creates snapshot of the block into dir.

func (*Block) String

func (pb *Block) String() string

func (*Block) Tombstones

func (pb *Block) Tombstones() (tombstones.Reader, error)

Tombstones returns a new TombstoneReader against the block data.

type BlockDesc

type BlockDesc struct {
	ULID    ulid.ULID `json:"ulid"`
	MinTime int64     `json:"minTime"`
	MaxTime int64     `json:"maxTime"`
}

BlockDesc describes a block by ULID and time range.

type BlockMeta

type BlockMeta struct {
	// Unique identifier for the block and its contents. Changes on compaction.
	ULID ulid.ULID `json:"ulid"`

	// MinTime and MaxTime specify the time range all samples
	// in the block are in.
	MinTime int64 `json:"minTime"`
	MaxTime int64 `json:"maxTime"`

	// Stats about the contents of the block.
	Stats BlockStats `json:"stats,omitempty"`

	// Information on compactions the block was created from.
	Compaction BlockMetaCompaction `json:"compaction"`

	// Version of the index format.
	Version int `json:"version"`
}

BlockMeta provides meta information about a block.

func CompactBlockMetas

func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta

CompactBlockMetas merges many block metas into one, combining it's source blocks together and adjusting compaction level. Min/Max time of result block meta covers all input blocks.

type BlockMetaCompaction

type BlockMetaCompaction struct {
	// Maximum number of compaction cycles any source block has
	// gone through.
	Level int `json:"level"`
	// ULIDs of all source head blocks that went into the block.
	Sources []ulid.ULID `json:"sources,omitempty"`
	// Indicates that during compaction it resulted in a block without any samples
	// so it should be deleted on the next reloadBlocks.
	Deletable bool `json:"deletable,omitempty"`
	// Short descriptions of the direct blocks that were used to create
	// this block.
	Parents []BlockDesc `json:"parents,omitempty"`
	Failed  bool        `json:"failed,omitempty"`
}

BlockMetaCompaction holds information about compactions a block went through.

type BlockReader

type BlockReader interface {
	// Index returns an IndexReader over the block's data.
	Index() (IndexReader, error)

	// Chunks returns a ChunkReader over the block's data.
	Chunks() (ChunkReader, error)

	// Tombstones returns a tombstones.Reader over the block's deleted data.
	Tombstones() (tombstones.Reader, error)

	// Meta provides meta information about the block reader.
	Meta() BlockMeta

	// Size returns the number of bytes that the block takes up on disk.
	Size() int64
}

BlockReader provides reading access to a data block.

type BlockStats

type BlockStats struct {
	NumSamples    uint64 `json:"numSamples,omitempty"`
	NumSeries     uint64 `json:"numSeries,omitempty"`
	NumChunks     uint64 `json:"numChunks,omitempty"`
	NumTombstones uint64 `json:"numTombstones,omitempty"`
}

BlockStats contains stats about contents of a block.

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter is a block writer that allows appending and flushing series to disk.

func NewBlockWriter

func NewBlockWriter(logger log.Logger, dir string, blockSize int64) (*BlockWriter, error)

NewBlockWriter create a new block writer.

The returned writer accumulates all the series in the Head block until `Flush` is called.

Note that the writer will not check if the target directory exists or contains anything at all. It is the caller's responsibility to ensure that the resulting blocks do not overlap etc. Writer ensures the block flush is atomic (via rename).

func (*BlockWriter) Appender

func (w *BlockWriter) Appender(ctx context.Context) storage.Appender

Appender returns a new appender on the database. Appender can't be called concurrently. However, the returned Appender can safely be used concurrently.

func (*BlockWriter) Close

func (w *BlockWriter) Close() error

func (*BlockWriter) Flush

func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error)

Flush implements the Writer interface. This is where actual block writing happens. After flush completes, no writes can be done.

type BlocksToDeleteFunc

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}

func DefaultBlocksToDelete

func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc

DefaultBlocksToDelete returns a filter which decides time based and size based retention from the options of the db.

type ChunkReader

type ChunkReader interface {
	// Chunk returns the series data chunk with the given reference.
	Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error)

	// Close releases all underlying resources of the reader.
	Close() error
}

ChunkReader provides reading access of serialized time series data.

type ChunkSnapshotStats

type ChunkSnapshotStats struct {
	TotalSeries int
	Dir         string
}

ChunkSnapshotStats returns stats about a created chunk snapshot.

type ChunkWriter

type ChunkWriter interface {
	// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
	// must be populated.
	// After returning successfully, the Ref fields in the ChunkMetas
	// are set and can be used to retrieve the chunks from the written data.
	WriteChunks(chunks ...chunks.Meta) error

	// Close writes any required finalization and closes the resources
	// associated with the underlying writer.
	Close() error
}

ChunkWriter serializes a time block of chunked series data.

type CircularExemplarStorage

type CircularExemplarStorage struct {
	// contains filtered or unexported fields
}

func (*CircularExemplarStorage) AddExemplar

func (*CircularExemplarStorage) Appender

func (*CircularExemplarStorage) ApplyConfig

func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error

func (*CircularExemplarStorage) ExemplarQuerier

func (*CircularExemplarStorage) IterateExemplars

func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error

IterateExemplars iterates through all the exemplars from oldest to newest appended and calls the given function on all of them till the end (or) till the first function call that returns an error.

func (*CircularExemplarStorage) Querier

func (*CircularExemplarStorage) Resize

func (ce *CircularExemplarStorage) Resize(l int64) int

Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.

func (*CircularExemplarStorage) Select

func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)

Select returns exemplars for a given set of label matchers.

func (*CircularExemplarStorage) ValidateExemplar

func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error

type Compactor

type Compactor interface {
	// Plan returns a set of directories that can be compacted concurrently.
	// The directories can be overlapping.
	// Results returned when compactions are in progress are undefined.
	Plan(dir string) ([]string, error)

	// Write persists a Block into a directory.
	// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
	Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)

	// Compact runs compaction against the provided directories. Must
	// only be called concurrently with results of Plan().
	// Can optionally pass a list of already open blocks,
	// to avoid having to reopen them.
	// When resulting Block has 0 samples
	//  * No block is written.
	//  * The source dirs are marked Deletable.
	//  * Returns empty ulid.ULID{}.
	Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
}

Compactor provides compaction against an underlying storage of time series data.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB handles reads and writes of time series falling into a hashed partition of a seriedb.

func Open

func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error)

Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.

func (*DB) Appender

func (db *DB) Appender(ctx context.Context) storage.Appender

Appender opens a new appender against the database.

func (*DB) ApplyConfig

func (db *DB) ApplyConfig(conf *config.Config) error

func (*DB) Blocks

func (db *DB) Blocks() []*Block

Blocks returns the databases persisted blocks.

func (*DB) ChunkQuerier

func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier returns a new chunk querier over the data partition for the given time range.

func (*DB) CleanTombstones

func (db *DB) CleanTombstones() (err error)

CleanTombstones re-writes any blocks with tombstones.

func (*DB) Close

func (db *DB) Close() error

Close the partition.

func (*DB) Compact

func (db *DB) Compact() (returnErr error)

Compact data if possible. After successful compaction blocks are reloaded which will also delete the blocks that fall out of the retention window. Old blocks are only deleted on reloadBlocks based on the new block's parent information. See DB.reloadBlocks documentation for further information.

func (*DB) CompactHead

func (db *DB) CompactHead(head *RangeHead) error

CompactHead compacts the given RangeHead.

func (*DB) Delete

func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error

Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.

func (*DB) Dir

func (db *DB) Dir() string

Dir returns the directory of the database.

func (*DB) DisableCompactions

func (db *DB) DisableCompactions()

DisableCompactions disables auto compactions.

func (*DB) EnableCompactions

func (db *DB) EnableCompactions()

EnableCompactions enables auto compactions.

func (*DB) ExemplarQuerier

func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

func (*DB) Head

func (db *DB) Head() *Head

Head returns the databases's head.

func (*DB) Querier

func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error)

Querier returns a new querier over the data partition for the given time range.

func (*DB) Snapshot

func (db *DB) Snapshot(dir string, withHead bool) error

Snapshot writes the current data to the directory. If withHead is set to true it will create a new block containing all data that's currently in the memory buffer/WAL.

func (*DB) StartTime

func (db *DB) StartTime() (int64, error)

StartTime implements the Storage interface.

func (*DB) String

func (db *DB) String() string

type DBReadOnly

type DBReadOnly struct {
	// contains filtered or unexported fields
}

DBReadOnly provides APIs for read only operations on a database. Current implementation doesn't support concurrency so all API calls should happen in the same go routine.

func OpenDBReadOnly

func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error)

OpenDBReadOnly opens DB in the given directory for read only operations.

func (*DBReadOnly) Blocks

func (db *DBReadOnly) Blocks() ([]BlockReader, error)

Blocks returns a slice of block readers for persisted blocks.

func (*DBReadOnly) ChunkQuerier

func (db *DBReadOnly) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. Current implementation doesn't support multiple ChunkQueriers.

func (*DBReadOnly) Close

func (db *DBReadOnly) Close() error

Close all block readers.

func (*DBReadOnly) FlushWAL

func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)

FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory can race.

func (*DBReadOnly) Querier

func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)

Querier loads the blocks and wal and returns a new querier over the data partition for the given time range. Current implementation doesn't support multiple Queriers.

type DBStats

type DBStats struct {
	Head *HeadStats
}

DBStats contains statistics about the DB separated by component (eg. head). They are available before the DB has finished initializing.

func NewDBStats

func NewDBStats() *DBStats

NewDBStats returns a new DBStats object initialized using the the new function from each component.

type DeletedIterator

type DeletedIterator struct {
	// Iter is an Iterator to be wrapped.
	Iter chunkenc.Iterator
	// Intervals are the deletion intervals.
	Intervals tombstones.Intervals
}

DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.

func (*DeletedIterator) At

func (it *DeletedIterator) At() (int64, float64)

func (*DeletedIterator) Err

func (it *DeletedIterator) Err() error

func (*DeletedIterator) Next

func (it *DeletedIterator) Next() bool

func (*DeletedIterator) Seek

func (it *DeletedIterator) Seek(t int64) bool

type ExemplarMetrics

type ExemplarMetrics struct {
	// contains filtered or unexported fields
}

func NewExemplarMetrics

func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics

type ExemplarStorage

type ExemplarStorage interface {
	storage.ExemplarQueryable
	AddExemplar(labels.Labels, exemplar.Exemplar) error
	ValidateExemplar(labels.Labels, exemplar.Exemplar) error
	IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
}

func NewCircularExemplarStorage

func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage, error)

NewCircularExemplarStorage creates an circular in memory exemplar storage. If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in 1GB of extra memory, accounting for the fact that this is heap allocated space. If len <= 0, then the exemplar storage is essentially a noop storage but can later be resized to store exemplars.

type Head struct {
	// contains filtered or unexported fields
}

Head handles reads and writes of time series data within a time window.

func NewHead

func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error)

NewHead opens the head block in dir.

func (*Head) AppendableMinValidTime

func (h *Head) AppendableMinValidTime() (int64, bool)

AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.

func (*Head) Appender

func (h *Head) Appender(_ context.Context) storage.Appender

Appender returns a new Appender on the database.

func (*Head) ApplyConfig

func (h *Head) ApplyConfig(cfg *config.Config) error

func (*Head) ChunkSnapshot

func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)

ChunkSnapshot creates a snapshot of all the series and tombstones in the head. It deletes the old chunk snapshots if the chunk snapshot creation is successful.

The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written using the WAL package. N is the last WAL segment present during snapshotting and M is the offset in segment N upto which data was written.

The snapshot first contains all series (each in individual records and not sorted), followed by tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they were written to the circular buffer.

func (*Head) Chunks

func (h *Head) Chunks() (ChunkReader, error)

Chunks returns a ChunkReader against the block.

func (*Head) Close

func (h *Head) Close() error

Close flushes the WAL and closes the head. It also takes a snapshot of in-memory chunks if enabled.

func (*Head) Delete

func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error

Delete all samples in the range of [mint, maxt] for series that satisfy the given label matchers.

func (*Head) ExemplarQuerier

func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

func (*Head) Index

func (h *Head) Index() (IndexReader, error)

Index returns an IndexReader against the block.

func (*Head) Init

func (h *Head) Init(minValidTime int64) error

Init loads data from the write ahead log and prepares the head for writes. It should be called before using an appender so that it limits the ingested samples to the head min valid time.

func (*Head) IsQuerierCollidingWithTruncation

func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)

IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier has to be created. In the latter case, the method also returns the new mint to be used for creating the new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.

NOTE: The querier should already be taken before calling this.

func (*Head) MaxTime

func (h *Head) MaxTime() int64

MaxTime returns the highest timestamp seen in data of the head.

func (*Head) Meta

func (h *Head) Meta() BlockMeta

Meta returns meta information about the head. The head is dynamic so will return dynamic results.

func (*Head) MinTime

func (h *Head) MinTime() int64

MinTime returns the lowest time bound on visible data in the head.

func (*Head) NumSeries

func (h *Head) NumSeries() uint64

NumSeries returns the number of active series in the head.

func (*Head) OverlapsClosedInterval

func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool

OverlapsClosedInterval returns true if the head overlaps [mint, maxt].

func (*Head) PostingsCardinalityStats

func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats

PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.

func (*Head) SetMinValidTime

func (h *Head) SetMinValidTime(minValidTime int64)

SetMinValidTime sets the minimum timestamp the head can ingest.

func (*Head) Size

func (h *Head) Size() int64

func (*Head) Stats

func (h *Head) Stats(statsByLabelName string) *Stats

Stats returns important current HEAD statistics. Note that it is expensive to calculate these.

func (*Head) String

func (h *Head) String() string

String returns an human readable representation of the TSDB head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*Head) Tombstones

func (h *Head) Tombstones() (tombstones.Reader, error)

Tombstones returns a new reader over the head's tombstones

func (*Head) Truncate

func (h *Head) Truncate(mint int64) (err error)

Truncate removes old data before mint from the head and WAL.

func (*Head) WaitForPendingReadersInTimeRange

func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64)

WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. The query timeout limits the max wait time of this function implicitly. The mint is inclusive and maxt is the truncation time hence exclusive.

type HeadOptions

type HeadOptions struct {
	// Runtime reloadable option. At the top of the struct for 32 bit OS:
	// https://pkg.go.dev/sync/atomic#pkg-note-BUG
	MaxExemplars atomic.Int64

	ChunkRange int64
	// ChunkDirRoot is the parent directory of the chunks directory.
	ChunkDirRoot         string
	ChunkPool            chunkenc.Pool
	ChunkWriteBufferSize int
	ChunkWriteQueueSize  int

	// StripeSize sets the number of entries in the hash map, it must be a power of 2.
	// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
	// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
	StripeSize                     int
	SeriesCallback                 SeriesLifecycleCallback
	EnableExemplarStorage          bool
	EnableMemorySnapshotOnShutdown bool

	IsolationDisabled bool
}

HeadOptions are parameters for the Head block.

func DefaultHeadOptions

func DefaultHeadOptions() *HeadOptions

type HeadStats

type HeadStats struct {
	WALReplayStatus *WALReplayStatus
}

HeadStats are the statistics for the head component of the DB.

func NewHeadStats

func NewHeadStats() *HeadStats

NewHeadStats returns a new HeadStats object.

type IndexReader

type IndexReader interface {
	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections. Input values must be sorted.
	Postings(name string, values ...string) (index.Postings, error)

	// SortedPostings returns a postings list that is reordered to be sorted
	// by the label set of the underlying series.
	SortedPostings(index.Postings) index.Postings

	// Series populates the given labels and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by IDs.
	// The names returned are sorted.
	LabelNamesFor(ids ...storage.SeriesRef) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

type IndexWriter

type IndexWriter interface {
	// AddSymbol registers a single symbol.
	// Symbols must be registered in sorted order.
	AddSymbol(sym string) error

	// AddSeries populates the index writer with a series and its offsets
	// of chunks that the index can reference.
	// Implementations may require series to be insert in strictly increasing order by
	// their labels. The reference numbers are used to resolve entries in postings lists
	// that are added later.
	AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error

	// Close writes any finalization and closes the resources associated with
	// the underlying writer.
	Close() error
}

IndexWriter serializes the index for a block of series data. The methods must be called in the order they are specified in.

type LeveledCompactor

type LeveledCompactor struct {
	// contains filtered or unexported fields
}

LeveledCompactor implements the Compactor interface.

func NewLeveledCompactor

func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)

NewLeveledCompactor returns a LeveledCompactor.

func NewLeveledCompactorWithChunkSize

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)

func (*LeveledCompactor) Compact

func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error)

Compact creates a new block in the compactor's directory from the blocks in the provided directories.

func (*LeveledCompactor) Plan

func (c *LeveledCompactor) Plan(dir string) ([]string, error)

Plan returns a list of compactable blocks in the provided directory.

func (*LeveledCompactor) Write

func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)

type Options

type Options struct {
	// Segments (wal files) max size.
	// WALSegmentSize = 0, segment size is default size.
	// WALSegmentSize > 0, segment size is WALSegmentSize.
	// WALSegmentSize < 0, wal is disabled.
	WALSegmentSize int

	// MaxBlockChunkSegmentSize is the max size of block chunk segment files.
	// MaxBlockChunkSegmentSize = 0, chunk segment size is default size.
	// MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize.
	MaxBlockChunkSegmentSize int64

	// Duration of persisted data to keep.
	// Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration.
	// Typically it is in milliseconds.
	RetentionDuration int64

	// Maximum number of bytes in blocks to be retained.
	// 0 or less means disabled.
	// NOTE: For proper storage calculations need to consider
	// the size of the WAL folder which is not added when calculating
	// the current size of the database.
	MaxBytes int64

	// NoLockfile disables creation and consideration of a lock file.
	NoLockfile bool

	// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
	// This in-turn enables vertical compaction and vertical query merge.
	AllowOverlappingBlocks bool

	// WALCompression will turn on Snappy compression for records on the WAL.
	WALCompression bool

	// StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance.
	StripeSize int

	// The timestamp range of head blocks after which they get persisted.
	// It's the minimum duration of any persisted block.
	// Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration.
	// Typically it is in milliseconds.
	MinBlockDuration int64

	// The maximum timestamp range of compacted blocks.
	// Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration.
	// Typically it is in milliseconds.
	MaxBlockDuration int64

	// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
	HeadChunksWriteBufferSize int

	// HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper.
	HeadChunksWriteQueueSize int

	// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
	// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
	SeriesLifecycleCallback SeriesLifecycleCallback

	// BlocksToDelete is a function which returns the blocks which can be deleted.
	// It is always the default time and size based retention in Prometheus and
	// mainly meant for external users who import TSDB.
	BlocksToDelete BlocksToDeleteFunc

	// Enables the in memory exemplar storage.
	EnableExemplarStorage bool

	// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
	EnableMemorySnapshotOnShutdown bool

	// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
	// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
	MaxExemplars int64

	// Disables isolation between reads and in-flight appends.
	IsolationDisabled bool
}

Options of the DB storage.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions used for the DB. They are sane for setups using millisecond precision timestamps.

type Overlaps

type Overlaps map[TimeRange][]BlockMeta

Overlaps contains overlapping blocks aggregated by overlapping range.

func OverlappingBlocks

func OverlappingBlocks(bm []BlockMeta) Overlaps

OverlappingBlocks returns all overlapping blocks from given meta files.

func (Overlaps) String

func (o Overlaps) String() string

String returns human readable string form of overlapped blocks.

type RangeHead

type RangeHead struct {
	// contains filtered or unexported fields
}

RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader but only within a restricted range. Used for queries and compactions.

func NewRangeHead

func NewRangeHead(head *Head, mint, maxt int64) *RangeHead

NewRangeHead returns a *RangeHead. There are no restrictions on mint/maxt.

func (*RangeHead) BlockMaxTime

func (h *RangeHead) BlockMaxTime() int64

BlockMaxTime returns the max time of the potential block created from this head. It's different to MaxTime as we need to add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.

func (*RangeHead) Chunks

func (h *RangeHead) Chunks() (ChunkReader, error)

func (*RangeHead) Index

func (h *RangeHead) Index() (IndexReader, error)

func (*RangeHead) MaxTime

func (h *RangeHead) MaxTime() int64

MaxTime returns the max time of actual data fetch-able from the head. This controls the chunks time range which is closed [b.MinTime, b.MaxTime].

func (*RangeHead) Meta

func (h *RangeHead) Meta() BlockMeta

func (*RangeHead) MinTime

func (h *RangeHead) MinTime() int64

func (*RangeHead) NumSeries

func (h *RangeHead) NumSeries() uint64

func (*RangeHead) Size

func (h *RangeHead) Size() int64

func (*RangeHead) String

func (h *RangeHead) String() string

String returns an human readable representation of the range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*RangeHead) Tombstones

func (h *RangeHead) Tombstones() (tombstones.Reader, error)

type SegmentWAL

type SegmentWAL struct {
	// contains filtered or unexported fields
}

SegmentWAL is a write ahead log for series data.

DEPRECATED: use wal pkg combined with the record coders instead.

func OpenSegmentWAL

func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error)

OpenSegmentWAL opens or creates a write ahead log in the given directory. The WAL must be read completely before new data is written.

func (*SegmentWAL) Close

func (w *SegmentWAL) Close() error

Close syncs all data and closes the underlying resources.

func (*SegmentWAL) LogDeletes

func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error

LogDeletes write a batch of new deletes to the log.

func (*SegmentWAL) LogSamples

func (w *SegmentWAL) LogSamples(samples []record.RefSample) error

LogSamples writes a batch of new samples to the log.

func (*SegmentWAL) LogSeries

func (w *SegmentWAL) LogSeries(series []record.RefSeries) error

LogSeries writes a batch of new series labels to the log. The series have to be ordered.

func (*SegmentWAL) Reader

func (w *SegmentWAL) Reader() WALReader

Reader returns a new reader over the write ahead log data. It must be completely consumed before writing to the WAL.

func (*SegmentWAL) Sync

func (w *SegmentWAL) Sync() error

Sync flushes the changes to disk.

func (*SegmentWAL) Truncate

func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool) error

Truncate deletes the values prior to mint and the series which the keep function does not indicate to preserve.

type SeriesLifecycleCallback

type SeriesLifecycleCallback interface {
	// PreCreation is called before creating a series to indicate if the series can be created.
	// A non nil error means the series should not be created.
	PreCreation(labels.Labels) error
	// PostCreation is called after creating a series to indicate a creation of series.
	PostCreation(labels.Labels)
	// PostDeletion is called after deletion of series.
	PostDeletion(...labels.Labels)
}

SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. It is always a no-op in Prometheus and mainly meant for external users who import TSDB. All the callbacks should be safe to be called concurrently. It is up to the user to implement soft or hard consistency by making the callbacks atomic or non-atomic. Atomic callbacks can cause degradation performance.

type Stats

type Stats struct {
	NumSeries         uint64
	MinTime, MaxTime  int64
	IndexPostingStats *index.PostingsStats
}

type TimeRange

type TimeRange struct {
	Min, Max int64
}

TimeRange specifies minTime and maxTime range.

type WAL

type WAL interface {
	Reader() WALReader
	LogSeries([]record.RefSeries) error
	LogSamples([]record.RefSample) error
	LogDeletes([]tombstones.Stone) error
	Truncate(mint int64, keep func(uint64) bool) error
	Close() error
}

WAL is a write ahead log that can log new series labels and samples. It must be completely read before new entries are logged.

DEPRECATED: use wal pkg combined with the record codex instead.

type WALEntryType

type WALEntryType uint8

WALEntryType indicates what data a WAL entry contains.

const (
	WALEntrySymbols WALEntryType = 1
	WALEntrySeries  WALEntryType = 2
	WALEntrySamples WALEntryType = 3
	WALEntryDeletes WALEntryType = 4
)

Entry types in a segment file.

type WALReader

type WALReader interface {
	Read(
		seriesf func([]record.RefSeries),
		samplesf func([]record.RefSample),
		deletesf func([]tombstones.Stone),
	) error
}

WALReader reads entries from a WAL.

type WALReplayStatus

type WALReplayStatus struct {
	sync.RWMutex
	Min     int
	Max     int
	Current int
}

WALReplayStatus contains status information about the WAL replay.

func (*WALReplayStatus) GetWALReplayStatus

func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus

GetWALReplayStatus returns the WAL replay status information.

Directories

Path Synopsis
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
Package goversion enforces the go version supported by the tsdb module.
Package goversion enforces the go version supported by the tsdb module.
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL