Version: v1.11.2 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2023 License: MIT Imports: 23 Imported by: 10



Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.



View Source
const (
	// DefaultWriteTimeout is the default timeout for a complete write to succeed.
	DefaultWriteTimeout = 10 * time.Second

	// DefaultMaxConcurrentQueries is the maximum number of running queries.
	// A value of zero will make the maximum query limit unlimited.
	DefaultMaxConcurrentQueries = 0

	// DefaultMaxSelectPointN is the maximum number of points a SELECT can process.
	// A value of zero will make the maximum point count unlimited.
	DefaultMaxSelectPointN = 0

	// DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
	// A value of zero will make the maximum series count unlimited.
	DefaultMaxSelectSeriesN = 0


View Source
var (
	// ErrTimeout is returned when a write times out.
	ErrTimeout = errors.New("timeout")

	// ErrPartialWrite is returned when a write partially succeeds but does
	// not meet the requested consistency level.
	ErrPartialWrite = errors.New("partial write")

	// ErrWriteFailed is returned when no writes succeeded.
	ErrWriteFailed = errors.New("write failed")
View Source
var ErrDatabaseNameRequired = errors.New("database name required")

ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.


This section is empty.


type BufferedPointsWriter added in v1.0.0

type BufferedPointsWriter struct {
	// contains filtered or unexported fields

BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries write their points to the destination in batches.

func NewBufferedPointsWriter added in v1.0.0

func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter

NewBufferedPointsWriter returns a new BufferedPointsWriter.

func (*BufferedPointsWriter) Cap added in v1.0.0

func (w *BufferedPointsWriter) Cap() int

Cap returns the capacity (in points) of the buffer.

func (*BufferedPointsWriter) Flush added in v1.0.0

func (w *BufferedPointsWriter) Flush() error

Flush writes all buffered points to the underlying writer.

func (*BufferedPointsWriter) Len added in v1.0.0

func (w *BufferedPointsWriter) Len() int

Len returns the number of points buffered.

func (*BufferedPointsWriter) WritePointsInto added in v1.0.0

func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error

WritePointsInto implements pointsWriter for BufferedPointsWriter.

type Config added in v1.0.0

type Config struct {
	WriteTimeout         toml.Duration `toml:"write-timeout"`
	MaxConcurrentQueries int           `toml:"max-concurrent-queries"`
	QueryTimeout         toml.Duration `toml:"query-timeout"`
	LogQueriesAfter      toml.Duration `toml:"log-queries-after"`
	LogTimedOutQueries   bool          `toml:"log-timedout-queries"`
	MaxSelectPointN      int           `toml:"max-select-point"`
	MaxSelectSeriesN     int           `toml:"max-select-series"`
	MaxSelectBucketsN    int           `toml:"max-select-buckets"`
	TerminationQueryLog  bool          `toml:"termination-query-log"`

Config represents the configuration for the coordinator service.

func NewConfig added in v1.0.0

func NewConfig() Config

NewConfig returns an instance of Config with defaults.

func (Config) Diagnostics added in v1.3.0

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

type IntoWriteRequest added in v1.0.0

type IntoWriteRequest struct {
	Database        string
	RetentionPolicy string
	Points          []models.Point

IntoWriteRequest is a partial copy of cluster.WriteRequest

type IteratorCreator added in v1.2.0

type IteratorCreator interface {

IteratorCreator is an interface that combines mapping fields and creating iterators.

type LocalShardMapper added in v1.2.0

type LocalShardMapper struct {
	MetaClient interface {
		ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)

	TSDBStore interface {
		ShardGroup(ids []uint64) tsdb.ShardGroup

LocalShardMapper implements a ShardMapper for local shards.

func (*LocalShardMapper) MapShards added in v1.2.0

MapShards maps the sources to the appropriate shards into an IteratorCreator.

type LocalShardMapping added in v1.2.0

type LocalShardMapping struct {
	ShardMap map[Source]tsdb.ShardGroup

	// MinTime is the minimum time that this shard mapper will allow.
	// Any attempt to use a time before this one will automatically result in using
	// this time instead.
	MinTime time.Time

	// MaxTime is the maximum time that this shard mapper will allow.
	// Any attempt to use a time after this one will automatically result in using
	// this time instead.
	MaxTime time.Time

ShardMapper maps data sources to a list of shard information.

func (*LocalShardMapping) Close added in v1.2.0

func (a *LocalShardMapping) Close() error

Close clears out the list of mapped shards.

func (*LocalShardMapping) CreateIterator added in v1.2.0

func (*LocalShardMapping) FieldDimensions added in v1.2.0

func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)

func (*LocalShardMapping) IteratorCost added in v1.4.0

func (*LocalShardMapping) MapType added in v1.2.0

type LocalTSDBStore added in v1.0.0

type LocalTSDBStore struct {

LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator to satisfy the TSDBStore interface.

type MetaClient added in v1.0.0

type MetaClient interface {
	CreateContinuousQuery(database, name, query string) error
	CreateDatabase(name string) (*meta.DatabaseInfo, error)
	CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
	CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
	CreateSubscription(database, rp, name, mode string, destinations []string) error
	CreateUser(name, password string, admin bool) (meta.User, error)
	Database(name string) *meta.DatabaseInfo
	Databases() []meta.DatabaseInfo
	DropShard(id uint64) error
	DropContinuousQuery(database, name string) error
	DropDatabase(name string) error
	DropRetentionPolicy(database, name string) error
	DropSubscription(database, rp, name string) error
	DropUser(name string) error
	RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
	SetAdminPrivilege(username string, admin bool) error
	SetPrivilege(username, database string, p influxql.Privilege) error
	ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
	TruncateShardGroups(t time.Time) error
	UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
	UpdateUser(name, password string) error
	UserPrivilege(username, database string) (*influxql.Privilege, error)
	UserPrivileges(username string) (map[string]influxql.Privilege, error)
	Users() []meta.UserInfo

MetaClient is an interface for accessing meta data.

type PointsWriter added in v1.0.0

type PointsWriter struct {
	WriteTimeout time.Duration
	Logger       *zap.Logger

	Node *influxdb.Node

	MetaClient interface {
		Database(name string) (di *meta.DatabaseInfo)
		RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
		CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)

	TSDBStore interface {
		CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
		WriteToShard(ctx tsdb.WriteContext, shardID uint64, points []models.Point) error

	Subscriber interface {
	// contains filtered or unexported fields

PointsWriter handles writes across multiple local and remote data nodes.

func NewPointsWriter added in v1.0.0

func NewPointsWriter() *PointsWriter

NewPointsWriter returns a new instance of PointsWriter for a node.

func (*PointsWriter) Close added in v1.0.0

func (w *PointsWriter) Close() error

Close closes the communication channel with the point writer.

func (*PointsWriter) MapShards added in v1.0.0

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.

func (*PointsWriter) Open added in v1.0.0

func (w *PointsWriter) Open() error

Open opens the communication channel with the point writer.

func (*PointsWriter) Statistics added in v1.0.0

func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*PointsWriter) WithLogger added in v1.2.0

func (w *PointsWriter) WithLogger(log *zap.Logger)

WithLogger sets the Logger on w.

func (*PointsWriter) WritePoints added in v1.0.0

func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error

A wrapper for WritePointsPrivileged() - user is only required for clustering

func (*PointsWriter) WritePointsInto added in v1.0.0

func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error

WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency. It is used for 'SELECT INTO' statements

func (*PointsWriter) WritePointsPrivileged added in v1.3.0

func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error

type ShardIteratorCreator added in v1.0.0

type ShardIteratorCreator interface {
	ShardIteratorCreator(id uint64) query.IteratorCreator

ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.

type ShardMapping added in v1.0.0

type ShardMapping struct {
	Points  map[uint64][]models.Point  // The points associated with a shard ID
	Shards  map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
	Dropped []models.Point             // Points that were dropped
	// contains filtered or unexported fields

ShardMapping contains a mapping of shards to points.

func NewShardMapping added in v1.0.0

func NewShardMapping(n int) *ShardMapping

NewShardMapping creates an empty ShardMapping.

func (*ShardMapping) MapPoint added in v1.0.0

func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)

MapPoint adds the point to the ShardMapping, associated with the given shardInfo.

type Source added in v1.2.0

type Source struct {
	Database        string
	RetentionPolicy string

Source contains the database and retention policy source for data.

type StatementExecutor added in v1.0.0

type StatementExecutor struct {
	MetaClient MetaClient

	// TaskManager holds the StatementExecutor that handles task-related commands.
	TaskManager query.StatementExecutor

	// TSDB storage for local node.
	TSDBStore TSDBStore

	// ShardMapper for mapping shards when executing a SELECT statement.
	ShardMapper query.ShardMapper

	// Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS.
	Monitor *monitor.Monitor

	// Used for rewriting points back into system for SELECT INTO statements.
	PointsWriter interface {
		WritePointsInto(*IntoWriteRequest) error

	// Disallow INF values in SELECT INTO and other previously ignored errors
	StrictErrorHandling bool

	// Select statement limits
	MaxSelectPointN   int
	MaxSelectSeriesN  int
	MaxSelectBucketsN int

StatementExecutor executes a statement in the query.

func (*StatementExecutor) ExecuteStatement added in v1.0.0

func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt influxql.Statement) error

ExecuteStatement executes the given statement with the given execution context.

func (*StatementExecutor) NormalizeStatement added in v1.0.0

func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string) (err error)

NormalizeStatement adds a default database and policy to the measurements in statement. Parameter defaultRetentionPolicy can be "".

type TSDBStore added in v1.0.0

type TSDBStore interface {
	CreateShard(database, policy string, shardID uint64, enabled bool) error
	WriteToShard(writeCtx tsdb.WriteContext, shardID uint64, points []models.Point) error

	RestoreShard(id uint64, r io.Reader) error
	BackupShard(id uint64, since time.Time, w io.Writer) error

	DeleteDatabase(name string) error
	DeleteMeasurement(database, name string) error
	DeleteRetentionPolicy(database, name string) error
	DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
	DeleteShard(id uint64) error

	MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, retentionPolicy string, cond influxql.Expr) ([][]byte, error)
	TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
	TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)

	SeriesCardinality(ctx context.Context, database string) (int64, error)
	MeasurementsCardinality(ctx context.Context, database string) (int64, error)

TSDBStore is an interface for accessing the time series data store.

type WritePointsRequest added in v1.0.0

type WritePointsRequest struct {
	Database        string
	RetentionPolicy string
	Points          []models.Point

WritePointsRequest represents a request to write point data to the cluster.

func (*WritePointsRequest) AddPoint added in v1.0.0

func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)

AddPoint adds a point to the WritePointRequest with field key 'value'

type WriteStatistics added in v1.0.0

type WriteStatistics struct {
	WriteReq           int64
	PointWriteReq      int64
	PointWriteReqLocal int64
	WriteOK            int64
	WriteDropped       int64
	WriteTimeout       int64
	WriteErr           int64
	SubWriteOK         int64

WriteStatistics keeps statistics related to the PointsWriter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL