tsdb

package
v0.0.0-...-e951c9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2022 License: AGPL-3.0 Imports: 54 Imported by: 0

Documentation

Overview

Copyright 2021 The Prometheus Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const TenantLabel = "__loki_tenant__"

TenantLabel is part of the reserved label namespace (__ prefix) It's used to create multi-tenant TSDBs (which do not have a tenancy concept) These labels are stripped out during compaction to single-tenant TSDBs

Variables

View Source
var (
	ChunkMetasPool = &index.ChunkMetasPool // re-exporting
	SeriesPool     PoolSeries
	ChunkRefsPool  PoolChunkRefs
)
View Source
var NewStore = func() newStoreFactoryFunc {
	return func(
		indexShipperCfg indexshipper.Config,
		p config.PeriodConfig,
		f *fetcher.Fetcher,
		objectClient client.ObjectClient,
		limits downloads.Limits,
		tableRanges config.TableRanges,
		backupIndexWriter index.Writer,
		reg prometheus.Registerer,
	) (
		index.ReaderWriter,
		func(),
		error,
	) {
		if storeInstance == nil {
			if backupIndexWriter == nil {
				backupIndexWriter = noopBackupIndexWriter{}
			}
			storeInstance = &store{
				backupIndexWriter: backupIndexWriter,
			}
			err := storeInstance.init(indexShipperCfg, objectClient, limits, tableRanges, reg)
			if err != nil {
				return nil, nil, err
			}
		}

		return storeInstance, storeInstance.Stop, nil
	}
}()

NewStore creates a new store if not initialized already. Each call to NewStore will always build a new stores.ChunkWriter even if the store was already initialized since fetcher.Fetcher instances could be different due to periodic configs having different types of object storage configured for storing chunks. It also helps us make tsdb store a singleton because we do not need to build store for each schema config since we do not do any schema specific handling yet. If we do need to do schema specific handling, it would be a good idea to abstract away the handling since running multiple head managers would be complicated and wasteful.

Functions

func NewIndexCompactor

func NewIndexCompactor() compactor.IndexCompactor

func NewTSDBIndexFromFile

func NewTSDBIndexFromFile(location string) (*TSDBIndex, GetRawFileReaderFunc, error)

Return the index as well as the underlying raw file reader which isn't exposed as an index method but is helpful for building an io.reader for the index shipper

func OpenShippableTSDB

func OpenShippableTSDB(p string) (index_shipper.Index, error)

func Overlap

func Overlap(a, b Bounded) bool

func PostingsForMatchers

func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

func ResetStoreInstance

func ResetStoreInstance()

This must only be called in test cases where a new store instances cannot be explicitly created.

Types

type Bounded

type Bounded interface {
	Bounds() (model.Time, model.Time)
}

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder is a helper used to create tsdb indices. It can accept streams in any order and will create the tsdb index appropriately via `Build()` It can even receive multiple writes for the same stream with the caveat that chunks must be added in order and not duplicated

func NewBuilder

func NewBuilder() *Builder

func (*Builder) AddSeries

func (b *Builder) AddSeries(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta)

func (*Builder) Build

func (b *Builder) Build(
	ctx context.Context,
	scratchDir string,

	createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, err error)

func (*Builder) DropChunk

func (b *Builder) DropChunk(streamID string, chk index.ChunkMeta) (bool, error)

func (*Builder) FinalizeChunks

func (b *Builder) FinalizeChunks()

func (*Builder) InsertChunk

func (b *Builder) InsertChunk(streamID string, chk index.ChunkMeta) error

type ChunkMetasRecord

type ChunkMetasRecord struct {
	Chks index.ChunkMetas
	Ref  uint64
}

type ChunkRef

type ChunkRef struct {
	User        string
	Fingerprint model.Fingerprint
	Start, End  model.Time
	Checksum    uint32
}

func (ChunkRef) Less

func (r ChunkRef) Less(x ChunkRef) bool

Compares by (Start, End) Assumes User is equivalent

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor(tenant, parentDir string) *Compactor

func (*Compactor) Compact

func (c *Compactor) Compact(ctx context.Context, indices ...*TSDBIndex) (res Identifier, err error)

type GetRawFileReaderFunc

type GetRawFileReaderFunc func() (io.ReadSeeker, error)

GetRawFileReaderFunc returns an io.ReadSeeker for reading raw tsdb file from disk

type Head struct {
	// contains filtered or unexported fields
}

func NewHead

func NewHead(tenant string, metrics *Metrics, logger log.Logger) *Head

func (*Head) Append

func (h *Head) Append(ls labels.Labels, chks index.ChunkMetas) (created bool, refID uint64)

Note: chks must not be nil or zero-length

func (*Head) Index

func (h *Head) Index() IndexReader

Index returns an IndexReader against the block.

func (*Head) MaxTime

func (h *Head) MaxTime() int64

MaxTime returns the highest timestamp seen in data of the head.

func (*Head) MinTime

func (h *Head) MinTime() int64

MinTime returns the lowest time bound on visible data in the head.

type HeadManager

type HeadManager struct {
	Index
	// contains filtered or unexported fields
}

func NewHeadManager

func NewHeadManager(logger log.Logger, dir string, metrics *Metrics, tsdbManager TSDBManager) *HeadManager

func (*HeadManager) Append

func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMetas) error

func (*HeadManager) Rotate

func (m *HeadManager) Rotate(t time.Time) (err error)

func (*HeadManager) Start

func (m *HeadManager) Start() error

func (*HeadManager) Stop

func (m *HeadManager) Stop() error

type Identifier

type Identifier interface {
	Name() string
	Path() string
}

Identifier can resolve an index to a name (in object storage) and a path (on disk)

type Index

type Index interface {
	Bounded
	SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
	Close() error
	// GetChunkRefs accepts an optional []ChunkRef argument.
	// If not nil, it will use that slice to build the result,
	// allowing us to avoid unnecessary allocations at the caller's discretion.
	// If nil, the underlying index implementation is required
	// to build the resulting slice nonetheless (it should not panic),
	// ideally by requesting a slice from the pool.
	// Shard is also optional. If not nil, TSDB will limit the result to
	// the requested shard. If it is nil, TSDB will return all results,
	// regardless of shard.
	// Note: any shard used must be a valid factor of two, meaning `0_of_2` and `3_of_4` are fine, but `0_of_3` is not.
	GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)
	// Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.
	Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)
	LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)
	LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)
	Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error
}

type IndexClient

type IndexClient struct {
	// contains filtered or unexported fields
}

implements stores.Index

func NewIndexClient

func NewIndexClient(idx Index, opts IndexClientOptions) *IndexClient

func (*IndexClient) GetChunkRefs

func (c *IndexClient) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error)

TODO(owen-d): synchronize logproto.ChunkRef and tsdb.ChunkRef so we don't have to convert. They share almost the same fields, so we can add the missing `KB` field to the proto and then use that within the tsdb package.

func (*IndexClient) GetSeries

func (c *IndexClient) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)

func (*IndexClient) LabelNamesForMetricName

func (c *IndexClient) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, _ string) ([]string, error)

tsdb no longer uses the __metric_name__="logs" hack, so we can ignore metric names!

func (*IndexClient) LabelValuesForMetricName

func (c *IndexClient) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, _ string, labelName string, matchers ...*labels.Matcher) ([]string, error)

tsdb no longer uses the __metric_name__="logs" hack, so we can ignore metric names!

func (*IndexClient) SetChunkFilterer

func (c *IndexClient) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

SetChunkFilterer sets a chunk filter to be used when retrieving chunks. This is only used for GetSeries implementation. Todo we might want to pass it as a parameter to GetSeries instead.

func (*IndexClient) Stats

func (c *IndexClient) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error)

type IndexClientOptions

type IndexClientOptions struct {
	// Whether using bloom filters in the Stats() method
	// should be skipped. This helps probabilistically detect
	// duplicates when chunks are written to multiple
	// index buckets, which is of use in the (index-gateway|querier)
	// but not worth the memory costs in the ingesters.
	UseBloomFilters bool
}

func DefaultIndexClientOptions

func DefaultIndexClientOptions() IndexClientOptions

type IndexReader

type IndexReader interface {
	// Bounds returns the earliest and latest samples in the index
	Bounds() (int64, int64)

	Checksum() uint32

	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections. Input values must be sorted.
	Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error)

	// Series populates the given labels and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by IDs.
	// The names returned are sorted.
	LabelNamesFor(ids ...storage.SeriesRef) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

type IndexStatsAccumulator

type IndexStatsAccumulator interface {
	AddStream(fp model.Fingerprint)
	AddChunk(fp model.Fingerprint, chk index.ChunkMeta)
	Stats() stats.Stats
}

type IndexWriter

type IndexWriter interface {
	Append(userID string, ls labels.Labels, chks tsdb_index.ChunkMetas) error
}

type LazyIndex

type LazyIndex func() (Index, error)

Index adapter for a function which returns an index when queried.

func (LazyIndex) Bounds

func (f LazyIndex) Bounds() (model.Time, model.Time)

func (LazyIndex) Close

func (f LazyIndex) Close() error

func (LazyIndex) GetChunkRefs

func (f LazyIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)

func (LazyIndex) LabelNames

func (f LazyIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)

func (LazyIndex) LabelValues

func (f LazyIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)

func (LazyIndex) Series

func (f LazyIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)

func (LazyIndex) SetChunkFilterer

func (f LazyIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (LazyIndex) Stats

func (f LazyIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

TODO(owen-d)

func NewMetrics

func NewMetrics(r prometheus.Registerer) *Metrics

type MultiIndex

type MultiIndex struct {
	// contains filtered or unexported fields
}

func NewMultiIndex

func NewMultiIndex(indices ...Index) (*MultiIndex, error)

func (*MultiIndex) Bounds

func (i *MultiIndex) Bounds() (model.Time, model.Time)

func (*MultiIndex) Close

func (i *MultiIndex) Close() error

func (*MultiIndex) GetChunkRefs

func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)

func (*MultiIndex) LabelNames

func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)

func (*MultiIndex) LabelValues

func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)

func (*MultiIndex) Series

func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)

func (*MultiIndex) SetChunkFilterer

func (i *MultiIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (*MultiIndex) Stats

func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error

type MultiTenantIndex

type MultiTenantIndex struct {
	// contains filtered or unexported fields
}

MultiTenantIndex will inject a tenant label to it's queries This works with pre-compacted TSDBs which aren't yet per tenant.

func NewMultiTenantIndex

func NewMultiTenantIndex(idx Index) *MultiTenantIndex

func (*MultiTenantIndex) Bounds

func (m *MultiTenantIndex) Bounds() (model.Time, model.Time)

func (*MultiTenantIndex) Close

func (m *MultiTenantIndex) Close() error

func (*MultiTenantIndex) GetChunkRefs

func (m *MultiTenantIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)

func (*MultiTenantIndex) LabelNames

func (m *MultiTenantIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)

func (*MultiTenantIndex) LabelValues

func (m *MultiTenantIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)

func (*MultiTenantIndex) Series

func (m *MultiTenantIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)

func (*MultiTenantIndex) SetChunkFilterer

func (m *MultiTenantIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (*MultiTenantIndex) Stats

func (m *MultiTenantIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error

type MultitenantTSDBIdentifier

type MultitenantTSDBIdentifier struct {
	// contains filtered or unexported fields
}

func (MultitenantTSDBIdentifier) Name

func (MultitenantTSDBIdentifier) Path

type NoopIndex

type NoopIndex struct{}

func (NoopIndex) Bounds

func (NoopIndex) Bounds() (from, through model.Time)

func (NoopIndex) Close

func (NoopIndex) Close() error

func (NoopIndex) GetChunkRefs

func (NoopIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)

func (NoopIndex) LabelNames

func (NoopIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)

func (NoopIndex) LabelValues

func (NoopIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)

func (NoopIndex) Series

func (NoopIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)

Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.

func (NoopIndex) SetChunkFilterer

func (NoopIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (NoopIndex) Stats

func (NoopIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error

type PoolChunkRefs

type PoolChunkRefs struct {
	// contains filtered or unexported fields
}

func (*PoolChunkRefs) Get

func (p *PoolChunkRefs) Get() []ChunkRef

func (*PoolChunkRefs) Put

func (p *PoolChunkRefs) Put(xs []ChunkRef)

type PoolSeries

type PoolSeries struct {
	// contains filtered or unexported fields
}

func (*PoolSeries) Get

func (p *PoolSeries) Get() []Series

func (*PoolSeries) Put

func (p *PoolSeries) Put(xs []Series)

type RecordType

type RecordType byte
const (
	// FirstWrite is a special record type written once
	// at the beginning of every WAL. It records the system time
	// when the WAL was created. This is used to determine when to rotate
	// WALs and persists across restarts.
	WalRecordSeries RecordType = iota
	WalRecordChunks
)

By prefixing records with versions, we can easily update our wal schema

type Series

type Series struct {
	Labels      labels.Labels
	Fingerprint model.Fingerprint
}

type SingleTenantTSDBIdentifier

type SingleTenantTSDBIdentifier struct {
	TS            time.Time
	From, Through model.Time
	Checksum      uint32
}

Identifier has all the information needed to resolve a TSDB index Notably this abstracts away OS path separators, etc.

func (SingleTenantTSDBIdentifier) Name

func (SingleTenantTSDBIdentifier) Path

type TSDBFile

type TSDBFile struct {
	// reuse Identifier for resolving locations
	Identifier

	// reuse TSDBIndex for reading
	Index
	// contains filtered or unexported fields
}

nolint TSDBFile is backed by an actual file and implements the indexshipper/index.Index interface

func NewShippableTSDBFile

func NewShippableTSDBFile(id Identifier) (*TSDBFile, error)

func (*TSDBFile) Close

func (f *TSDBFile) Close() error

func (*TSDBFile) Reader

func (f *TSDBFile) Reader() (io.ReadSeeker, error)

type TSDBIndex

type TSDBIndex struct {
	// contains filtered or unexported fields
}

nolint TSDBIndex is backed by an IndexReader and translates the IndexReader to an Index implementation It loads the file into memory and doesn't keep a file descriptor open

func NewTSDBIndex

func NewTSDBIndex(reader IndexReader) *TSDBIndex

func (*TSDBIndex) Bounds

func (i *TSDBIndex) Bounds() (model.Time, model.Time)

func (*TSDBIndex) Checksum

func (i *TSDBIndex) Checksum() uint32

func (*TSDBIndex) Close

func (i *TSDBIndex) Close() error

func (*TSDBIndex) GetChunkRefs

func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)

func (*TSDBIndex) Identifier

func (*TSDBIndex) LabelNames

func (i *TSDBIndex) LabelNames(_ context.Context, _ string, _, _ model.Time, matchers ...*labels.Matcher) ([]string, error)

func (*TSDBIndex) LabelValues

func (i *TSDBIndex) LabelValues(_ context.Context, _ string, _, _ model.Time, name string, matchers ...*labels.Matcher) ([]string, error)

func (*TSDBIndex) Series

func (i *TSDBIndex) Series(ctx context.Context, _ string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)

func (*TSDBIndex) SetChunkFilterer

func (i *TSDBIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (*TSDBIndex) Stats

func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error

type TSDBManager

type TSDBManager interface {
	Start() error
	// Builds a new TSDB file from a set of WALs
	BuildFromWALs(time.Time, []WALIdentifier) error
	// Builds a new TSDB file from tenantHeads
	BuildFromHead(*tenantHeads) error
}

nolint:revive TSDBManager wraps the index shipper and writes/manages TSDB files on disk

func NewTSDBManager

func NewTSDBManager(
	nodeName,
	dir string,
	shipper indexshipper.IndexShipper,
	tableRanges config.TableRanges,
	logger log.Logger,
	metrics *Metrics,
) TSDBManager

type WAL

type WAL interface {
	Start(time.Time) error
	Log(*WALRecord) error
	Stop() error
}

type WALIdentifier

type WALIdentifier struct {
	// contains filtered or unexported fields
}

type WALRecord

type WALRecord struct {
	UserID string
	Series record.RefSeries
	Chks   ChunkMetasRecord
}

type WalGroup

type WalGroup struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL