Documentation
¶
Index ¶
- Constants
- Variables
- func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error)
- type BufferedWriteCloser
- type Config
- type Factory
- type IntoWriteRequest
- type MetaExecutor
- func (m *MetaExecutor) BackupShard(id uint64, since time.Time, w io.Writer) error
- func (m *MetaExecutor) CreateShard(db, policy string, shardID uint64, enabled bool) error
- func (m *MetaExecutor) DeleteDatabase(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteMeasurement(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteRetentionPolicy(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteSeries(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteShard(stmt influxql.Statement) error
- func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error
- func (m *MetaExecutor) IteratorCreator(opt influxql.IteratorOptions) influxql.IteratorCreator
- func (m *MetaExecutor) Measurements() []string
- func (m *MetaExecutor) RestoreShard(id uint64, r io.Reader) error
- func (m *MetaExecutor) TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
- func (m *MetaExecutor) WriteToShard(shardID, ownerID uint64, points []models.Point) error
- type NodeDialer
- type PointsWriter
- func (w *PointsWriter) Close() error
- func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
- func (w *PointsWriter) Open() error
- func (w *PointsWriter) WithLogger(log zap.Logger)
- func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, ...) error
- func (w *PointsWriter) WritePointsInto(p *coordinator.IntoWriteRequest) error
- type Service
- type ShardDeleter
- type ShardMapping
- type ShardWriter
- type StatementExecutor
- type Tracker
- type WritePointsRequest
- type WriteStatistics
Constants ¶
const ( // DefaultDialTimeout is the default timeout for a complete dial to succeed. DefaultDialTimeout = 1 * time.Second // DefaultShardWriterTimeout is the default timeout set on shard writers. DefaultShardWriterTimeout = 5 * time.Second // DefaultShardReaderTimeout is the default timeout set on shard writers. DefaultShardReaderTimeout = 5 * time.Second // DefaultMaxRemoteWriteConnections is the maximum number of open connections // that will be available for remote writes to another host. DefaultMaxRemoteWriteConnections = 3 // DefaultClusterTracing enables traceing cluster info if it is true DefaultClusterTracing = false // DefaultWriteTimeout is the default timeout for a complete write to succeed. DefaultWriteTimeout = 5 * time.Second // DefaultMaxConcurrentQueries is the maximum number of running queries. // A value of zero will make the maximum query limit unlimited. DefaultMaxConcurrentQueries = 0 // DefaultMaxSelectPointN is the maximum number of points a SELECT can process. // A value of zero will make the maximum point count unlimited. DefaultMaxSelectPointN = 0 // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. // A value of zero will make the maximum series count unlimited. DefaultMaxSelectSeriesN = 0 // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. // A value of zero will make the maximum series count unlimited. DefaultMaxSelectBucketsN = 0 )
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB
MaxMessageSize defines how large a message can be before we reject it
const MuxHeader = 2
MuxHeader is the header byte used in the TCP mux.
Variables ¶
var ( // ErrTimeout is returned when a write times out. ErrTimeout = errors.New("timeout") // ErrPartialWrite is returned when a write partially succeeds but does // not meet the requested consistency level. ErrPartialWrite = errors.New("partial write") // ErrWriteFailed is returned when no writes succeeded. ErrWriteFailed = errors.New("write failed") )
Functions ¶
func NewBoundedPool ¶
func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error)
NewBoundedPool returns a new pool based on buffered channels with an initial capacity, maximum capacity and timeout to wait for a connection from the pool. Factory is used when initial capacity is greater than zero to fill the pool. A zero initialCap doesn't fill the Pool until a new Get() is called. During a Get(), If there is no new connection available in the pool and total connections is less than the max, a new connection will be created via the Factory() method. Othewise, the call will block until a connection is available or the timeout is reached.
Types ¶
type BufferedWriteCloser ¶
type BufferedWriteCloser struct {
}
BufferedWriteCloser will
func (*BufferedWriteCloser) Close ¶
func (bfc *BufferedWriteCloser) Close()
Close is actually closing this bufferedwriter
type Config ¶
type Config struct {
DialTimeout toml.Duration `toml:"dial-timeout"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
ShardReaderTimeout toml.Duration `toml:"shard-reader-timeout"`
MaxRemoteWriteConnections int `toml:"max-remote-write-connections"`
ClusterTracing bool `toml:"cluster-tracing`
WriteTimeout toml.Duration `toml:"write-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
QueryTimeout toml.Duration `toml:"query-timeout"`
LogQueriesAfter toml.Duration `toml:"log-queries-after"`
MaxSelectPointN int `toml:"max-select-point"`
MaxSelectSeriesN int `toml:"max-select-series"`
MaxSelectBucketsN int `toml:"max-select-buckets"`
}
Config represents the configuration for the clustering service.
type IntoWriteRequest ¶
IntoWriteRequest is a partial copy of cluster.WriteRequest
type MetaExecutor ¶
type MetaExecutor struct {
Logger zap.Logger
Node *influxdb.Node
MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
DataNodes() ([]meta.NodeInfo, error)
}
TSDBStore interface {
CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
BackupShard(id uint64, since time.Time, w io.Writer) error
RestoreShard(id uint64, r io.Reader) error
Measurements(database string, cond influxql.Expr) ([]string, error)
TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
}
ShardWriter interface {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}
// contains filtered or unexported fields
}
MetaExecutor executes meta queries on all data nodes.
func NewMetaExecutor ¶
func NewMetaExecutor() *MetaExecutor
NewMetaExecutor returns a new initialized *MetaExecutor.
func (*MetaExecutor) BackupShard ¶
BackupShard backup a shard in cluster
func (*MetaExecutor) CreateShard ¶
func (m *MetaExecutor) CreateShard(db, policy string, shardID uint64, enabled bool) error
CreateShard will create Shard on serveral data nodes
func (*MetaExecutor) DeleteDatabase ¶
func (m *MetaExecutor) DeleteDatabase(stmt influxql.Statement) error
DeleteDatabase will remove a database from cluster
func (*MetaExecutor) DeleteMeasurement ¶
func (m *MetaExecutor) DeleteMeasurement(stmt influxql.Statement) error
DeleteMeasurement removes measurement from cluster
func (*MetaExecutor) DeleteRetentionPolicy ¶
func (m *MetaExecutor) DeleteRetentionPolicy(stmt influxql.Statement) error
DeleteRetentionPolicy removes RetentionPolicy from cluster
func (*MetaExecutor) DeleteSeries ¶
func (m *MetaExecutor) DeleteSeries(stmt influxql.Statement) error
DeleteSeries removes series data from cluster
func (*MetaExecutor) DeleteShard ¶
func (m *MetaExecutor) DeleteShard(stmt influxql.Statement) error
DeleteShard removes a Shard from cluster
func (*MetaExecutor) ExecuteStatement ¶
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error
ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (*MetaExecutor) IteratorCreator ¶
func (m *MetaExecutor) IteratorCreator(opt influxql.IteratorOptions) influxql.IteratorCreator
IteratorCreator return a IteratorCreator according IteratorOptions
func (*MetaExecutor) Measurements ¶
func (m *MetaExecutor) Measurements() []string
Measurements return a all measurements in cluster
func (*MetaExecutor) RestoreShard ¶
func (m *MetaExecutor) RestoreShard(id uint64, r io.Reader) error
RestoreShard restore a shard in cluster
func (*MetaExecutor) WriteToShard ¶
func (m *MetaExecutor) WriteToShard(shardID, ownerID uint64, points []models.Point) error
WriteToShard will write points into shard accoridng to shardID and ownerID
type NodeDialer ¶
type PointsWriter ¶
type PointsWriter struct {
WriteTimeout time.Duration
Logger zap.Logger
Node *influxcloud.Node
MetaClient interface {
Database(name string) (di *meta.DatabaseInfo)
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
}
TSDBStore interface {
CreateShard(database, retentionPolicy string, shardID uint64) error
WriteToShard(shardID uint64, points []models.Point) error
}
ShardWriter interface {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}
HintedHandoff interface {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}
// contains filtered or unexported fields
}
PointsWriter handles writes across multiple local and remote data nodes.
func NewPointsWriter ¶
func NewPointsWriter() *PointsWriter
NewPointsWriter returns a new instance of PointsWriter for a node.
func (*PointsWriter) Close ¶
func (w *PointsWriter) Close() error
Close closes the communication channel with the point writer
func (*PointsWriter) MapShards ¶
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.
func (*PointsWriter) Open ¶
func (w *PointsWriter) Open() error
Open opens the communication channel with the point writer
func (*PointsWriter) WithLogger ¶
func (w *PointsWriter) WithLogger(log zap.Logger)
WithLogger sets the Logger on w.
func (*PointsWriter) WritePoints ¶
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePoints writes across multiple local and remote data nodes according the consistency level.
func (*PointsWriter) WritePointsInto ¶
func (w *PointsWriter) WritePointsInto(p *coordinator.IntoWriteRequest) error
WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency
type Service ¶
type Service struct {
Listener net.Listener
MetaClient interface {
ShardOwner(shardID uint64) (string, string, meta.ShardInfo)
}
TSDBStore coordinator.TSDBStore
ShardIteratorCreator coordinator.ShardIteratorCreator
Logger zap.Logger
ShardWriter ShardWriter
// contains filtered or unexported fields
}
Service reprsents a cluster service
func (*Service) WithLogger ¶
WithLogger sets the internal logger to the logger passed in
type ShardDeleter ¶
type ShardDeleter struct {
TSDBStore interface {
ShardIDs() []uint64
DeleteShard(shardID uint64) error
}
}
ShardDeleter is a wrapper of TSDBStore which can delete shard from disk
func NewShardDeleter ¶
func NewShardDeleter() *ShardDeleter
NewShardDeleter will return a ShardDeleter instance
func (ShardDeleter) DeleteShard ¶
func (d ShardDeleter) DeleteShard(shardID uint64) error
DeleteShard will delete a shard according to shardID if failed, then return error
func (ShardDeleter) ShardIDs ¶
func (d ShardDeleter) ShardIDs() []uint64
ShardIDs will return all shards' ID in this node
type ShardMapping ¶
type ShardMapping struct {
Points map[uint64][]models.Point // The points associated with a shard ID
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
// contains filtered or unexported fields
}
ShardMapping contains a mapping of a shards to a points.
func NewShardMapping ¶
func NewShardMapping(n int) *ShardMapping
NewShardMapping creates an empty ShardMapping
type ShardWriter ¶
type ShardWriter struct {
MetaClient interface {
ShardOwner(shardID uint64) (database, policy string, owners meta.ShardInfo)
DataNode(id uint64) (ni *meta.NodeInfo, err error)
}
// contains filtered or unexported fields
}
ShardWriter writes a set of points to a shard.
func NewShardWriter ¶
func NewShardWriter(timeout time.Duration, maxConnections int) *ShardWriter
NewShardWriter returns a new instance of ShardWriter.
func (*ShardWriter) WriteShard ¶
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error
WriteShard writes time series points to a shard
func (*ShardWriter) WriteShardBinary ¶
func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, buf []byte) error
WriteShardBinary writes binary time series points to a shard
type StatementExecutor ¶
type StatementExecutor struct {
MetaClient interface {
DataNodes() (ni meta.NodeInfos, err error)
}
// This reprsents local StatementExecutor
StatementExecutor coordinator.StatementExecutor
// contains filtered or unexported fields
}
StatementExecutor executes a statement in the query.
func (*StatementExecutor) ExecuteStatement ¶
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error
ExecuteStatement executes the given statement with the given execution context.
type WritePointsRequest ¶
WritePointsRequest represents a request to write point data to the cluster.
type WriteStatistics ¶
type WriteStatistics struct {
WriteReq int64
PointWriteReq int64
PointWriteReqLocal int64
PointWriteReqRemote int64
PointWriteReqHH int64
WriteOK int64
WriteDropped int64
WriteTimeout int64
WritePartial int64
WriteErr int64
SubWriteOK int64
SubWriteDrop int64
}
WriteStatistics keeps statistics related to the PointsWriter.