replica

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrFamilyChannelCanceled is the error returned when a family channel is closed.
	ErrFamilyChannelCanceled = errors.New("family Channel is canceled")
	ErrIngestTimeout         = errors.New("ingest timout")
)
View Source
var (
	NewPartitionFn = NewPartition
)

for testing

Functions

This section is empty.

Types

type ChannelManager

type ChannelManager interface {
	// Write writes a MetricList, the manager handler the database, sharding things.
	Write(ctx context.Context, database string, brokerBatchRows *metric.BrokerBatchRows) error

	// Close closes all the shardChannel.
	Close()
}

ChannelManager manages the construction, retrieving, closing for all channels.

func NewChannelManager

func NewChannelManager(
	ctx context.Context,
	fct rpc.ClientStreamFactory,
	stateMgr broker.StateManager,
) ChannelManager

NewChannelManager returns a ChannelManager with dirPath and WriteClientFactory. WriteClientFactory makes it easy to mock rpc streamClient for test.

type Chunk

type Chunk interface {
	// Compress marshals and compresses the data, then resets the context,
	Compress() (*compressedChunk, error)
	// IsFull checks the chunk if is full
	IsFull() bool
	// IsEmpty checks the chunk if is empty
	IsEmpty() bool
	// Size returns the size of chunk
	Size() ltoml.Size
	// Write writes the metric into buffer
	Write([]byte) (n int, err error)
}

Chunk represents the writeTask buffer chunk for compressing the metric list

type DatabaseChannel

type DatabaseChannel interface {
	// Write writes the metric data into shardChannel's buffer
	Write(ctx context.Context, brokerBatchRows *metric.BrokerBatchRows) error
	// CreateChannel creates the shard level replication shardChannel by given shard id
	CreateChannel(numOfShard int32, shardID models.ShardID) (ShardChannel, error)
	// Stop stops current database write shardChannel.
	Stop()
	// contains filtered or unexported methods
}

DatabaseChannel represents the database level replication shardChannel

type FamilyChannel

type FamilyChannel interface {
	// Write writes the data into the shardChannel,
	// ErrCanceled is returned when the shardChannel is canceled before data is written successfully.
	// Concurrent safe.
	Write(ctx context.Context, rows []metric.BrokerRow) error

	// Stop stops the family shardChannel.
	Stop(timeout int64)
	// FamilyTime returns the family time of current shardChannel.
	FamilyTime() int64
	// contains filtered or unexported methods
}

FamilyChannel represents family write shardChannel.

type Partition

type Partition interface {
	io.Closer
	// BuildReplicaForLeader builds replica relation when handle writeTask connection.
	BuildReplicaForLeader(leader models.NodeID, replicas []models.NodeID) error
	// BuildReplicaForFollower builds replica relation when handle replica connection.
	BuildReplicaForFollower(leader models.NodeID, replica models.NodeID) error
	// ReplicaLog writes msg that leader sends replica msg.
	// return appended index, if success.
	ReplicaLog(replicaIdx int64, msg []byte) (int64, error)
	// WriteLog writes msg that leader handle client writeTask request.
	WriteLog(msg []byte) error
	// ReplicaAckIndex returns the index which replica appended index.
	ReplicaAckIndex() int64
	// ResetReplicaIndex resets replica index.
	ResetReplicaIndex(idx int64)
	// IsExpire returns partition if it is expired.
	IsExpire() bool
	// Path returns the path of partition.
	Path() string
	// Stop stops replicator channel.
	Stop()
	// contains filtered or unexported methods
}

Partition represents a partition of writeTask ahead log.

func NewPartition

func NewPartition(
	ctx context.Context,
	shard tsdb.Shard,
	family tsdb.DataFamily,
	currentNodeID models.NodeID,
	log queue.FanOutQueue,
	cliFct rpc.ClientStreamFactory,
	stateMgr storage.StateManager,
) Partition

NewPartition creates a writeTask ahead log partition(db+shard+family time+leader).

type Replicator

type Replicator interface {
	fmt.Stringer
	// ReplicaState returns the replica state.
	ReplicaState() *models.ReplicaState
	// State returns the state of replicator.
	State() *state
	// Pause paused replica data.
	Pause()
	// Consume returns the index of message replica.
	Consume() int64
	// GetMessage returns message by replica index.
	GetMessage(replicaIdx int64) ([]byte, error)
	// Replica replicas message by replica index.
	Replica(idx int64, msg []byte)
	// IsReady returns if replicator is ready.
	IsReady() bool
	// Connect connects follower for sending replica message.
	Connect() bool
	// ReplicaIndex returns the index of message replica
	ReplicaIndex() int64
	// AckIndex returns the index of message replica ack
	AckIndex() int64
	// AppendIndex returns next append index.
	AppendIndex() int64
	// ResetReplicaIndex resets replica index.
	ResetReplicaIndex(idx int64)
	// ResetAppendIndex resets append index.
	ResetAppendIndex(idx int64)
	// SetAckIndex sets ack index.
	SetAckIndex(ackIdx int64)
	// Pending returns lag of queue.
	Pending() int64
	// IgnoreMessage ignores invalid message.
	IgnoreMessage(replicaIdx int64)
	// Close closes replicator, releases resource.
	Close()
}

Replicator represents write ahead log replicator.

func NewLocalReplicator

func NewLocalReplicator(channel *ReplicatorChannel, shard tsdb.Shard, family tsdb.DataFamily) Replicator

func NewRemoteReplicator

func NewRemoteReplicator(
	ctx context.Context,
	channel *ReplicatorChannel,
	stateMgr storage.StateManager,
	cliFct rpc.ClientStreamFactory,
) Replicator

NewRemoteReplicator creates remote replicator.

type ReplicatorChannel

type ReplicatorChannel struct {
	State *models.ReplicaState

	// underlying ConsumerGroup records the replication process.
	ConsumerGroup queue.ConsumerGroup
}

ReplicatorChannel represents channel peer[from,to] for the shard of database.

type ReplicatorPeer

type ReplicatorPeer interface {
	// Startup starts wal replicator channel,
	Startup()
	// Shutdown shutdowns gracefully.
	Shutdown()
	// ReplicatorState returns the state and type of the replicator.
	ReplicatorState() (string, *state)
}

ReplicatorPeer represents wal replica peer. local replicator: from == to. remote replicator: from != to.

func NewReplicatorPeer

func NewReplicatorPeer(replicator Replicator) ReplicatorPeer

NewReplicatorPeer creates a ReplicatorPeer.

type ShardChannel

type ShardChannel interface {
	// SyncShardState syncs shard state after state event changed.
	SyncShardState(shardState models.ShardState, liveNodes map[models.NodeID]models.StatefulNode)
	// GetOrCreateFamilyChannel musts picks the family shardChannel by given family time.
	GetOrCreateFamilyChannel(familyTime int64) FamilyChannel
	// Stop stops shard shardChannel.
	Stop()
	// contains filtered or unexported methods
}

ShardChannel represents a place to buffer the data for a specific cluster, database, shardID.

type WriteAheadLog

type WriteAheadLog interface {
	io.Closer

	// Name returns the name of write ahead log.
	Name() string
	// GetOrCreatePartition returns a partition of write ahead log.
	// if exist returns it, else create a new partition.
	GetOrCreatePartition(shardID models.ShardID, familyTime int64, leader models.NodeID) (Partition, error)
	// Stop stops all replicator channels.
	Stop()
	// Drop drops write ahead log.
	Drop() error
	// contains filtered or unexported methods
}

WriteAheadLog represents write ahead log underlying fan out queue.

func NewWriteAheadLog

func NewWriteAheadLog(
	ctx context.Context,
	cfg config.WAL,
	currentNodeID models.NodeID,
	database string,
	engine tsdb.Engine,
	cliFct rpc.ClientStreamFactory,
	stateMgr storage.StateManager,
) WriteAheadLog

NewWriteAheadLog creates a WriteAheadLog instance.

type WriteAheadLogManager

type WriteAheadLogManager interface {
	io.Closer

	// GetOrCreateLog returns write ahead log for database,
	// if exist returns it, else creates a new log.
	GetOrCreateLog(database string) WriteAheadLog
	// GetReplicaState returns replica state for given database's name.
	GetReplicaState(database string) []models.FamilyLogReplicaState
	// DropDatabases drops write ahead log of databases, keep active databases.
	DropDatabases(activeDatabases map[string]struct{})
	// StopDatabases stop the replicator for write ahead log of databases, keep active databases.
	StopDatabases(activeDatabases map[string]struct{})
	// Recovery recoveries local history wal when server start.
	Recovery() error
	// Stop stops all replicator channel.
	Stop()
}

WriteAheadLogManager represents manage all write ahead log.

func NewWriteAheadLogManager

func NewWriteAheadLogManager(
	ctx context.Context,
	cfg config.WAL,
	currentNodeID models.NodeID,
	engine tsdb.Engine,
	cliFct rpc.ClientStreamFactory,
	stateMgr storage.StateManager,
) WriteAheadLogManager

NewWriteAheadLogManager creates a WriteAheadLogManager instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL