Back to

Package storage

v2.0.0 (beta.5)
Latest Go to latest
Published: Feb 13, 2020 | License: MIT | Module:


Package Files


const (
	DefaultRetentionInterval       = time.Hour
	DefaultSeriesFileDirectoryName = "_series"
	DefaultIndexDirectoryName      = "index"
	DefaultWALDirectoryName        = "wal"
	DefaultEngineDirectoryName     = "data"

Default configuration values.


var ErrEngineClosed = errors.New("engine is closed")

ErrEngineClosed is returned when a caller attempts to use the engine while it's closed.

var ErrServiceClosed = errors.New("service is currently closed")

ErrServiceClosed is returned when the service is unavailable.

func RetentionPrometheusCollectors

func RetentionPrometheusCollectors() []prometheus.Collector

RetentionPrometheusCollectors returns all prometheus metrics for retention.

type BucketDeleter

type BucketDeleter interface {
	DeleteBucket(context.Context, platform.ID, platform.ID) error

BucketDeleter defines the behaviour of deleting a bucket.

type BucketFinder

type BucketFinder interface {
	FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)

A BucketFinder is responsible for providing access to buckets via a filter.

type BucketService

type BucketService struct {
	// contains filtered or unexported fields

BucketService wraps an existing platform.BucketService implementation.

BucketService ensures that when a bucket is deleted, all stored data associated with the bucket is either removed, or marked to be removed via a future compaction.

func NewBucketService

func NewBucketService(s platform.BucketService, engine BucketDeleter) *BucketService

NewBucketService returns a new BucketService for the provided BucketDeleter, which typically will be an Engine.

func (*BucketService) CreateBucket

func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) error

CreateBucket creates a new bucket and sets b.ID with the new identifier.

func (*BucketService) DeleteBucket

func (s *BucketService) DeleteBucket(ctx context.Context, bucketID platform.ID) error

DeleteBucket removes a bucket by ID.

func (*BucketService) FindBucket

func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error)

FindBucket returns the first bucket that matches filter.

func (*BucketService) FindBucketByID

func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error)

FindBucketByID returns a single bucket by ID.

func (*BucketService) FindBucketByName

func (s *BucketService) FindBucketByName(ctx context.Context, orgID platform.ID, name string) (*platform.Bucket, error)

FindBucketByName returns a single bucket by name.

func (*BucketService) FindBuckets

func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error)

FindBuckets returns a list of buckets that match filter and the total count of matching buckets. Additional options provide pagination & sorting.

func (*BucketService) UpdateBucket

func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error)

UpdateBucket updates a single bucket with changeset. Returns the new bucket state after update.

type BufferedPointsWriter

type BufferedPointsWriter struct {
	// contains filtered or unexported fields

func NewBufferedPointsWriter

func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter

func (*BufferedPointsWriter) Available

func (b *BufferedPointsWriter) Available() int

Available returns how many models.Points are unused in the buffer.

func (*BufferedPointsWriter) Buffered

func (b *BufferedPointsWriter) Buffered() int

Buffered returns the number of models.Points that have been written into the current buffer.

func (*BufferedPointsWriter) Flush

func (b *BufferedPointsWriter) Flush(ctx context.Context) error

Flush writes any buffered data to the underlying PointsWriter.

func (*BufferedPointsWriter) WritePoints

func (b *BufferedPointsWriter) WritePoints(ctx context.Context, p []models.Point) error

WritePoints writes the points to the underlying PointsWriter.

type Config

type Config struct {
	// Frequency of retention in seconds.
	RetentionInterval toml.Duration `toml:"retention-interval"`

	// Series file config.
	SeriesFilePath string `toml:"series-file-path"` // Overrides the default path.

	// TSDB config.
	TSDB tsdb.Config `toml:"tsdb"`

	// WAL config.
	WAL     tsm1.WALConfig `toml:"wal"`
	WALPath string         `toml:"wal-path"` // Overrides the default path.

	// Engine config.
	Engine     tsm1.Config `toml:"engine"`
	EnginePath string      `toml:"engine-path"` // Overrides the default path.

	// Index config.
	Index     tsi1.Config `toml:"index"`
	IndexPath string      `toml:"index-path"` // Overrides the default path.

Config holds the configuration for an Engine.

func NewConfig

func NewConfig() Config

NewConfig initialises a new config for an Engine.

func (Config) GetEnginePath

func (c Config) GetEnginePath(base string) string

GetEnginePath returns the path to the engine.

func (Config) GetIndexPath

func (c Config) GetIndexPath(base string) string

GetIndexPath returns the path to the index.

func (Config) GetSeriesFilePath

func (c Config) GetSeriesFilePath(base string) string

GetSeriesFilePath returns the path to the series file.

func (Config) GetWALPath

func (c Config) GetWALPath(base string) string

GetWALPath returns the path to the WAL.

type Deleter

type Deleter interface {
	DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error

A Deleter implementation is capable of deleting data from a storage engine.

type Engine

type Engine struct {
	// contains filtered or unexported fields

func NewEngine

func NewEngine(path string, c Config, options ...Option) *Engine

NewEngine initialises a new storage engine, including a series file, index and TSM engine.

func (*Engine) AcquireSegments

func (e *Engine) AcquireSegments(ctx context.Context, fn func(segs []string) error) error

AcquireSegments closes the current WAL segment, gets the set of all the currently closed segments, and calls the callback. It does all of this under the lock on the engine.

func (*Engine) ApplyFnToSeriesIDSet

func (e *Engine) ApplyFnToSeriesIDSet(fn func(*tsdb.SeriesIDSet))

ApplyFnToSeriesIDSet allows the caller to apply fn to the SeriesIDSet held within the engine's index.

func (*Engine) Close

func (e *Engine) Close() error

Close closes the store and all underlying resources. It returns an error if any of the underlying systems fail to close.

func (*Engine) CommitSegments

func (e *Engine) CommitSegments(ctx context.Context, segs []string, fn func() error) error

CommitSegments calls the callback and if that does not return an error, removes the segment files from the WAL. It does all of this under the lock on the engine.

func (*Engine) CreateBackup

func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error)

CreateBackup creates a "snapshot" of all TSM data in the Engine.

1) Snapshot the cache to ensure the backup includes all data written before now.
2) Create hard links to all TSM files, in a new directory within the engine root directory.
3) Return a unique backup ID (invalid after the process terminates) and list of files.

func (*Engine) CreateCursorIterator

func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator, error)

CreateCursorIterator creates a CursorIterator for usage with the read service.

func (*Engine) CreateSeriesCursor

func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error)

CreateSeriesCursor creates a SeriesCursor for usage with the read service.

func (*Engine) DeleteBucket

func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID platform.ID) error

DeleteBucket deletes an entire bucket from the storage engine.

func (*Engine) DeleteBucketRange

func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID platform.ID, min, max int64) error

DeleteBucketRange deletes an entire bucket from the storage engine.

func (*Engine) DeleteBucketRangePredicate

func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred platform.Predicate) error

DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data deleted must be in [min, max], and the key must match the predicate if provided.

func (*Engine) FetchBackupFile

func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error

FetchBackupFile writes a given backup file to the provided writer. After a successful write, the internal copy is removed.

func (*Engine) InternalBackupPath

func (e *Engine) InternalBackupPath(backupID int) string

InternalBackupPath provides the internal, full path directory name of the backup. This should not be exposed via API.

func (*Engine) MeasurementCardinalityStats

func (e *Engine) MeasurementCardinalityStats() (tsi1.MeasurementCardinalityStats, error)

MeasurementCardinalityStats returns cardinality stats for all measurements.

func (*Engine) MeasurementStats

func (e *Engine) MeasurementStats() (tsm1.MeasurementStats, error)

MeasurementStats returns the current measurement stats for the engine.

func (*Engine) Open

func (e *Engine) Open(ctx context.Context) (err error)

Open opens the store and all underlying resources. It returns an error if any of the underlying systems fail to open.

func (*Engine) Path

func (e *Engine) Path() string

Path returns the path of the engine's base directory.

func (*Engine) PrometheusCollectors

func (e *Engine) PrometheusCollectors() []prometheus.Collector

PrometheusCollectors returns all the prometheus collectors associated with the engine and its components.

func (*Engine) SeriesCardinality

func (e *Engine) SeriesCardinality() int64

SeriesCardinality returns the number of series in the engine.

func (*Engine) TagKeys

func (e *Engine) TagKeys(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

TagKeys returns an iterator where the values are tag keys for the bucket matching the predicate within the time range (start, end].

TagKeys will always return a StringIterator if there is no error.

func (*Engine) TagValues

func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

TagValues returns an iterator which enumerates the values for the specific tagKey in the given bucket matching the predicate within the time range (start, end].

TagValues will always return a StringIterator if there is no error.

func (*Engine) WithLogger

func (e *Engine) WithLogger(log *zap.Logger)

WithLogger sets the logger on the Store. It must be called before Open.

func (*Engine) WritePoints

func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error

WritePoints writes the provided points to the engine.

The Engine expects all points to have been correctly validated by the caller. However, WritePoints will determine if any tag key-pairs are missing, or if there are any field type conflicts.

Appropriate errors are returned in those cases.

type Option

type Option func(*Engine)

Option provides a set

func WithCompactionLimiter

func WithCompactionLimiter(limiter limiter.Fixed) Option

WithCompactionLimiter allows the caller to set the limiter that a storage engine uses. A typical use-case for this would be if multiple engines should share the same limiter.

func WithCompactionPlanner

func WithCompactionPlanner(planner tsm1.CompactionPlanner) Option

WithCompactionPlanner makes the engine have the provided compaction planner.

func WithCompactionSemaphore

func WithCompactionSemaphore(s influxdb.Semaphore) Option

WithCompactionSemaphore sets the semaphore used to coordinate full compactions across multiple storage engines.

func WithCurrentGenerationFunc

func WithCurrentGenerationFunc(fn func() int) Option

WithCurrentGenerationFunc sets a function for obtaining the current generation.

func WithEngineID

func WithEngineID(id int) Option

WithEngineID sets an engine id, which can be useful for logging when multiple engines are in use.

func WithFileStoreObserver

func WithFileStoreObserver(obs tsm1.FileStoreObserver) Option

WithFileStoreObserver makes the engine have the provided file store observer.

func WithNodeID

func WithNodeID(id int) Option

WithNodeID sets a node id on the engine, which can be useful for logging when a system has engines running on multiple nodes.

func WithRetentionEnforcer

func WithRetentionEnforcer(finder BucketFinder) Option

WithRetentionEnforcer initialises a retention enforcer on the engine. WithRetentionEnforcer must be called after other options to ensure that all metrics are labelled correctly.

func WithRetentionEnforcerLimiter

func WithRetentionEnforcerLimiter(f runnable) Option

WithRetentionEnforcerLimiter sets a limiter used to control when the retention enforcer can proceed. If this option is not used then the default limiter (or the absence of one) is a no-op, and no limitations will be put on running the retention enforcer.

func WithTSMFilenameFormatter

func WithTSMFilenameFormatter(fn tsm1.FormatFileNameFunc) Option

WithTSMFilenameFormatter sets a function on the underlying tsm1.Engine to specify how TSM files are named.

type PointsWriter

type PointsWriter interface {
	WritePoints(context.Context, []models.Point) error

PointsWriter describes the ability to write points into a storage engine.

type SeriesCursor

type SeriesCursor interface {
	Close() error
	Next() (*SeriesCursorRow, error)

type SeriesCursorRequest

type SeriesCursorRequest struct {
	// Name contains the tsdb encoded org and bucket ID
	Name [influxdb.IDLength]byte

type SeriesCursorRow

type SeriesCursorRow struct {
	Name []byte
	Tags models.Tags

type Snapshotter

type Snapshotter interface {
	WriteSnapshot(ctx context.Context, status tsm1.CacheStatus) error

A Snapshotter implementation can take snapshots of the entire engine.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier