Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFamilyChannelCanceled is the error returned when a family channel is closed. ErrFamilyChannelCanceled = errors.New("family Channel is canceled") ErrIngestTimeout = errors.New("ingest timout") )
var (
NewPartitionFn = NewPartition
)
for testing
Functions ¶
This section is empty.
Types ¶
type ChannelManager ¶
type ChannelManager interface { // Write writes a MetricList, the manager handler the database, sharding things. Write(ctx context.Context, database string, brokerBatchRows *metric.BrokerBatchRows) error // Close closes all the shardChannel. Close() }
ChannelManager manages the construction, retrieving, closing for all channels.
func NewChannelManager ¶
func NewChannelManager( ctx context.Context, fct rpc.ClientStreamFactory, stateMgr broker.StateManager, ) ChannelManager
NewChannelManager returns a ChannelManager with dirPath and WriteClientFactory. WriteClientFactory makes it easy to mock rpc streamClient for test.
type Chunk ¶
type Chunk interface { // Compress marshals and compresses the data, then resets the context, Compress() (*compressedChunk, error) // IsFull checks the chunk if is full IsFull() bool // IsEmpty checks the chunk if is empty IsEmpty() bool // Size returns the size of chunk Size() ltoml.Size // Write writes the metric into buffer Write([]byte) (n int, err error) }
Chunk represents the writeTask buffer chunk for compressing the metric list
type DatabaseChannel ¶
type DatabaseChannel interface { // Write writes the metric data into shardChannel's buffer Write(ctx context.Context, brokerBatchRows *metric.BrokerBatchRows) error // CreateChannel creates the shard level replication shardChannel by given shard id CreateChannel(numOfShard int32, shardID models.ShardID) (ShardChannel, error) // Stop stops current database write shardChannel. Stop() // contains filtered or unexported methods }
DatabaseChannel represents the database level replication shardChannel
type FamilyChannel ¶
type FamilyChannel interface { // Write writes the data into the shardChannel, // ErrCanceled is returned when the shardChannel is canceled before data is written successfully. // Concurrent safe. Write(ctx context.Context, rows []metric.BrokerRow) error // Stop stops the family shardChannel. Stop(timeout int64) // FamilyTime returns the family time of current shardChannel. FamilyTime() int64 // contains filtered or unexported methods }
FamilyChannel represents family write shardChannel.
type Partition ¶
type Partition interface { io.Closer // BuildReplicaForLeader builds replica relation when handle writeTask connection. BuildReplicaForLeader(leader models.NodeID, replicas []models.NodeID) error // BuildReplicaForFollower builds replica relation when handle replica connection. BuildReplicaForFollower(leader models.NodeID, replica models.NodeID) error // ReplicaLog writes msg that leader sends replica msg. // return appended index, if success. ReplicaLog(replicaIdx int64, msg []byte) (int64, error) // WriteLog writes msg that leader handle client writeTask request. WriteLog(msg []byte) error // ReplicaAckIndex returns the index which replica appended index. ReplicaAckIndex() int64 // ResetReplicaIndex resets replica index. ResetReplicaIndex(idx int64) // IsExpire returns partition if it is expired. IsExpire() bool // Path returns the path of partition. Path() string // Stop stops replicator channel. Stop() // contains filtered or unexported methods }
Partition represents a partition of writeTask ahead log.
func NewPartition ¶
func NewPartition( ctx context.Context, shard tsdb.Shard, family tsdb.DataFamily, currentNodeID models.NodeID, log queue.FanOutQueue, cliFct rpc.ClientStreamFactory, stateMgr storage.StateManager, ) Partition
NewPartition creates a writeTask ahead log partition(db+shard+family time+leader).
type Replicator ¶
type Replicator interface { fmt.Stringer // ReplicaState returns the replica state. ReplicaState() *models.ReplicaState // State returns the state of replicator. State() *state // Pause paused replica data. Pause() // Consume returns the index of message replica. Consume() int64 // GetMessage returns message by replica index. GetMessage(replicaIdx int64) ([]byte, error) // Replica replicas message by replica index. Replica(idx int64, msg []byte) // IsReady returns if replicator is ready. IsReady() bool // Connect connects follower for sending replica message. Connect() bool // ReplicaIndex returns the index of message replica ReplicaIndex() int64 // AckIndex returns the index of message replica ack AckIndex() int64 // AppendIndex returns next append index. AppendIndex() int64 // ResetReplicaIndex resets replica index. ResetReplicaIndex(idx int64) // ResetAppendIndex resets append index. ResetAppendIndex(idx int64) // SetAckIndex sets ack index. SetAckIndex(ackIdx int64) // Pending returns lag of queue. Pending() int64 // IgnoreMessage ignores invalid message. IgnoreMessage(replicaIdx int64) // Close closes replicator, releases resource. Close() }
Replicator represents write ahead log replicator.
func NewLocalReplicator ¶
func NewLocalReplicator(channel *ReplicatorChannel, shard tsdb.Shard, family tsdb.DataFamily) Replicator
func NewRemoteReplicator ¶
func NewRemoteReplicator( ctx context.Context, channel *ReplicatorChannel, stateMgr storage.StateManager, cliFct rpc.ClientStreamFactory, ) Replicator
NewRemoteReplicator creates remote replicator.
type ReplicatorChannel ¶
type ReplicatorChannel struct { State *models.ReplicaState // underlying ConsumerGroup records the replication process. ConsumerGroup queue.ConsumerGroup }
ReplicatorChannel represents channel peer[from,to] for the shard of database.
type ReplicatorPeer ¶
type ReplicatorPeer interface { // Startup starts wal replicator channel, Startup() // Shutdown shutdowns gracefully. Shutdown() // ReplicatorState returns the state and type of the replicator. ReplicatorState() (string, *state) }
ReplicatorPeer represents wal replica peer. local replicator: from == to. remote replicator: from != to.
func NewReplicatorPeer ¶
func NewReplicatorPeer(replicator Replicator) ReplicatorPeer
NewReplicatorPeer creates a ReplicatorPeer.
type ShardChannel ¶
type ShardChannel interface { // SyncShardState syncs shard state after state event changed. SyncShardState(shardState models.ShardState, liveNodes map[models.NodeID]models.StatefulNode) // GetOrCreateFamilyChannel musts picks the family shardChannel by given family time. GetOrCreateFamilyChannel(familyTime int64) FamilyChannel // Stop stops shard shardChannel. Stop() // contains filtered or unexported methods }
ShardChannel represents a place to buffer the data for a specific cluster, database, shardID.
type WriteAheadLog ¶
type WriteAheadLog interface { io.Closer // Name returns the name of write ahead log. Name() string // GetOrCreatePartition returns a partition of write ahead log. // if exist returns it, else create a new partition. GetOrCreatePartition(shardID models.ShardID, familyTime int64, leader models.NodeID) (Partition, error) // Stop stops all replicator channels. Stop() // Drop drops write ahead log. Drop() error // contains filtered or unexported methods }
WriteAheadLog represents write ahead log underlying fan out queue.
func NewWriteAheadLog ¶
func NewWriteAheadLog( ctx context.Context, cfg config.WAL, currentNodeID models.NodeID, database string, engine tsdb.Engine, cliFct rpc.ClientStreamFactory, stateMgr storage.StateManager, ) WriteAheadLog
NewWriteAheadLog creates a WriteAheadLog instance.
type WriteAheadLogManager ¶
type WriteAheadLogManager interface { io.Closer // GetOrCreateLog returns write ahead log for database, // if exist returns it, else creates a new log. GetOrCreateLog(database string) WriteAheadLog // GetReplicaState returns replica state for given database's name. GetReplicaState(database string) []models.FamilyLogReplicaState // DropDatabases drops write ahead log of databases, keep active databases. DropDatabases(activeDatabases map[string]struct{}) // StopDatabases stop the replicator for write ahead log of databases, keep active databases. StopDatabases(activeDatabases map[string]struct{}) // Recovery recoveries local history wal when server start. Recovery() error // Stop stops all replicator channel. Stop() }
WriteAheadLogManager represents manage all write ahead log.
func NewWriteAheadLogManager ¶
func NewWriteAheadLogManager( ctx context.Context, cfg config.WAL, currentNodeID models.NodeID, engine tsdb.Engine, cliFct rpc.ClientStreamFactory, stateMgr storage.StateManager, ) WriteAheadLogManager
NewWriteAheadLogManager creates a WriteAheadLogManager instance.