Documentation ¶
Overview ¶
Package tsdb implements a time series storage for float64 sample data.
Example ¶
package main import ( "context" "fmt" "math" "os" "time" "github.com/BraeTroutman/prometheus/model/labels" ) func main() { // Create a random dir to work in. Open() doesn't require a pre-existing dir, but // we want to make sure not to make a mess where we shouldn't. dir, err := os.MkdirTemp("", "tsdb-test") noErr(err) // Open a TSDB for reading and/or writing. db, err := Open(dir, nil, nil, DefaultOptions(), nil) noErr(err) // Open an appender for writing. app := db.Appender(context.Background()) series := labels.FromStrings("foo", "bar") // Ref is 0 for the first append since we don't know the reference for the series. ref, err := app.Append(0, series, time.Now().Unix(), 123) noErr(err) // Another append for a second later. // Re-using the ref from above since it's the same series, makes append faster. time.Sleep(time.Second) _, err = app.Append(ref, series, time.Now().Unix(), 124) noErr(err) // Commit to storage. err = app.Commit() noErr(err) // In case you want to do more appends after app.Commit(), // you need a new appender. app = db.Appender(context.Background()) // ... adding more samples. // Open a querier for reading. querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) noErr(err) ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) for ss.Next() { series := ss.At() fmt.Println("series:", series.Labels().String()) it := series.Iterator() for it.Next() { _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below) fmt.Println("sample", v) } fmt.Println("it.Err():", it.Err()) } fmt.Println("ss.Err():", ss.Err()) ws := ss.Warnings() if len(ws) > 0 { fmt.Println("warnings:", ws) } err = querier.Close() noErr(err) // Clean up any last resources when done. err = db.Close() noErr(err) err = os.RemoveAll(dir) noErr(err) } func noErr(err error) { if err != nil { panic(err) } }
Output: series: {foo="bar"} sample 123 sample 124 it.Err(): <nil> ss.Err(): <nil>
Index ¶
- Constants
- Variables
- func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})
- func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})
- func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)
- func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error
- func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64
- func LastChunkSnapshot(dir string) (string, int, int, error)
- func MigrateWAL(logger log.Logger, dir string) (err error)
- func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)
- func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)
- func NewMergedStringIter(a, b index.StringIter) index.StringIter
- func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error)
- type Block
- func (pb *Block) Chunks() (ChunkReader, error)
- func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error)
- func (pb *Block) Close() error
- func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error
- func (pb *Block) Dir() string
- func (pb *Block) GetSymbolTableSize() uint64
- func (pb *Block) Index() (IndexReader, error)
- func (pb *Block) LabelNames() ([]string, error)
- func (pb *Block) MaxTime() int64
- func (pb *Block) Meta() BlockMeta
- func (pb *Block) MinTime() int64
- func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool
- func (pb *Block) Size() int64
- func (pb *Block) Snapshot(dir string) error
- func (pb *Block) String() string
- func (pb *Block) Tombstones() (tombstones.Reader, error)
- type BlockDesc
- type BlockMeta
- type BlockMetaCompaction
- type BlockReader
- type BlockStats
- type BlockWriter
- type BlocksToDeleteFunc
- type ChunkReader
- type ChunkSnapshotStats
- type ChunkWriter
- type CircularExemplarStorage
- func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error
- func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage
- func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error
- func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error)
- func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
- func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error)
- func (ce *CircularExemplarStorage) Resize(l int64) int
- func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)
- func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error
- type Compactor
- type DB
- func (db *DB) Appender(ctx context.Context) storage.Appender
- func (db *DB) ApplyConfig(conf *config.Config) error
- func (db *DB) Blocks() []*Block
- func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error)
- func (db *DB) CleanTombstones() (err error)
- func (db *DB) Close() error
- func (db *DB) Compact() (returnErr error)
- func (db *DB) CompactHead(head *RangeHead) error
- func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error
- func (db *DB) Dir() string
- func (db *DB) DisableCompactions()
- func (db *DB) EnableCompactions()
- func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)
- func (db *DB) Head() *Head
- func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error)
- func (db *DB) Snapshot(dir string, withHead bool) error
- func (db *DB) StartTime() (int64, error)
- func (db *DB) String() string
- type DBReadOnly
- func (db *DBReadOnly) Blocks() ([]BlockReader, error)
- func (db *DBReadOnly) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error)
- func (db *DBReadOnly) Close() error
- func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)
- func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)
- type DBStats
- type DeletedIterator
- type ExemplarMetrics
- type ExemplarStorage
- type Head
- func (h *Head) AppendableMinValidTime() (int64, bool)
- func (h *Head) Appender(_ context.Context) storage.Appender
- func (h *Head) ApplyConfig(cfg *config.Config) error
- func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)
- func (h *Head) Chunks() (ChunkReader, error)
- func (h *Head) Close() error
- func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error
- func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)
- func (h *Head) Index() (IndexReader, error)
- func (h *Head) Init(minValidTime int64) error
- func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)
- func (h *Head) MaxTime() int64
- func (h *Head) Meta() BlockMeta
- func (h *Head) MinTime() int64
- func (h *Head) NumSeries() uint64
- func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool
- func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats
- func (h *Head) SetMinValidTime(minValidTime int64)
- func (h *Head) Size() int64
- func (h *Head) Stats(statsByLabelName string) *Stats
- func (h *Head) String() string
- func (h *Head) Tombstones() (tombstones.Reader, error)
- func (h *Head) Truncate(mint int64) (err error)
- func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64)
- type HeadOptions
- type HeadStats
- type IndexReader
- type IndexWriter
- type LeveledCompactor
- type Options
- type Overlaps
- type RangeHead
- func (h *RangeHead) BlockMaxTime() int64
- func (h *RangeHead) Chunks() (ChunkReader, error)
- func (h *RangeHead) Index() (IndexReader, error)
- func (h *RangeHead) MaxTime() int64
- func (h *RangeHead) Meta() BlockMeta
- func (h *RangeHead) MinTime() int64
- func (h *RangeHead) NumSeries() uint64
- func (h *RangeHead) Size() int64
- func (h *RangeHead) String() string
- func (h *RangeHead) Tombstones() (tombstones.Reader, error)
- type SegmentWAL
- func (w *SegmentWAL) Close() error
- func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error
- func (w *SegmentWAL) LogSamples(samples []record.RefSample) error
- func (w *SegmentWAL) LogSeries(series []record.RefSeries) error
- func (w *SegmentWAL) Reader() WALReader
- func (w *SegmentWAL) Sync() error
- func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool) error
- type SeriesLifecycleCallback
- type Stats
- type TimeRange
- type WAL
- type WALEntryType
- type WALReader
- type WALReplayStatus
Examples ¶
Constants ¶
const ( // WALMagic is a 4 byte number every WAL segment file starts with. WALMagic = uint32(0x43AF00EF) // WALFormatDefault is the version flag for the default outer segment file format. WALFormatDefault = byte(1) )
const ( // Default duration of a block in milliseconds. DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond) )
const (
// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
DefaultStripeSize = 1 << 14
)
Variables ¶
var ( // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") // ErrInvalidExemplar is returned if an appended exemplar is not valid and can't // be ingested. ErrInvalidExemplar = errors.New("invalid exemplar") // ErrAppenderClosed is returned if an appender has already be successfully // rolled back or committed. ErrAppenderClosed = errors.New("appender closed") )
var ErrClosed = errors.New("db already closed")
ErrClosed is returned when the db is closed.
var ErrClosing = errors.New("block is closing")
ErrClosing is returned when a block is in the process of being closed.
var ErrInvalidTimes = fmt.Errorf("max time is lesser than min time")
var ErrNoSeriesAppended error = errors.New("no series appended, aborting")
ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.
var ErrNotReady = errors.New("TSDB not ready")
ErrNotReady is returned if the underlying storage is not ready yet.
Functions ¶
func BeyondSizeRetention ¶
BeyondSizeRetention returns those blocks which are beyond the size retention set in the db options.
func BeyondTimeRetention ¶
BeyondTimeRetention returns those blocks which are beyond the time retention set in the db options.
func CreateBlock ¶
func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)
CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk.
func DeleteChunkSnapshots ¶
DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
func ExponentialBlockRanges ¶
ExponentialBlockRanges returns the time ranges based on the stepSize.
func LastChunkSnapshot ¶
LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. If dir does not contain any chunk snapshots, ErrNotFound is returned.
func MigrateWAL ¶
MigrateWAL rewrites the deprecated write ahead log into the new format.
func NewBlockChunkQuerier ¶
func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)
NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.
func NewBlockQuerier ¶
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)
NewBlockQuerier returns a querier against the block reader and requested min and max time range.
func NewMergedStringIter ¶
func NewMergedStringIter(a, b index.StringIter) index.StringIter
NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.
func PostingsForMatchers ¶
PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.
Types ¶
type Block ¶
type Block struct {
// contains filtered or unexported fields
}
Block represents a directory of time series data covering a continuous time range.
func OpenBlock ¶
OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used to instantiate chunk structs.
func (*Block) Chunks ¶
func (pb *Block) Chunks() (ChunkReader, error)
Chunks returns a new ChunkReader against the block data.
func (*Block) CleanTombstones ¶
CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). If there was a rewrite, then it returns the ULID of the new block written, else nil. If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. It returns a boolean indicating if the parent block can be deleted safely of not.
func (*Block) Close ¶
Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (*Block) GetSymbolTableSize ¶
GetSymbolTableSize returns the Symbol Table Size in the index of this block.
func (*Block) Index ¶
func (pb *Block) Index() (IndexReader, error)
Index returns a new IndexReader against the block data.
func (*Block) LabelNames ¶
LabelNames returns all the unique label names present in the Block in sorted order.
func (*Block) OverlapsClosedInterval ¶
OverlapsClosedInterval returns true if the block overlaps [mint, maxt].
func (*Block) Tombstones ¶
func (pb *Block) Tombstones() (tombstones.Reader, error)
Tombstones returns a new TombstoneReader against the block data.
type BlockDesc ¶
type BlockDesc struct { ULID ulid.ULID `json:"ulid"` MinTime int64 `json:"minTime"` MaxTime int64 `json:"maxTime"` }
BlockDesc describes a block by ULID and time range.
type BlockMeta ¶
type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. ULID ulid.ULID `json:"ulid"` // MinTime and MaxTime specify the time range all samples // in the block are in. MinTime int64 `json:"minTime"` MaxTime int64 `json:"maxTime"` // Stats about the contents of the block. Stats BlockStats `json:"stats,omitempty"` // Information on compactions the block was created from. Compaction BlockMetaCompaction `json:"compaction"` // Version of the index format. Version int `json:"version"` }
BlockMeta provides meta information about a block.
type BlockMetaCompaction ¶
type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has // gone through. Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` // Indicates that during compaction it resulted in a block without any samples // so it should be deleted on the next reloadBlocks. Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` Failed bool `json:"failed,omitempty"` }
BlockMetaCompaction holds information about compactions a block went through.
type BlockReader ¶
type BlockReader interface { // Index returns an IndexReader over the block's data. Index() (IndexReader, error) // Chunks returns a ChunkReader over the block's data. Chunks() (ChunkReader, error) // Tombstones returns a tombstones.Reader over the block's deleted data. Tombstones() (tombstones.Reader, error) // Meta provides meta information about the block reader. Meta() BlockMeta // Size returns the number of bytes that the block takes up on disk. Size() int64 }
BlockReader provides reading access to a data block.
type BlockStats ¶
type BlockStats struct { NumSamples uint64 `json:"numSamples,omitempty"` NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` }
BlockStats contains stats about contents of a block.
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter is a block writer that allows appending and flushing series to disk.
func NewBlockWriter ¶
NewBlockWriter create a new block writer.
The returned writer accumulates all the series in the Head block until `Flush` is called.
Note that the writer will not check if the target directory exists or contains anything at all. It is the caller's responsibility to ensure that the resulting blocks do not overlap etc. Writer ensures the block flush is atomic (via rename).
func (*BlockWriter) Appender ¶
func (w *BlockWriter) Appender(ctx context.Context) storage.Appender
Appender returns a new appender on the database. Appender can't be called concurrently. However, the returned Appender can safely be used concurrently.
func (*BlockWriter) Close ¶
func (w *BlockWriter) Close() error
type BlocksToDeleteFunc ¶
func DefaultBlocksToDelete ¶
func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc
DefaultBlocksToDelete returns a filter which decides time based and size based retention from the options of the db.
type ChunkReader ¶
type ChunkReader interface { // Chunk returns the series data chunk with the given reference. Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) // Close releases all underlying resources of the reader. Close() error }
ChunkReader provides reading access of serialized time series data.
type ChunkSnapshotStats ¶
ChunkSnapshotStats returns stats about a created chunk snapshot.
type ChunkWriter ¶
type ChunkWriter interface { // WriteChunks writes several chunks. The Chunk field of the ChunkMetas // must be populated. // After returning successfully, the Ref fields in the ChunkMetas // are set and can be used to retrieve the chunks from the written data. WriteChunks(chunks ...chunks.Meta) error // Close writes any required finalization and closes the resources // associated with the underlying writer. Close() error }
ChunkWriter serializes a time block of chunked series data.
type CircularExemplarStorage ¶
type CircularExemplarStorage struct {
// contains filtered or unexported fields
}
func (*CircularExemplarStorage) AddExemplar ¶
func (*CircularExemplarStorage) Appender ¶
func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage
func (*CircularExemplarStorage) ApplyConfig ¶
func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error
func (*CircularExemplarStorage) ExemplarQuerier ¶
func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error)
func (*CircularExemplarStorage) IterateExemplars ¶
func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
IterateExemplars iterates through all the exemplars from oldest to newest appended and calls the given function on all of them till the end (or) till the first function call that returns an error.
func (*CircularExemplarStorage) Querier ¶
func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error)
func (*CircularExemplarStorage) Resize ¶
func (ce *CircularExemplarStorage) Resize(l int64) int
Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
func (*CircularExemplarStorage) Select ¶
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)
Select returns exemplars for a given set of label matchers.
func (*CircularExemplarStorage) ValidateExemplar ¶
type Compactor ¶
type Compactor interface { // Plan returns a set of directories that can be compacted concurrently. // The directories can be overlapping. // Results returned when compactions are in progress are undefined. Plan(dir string) ([]string, error) // Write persists a Block into a directory. // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. // When resulting Block has 0 samples // * No block is written. // * The source dirs are marked Deletable. // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) }
Compactor provides compaction against an underlying storage of time series data.
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB handles reads and writes of time series falling into a hashed partition of a seriedb.
func Open ¶
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error)
Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
func (*DB) ChunkQuerier ¶
ChunkQuerier returns a new chunk querier over the data partition for the given time range.
func (*DB) CleanTombstones ¶
CleanTombstones re-writes any blocks with tombstones.
func (*DB) Compact ¶
Compact data if possible. After successful compaction blocks are reloaded which will also delete the blocks that fall out of the retention window. Old blocks are only deleted on reloadBlocks based on the new block's parent information. See DB.reloadBlocks documentation for further information.
func (*DB) CompactHead ¶
CompactHead compacts the given RangeHead.
func (*DB) Delete ¶
Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
func (*DB) DisableCompactions ¶
func (db *DB) DisableCompactions()
DisableCompactions disables auto compactions.
func (*DB) EnableCompactions ¶
func (db *DB) EnableCompactions()
EnableCompactions enables auto compactions.
func (*DB) ExemplarQuerier ¶
func (*DB) Querier ¶
Querier returns a new querier over the data partition for the given time range.
func (*DB) Snapshot ¶
Snapshot writes the current data to the directory. If withHead is set to true it will create a new block containing all data that's currently in the memory buffer/WAL.
type DBReadOnly ¶
type DBReadOnly struct {
// contains filtered or unexported fields
}
DBReadOnly provides APIs for read only operations on a database. Current implementation doesn't support concurrency so all API calls should happen in the same go routine.
func OpenDBReadOnly ¶
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error)
OpenDBReadOnly opens DB in the given directory for read only operations.
func (*DBReadOnly) Blocks ¶
func (db *DBReadOnly) Blocks() ([]BlockReader, error)
Blocks returns a slice of block readers for persisted blocks.
func (*DBReadOnly) ChunkQuerier ¶
func (db *DBReadOnly) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error)
ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. Current implementation doesn't support multiple ChunkQueriers.
func (*DBReadOnly) FlushWAL ¶
func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)
FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory can race.
type DBStats ¶
type DBStats struct {
Head *HeadStats
}
DBStats contains statistics about the DB separated by component (eg. head). They are available before the DB has finished initializing.
func NewDBStats ¶
func NewDBStats() *DBStats
NewDBStats returns a new DBStats object initialized using the the new function from each component.
type DeletedIterator ¶
type DeletedIterator struct { // Iter is an Iterator to be wrapped. Iter chunkenc.Iterator // Intervals are the deletion intervals. Intervals tombstones.Intervals }
DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.
func (*DeletedIterator) At ¶
func (it *DeletedIterator) At() (int64, float64)
func (*DeletedIterator) Err ¶
func (it *DeletedIterator) Err() error
func (*DeletedIterator) Next ¶
func (it *DeletedIterator) Next() bool
func (*DeletedIterator) Seek ¶
func (it *DeletedIterator) Seek(t int64) bool
type ExemplarMetrics ¶
type ExemplarMetrics struct {
// contains filtered or unexported fields
}
func NewExemplarMetrics ¶
func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics
type ExemplarStorage ¶
type ExemplarStorage interface { storage.ExemplarQueryable AddExemplar(labels.Labels, exemplar.Exemplar) error ValidateExemplar(labels.Labels, exemplar.Exemplar) error IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error }
func NewCircularExemplarStorage ¶
func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage, error)
NewCircularExemplarStorage creates an circular in memory exemplar storage. If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in 1GB of extra memory, accounting for the fact that this is heap allocated space. If len <= 0, then the exemplar storage is essentially a noop storage but can later be resized to store exemplars.
type Head ¶
type Head struct {
// contains filtered or unexported fields
}
Head handles reads and writes of time series data within a time window.
func NewHead ¶
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error)
NewHead opens the head block in dir.
func (*Head) AppendableMinValidTime ¶
AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.
func (*Head) ChunkSnapshot ¶
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)
ChunkSnapshot creates a snapshot of all the series and tombstones in the head. It deletes the old chunk snapshots if the chunk snapshot creation is successful.
The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written using the WAL package. N is the last WAL segment present during snapshotting and M is the offset in segment N upto which data was written.
The snapshot first contains all series (each in individual records and not sorted), followed by tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they were written to the circular buffer.
func (*Head) Chunks ¶
func (h *Head) Chunks() (ChunkReader, error)
Chunks returns a ChunkReader against the block.
func (*Head) Close ¶
Close flushes the WAL and closes the head. It also takes a snapshot of in-memory chunks if enabled.
func (*Head) Delete ¶
Delete all samples in the range of [mint, maxt] for series that satisfy the given label matchers.
func (*Head) ExemplarQuerier ¶
func (*Head) Index ¶
func (h *Head) Index() (IndexReader, error)
Index returns an IndexReader against the block.
func (*Head) Init ¶
Init loads data from the write ahead log and prepares the head for writes. It should be called before using an appender so that it limits the ingested samples to the head min valid time.
func (*Head) IsQuerierCollidingWithTruncation ¶
func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)
IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier has to be created. In the latter case, the method also returns the new mint to be used for creating the new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
NOTE: The querier should already be taken before calling this.
func (*Head) Meta ¶
Meta returns meta information about the head. The head is dynamic so will return dynamic results.
func (*Head) OverlapsClosedInterval ¶
OverlapsClosedInterval returns true if the head overlaps [mint, maxt].
func (*Head) PostingsCardinalityStats ¶
func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats
PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
func (*Head) SetMinValidTime ¶
SetMinValidTime sets the minimum timestamp the head can ingest.
func (*Head) Stats ¶
Stats returns important current HEAD statistics. Note that it is expensive to calculate these.
func (*Head) String ¶
String returns an human readable representation of the TSDB head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.
func (*Head) Tombstones ¶
func (h *Head) Tombstones() (tombstones.Reader, error)
Tombstones returns a new reader over the head's tombstones
func (*Head) WaitForPendingReadersInTimeRange ¶
WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. The query timeout limits the max wait time of this function implicitly. The mint is inclusive and maxt is the truncation time hence exclusive.
type HeadOptions ¶
type HeadOptions struct { // Runtime reloadable option. At the top of the struct for 32 bit OS: // https://pkg.go.dev/sync/atomic#pkg-note-BUG MaxExemplars atomic.Int64 ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string ChunkPool chunkenc.Pool ChunkWriteBufferSize int ChunkWriteQueueSize int // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. StripeSize int SeriesCallback SeriesLifecycleCallback EnableExemplarStorage bool EnableMemorySnapshotOnShutdown bool IsolationDisabled bool }
HeadOptions are parameters for the Head block.
func DefaultHeadOptions ¶
func DefaultHeadOptions() *HeadOptions
type HeadStats ¶
type HeadStats struct {
WALReplayStatus *WALReplayStatus
}
HeadStats are the statistics for the head component of the DB.
type IndexReader ¶
type IndexReader interface { // Symbols return an iterator over sorted string symbols that may occur in // series' labels and indices. It is not safe to use the returned strings // beyond the lifetime of the index reader. Symbols() index.StringIter // SortedLabelValues returns sorted possible label values. SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) // LabelValues returns possible label values which may not be sorted. LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Input values must be sorted. Postings(name string, values ...string) (index.Postings, error) // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings // Series populates the given labels and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(matchers ...*labels.Matcher) ([]string, error) // LabelValueFor returns label value for the given label name in the series referred to by ID. // If the series couldn't be found or the series doesn't have the requested label a // storage.ErrNotFound is returned as error. LabelValueFor(id storage.SeriesRef, label string) (string, error) // LabelNamesFor returns all the label names for the series referred to by IDs. // The names returned are sorted. LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) // Close releases the underlying resources of the reader. Close() error }
IndexReader provides reading access of serialized index data.
type IndexWriter ¶
type IndexWriter interface { // AddSymbol registers a single symbol. // Symbols must be registered in sorted order. AddSymbol(sym string) error // AddSeries populates the index writer with a series and its offsets // of chunks that the index can reference. // Implementations may require series to be insert in strictly increasing order by // their labels. The reference numbers are used to resolve entries in postings lists // that are added later. AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error }
IndexWriter serializes the index for a block of series data. The methods must be called in the order they are specified in.
type LeveledCompactor ¶
type LeveledCompactor struct {
// contains filtered or unexported fields
}
LeveledCompactor implements the Compactor interface.
func NewLeveledCompactor ¶
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)
NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactorWithChunkSize ¶
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)
func (*LeveledCompactor) Compact ¶
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error)
Compact creates a new block in the compactor's directory from the blocks in the provided directories.
func (*LeveledCompactor) Plan ¶
func (c *LeveledCompactor) Plan(dir string) ([]string, error)
Plan returns a list of compactable blocks in the provided directory.
func (*LeveledCompactor) Write ¶
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
type Options ¶
type Options struct { // Segments (wal files) max size. // WALSegmentSize = 0, segment size is default size. // WALSegmentSize > 0, segment size is WALSegmentSize. // WALSegmentSize < 0, wal is disabled. WALSegmentSize int // MaxBlockChunkSegmentSize is the max size of block chunk segment files. // MaxBlockChunkSegmentSize = 0, chunk segment size is default size. // MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize. MaxBlockChunkSegmentSize int64 // Duration of persisted data to keep. // Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration. // Typically it is in milliseconds. RetentionDuration int64 // Maximum number of bytes in blocks to be retained. // 0 or less means disabled. // NOTE: For proper storage calculations need to consider // the size of the WAL folder which is not added when calculating // the current size of the database. MaxBytes int64 // NoLockfile disables creation and consideration of a lock file. NoLockfile bool // Overlapping blocks are allowed if AllowOverlappingBlocks is true. // This in-turn enables vertical compaction and vertical query merge. AllowOverlappingBlocks bool // WALCompression will turn on Snappy compression for records on the WAL. WALCompression bool // StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int // The timestamp range of head blocks after which they get persisted. // It's the minimum duration of any persisted block. // Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration. // Typically it is in milliseconds. MinBlockDuration int64 // The maximum timestamp range of compacted blocks. // Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration. // Typically it is in milliseconds. MaxBlockDuration int64 // HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper. HeadChunksWriteBufferSize int // HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper. HeadChunksWriteQueueSize int // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback // BlocksToDelete is a function which returns the blocks which can be deleted. // It is always the default time and size based retention in Prometheus and // mainly meant for external users who import TSDB. BlocksToDelete BlocksToDeleteFunc // Enables the in memory exemplar storage. EnableExemplarStorage bool // Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster. EnableMemorySnapshotOnShutdown bool // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. MaxExemplars int64 // Disables isolation between reads and in-flight appends. IsolationDisabled bool }
Options of the DB storage.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions used for the DB. They are sane for setups using millisecond precision timestamps.
type Overlaps ¶
Overlaps contains overlapping blocks aggregated by overlapping range.
func OverlappingBlocks ¶
OverlappingBlocks returns all overlapping blocks from given meta files.
type RangeHead ¶
type RangeHead struct {
// contains filtered or unexported fields
}
RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader but only within a restricted range. Used for queries and compactions.
func NewRangeHead ¶
NewRangeHead returns a *RangeHead. There are no restrictions on mint/maxt.
func (*RangeHead) BlockMaxTime ¶
BlockMaxTime returns the max time of the potential block created from this head. It's different to MaxTime as we need to add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
func (*RangeHead) Chunks ¶
func (h *RangeHead) Chunks() (ChunkReader, error)
func (*RangeHead) Index ¶
func (h *RangeHead) Index() (IndexReader, error)
func (*RangeHead) MaxTime ¶
MaxTime returns the max time of actual data fetch-able from the head. This controls the chunks time range which is closed [b.MinTime, b.MaxTime].
func (*RangeHead) String ¶
String returns an human readable representation of the range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.
func (*RangeHead) Tombstones ¶
func (h *RangeHead) Tombstones() (tombstones.Reader, error)
type SegmentWAL ¶
type SegmentWAL struct {
// contains filtered or unexported fields
}
SegmentWAL is a write ahead log for series data.
DEPRECATED: use wal pkg combined with the record coders instead.
func OpenSegmentWAL ¶
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error)
OpenSegmentWAL opens or creates a write ahead log in the given directory. The WAL must be read completely before new data is written.
func (*SegmentWAL) Close ¶
func (w *SegmentWAL) Close() error
Close syncs all data and closes the underlying resources.
func (*SegmentWAL) LogDeletes ¶
func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error
LogDeletes write a batch of new deletes to the log.
func (*SegmentWAL) LogSamples ¶
func (w *SegmentWAL) LogSamples(samples []record.RefSample) error
LogSamples writes a batch of new samples to the log.
func (*SegmentWAL) LogSeries ¶
func (w *SegmentWAL) LogSeries(series []record.RefSeries) error
LogSeries writes a batch of new series labels to the log. The series have to be ordered.
func (*SegmentWAL) Reader ¶
func (w *SegmentWAL) Reader() WALReader
Reader returns a new reader over the write ahead log data. It must be completely consumed before writing to the WAL.
func (*SegmentWAL) Truncate ¶
func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool) error
Truncate deletes the values prior to mint and the series which the keep function does not indicate to preserve.
type SeriesLifecycleCallback ¶
type SeriesLifecycleCallback interface { // PreCreation is called before creating a series to indicate if the series can be created. // A non nil error means the series should not be created. PreCreation(labels.Labels) error // PostCreation is called after creating a series to indicate a creation of series. PostCreation(labels.Labels) // PostDeletion is called after deletion of series. PostDeletion(...labels.Labels) }
SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. It is always a no-op in Prometheus and mainly meant for external users who import TSDB. All the callbacks should be safe to be called concurrently. It is up to the user to implement soft or hard consistency by making the callbacks atomic or non-atomic. Atomic callbacks can cause degradation performance.
type Stats ¶
type Stats struct { NumSeries uint64 MinTime, MaxTime int64 IndexPostingStats *index.PostingsStats }
type TimeRange ¶
type TimeRange struct {
Min, Max int64
}
TimeRange specifies minTime and maxTime range.
type WAL ¶
type WAL interface { Reader() WALReader LogSeries([]record.RefSeries) error LogSamples([]record.RefSample) error LogDeletes([]tombstones.Stone) error Truncate(mint int64, keep func(uint64) bool) error Close() error }
WAL is a write ahead log that can log new series labels and samples. It must be completely read before new entries are logged.
DEPRECATED: use wal pkg combined with the record codex instead.
type WALEntryType ¶
type WALEntryType uint8
WALEntryType indicates what data a WAL entry contains.
const ( WALEntrySymbols WALEntryType = 1 WALEntrySeries WALEntryType = 2 WALEntrySamples WALEntryType = 3 WALEntryDeletes WALEntryType = 4 )
Entry types in a segment file.
type WALReader ¶
type WALReader interface { Read( seriesf func([]record.RefSeries), samplesf func([]record.RefSample), deletesf func([]tombstones.Stone), ) error }
WALReader reads entries from a WAL.
type WALReplayStatus ¶
WALReplayStatus contains status information about the WAL replay.
func (*WALReplayStatus) GetWALReplayStatus ¶
func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus
GetWALReplayStatus returns the WAL replay status information.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
|
Package fileutil provides utility methods used when dealing with the filesystem in tsdb. |
Package goversion enforces the go version supported by the tsdb module.
|
Package goversion enforces the go version supported by the tsdb module. |
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
|
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot. |