core

package
v0.0.0-...-0e96e3e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2021 License: BSD-3-Clause Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PENDING = iota
	COMPLETE
	TIMEOUT

	InvalidLogIndex = math.MinInt32
)
View Source
const (
	RpcPending int32 = iota
	RpcRespond
)
View Source
const (
	RemoteSnapshotURISchema string = "remote://"
	SnapshotTmpPath         string = "temp"

	InvalidSnapshotIndex int64 = math.MinInt32
)
View Source
const (
	ErrSetLastCommittedIndex = "node changes to leader, pendingIndex=%d, param lastCommittedIndex=%d"
	ErrAppendPendingTask     = "fail to appendingTask, pendingIndex=%d"
)
View Source
const (
	ConfPreFix           string = "conf_"
	AppendLogRetryTimes         = 50
	StablePBMetaFileName        = "stable_pb_meta"
)
View Source
const (
	ErrRollbackMsg = "StateMachine meet critical error when applying one or more tasks since index=%d, %s"
)
View Source
const (
	InvalidReaderIndex int64 = math.MinInt16
)

Variables

View Source
var (
	DefalutCopySessionOption = CopySessionOption{}
)
View Source
var (
	// FirstLogIdxKey 第一个LogIndex的名称
	FirstLogIdxKey = []byte("meta/firstLogIndex")
)

Functions

func IsNodeActive

func IsNodeActive(state NodeState) bool

IsNodeActive node 是否处于存活状态

func NewReadIndexHeartbeatResponseClosure

func NewReadIndexHeartbeatResponseClosure(done *ReadIndexResponseClosure, readIndexResp *raft.ReadIndexResponse, quorum, peerSize int32) *readIndexHeartbeatResponseClosure

NewReadIndexHeartbeatResponseClosure readIndexResp 不涉及网络传输,根据从 Leader 返回的 AppendEntriesResponse 信息决定 readIndexResp 的内容是什么

Types

type AppendEntriesResponseClosure

type AppendEntriesResponseClosure struct {
	RpcResponseClosure
}

type ApplyTask

type ApplyTask struct {
	TType               TaskType
	CommittedIndex      int64
	Term                int64
	Status              *entity.Status
	LeaderChangeContext *entity.LeaderChangeContext
	Done                Closure
	Latch               *sync.WaitGroup
}

func (*ApplyTask) Name

func (at *ApplyTask) Name() string

func (*ApplyTask) Reset

func (at *ApplyTask) Reset()

func (*ApplyTask) Sequence

func (at *ApplyTask) Sequence() int64

type BallotBox

type BallotBox struct {
	// contains filtered or unexported fields
}

func (*BallotBox) AppendPendingTask

func (bx *BallotBox) AppendPendingTask(conf, oldConf *entity.Configuration, done Closure) bool

func (*BallotBox) ClearPendingTasks

func (bx *BallotBox) ClearPendingTasks()

func (*BallotBox) CommitAt

func (bx *BallotBox) CommitAt(firstLogIndex, lastLogIndex int64, peer entity.PeerId) bool

[firstLogIndex, lastLogIndex] commit to stable at peer

func (*BallotBox) GetLastCommittedIndex

func (bx *BallotBox) GetLastCommittedIndex() int64

func (*BallotBox) GetPendingIndex

func (bx *BallotBox) GetPendingIndex() int64

func (*BallotBox) GetPendingMetaQueue

func (bx *BallotBox) GetPendingMetaQueue() *utils.SegmentList

func (*BallotBox) Init

func (bx *BallotBox) Init(opt *BallotBoxOptions)

func (*BallotBox) RestPendingIndex

func (bx *BallotBox) RestPendingIndex(newPendingIndex int64) bool

RestPendingIndex 重置待处理的 RaftLog 提案, 从[newPendingIndex, ) 都是需要新的 leader 去继续发起提交的

func (*BallotBox) SetLastCommittedIndex

func (bx *BallotBox) SetLastCommittedIndex(lastCommittedIndex int64) (bool, error)

func (*BallotBox) Shutdown

func (bx *BallotBox) Shutdown()

type BallotBoxOptions

type BallotBoxOptions struct {
	Waiter       fsmCaller
	ClosureQueue *ClosureQueue
}

BallotBoxOptions 初始化投票器的配置信息

type CatchUpClosure

type CatchUpClosure struct {
	// contains filtered or unexported fields
}

func (*CatchUpClosure) GetFuture

func (cuc *CatchUpClosure) GetFuture() common.Future

func (*CatchUpClosure) Run

func (cuc *CatchUpClosure) Run(status entity.Status)

type CliService

type CliService struct {
	// contains filtered or unexported fields
}

func (*CliService) AddLearners

func (cli *CliService) AddLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status

func (*CliService) AddPeer

func (cli *CliService) AddPeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status

func (*CliService) ChangePeer

func (cli *CliService) ChangePeer(groupId string, oldConf, newConf *entity.Configuration) entity.Status

func (*CliService) GetAliveLearners

func (cli *CliService) GetAliveLearners(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId

func (*CliService) GetAlivePeers

func (cli *CliService) GetAlivePeers(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId

func (*CliService) GetLeader

func (cli *CliService) GetLeader(groupId string, leaderId *entity.PeerId, conf *entity.Configuration) entity.Status

func (*CliService) GetLearners

func (cli *CliService) GetLearners(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId

func (*CliService) GetPeers

func (cli *CliService) GetPeers(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId

func (*CliService) ReBalance

func (cli *CliService) ReBalance(groupIds []string, balanceLeaderIds map[string]*entity.PeerId, conf *entity.Configuration) []*entity.PeerId

func (*CliService) RemoveLearners

func (cli *CliService) RemoveLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status

func (*CliService) RemovePeer

func (cli *CliService) RemovePeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status

func (*CliService) ResetLearners

func (cli *CliService) ResetLearners(groupId string, learners []*entity.PeerId,
	conf *entity.Configuration) entity.Status

func (*CliService) ResetPeer

func (cli *CliService) ResetPeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status

func (*CliService) Snapshot

func (cli *CliService) Snapshot(groupId string, peerId *entity.PeerId) entity.Status

func (*CliService) TransferLeader

func (cli *CliService) TransferLeader(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status

type Closure

type Closure interface {
	Run(status entity.Status)
}

Closur 通用的回调接口

type ClosureQueue

type ClosureQueue struct {
	// contains filtered or unexported fields
}

func (*ClosureQueue) AppendPendingClosure

func (cq *ClosureQueue) AppendPendingClosure(closure Closure)

func (*ClosureQueue) Clear

func (cq *ClosureQueue) Clear()

func (*ClosureQueue) GetFirstIndex

func (cq *ClosureQueue) GetFirstIndex() int64

func (*ClosureQueue) GetQueue

func (cq *ClosureQueue) GetQueue() list.List

func (*ClosureQueue) IsEmpty

func (cq *ClosureQueue) IsEmpty() bool

func (*ClosureQueue) PopClosureUntil

func (cq *ClosureQueue) PopClosureUntil(endIndex int64, closures []Closure, tasks []TaskClosure) int64

PopClosureUntil 弹出 Closure 对象,直到 index == endIndex 时候停止继续弹出

func (*ClosureQueue) RestFirstIndex

func (cq *ClosureQueue) RestFirstIndex(firstIndex int64) bool

RestFirstIndex

type ConfigurationChangeClosure

type ConfigurationChangeClosure struct {
	// contains filtered or unexported fields
}

ConfigurationChangeClosure 集群成员变更完成之后的回调

func (*ConfigurationChangeClosure) Run

type ConfigurationCtx

type ConfigurationCtx struct {
	// contains filtered or unexported fields
}

func NewConfigurationCtx

func NewConfigurationCtx(node *nodeImpl) *ConfigurationCtx

func (*ConfigurationCtx) IsBusy

func (cc *ConfigurationCtx) IsBusy() bool

func (*ConfigurationCtx) Reset

func (cc *ConfigurationCtx) Reset(st entity.Status)

func (*ConfigurationCtx) Start

func (cc *ConfigurationCtx) Start(newConf, oldConf *entity.Configuration, done Closure)

Start 执行根据配置进行启动

type CopySessionOption

type CopySessionOption struct {
	RpcService       *RaftClientOperator
	SnapshotThrottle SnapshotThrottle
	RaftOpt          RaftOption
	Endpoint         entity.Endpoint
	// contains filtered or unexported fields
}

type CopySessionOptions

type CopySessionOptions func(opt *CopySessionOption)

type DownloadingSnapshot

type DownloadingSnapshot struct {
	Request     *raft.InstallSnapshotRequest
	Resp        *raft.InstallSnapshotResponse
	RequestDone *RpcRequestClosure
}

func NewDownloadingSnapshot

func NewDownloadingSnapshot(req *raft.InstallSnapshotRequest, done *RpcRequestClosure) *DownloadingSnapshot

创建一个快照下载的任务信息

type ElectionJob

type ElectionJob struct {
	// contains filtered or unexported fields
}

type FSMCallerOptions

type FSMCallerOptions struct {
	LogManager LogManager
	// 用户的状态机对象
	FSM           StateMachine
	AfterShutdown Closure
	BootstrapID   *entity.LogId
	ClosureQueue  *ClosureQueue
	Node          *nodeImpl
}

FSMCallerOptions 初始化状态机的配置参数信息

type FileReader

type FileReader interface {
	GetPath() string

	ReadFile(buf *bytes.Buffer, fileName string, offset, maxCount int64) (int64, error)
}

type FileService

type FileService struct {
	// contains filtered or unexported fields
}

FileService FileReader 管理,每一个文件获取的请求,都对应着一个 FileReader 实例

var FileSvrInstance *FileService = &FileService{
	fileReaderMap: &utils.ConcurrentMap{},
	nextId:        utils.NewAtomicInt64(),
}

func (*FileService) AddReader

func (fs *FileService) AddReader(reader FileReader) int64

AddReader 添加一个 FileReader 实例,并给他一个 ReaderId 编号用于查找

func (*FileService) HandleGetFile

func (fs *FileService) HandleGetFile(req *protoM.GetFileRequest, closure *RpcRequestClosure) proto.Message

HandleGetFile 处理获取文件的请求操作

func (*FileService) RemoveReader

func (fs *FileService) RemoveReader(readerId int64)

RemoveReader 移除一个 FileReader

type FirstSnapshotLoadDone

type FirstSnapshotLoadDone struct {
	// contains filtered or unexported fields
}

func (*FirstSnapshotLoadDone) Run

func (fsl *FirstSnapshotLoadDone) Run(status entity.Status)

func (*FirstSnapshotLoadDone) Start

type GetFileResponseClosure

type GetFileResponseClosure struct {
	RpcResponseClosure
}

type InFlight

type InFlight struct {
	// contains filtered or unexported fields
}

type InstallSnapshotResponseClosure

type InstallSnapshotResponseClosure struct {
	RpcResponseClosure
}

type Iterator

type Iterator interface {
	GetData() []byte

	GetIndex() int64

	GetTerm() int64

	Done() Closure

	SetErrorAndRollback(nTail int64, st entity.Status) error
}

type IteratorImpl

type IteratorImpl struct {
	// contains filtered or unexported fields
}

func (*IteratorImpl) Done

func (iti *IteratorImpl) Done() Closure

func (*IteratorImpl) Entry

func (iti *IteratorImpl) Entry() *entity.LogEntry

func (*IteratorImpl) GetError

func (iti *IteratorImpl) GetError() *entity.RaftError

func (*IteratorImpl) GetIndex

func (iti *IteratorImpl) GetIndex() int64

func (*IteratorImpl) GetOrCreateError

func (iti *IteratorImpl) GetOrCreateError() *entity.RaftError

func (*IteratorImpl) HasError

func (iti *IteratorImpl) HasError() bool

func (*IteratorImpl) IsGood

func (iti *IteratorImpl) IsGood() bool

IsGood 当前可以推进的位点小于需要到的 committed index 以及没有出现过错误

func (*IteratorImpl) Next

func (iti *IteratorImpl) Next()

func (*IteratorImpl) RunTenRestClosureWithError

func (iti *IteratorImpl) RunTenRestClosureWithError()

func (*IteratorImpl) SetErrorAndRollback

func (iti *IteratorImpl) SetErrorAndRollback(nTail int64, st *entity.Status) error

type IteratorWrapper

type IteratorWrapper struct {
	// contains filtered or unexported fields
}

func NewIteratorWrapper

func NewIteratorWrapper(impl *IteratorImpl) *IteratorWrapper

func (*IteratorWrapper) Done

func (iw *IteratorWrapper) Done() Closure

func (*IteratorWrapper) GetData

func (iw *IteratorWrapper) GetData() []byte

func (*IteratorWrapper) GetIndex

func (iw *IteratorWrapper) GetIndex() int64

func (*IteratorWrapper) GetTerm

func (iw *IteratorWrapper) GetTerm() int64

func (*IteratorWrapper) HasNext

func (iw *IteratorWrapper) HasNext() bool

HasNext 只判断当前是否可以进行向前推进一个LogEntry以及当前是否是用户态的数据信息

func (*IteratorWrapper) Next

func (iw *IteratorWrapper) Next() []byte

func (*IteratorWrapper) SetErrorAndRollback

func (iw *IteratorWrapper) SetErrorAndRollback(nTail int64, st entity.Status) error

type JobSwitch

type JobSwitch int32

JobSwitch 任务的切换状态标识

const (
	OpenJob JobSwitch = iota
	Suspend
)

type JobType

type JobType int32

任务类型

const (
	// 投票任务
	JobForVote JobType = iota
	// 选举任务
	JobForElection
	// 执行快照任务
	JobForSnapshot
	// 停止任务
	JobForStepDown
)

type LastAppliedLogIndexListener

type LastAppliedLogIndexListener interface {
	//OnApplied 监听当前状态机已经将哪一些 core.LogEntry 给 apply 成功了, 这里传入了当前最新的, appliedLogIndex
	OnApplied(lastAppliedLogIndex int64)
}

type LastLogIndexListener

type LastLogIndexListener interface {
	//OnLastLogIndexChanged 当最新的LogIndex发生变化时的监听,这里不能panic error,所有的 error 必须自行 defer recover 处理
	OnLastLogIndexChanged(lastLogIndex int64)
}

LastLogIndexListener 监听最新的 RaftLog index

type LoadSnapshotClosure

type LoadSnapshotClosure interface {
	Closure
	// Start 开始快照加载,返回一个快照文件的读取者
	Start() SnapshotReader
}

LoadSnapshotClosure 状态机的快照加载回调

type LocalDirReader

type LocalDirReader struct {
	// contains filtered or unexported fields
}

func (*LocalDirReader) GetPath

func (ldr *LocalDirReader) GetPath() string

func (*LocalDirReader) ReadFile

func (ldr *LocalDirReader) ReadFile(buf *bytes.Buffer, fileName string, offset, maxCount int64) (int64, error)

ReadFile 根据要求读取文件

func (*LocalDirReader) ReadFileWithMeta

func (ldr *LocalDirReader) ReadFileWithMeta(buf *bytes.Buffer, fileName string, fileMeta proto.Message, offset, maxCount int64) (int64, error)

TODO 如果 maxCount == -1,可以有优化为直接读取剩余的所有文件内容 ReadFileWithMeta 根据元数据,读取文件的一部分内容(用于大文件进行多次分批传输)

type LocalSnapshot

type LocalSnapshot struct {
	// contains filtered or unexported fields
}

LocalSnapshot 本地快照的描述对象

func (*LocalSnapshot) GetFileMeta

func (ls *LocalSnapshot) GetFileMeta(fileName string) *raft.LocalFileMeta

func (*LocalSnapshot) GetPath

func (ls *LocalSnapshot) GetPath() string

func (*LocalSnapshot) ListFiles

func (ls *LocalSnapshot) ListFiles() []string

type LocalSnapshotCopier

type LocalSnapshotCopier struct {
	// contains filtered or unexported fields
}

LocalSnapshotCopier 本地快照文件的拷贝执行者

func (*LocalSnapshotCopier) Cancel

func (copier *LocalSnapshotCopier) Cancel()

func (*LocalSnapshotCopier) Close

func (copier *LocalSnapshotCopier) Close() error

func (*LocalSnapshotCopier) GetReader

func (copier *LocalSnapshotCopier) GetReader() SnapshotReader

func (*LocalSnapshotCopier) GetStatus

func (copier *LocalSnapshotCopier) GetStatus() entity.Status

func (*LocalSnapshotCopier) Init

func (copier *LocalSnapshotCopier) Init(uri string, opts SnapshotCopierOptions) error

func (*LocalSnapshotCopier) Join

func (copier *LocalSnapshotCopier) Join()

func (*LocalSnapshotCopier) SetError

func (copier *LocalSnapshotCopier) SetError(code entity.RaftErrorCode, temp string, args ...interface{})

SetError 设置 entity.Status

func (*LocalSnapshotCopier) Start

func (copier *LocalSnapshotCopier) Start()

type LocalSnapshotMetaTable

type LocalSnapshotMetaTable struct {
	// contains filtered or unexported fields
}

func NewLocalSnapshotMetaTable

func NewLocalSnapshotMetaTable(raftOpt RaftOption) *LocalSnapshotMetaTable

NewLocalSnapshotMetaTable

func (*LocalSnapshotMetaTable) AddFile

func (metaT *LocalSnapshotMetaTable) AddFile(filename string, meta *raft.LocalFileMeta) bool

AddFile 添加一个文件元数据信息描述

func (*LocalSnapshotMetaTable) GetFileMeta

func (metaT *LocalSnapshotMetaTable) GetFileMeta(filename string) *raft.LocalFileMeta

GetFileMeta 根据文件名称获取本快照中的某一个文件的元信息数据

func (*LocalSnapshotMetaTable) GetMeta

func (metaT *LocalSnapshotMetaTable) GetMeta() *raft.SnapshotMeta

func (*LocalSnapshotMetaTable) ListFiles

func (metaT *LocalSnapshotMetaTable) ListFiles() []string

ListFiles 列出本快照的所有文件信息

type LocalSnapshotReader

type LocalSnapshotReader struct {
	// contains filtered or unexported fields
}

LocalSnapshotReader 本地快照读取者

func (*LocalSnapshotReader) Close

func (lsr *LocalSnapshotReader) Close() error

func (*LocalSnapshotReader) DestorySelf

func (lsr *LocalSnapshotReader) DestorySelf()

DestorySelf 销毁自己

func (*LocalSnapshotReader) GenerateURIForCopy

func (lsr *LocalSnapshotReader) GenerateURIForCopy() string

GenerateURIForCopy 构建出一个 url 用于告诉 follower or learner 从哪里开始进行 snapshot 的数据拷贝

func (*LocalSnapshotReader) GetFileMeta

func (lsr *LocalSnapshotReader) GetFileMeta(fileName string) proto.Message

func (*LocalSnapshotReader) GetPath

func (lsr *LocalSnapshotReader) GetPath() string

func (*LocalSnapshotReader) GetSnapshotIndex

func (lsr *LocalSnapshotReader) GetSnapshotIndex() (int64, error)

GetSnapshotIndex 获取快照的 Index 位点信息

func (*LocalSnapshotReader) GetStatus

func (lsr *LocalSnapshotReader) GetStatus() entity.Status

func (*LocalSnapshotReader) Init

func (lsr *LocalSnapshotReader) Init(arg interface{}) error

func (*LocalSnapshotReader) ListFiles

func (lsr *LocalSnapshotReader) ListFiles() []string

ListFiles 列出当前的快照文件

func (*LocalSnapshotReader) Load

func (lsr *LocalSnapshotReader) Load() *raft.SnapshotMeta

func (*LocalSnapshotReader) SetError

func (lsr *LocalSnapshotReader) SetError(code entity.RaftErrorCode, temp string, args ...interface{})

func (*LocalSnapshotReader) Status

func (lsr *LocalSnapshotReader) Status() entity.Status

type LocalSnapshotStorage

type LocalSnapshotStorage struct {
	// contains filtered or unexported fields
}

func (*LocalSnapshotStorage) Close

func (storage *LocalSnapshotStorage) Close(writer *LocalSnapshotWriter, keepDataOnError bool)

func (*LocalSnapshotStorage) CopyFrom

func (storage *LocalSnapshotStorage) CopyFrom(uri string, opts SnapshotCopierOptions) SnapshotReader

CopyFrom 从某一个地方开始拷贝 snapshot 数据

func (*LocalSnapshotStorage) Create

func (storage *LocalSnapshotStorage) Create(fromEmpty bool) SnapshotWriter

Create 从本地快照管理者中创建出一个快照的写入执行者

func (*LocalSnapshotStorage) CreateFromEmpty

func (storage *LocalSnapshotStorage) CreateFromEmpty() SnapshotWriter

func (*LocalSnapshotStorage) GetSnapshotPath

func (storage *LocalSnapshotStorage) GetSnapshotPath(index int64) string

func (*LocalSnapshotStorage) Init

func (storage *LocalSnapshotStorage) Init(arg interface{}) error

func (*LocalSnapshotStorage) Open

func (storage *LocalSnapshotStorage) Open() SnapshotReader

Open 打开一最近的一个快照文件

func (*LocalSnapshotStorage) SetFilterBeforeCopyRemote

func (storage *LocalSnapshotStorage) SetFilterBeforeCopyRemote()

func (*LocalSnapshotStorage) Shutdown

func (storage *LocalSnapshotStorage) Shutdown()

type LocalSnapshotWriter

type LocalSnapshotWriter struct {
	// contains filtered or unexported fields
}

LocalSnapshotWriter 本地快照的写入操作者

func (*LocalSnapshotWriter) AddFile

func (lsw *LocalSnapshotWriter) AddFile(fileName string, meta proto.Message) bool

func (*LocalSnapshotWriter) Close

func (lsw *LocalSnapshotWriter) Close(keepDataOnError bool)

func (*LocalSnapshotWriter) GetPath

func (lsw *LocalSnapshotWriter) GetPath() string

func (*LocalSnapshotWriter) GetSnapshotIndex

func (lsw *LocalSnapshotWriter) GetSnapshotIndex() int64

func (*LocalSnapshotWriter) GetStatus

func (lsw *LocalSnapshotWriter) GetStatus() entity.Status

func (*LocalSnapshotWriter) Init

func (lsw *LocalSnapshotWriter) Init(arg interface{}) error

func (*LocalSnapshotWriter) RemoveFile

func (lsw *LocalSnapshotWriter) RemoveFile(fileName string)

func (*LocalSnapshotWriter) SaveMeta

func (lsw *LocalSnapshotWriter) SaveMeta(meta raft.SnapshotMeta) bool

func (*LocalSnapshotWriter) SetError

func (lsw *LocalSnapshotWriter) SetError(code entity.RaftErrorCode, temp string, args ...interface{})

func (*LocalSnapshotWriter) Shutdown

func (lsw *LocalSnapshotWriter) Shutdown()

func (*LocalSnapshotWriter) Sync

func (copier *LocalSnapshotWriter) Sync() bool

Sync 进行文件保存

type LogEntryAndClosure

type LogEntryAndClosure struct {
	Entry        *entity.LogEntry
	Done         Closure
	ExpectedTerm int64
	Latch        *sync.WaitGroup
}

func (*LogEntryAndClosure) Name

func (lac *LogEntryAndClosure) Name() string

func (*LogEntryAndClosure) Reset

func (lac *LogEntryAndClosure) Reset()

func (*LogEntryAndClosure) Sequence

func (lac *LogEntryAndClosure) Sequence() int64

type LogManager

type LogManager interface {
	utils.LifeCycle
	// AddLastLogIndexListener
	AddLastLogIndexListener(listener LastLogIndexListener)
	// RemoveLogIndexListener
	RemoveLogIndexListener(listener LastLogIndexListener)
	// AppendEntries 追加多个 RaftLog 日志条目
	AppendEntries(entries []*entity.LogEntry, done *StableClosure) error
	// SetSnapshot
	SetSnapshot(meta *raft.SnapshotMeta)
	// ClearBufferedLogs
	ClearBufferedLogs()
	// GetEntry
	GetEntry(index int64) *entity.LogEntry
	// GetTerm
	GetTerm(index int64) int64
	// GetFirstLogIndex
	GetFirstLogIndex() int64
	// GetLastLogIndex
	GetLastLogIndex() int64
	// GetLastLogID
	GetLastLogID(isFlush bool) entity.LogId
	// GetConfiguration
	GetConfiguration(index int64) (*entity.ConfigurationEntry, error)
	// CheckAndSetConfiguration
	CheckAndSetConfiguration(current *entity.ConfigurationEntry) *entity.ConfigurationEntry
	// Wait
	Wait(expectedLastLogIndex int64, cb NewLogCallback, replicator *Replicator) int64
	// RemoveWaiter
	RemoveWaiter(id int64) bool
	// SetAppliedID
	SetAppliedID(appliedID entity.LogId)
	// CheckConsistency
	CheckConsistency() entity.Status
}

LogManager Raft Log 日志管理器

type LogManagerOption

type LogManagerOption struct {
	MaxEventQueueSize int64
}

type LogManagerOptions

type LogManagerOptions func(opt *LogManagerOption)

type LogMgnEventType

type LogMgnEventType int32
const (
	LMgnEventForOther LogMgnEventType = iota
	LMgnEventForReset
	LMgnEventForTruncatePrefix
	LMgnEventForTruncateSuffix
	LMgnEventForShutdown
	LMgnEventForLastLogID
)

type LogStorage

type LogStorage interface {
	//GetFirstLogIndex
	GetFirstLogIndex() int64

	//GetLastLogIndex
	GetLastLogIndex() int64

	//GetEntry
	GetEntry(index int64) *entity.LogEntry

	//GetTerm
	GetTerm(index int64) int64

	//AppendEntry
	AppendEntry(entry *entity.LogEntry) (bool, error)

	//AppendEntries 将多个 LogEntry 刷入到磁盘存储中去
	AppendEntries(entries []*entity.LogEntry) (int, error)

	//TruncatePrefix 清理 firstIndexKept 之前的所有数据
	TruncatePrefix(firstIndexKept int64) bool

	//TruncateSuffix 清理 lastIndexKept 之后的所有数据
	TruncateSuffix(lastIndexKept int64) bool

	//Rest always use when install snapshot from leader, will clear all exits log and set nextLogIndex
	Rest(nextLogIndex int64) (bool, error)

	//Shutdown 关闭
	Shutdown() error
}

LogStorage 实际的日志存储者

func NewLogStorage

func NewLogStorage(storageType LogStorageType, options ...LogStorageOptions) (LogStorage, error)

type LogStorageOption

type LogStorageOption struct {
	KvDir    string //数据目录
	WALDir   string //WAL目录
	WriteOpt struct {
		Sync bool
	}
	ConfMgn     *entity.ConfigurationManager //entity.ConfigurationManager
	ExtendParam map[string]string            //额外的配置信息,和具体的存储实现有关
}

type LogStorageOptions

type LogStorageOptions func(opt *LogStorageOption)

type LogStorageType

type LogStorageType int16
const (
	PebbleLogStorageType LogStorageType = iota
	SimpleStorageType
)

type NewLogCallback

type NewLogCallback interface {
	// OnNewLog
	OnNewLog(arg *Replicator, errCode entity.RaftErrorCode)
}

NewLogCallback

type Node

type Node interface {
	GetLeaderID() entity.PeerId

	GetNodeID() entity.NodeId

	GetGroupID() string

	GetOptions() NodeOptions

	GetRaftOptions() RaftOption

	IsLeader() bool

	Shutdown(done Closure)

	Join()

	Apply(task *Task) error

	ReadIndex(reqCtx []byte, done *ReadIndexClosure) error

	ListPeers() ([]entity.PeerId, error)

	ListAlivePeers() ([]entity.PeerId, error)

	ListLearners() ([]entity.PeerId, error)

	ListAliveLearners() ([]entity.PeerId, error)

	AddPeer(peer entity.PeerId, done Closure)

	RemovePeer(peer entity.PeerId, done Closure)

	ChangePeers(newConf *entity.Configuration, done Closure)

	ResetPeers(newConf *entity.Configuration) entity.Status

	AddLearners(learners []entity.PeerId, done Closure)

	RemoveLearners(learners []entity.PeerId, done Closure)

	ResetLearners(learners []entity.PeerId, done Closure)

	Snapshot(done Closure)

	ResetElectionTimeoutMs(electionTimeoutMs int32)

	TransferLeadershipTo(peer entity.PeerId) entity.Status

	AddReplicatorStateListener(replicatorStateListener ReplicatorStateListener)

	RemoveReplicatorStateListener(replicatorStateListener ReplicatorStateListener)

	ClearReplicatorStateListeners()

	GetReplicatorStatueListeners() []ReplicatorStateListener

	GetNodeTargetPriority() int32
}

type NodeOptions

type NodeOptions struct {
	ElectionTimeoutMs    int64
	ElectionMaxDelayMs   int64
	ElectionPriority     entity.ElectionPriority
	DecayPriorityGap     int32
	LeaderLeaseTimeRatio int32
	SnapshotIntervalSecs int32
	// 做 snapshot 时,判断上次的 snapshot 的 raft log index 与最新的 raft log applied index 的距离,在一定距离之内不做 snapshot
	SnapshotLogIndexMargin int32
	// 只有当 SnapshotLogIndexMargin 存在正确的值时才能生效
	MaxSkipSnapshotTimes     int16
	CatchupMargin            int32
	InitialConf              *entity.Configuration
	Fsm                      StateMachine
	LogURI                   string
	RaftMetaURI              string
	SnapshotURI              string
	FilterBeforeCopyRemote   bool
	DisableCli               bool
	SharedTimerPool          bool
	CliRpcGoroutinePoolSize  int32
	RaftRpcGoroutinePoolSize int32
	EnableMetrics            bool
	SnapshotThrottle         SnapshotThrottle
}

func NewDefaultNodeOptions

func NewDefaultNodeOptions() NodeOptions

type NodeState

type NodeState int
const (
	// It's a leader
	StateLeader NodeState = iota
	// It's transferring leadership
	StateTransferring
	// It's a candidate
	StateCandidate
	// It's a follower
	StateFollower
	// It's in error
	StateError
	// It's uninitialized
	StateUninitialized
	// It's shutting down
	StateShutting
	// It's shutdown already
	StateShutdown
	// State end
	StateEnd
)

func (NodeState) GetName

func (ns NodeState) GetName() string

func (NodeState) String

func (i NodeState) String() string

type OnCaughtUp

type OnCaughtUp struct {
	// contains filtered or unexported fields
}

OnCaughtUp 当 Follower 成功追上数据 or 未追上数据的操作

type OnErrorClosure

type OnErrorClosure struct {
	Err entity.RaftError
	F   func(status entity.Status)
}

func (*OnErrorClosure) Run

func (oec *OnErrorClosure) Run(status entity.Status)

type OnPreVoteRpcDone

type OnPreVoteRpcDone struct {
	RpcResponseClosure
	PeerId    entity.PeerId
	Term      int64
	StartTime time.Time
	Req       *raft.RequestVoteRequest
}

type OnRequestVoteRpcDone

type OnRequestVoteRpcDone struct {
	RpcResponseClosure
	PeerId    entity.PeerId
	Term      int64
	StartTime time.Time

	Req *raft.RequestVoteRequest
	// contains filtered or unexported fields
}

type RaftClientOperator

type RaftClientOperator struct {
	// contains filtered or unexported fields
}

RaftClient 的一些操作

func NewRaftClientOperator

func NewRaftClientOperator(nodeOpt *NodeOptions, raftClient *rpc.RaftClient, replicateGroup *ReplicatorGroup) *RaftClientOperator

func (*RaftClientOperator) AppendEntries

func (*RaftClientOperator) GetFile

func (rcop *RaftClientOperator) GetFile(endpoint entity.Endpoint, req *raft.GetFileRequest,
	done *RpcResponseClosure) mono.Mono

func (*RaftClientOperator) InstallSnapshot

func (*RaftClientOperator) PreVote

func (rcop *RaftClientOperator) PreVote(endpoint entity.Endpoint, req *raft.RequestVoteRequest,
	done *OnPreVoteRpcDone) mono.Mono

func (*RaftClientOperator) ReadIndex

func (*RaftClientOperator) RequestVote

func (rcop *RaftClientOperator) RequestVote(endpoint entity.Endpoint, req *raft.RequestVoteRequest,
	done *OnRequestVoteRpcDone) mono.Mono

func (*RaftClientOperator) TimeoutNow

type RaftMetaStorage

type RaftMetaStorage struct {
	// contains filtered or unexported fields
}

type RaftMetaStorageOption

type RaftMetaStorageOption struct {
	Path    string
	RaftOpt RaftOption
	Node    *nodeImpl
}

type RaftMetaStorageOptions

type RaftMetaStorageOptions func(opt *RaftMetaStorageOption)

type RaftNodeJobManager

type RaftNodeJobManager struct {
	// contains filtered or unexported fields
}

func NewRaftNodeJobManager

func NewRaftNodeJobManager(node *nodeImpl) *RaftNodeJobManager

type RaftOption

type RaftOption struct {
	// 是否开启每个 Log 都做一个 checksum 交验
	EnableLogEntryChecksum bool
	//
	StepDownWhenVoteTimeout   bool
	ReadOnlyOpt               ReadOnlyOption
	MaxReplicatorInflightMsgs int64
	MaxEntriesSize            int32
	MaxAppendBufferEntries    int32
	MaxBodySize               int32
	MaxByteCountPerRpc        int32
}

RaftOption raft 的相关配置参数

type ReadIndexClosure

type ReadIndexClosure struct {
	// contains filtered or unexported fields
}

func NewReadIndexClosure

func NewReadIndexClosure(f func(status entity.Status, index int64, reqCtx []byte), timeout time.Duration) *ReadIndexClosure

func (*ReadIndexClosure) Run

func (rc *ReadIndexClosure) Run(status entity.Status)

func (*ReadIndexClosure) SetResult

func (rc *ReadIndexClosure) SetResult(index int64, reqCtx []byte)

type ReadIndexEvent

type ReadIndexEvent struct {
	// contains filtered or unexported fields
}

func (*ReadIndexEvent) Name

func (re *ReadIndexEvent) Name() string

Topic of the event

func (*ReadIndexEvent) Sequence

func (re *ReadIndexEvent) Sequence() int64

The sequence number of the event

type ReadIndexEventSubscriber

type ReadIndexEventSubscriber struct {
	// contains filtered or unexported fields
}

func (*ReadIndexEventSubscriber) IgnoreExpireEvent

func (res *ReadIndexEventSubscriber) IgnoreExpireEvent() bool

func (*ReadIndexEventSubscriber) OnEvent

func (res *ReadIndexEventSubscriber) OnEvent(event utils.Event, endOfBatch bool)

func (*ReadIndexEventSubscriber) SubscribeType

func (res *ReadIndexEventSubscriber) SubscribeType() utils.Event

type ReadIndexResponseClosure

type ReadIndexResponseClosure struct {
	RpcResponseClosure
	// contains filtered or unexported fields
}

func NewReadIndexResponseClosure

func NewReadIndexResponseClosure(states []*ReadIndexState, req *raft.ReadIndexRequest) *ReadIndexResponseClosure

func (*ReadIndexResponseClosure) Run

func (rrc *ReadIndexResponseClosure) Run(status entity.Status)

type ReadIndexState

type ReadIndexState struct {
	Index int64

	Done *ReadIndexClosure
	// contains filtered or unexported fields
}

ReadIndexState 线性读的状态标识信息

func NewReadIndexState

func NewReadIndexState(reqCtx []byte, done *ReadIndexClosure, startTime time.Time) *ReadIndexState

type ReadIndexStatus

type ReadIndexStatus struct {
	States []*ReadIndexState
	Req    *raft.ReadIndexRequest
	Index  int64
}

func (*ReadIndexStatus) IsApplied

func (ris *ReadIndexStatus) IsApplied(appliedIndex int64) bool

type ReadOnlyOperator

type ReadOnlyOperator struct {
	// contains filtered or unexported fields
}

func (*ReadOnlyOperator) OnApplied

func (rop *ReadOnlyOperator) OnApplied(lastAppliedLogIndex int64)

OnApplied 监听当前状态机已经将哪一些 core.LogEntry 给 apply 成功了, 这里传入了当前最新的, appliedLogIndex

type ReadOnlyOption

type ReadOnlyOption string
const (
	// ReadOnlyLeaseBased 根据本地时钟租期判断是否可以直接处理
	ReadOnlyLeaseBased ReadOnlyOption = "ReadOnlyLeaseBased"
	// 安全的一致性线性读操作,leader 去跟每个 follower 去确认自己是否是真正的 leader
	ReadOnlySafe ReadOnlyOption = "ReadOnlySafe"
)

type RemoteFileCopierOption

type RemoteFileCopierOption struct {
	Uri              string
	SnapshotThrottle SnapshotThrottle
	Opts             SnapshotCopierOptions
}

type RemoteFileCopierOptions

type RemoteFileCopierOptions func(opt *RemoteFileCopierOption)

type Replicator

type Replicator struct {
	// contains filtered or unexported fields
}

Replicator 这个对象本身,是一个临界资源,log的发送要是竞争的

type ReplicatorEvent

type ReplicatorEvent int
const (
	ReplicatorCreatedEvent ReplicatorEvent = iota
	ReplicatorErrorEvent
	ReplicatorDestroyedEvent
)

type ReplicatorGroup

type ReplicatorGroup struct {
	// contains filtered or unexported fields
}

type ReplicatorState

type ReplicatorState int32
const (
	ReplicatorProbe ReplicatorState = iota
	ReplicatorSnapshot
	ReplicatorReplicate
	ReplicatorDestroyed
)

type ReplicatorStateListener

type ReplicatorStateListener interface {
	// OnCreate 创建出一个复制者
	OnCreate(peer entity.PeerId)
	// OnError 某个复制者出现错误
	OnError(peer entity.PeerId, st entity.Status)
	// OnDestroyed 某个复制者被销毁
	OnDestroyed(peer entity.PeerId)
}

ReplicatorStateListener 复制者状态变化的监听器

type ReplicatorType

type ReplicatorType string
const (
	ReplicatorFollower ReplicatorType = "Follower" // 可以参与投票的 Follower
	ReplicatorLearner  ReplicatorType = "Learner"  // 仅仅参与日志复制的 Learner,可以用来当作容灾的节点或者只读节点
)

func (ReplicatorType) IsFollower

func (rt ReplicatorType) IsFollower() bool

func (ReplicatorType) IsLearner

func (rt ReplicatorType) IsLearner() bool

type RequestType

type RequestType int
const (
	RequestTypeForSnapshot RequestType = iota
	RequestTypeForAppendEntries
)

type RequestVoteResponseClosure

type RequestVoteResponseClosure struct {
	RpcResponseClosure
}

type RpcOptions

type RpcOptions struct {
	RpcConnectTimeoutMs        int32
	RpcDefaultTimeout          int32
	RpcInstallSnapshotTimeout  int32
	RpcProcessorThreadPoolSize int32
	EnableRpcChecksum          bool
}

func NewDefaultRpcOptions

func NewDefaultRpcOptions() RpcOptions

type RpcRequestClosure

type RpcRequestClosure struct {
	F func(status entity.Status)
	// contains filtered or unexported fields
}

func NewRpcRequestClosure

func NewRpcRequestClosure(rpcCtx rpc.RpcContext) *RpcRequestClosure

func NewRpcRequestClosureWithDefaultResp

func NewRpcRequestClosureWithDefaultResp(rpcCtx rpc.RpcContext, defaultResp *api.ServerResponse) *RpcRequestClosure

func (*RpcRequestClosure) GetRpcCtx

func (rrc *RpcRequestClosure) GetRpcCtx() rpc.RpcContext

func (*RpcRequestClosure) Run

func (rrc *RpcRequestClosure) Run(status entity.Status)

func (*RpcRequestClosure) SendResponse

func (rrc *RpcRequestClosure) SendResponse(msg proto.Message)

type RpcResponseClosure

type RpcResponseClosure struct {
	// Resp receive response
	Resp proto.Message
	// F callback function
	F func(resp proto.Message, status entity.Status)
}

RpcResponseClosure deal rpc response closure

func (*RpcResponseClosure) Run

func (rrc *RpcResponseClosure) Run(status entity.Status)

type RunningState

type RunningState int
const (
	RunningStateForIdle RunningState = iota
	RunningStateForBlocking
	RunningStateForAppendingEntries
	RunningStateForInstallingSnapshot
)

type SaveSnapshotClosure

type SaveSnapshotClosure interface {
	Closure
	// Start 开启快照保存操作,这里创建出一个 writer,用于持久化文件信息
	Start(meta *raft.SnapshotMeta) SnapshotWriter
}

SaveSnapshotClosure 保存快照时的回调

type Snapshot

type Snapshot interface {
	GetPath() string

	ListFiles() []string

	GetFileMeta(fileName string) proto.Message

	SetError(code entity.RaftErrorCode, temp string, args ...interface{})

	GetStatus() entity.Status
}

Snapshot 快照的定义

type SnapshotCopier

type SnapshotCopier interface {
	io.Closer
	Cancel()

	Join()

	Start()

	GetReader() SnapshotReader

	GetStatus() entity.Status
}

SnapshotCopier 负责快照的拷贝工作

type SnapshotCopierOptions

type SnapshotCopierOptions struct {
	// contains filtered or unexported fields
}

type SnapshotExecutor

type SnapshotExecutor struct {
	// contains filtered or unexported fields
}

SnapshotExecutor 快照执行器

func (*SnapshotExecutor) DoSnapshot

func (se *SnapshotExecutor) DoSnapshot(done Closure)

DoSnapshot 执行创建一次 RaftSnapshot 操作

func (*SnapshotExecutor) GetNode

func (se *SnapshotExecutor) GetNode() *nodeImpl

func (*SnapshotExecutor) GetSnapshotStorage

func (se *SnapshotExecutor) GetSnapshotStorage() *LocalSnapshotStorage

func (*SnapshotExecutor) Init

func (se *SnapshotExecutor) Init(arg interface{}) (bool, error)

func (*SnapshotExecutor) InstallSnapshot

func (se *SnapshotExecutor) InstallSnapshot(req *raft.InstallSnapshotRequest, done *RpcRequestClosure)

InstallSnapshot 安装一个快照

func (*SnapshotExecutor) IsInstallingSnapshot

func (se *SnapshotExecutor) IsInstallingSnapshot() bool

func (*SnapshotExecutor) Join

func (se *SnapshotExecutor) Join()

Join 等待所有的任务执行完成

func (*SnapshotExecutor) Shutdown

func (se *SnapshotExecutor) Shutdown()

type SnapshotExecutorOption

type SnapshotExecutorOption struct {
	Uri       string
	FsmCaller fsmCaller

	Addr                   entity.Endpoint
	FilterBeforeCopyRemote bool
	SnapshotThrottle       SnapshotThrottle
	// contains filtered or unexported fields
}

快照执行器的相关配置信息

type SnapshotExecutorOptions

type SnapshotExecutorOptions func(opt *SnapshotExecutorOption)

type SnapshotFileReader

type SnapshotFileReader struct {
	// contains filtered or unexported fields
}

func NewSnapshotFileReader

func NewSnapshotFileReader(path string, throttle SnapshotThrottle) *SnapshotFileReader

func (*SnapshotFileReader) GetLocalSnapshotMetaTable

func (sfr *SnapshotFileReader) GetLocalSnapshotMetaTable() *LocalSnapshotMetaTable

func (*SnapshotFileReader) GetPath

func (sfr *SnapshotFileReader) GetPath() string

func (*SnapshotFileReader) Open

func (sfr *SnapshotFileReader) Open() bool

func (*SnapshotFileReader) ReadFile

func (sfr *SnapshotFileReader) ReadFile(buf *bytes.Buffer, fileName string, offset, maxCount int64) (int64, error)

ReadFile 根据要求读取文件

func (*SnapshotFileReader) SetLocalSnapshotMetaTable

func (sfr *SnapshotFileReader) SetLocalSnapshotMetaTable(memtable *LocalSnapshotMetaTable)

type SnapshotJob

type SnapshotJob struct {
	// contains filtered or unexported fields
}

type SnapshotReader

type SnapshotReader interface {
	Snapshot

	Status() entity.Status

	Load() *raft.SnapshotMeta

	GenerateURIForCopy() string

	io.Closer
}

type SnapshotStorageOption

type SnapshotStorageOption struct {
	Uri     string
	RaftOpt RaftOption
}

type SnapshotStorageOptions

type SnapshotStorageOptions func(opt *SnapshotStorageOption)

type SnapshotThrottle

type SnapshotThrottle interface {
	//ThrottledByThroughput 计算下次可以发送多少数据
	ThrottledByThroughput(bytes int64) int64
}

SnapshotThrottle 快照传输的限流计算器

type SnapshotWriter

type SnapshotWriter interface {
	SaveMeta(meta raft.SnapshotMeta) bool

	AddFile(fileName string, meta proto.Message) bool

	RemoveFile(fileName string)

	Close(keepDataOnError bool)

	SetError(code entity.RaftErrorCode, temp string, args ...interface{})

	GetPath() string

	GetStatus() entity.Status
}

SnapshotWriter 负责快照的写入操作

type StableClosure

type StableClosure struct {
	// contains filtered or unexported fields
}

StableClosure

func NewStableClosure

func NewStableClosure(entries []*entity.LogEntry, f func(status entity.Status)) *StableClosure

func (*StableClosure) Run

func (sc *StableClosure) Run(status entity.Status)

type StableClosureEvent

type StableClosureEvent struct {
	// contains filtered or unexported fields
}

func (*StableClosureEvent) Name

func (sce *StableClosureEvent) Name() string

func (*StableClosureEvent) Sequence

func (sce *StableClosureEvent) Sequence() int64

type Stage

type Stage int16
const (
	StageNone Stage = iota
	StageCatchingUp
	StageJoint
	StageStable
)

type Stat

type Stat struct {
	// contains filtered or unexported fields
}

type StateMachine

type StateMachine interface {
	//OnApply 用户状态机进行批量的 apply 所有已经处于 committed 状态的 RaftLog
	OnApply(iterator Iterator)
	// OnShutdown 当状态机关闭时的回调
	OnShutdown()
	// OnSnapshotSave 用户需要在这个回调上实现快照的处理,这里的快照处理可以异步处理
	OnSnapshotSave(writer SnapshotWriter, done Closure)
	// OnSnapshotLoad 用户需要在这个回调上实现快照的 load 操作,这里不要进行异步化操作
	OnSnapshotLoad(reader SnapshotReader) bool
	// OnLeaderStart 当自己作为 leader 启动时的回调,告知当前最新的任期信息
	OnLeaderStart(term int64)
	// OnLeaderStop 当自己不再作为 leader 时的回调
	OnLeaderStop(status entity.Status)
	// OnError 状态机出现不可挽回的错误时的回调
	OnError(e entity.RaftError)
	// OnConfigurationCommitted 当集群配置被确认之后的回调
	OnConfigurationCommitted(conf *entity.Configuration)
	// OnStopFollowing
	OnStopFollowing(ctx entity.LeaderChangeContext)
	// OnStartFollowing
	OnStartFollowing(ctx entity.LeaderChangeContext)
}

StateMachine

type StepDownJob

type StepDownJob struct {
	// contains filtered or unexported fields
}

StepDownJob 定时任务,只针对于 Leader 节点可以进行,主要是检查当前 RaftGroup 成员组内每个节点的健康情况,其主要还是通 过 Replicator 针对每一个节点的 Request-Response 的响应时间

type SynchronizedClosure

type SynchronizedClosure struct {
	// contains filtered or unexported fields
}

func NewSynchronizedClosure

func NewSynchronizedClosure(cnt int) *SynchronizedClosure

func (*SynchronizedClosure) Await

func (sc *SynchronizedClosure) Await() entity.Status

func (*SynchronizedClosure) GetStatus

func (sc *SynchronizedClosure) GetStatus() entity.Status

func (*SynchronizedClosure) Rest

func (sc *SynchronizedClosure) Rest()

func (*SynchronizedClosure) Run

func (sc *SynchronizedClosure) Run(status entity.Status)

type Task

type Task struct {
	Done       Closure
	ExpectTerm int64
	Data       []byte
}

type TaskClosure

type TaskClosure interface {
	Closure
	// OnCommitted after committed and before log applied
	OnCommitted()
}

TaskClosure 具体状态机的回调钩子

type TaskType

type TaskType int
const (
	TaskIdle TaskType = iota
	TaskCommitted
	TaskSnapshotSave
	TaskSnapshotLoad
	TaskLeaderStop
	TaskLeaderStart
	TaskStartFollowing
	TaskStopFollowing
	TaskShutdown
	TaskFlush
	TaskError
)

type ThroughputSnapshotThrottle

type ThroughputSnapshotThrottle struct {
	// contains filtered or unexported fields
}

func (*ThroughputSnapshotThrottle) ThrottledByThroughput

func (t *ThroughputSnapshotThrottle) ThrottledByThroughput(bytes int64) int64

ThrottledByThroughput 计算下次可以发送多少数据

type TimeoutNowResponseClosure

type TimeoutNowResponseClosure struct {
	RpcResponseClosure
}

type VoteJob

type VoteJob struct {
	// contains filtered or unexported fields
}

VoteJob 发起投票将自己提升为 Leader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL