Documentation ¶
Index ¶
- Constants
- Variables
- func IsNodeActive(state NodeState) bool
- func NewReadIndexHeartbeatResponseClosure(done *ReadIndexResponseClosure, readIndexResp *raft.ReadIndexResponse, ...) *readIndexHeartbeatResponseClosure
- type AppendEntriesResponseClosure
- type ApplyTask
- type BallotBox
- func (bx *BallotBox) AppendPendingTask(conf, oldConf *entity.Configuration, done Closure) bool
- func (bx *BallotBox) ClearPendingTasks()
- func (bx *BallotBox) CommitAt(firstLogIndex, lastLogIndex int64, peer entity.PeerId) bool
- func (bx *BallotBox) GetLastCommittedIndex() int64
- func (bx *BallotBox) GetPendingIndex() int64
- func (bx *BallotBox) GetPendingMetaQueue() *utils.SegmentList
- func (bx *BallotBox) Init(opt *BallotBoxOptions)
- func (bx *BallotBox) RestPendingIndex(newPendingIndex int64) bool
- func (bx *BallotBox) SetLastCommittedIndex(lastCommittedIndex int64) (bool, error)
- func (bx *BallotBox) Shutdown()
- type BallotBoxOptions
- type CatchUpClosure
- type CliService
- func (cli *CliService) AddLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) AddPeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) ChangePeer(groupId string, oldConf, newConf *entity.Configuration) entity.Status
- func (cli *CliService) GetAliveLearners(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
- func (cli *CliService) GetAlivePeers(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
- func (cli *CliService) GetLeader(groupId string, leaderId *entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) GetLearners(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
- func (cli *CliService) GetPeers(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
- func (cli *CliService) ReBalance(groupIds []string, balanceLeaderIds map[string]*entity.PeerId, ...) []*entity.PeerId
- func (cli *CliService) RemoveLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) RemovePeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) ResetLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) ResetPeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
- func (cli *CliService) Snapshot(groupId string, peerId *entity.PeerId) entity.Status
- func (cli *CliService) TransferLeader(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
- type Closure
- type ClosureQueue
- func (cq *ClosureQueue) AppendPendingClosure(closure Closure)
- func (cq *ClosureQueue) Clear()
- func (cq *ClosureQueue) GetFirstIndex() int64
- func (cq *ClosureQueue) GetQueue() list.List
- func (cq *ClosureQueue) IsEmpty() bool
- func (cq *ClosureQueue) PopClosureUntil(endIndex int64, closures []Closure, tasks []TaskClosure) int64
- func (cq *ClosureQueue) RestFirstIndex(firstIndex int64) bool
- type ConfigurationChangeClosure
- type ConfigurationCtx
- type CopySessionOption
- type CopySessionOptions
- type DownloadingSnapshot
- type ElectionJob
- type FSMCallerOptions
- type FileReader
- type FileService
- type FirstSnapshotLoadDone
- type GetFileResponseClosure
- type InFlight
- type InstallSnapshotResponseClosure
- type Iterator
- type IteratorImpl
- func (iti *IteratorImpl) Done() Closure
- func (iti *IteratorImpl) Entry() *entity.LogEntry
- func (iti *IteratorImpl) GetError() *entity.RaftError
- func (iti *IteratorImpl) GetIndex() int64
- func (iti *IteratorImpl) GetOrCreateError() *entity.RaftError
- func (iti *IteratorImpl) HasError() bool
- func (iti *IteratorImpl) IsGood() bool
- func (iti *IteratorImpl) Next()
- func (iti *IteratorImpl) RunTenRestClosureWithError()
- func (iti *IteratorImpl) SetErrorAndRollback(nTail int64, st *entity.Status) error
- type IteratorWrapper
- func (iw *IteratorWrapper) Done() Closure
- func (iw *IteratorWrapper) GetData() []byte
- func (iw *IteratorWrapper) GetIndex() int64
- func (iw *IteratorWrapper) GetTerm() int64
- func (iw *IteratorWrapper) HasNext() bool
- func (iw *IteratorWrapper) Next() []byte
- func (iw *IteratorWrapper) SetErrorAndRollback(nTail int64, st entity.Status) error
- type JobSwitch
- type JobType
- type LastAppliedLogIndexListener
- type LastLogIndexListener
- type LoadSnapshotClosure
- type LocalDirReader
- type LocalSnapshot
- type LocalSnapshotCopier
- func (copier *LocalSnapshotCopier) Cancel()
- func (copier *LocalSnapshotCopier) Close() error
- func (copier *LocalSnapshotCopier) GetReader() SnapshotReader
- func (copier *LocalSnapshotCopier) GetStatus() entity.Status
- func (copier *LocalSnapshotCopier) Init(uri string, opts SnapshotCopierOptions) error
- func (copier *LocalSnapshotCopier) Join()
- func (copier *LocalSnapshotCopier) SetError(code entity.RaftErrorCode, temp string, args ...interface{})
- func (copier *LocalSnapshotCopier) Start()
- type LocalSnapshotMetaTable
- func (metaT *LocalSnapshotMetaTable) AddFile(filename string, meta *raft.LocalFileMeta) bool
- func (metaT *LocalSnapshotMetaTable) GetFileMeta(filename string) *raft.LocalFileMeta
- func (metaT *LocalSnapshotMetaTable) GetMeta() *raft.SnapshotMeta
- func (metaT *LocalSnapshotMetaTable) ListFiles() []string
- type LocalSnapshotReader
- func (lsr *LocalSnapshotReader) Close() error
- func (lsr *LocalSnapshotReader) DestorySelf()
- func (lsr *LocalSnapshotReader) GenerateURIForCopy() string
- func (lsr *LocalSnapshotReader) GetFileMeta(fileName string) proto.Message
- func (lsr *LocalSnapshotReader) GetPath() string
- func (lsr *LocalSnapshotReader) GetSnapshotIndex() (int64, error)
- func (lsr *LocalSnapshotReader) GetStatus() entity.Status
- func (lsr *LocalSnapshotReader) Init(arg interface{}) error
- func (lsr *LocalSnapshotReader) ListFiles() []string
- func (lsr *LocalSnapshotReader) Load() *raft.SnapshotMeta
- func (lsr *LocalSnapshotReader) SetError(code entity.RaftErrorCode, temp string, args ...interface{})
- func (lsr *LocalSnapshotReader) Status() entity.Status
- type LocalSnapshotStorage
- func (storage *LocalSnapshotStorage) Close(writer *LocalSnapshotWriter, keepDataOnError bool)
- func (storage *LocalSnapshotStorage) CopyFrom(uri string, opts SnapshotCopierOptions) SnapshotReader
- func (storage *LocalSnapshotStorage) Create(fromEmpty bool) SnapshotWriter
- func (storage *LocalSnapshotStorage) CreateFromEmpty() SnapshotWriter
- func (storage *LocalSnapshotStorage) GetSnapshotPath(index int64) string
- func (storage *LocalSnapshotStorage) Init(arg interface{}) error
- func (storage *LocalSnapshotStorage) Open() SnapshotReader
- func (storage *LocalSnapshotStorage) SetFilterBeforeCopyRemote()
- func (storage *LocalSnapshotStorage) Shutdown()
- type LocalSnapshotWriter
- func (lsw *LocalSnapshotWriter) AddFile(fileName string, meta proto.Message) bool
- func (lsw *LocalSnapshotWriter) Close(keepDataOnError bool)
- func (lsw *LocalSnapshotWriter) GetPath() string
- func (lsw *LocalSnapshotWriter) GetSnapshotIndex() int64
- func (lsw *LocalSnapshotWriter) GetStatus() entity.Status
- func (lsw *LocalSnapshotWriter) Init(arg interface{}) error
- func (lsw *LocalSnapshotWriter) RemoveFile(fileName string)
- func (lsw *LocalSnapshotWriter) SaveMeta(meta raft.SnapshotMeta) bool
- func (lsw *LocalSnapshotWriter) SetError(code entity.RaftErrorCode, temp string, args ...interface{})
- func (lsw *LocalSnapshotWriter) Shutdown()
- func (copier *LocalSnapshotWriter) Sync() bool
- type LogEntryAndClosure
- type LogManager
- type LogManagerOption
- type LogManagerOptions
- type LogMgnEventType
- type LogStorage
- type LogStorageOption
- type LogStorageOptions
- type LogStorageType
- type NewLogCallback
- type Node
- type NodeOptions
- type NodeState
- type OnCaughtUp
- type OnErrorClosure
- type OnPreVoteRpcDone
- type OnRequestVoteRpcDone
- type RaftClientOperator
- func (rcop *RaftClientOperator) AppendEntries(endpoint entity.Endpoint, req *raft.AppendEntriesRequest, ...) mono.Mono
- func (rcop *RaftClientOperator) GetFile(endpoint entity.Endpoint, req *raft.GetFileRequest, done *RpcResponseClosure) mono.Mono
- func (rcop *RaftClientOperator) InstallSnapshot(endpoint entity.Endpoint, req *raft.InstallSnapshotRequest, ...) mono.Mono
- func (rcop *RaftClientOperator) PreVote(endpoint entity.Endpoint, req *raft.RequestVoteRequest, done *OnPreVoteRpcDone) mono.Mono
- func (rcop *RaftClientOperator) ReadIndex(endpoint entity.Endpoint, req *raft.ReadIndexRequest, ...) mono.Mono
- func (rcop *RaftClientOperator) RequestVote(endpoint entity.Endpoint, req *raft.RequestVoteRequest, ...) mono.Mono
- func (rcop *RaftClientOperator) TimeoutNow(endpoint entity.Endpoint, req *raft.TimeoutNowRequest, ...) mono.Mono
- type RaftMetaStorage
- type RaftMetaStorageOption
- type RaftMetaStorageOptions
- type RaftNodeJobManager
- type RaftOption
- type ReadIndexClosure
- type ReadIndexEvent
- type ReadIndexEventSubscriber
- type ReadIndexResponseClosure
- type ReadIndexState
- type ReadIndexStatus
- type ReadOnlyOperator
- type ReadOnlyOption
- type RemoteFileCopierOption
- type RemoteFileCopierOptions
- type Replicator
- type ReplicatorEvent
- type ReplicatorGroup
- type ReplicatorState
- type ReplicatorStateListener
- type ReplicatorType
- type RequestType
- type RequestVoteResponseClosure
- type RpcOptions
- type RpcRequestClosure
- type RpcResponseClosure
- type RunningState
- type SaveSnapshotClosure
- type Snapshot
- type SnapshotCopier
- type SnapshotCopierOptions
- type SnapshotExecutor
- func (se *SnapshotExecutor) DoSnapshot(done Closure)
- func (se *SnapshotExecutor) GetNode() *nodeImpl
- func (se *SnapshotExecutor) GetSnapshotStorage() *LocalSnapshotStorage
- func (se *SnapshotExecutor) Init(arg interface{}) (bool, error)
- func (se *SnapshotExecutor) InstallSnapshot(req *raft.InstallSnapshotRequest, done *RpcRequestClosure)
- func (se *SnapshotExecutor) IsInstallingSnapshot() bool
- func (se *SnapshotExecutor) Join()
- func (se *SnapshotExecutor) Shutdown()
- type SnapshotExecutorOption
- type SnapshotExecutorOptions
- type SnapshotFileReader
- func (sfr *SnapshotFileReader) GetLocalSnapshotMetaTable() *LocalSnapshotMetaTable
- func (sfr *SnapshotFileReader) GetPath() string
- func (sfr *SnapshotFileReader) Open() bool
- func (sfr *SnapshotFileReader) ReadFile(buf *bytes.Buffer, fileName string, offset, maxCount int64) (int64, error)
- func (sfr *SnapshotFileReader) SetLocalSnapshotMetaTable(memtable *LocalSnapshotMetaTable)
- type SnapshotJob
- type SnapshotReader
- type SnapshotStorageOption
- type SnapshotStorageOptions
- type SnapshotThrottle
- type SnapshotWriter
- type StableClosure
- type StableClosureEvent
- type Stage
- type Stat
- type StateMachine
- type StepDownJob
- type SynchronizedClosure
- type Task
- type TaskClosure
- type TaskType
- type ThroughputSnapshotThrottle
- type TimeoutNowResponseClosure
- type VoteJob
Constants ¶
const ( PENDING = iota COMPLETE TIMEOUT InvalidLogIndex = math.MinInt32 )
const ( RpcPending int32 = iota RpcRespond )
const ( RemoteSnapshotURISchema string = "remote://" SnapshotTmpPath string = "temp" InvalidSnapshotIndex int64 = math.MinInt32 )
const ( ErrSetLastCommittedIndex = "node changes to leader, pendingIndex=%d, param lastCommittedIndex=%d" ErrAppendPendingTask = "fail to appendingTask, pendingIndex=%d" )
const ( ConfPreFix string = "conf_" AppendLogRetryTimes = 50 StablePBMetaFileName = "stable_pb_meta" )
const (
ErrRollbackMsg = "StateMachine meet critical error when applying one or more tasks since index=%d, %s"
)
const (
InvalidReaderIndex int64 = math.MinInt16
)
Variables ¶
var (
DefalutCopySessionOption = CopySessionOption{}
)
var ( // FirstLogIdxKey 第一个LogIndex的名称 FirstLogIdxKey = []byte("meta/firstLogIndex") )
Functions ¶
func NewReadIndexHeartbeatResponseClosure ¶
func NewReadIndexHeartbeatResponseClosure(done *ReadIndexResponseClosure, readIndexResp *raft.ReadIndexResponse, quorum, peerSize int32) *readIndexHeartbeatResponseClosure
NewReadIndexHeartbeatResponseClosure readIndexResp 不涉及网络传输,根据从 Leader 返回的 AppendEntriesResponse 信息决定 readIndexResp 的内容是什么
Types ¶
type AppendEntriesResponseClosure ¶
type AppendEntriesResponseClosure struct {
RpcResponseClosure
}
type ApplyTask ¶
type BallotBox ¶
type BallotBox struct {
// contains filtered or unexported fields
}
func (*BallotBox) AppendPendingTask ¶
func (bx *BallotBox) AppendPendingTask(conf, oldConf *entity.Configuration, done Closure) bool
func (*BallotBox) ClearPendingTasks ¶
func (bx *BallotBox) ClearPendingTasks()
func (*BallotBox) GetLastCommittedIndex ¶
func (*BallotBox) GetPendingIndex ¶
func (*BallotBox) GetPendingMetaQueue ¶
func (bx *BallotBox) GetPendingMetaQueue() *utils.SegmentList
func (*BallotBox) Init ¶
func (bx *BallotBox) Init(opt *BallotBoxOptions)
func (*BallotBox) RestPendingIndex ¶
RestPendingIndex 重置待处理的 RaftLog 提案, 从[newPendingIndex, ) 都是需要新的 leader 去继续发起提交的
func (*BallotBox) SetLastCommittedIndex ¶
type BallotBoxOptions ¶
type BallotBoxOptions struct { Waiter fsmCaller ClosureQueue *ClosureQueue }
BallotBoxOptions 初始化投票器的配置信息
type CatchUpClosure ¶
type CatchUpClosure struct {
// contains filtered or unexported fields
}
func (*CatchUpClosure) GetFuture ¶
func (cuc *CatchUpClosure) GetFuture() common.Future
func (*CatchUpClosure) Run ¶
func (cuc *CatchUpClosure) Run(status entity.Status)
type CliService ¶
type CliService struct {
// contains filtered or unexported fields
}
func (*CliService) AddLearners ¶
func (cli *CliService) AddLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) AddPeer ¶
func (cli *CliService) AddPeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) ChangePeer ¶
func (cli *CliService) ChangePeer(groupId string, oldConf, newConf *entity.Configuration) entity.Status
func (*CliService) GetAliveLearners ¶
func (cli *CliService) GetAliveLearners(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
func (*CliService) GetAlivePeers ¶
func (cli *CliService) GetAlivePeers(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
func (*CliService) GetLeader ¶
func (cli *CliService) GetLeader(groupId string, leaderId *entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) GetLearners ¶
func (cli *CliService) GetLearners(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
func (*CliService) GetPeers ¶
func (cli *CliService) GetPeers(groupId string, peerId *entity.PeerId, conf *entity.Configuration) []*entity.PeerId
func (*CliService) ReBalance ¶
func (cli *CliService) ReBalance(groupIds []string, balanceLeaderIds map[string]*entity.PeerId, conf *entity.Configuration) []*entity.PeerId
func (*CliService) RemoveLearners ¶
func (cli *CliService) RemoveLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) RemovePeer ¶
func (cli *CliService) RemovePeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) ResetLearners ¶
func (cli *CliService) ResetLearners(groupId string, learners []*entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) ResetPeer ¶
func (cli *CliService) ResetPeer(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
func (*CliService) TransferLeader ¶
func (cli *CliService) TransferLeader(groupId string, peerId *entity.PeerId, conf *entity.Configuration) entity.Status
type ClosureQueue ¶
type ClosureQueue struct {
// contains filtered or unexported fields
}
func (*ClosureQueue) AppendPendingClosure ¶
func (cq *ClosureQueue) AppendPendingClosure(closure Closure)
func (*ClosureQueue) Clear ¶
func (cq *ClosureQueue) Clear()
func (*ClosureQueue) GetFirstIndex ¶
func (cq *ClosureQueue) GetFirstIndex() int64
func (*ClosureQueue) GetQueue ¶
func (cq *ClosureQueue) GetQueue() list.List
func (*ClosureQueue) IsEmpty ¶
func (cq *ClosureQueue) IsEmpty() bool
func (*ClosureQueue) PopClosureUntil ¶
func (cq *ClosureQueue) PopClosureUntil(endIndex int64, closures []Closure, tasks []TaskClosure) int64
PopClosureUntil 弹出 Closure 对象,直到 index == endIndex 时候停止继续弹出
func (*ClosureQueue) RestFirstIndex ¶
func (cq *ClosureQueue) RestFirstIndex(firstIndex int64) bool
RestFirstIndex
type ConfigurationChangeClosure ¶
type ConfigurationChangeClosure struct {
// contains filtered or unexported fields
}
ConfigurationChangeClosure 集群成员变更完成之后的回调
func (*ConfigurationChangeClosure) Run ¶
func (ccc *ConfigurationChangeClosure) Run(st entity.Status)
type ConfigurationCtx ¶
type ConfigurationCtx struct {
// contains filtered or unexported fields
}
func NewConfigurationCtx ¶
func NewConfigurationCtx(node *nodeImpl) *ConfigurationCtx
func (*ConfigurationCtx) IsBusy ¶
func (cc *ConfigurationCtx) IsBusy() bool
func (*ConfigurationCtx) Reset ¶
func (cc *ConfigurationCtx) Reset(st entity.Status)
func (*ConfigurationCtx) Start ¶
func (cc *ConfigurationCtx) Start(newConf, oldConf *entity.Configuration, done Closure)
Start 执行根据配置进行启动
type CopySessionOption ¶
type CopySessionOption struct { RpcService *RaftClientOperator SnapshotThrottle SnapshotThrottle RaftOpt RaftOption Endpoint entity.Endpoint // contains filtered or unexported fields }
type CopySessionOptions ¶
type CopySessionOptions func(opt *CopySessionOption)
type DownloadingSnapshot ¶
type DownloadingSnapshot struct { Request *raft.InstallSnapshotRequest Resp *raft.InstallSnapshotResponse RequestDone *RpcRequestClosure }
func NewDownloadingSnapshot ¶
func NewDownloadingSnapshot(req *raft.InstallSnapshotRequest, done *RpcRequestClosure) *DownloadingSnapshot
创建一个快照下载的任务信息
type ElectionJob ¶
type ElectionJob struct {
// contains filtered or unexported fields
}
type FSMCallerOptions ¶
type FSMCallerOptions struct { LogManager LogManager // 用户的状态机对象 FSM StateMachine AfterShutdown Closure BootstrapID *entity.LogId ClosureQueue *ClosureQueue Node *nodeImpl }
FSMCallerOptions 初始化状态机的配置参数信息
type FileReader ¶
type FileService ¶
type FileService struct {
// contains filtered or unexported fields
}
FileService FileReader 管理,每一个文件获取的请求,都对应着一个 FileReader 实例
var FileSvrInstance *FileService = &FileService{ fileReaderMap: &utils.ConcurrentMap{}, nextId: utils.NewAtomicInt64(), }
func (*FileService) AddReader ¶
func (fs *FileService) AddReader(reader FileReader) int64
AddReader 添加一个 FileReader 实例,并给他一个 ReaderId 编号用于查找
func (*FileService) HandleGetFile ¶
func (fs *FileService) HandleGetFile(req *protoM.GetFileRequest, closure *RpcRequestClosure) proto.Message
HandleGetFile 处理获取文件的请求操作
func (*FileService) RemoveReader ¶
func (fs *FileService) RemoveReader(readerId int64)
RemoveReader 移除一个 FileReader
type FirstSnapshotLoadDone ¶
type FirstSnapshotLoadDone struct {
// contains filtered or unexported fields
}
func (*FirstSnapshotLoadDone) Run ¶
func (fsl *FirstSnapshotLoadDone) Run(status entity.Status)
func (*FirstSnapshotLoadDone) Start ¶
func (fsl *FirstSnapshotLoadDone) Start() SnapshotReader
type GetFileResponseClosure ¶
type GetFileResponseClosure struct {
RpcResponseClosure
}
type InstallSnapshotResponseClosure ¶
type InstallSnapshotResponseClosure struct {
RpcResponseClosure
}
type IteratorImpl ¶
type IteratorImpl struct {
// contains filtered or unexported fields
}
func (*IteratorImpl) Done ¶
func (iti *IteratorImpl) Done() Closure
func (*IteratorImpl) Entry ¶
func (iti *IteratorImpl) Entry() *entity.LogEntry
func (*IteratorImpl) GetError ¶
func (iti *IteratorImpl) GetError() *entity.RaftError
func (*IteratorImpl) GetIndex ¶
func (iti *IteratorImpl) GetIndex() int64
func (*IteratorImpl) GetOrCreateError ¶
func (iti *IteratorImpl) GetOrCreateError() *entity.RaftError
func (*IteratorImpl) HasError ¶
func (iti *IteratorImpl) HasError() bool
func (*IteratorImpl) IsGood ¶
func (iti *IteratorImpl) IsGood() bool
IsGood 当前可以推进的位点小于需要到的 committed index 以及没有出现过错误
func (*IteratorImpl) Next ¶
func (iti *IteratorImpl) Next()
func (*IteratorImpl) RunTenRestClosureWithError ¶
func (iti *IteratorImpl) RunTenRestClosureWithError()
func (*IteratorImpl) SetErrorAndRollback ¶
func (iti *IteratorImpl) SetErrorAndRollback(nTail int64, st *entity.Status) error
type IteratorWrapper ¶
type IteratorWrapper struct {
// contains filtered or unexported fields
}
func NewIteratorWrapper ¶
func NewIteratorWrapper(impl *IteratorImpl) *IteratorWrapper
func (*IteratorWrapper) Done ¶
func (iw *IteratorWrapper) Done() Closure
func (*IteratorWrapper) GetData ¶
func (iw *IteratorWrapper) GetData() []byte
func (*IteratorWrapper) GetIndex ¶
func (iw *IteratorWrapper) GetIndex() int64
func (*IteratorWrapper) GetTerm ¶
func (iw *IteratorWrapper) GetTerm() int64
func (*IteratorWrapper) HasNext ¶
func (iw *IteratorWrapper) HasNext() bool
HasNext 只判断当前是否可以进行向前推进一个LogEntry以及当前是否是用户态的数据信息
func (*IteratorWrapper) Next ¶
func (iw *IteratorWrapper) Next() []byte
func (*IteratorWrapper) SetErrorAndRollback ¶
func (iw *IteratorWrapper) SetErrorAndRollback(nTail int64, st entity.Status) error
type LastAppliedLogIndexListener ¶
type LastAppliedLogIndexListener interface { //OnApplied 监听当前状态机已经将哪一些 core.LogEntry 给 apply 成功了, 这里传入了当前最新的, appliedLogIndex OnApplied(lastAppliedLogIndex int64) }
type LastLogIndexListener ¶
type LastLogIndexListener interface { //OnLastLogIndexChanged 当最新的LogIndex发生变化时的监听,这里不能panic error,所有的 error 必须自行 defer recover 处理 OnLastLogIndexChanged(lastLogIndex int64) }
LastLogIndexListener 监听最新的 RaftLog index
type LoadSnapshotClosure ¶
type LoadSnapshotClosure interface { Closure // Start 开始快照加载,返回一个快照文件的读取者 Start() SnapshotReader }
LoadSnapshotClosure 状态机的快照加载回调
type LocalDirReader ¶
type LocalDirReader struct {
// contains filtered or unexported fields
}
func (*LocalDirReader) GetPath ¶
func (ldr *LocalDirReader) GetPath() string
type LocalSnapshot ¶
type LocalSnapshot struct {
// contains filtered or unexported fields
}
LocalSnapshot 本地快照的描述对象
func (*LocalSnapshot) GetFileMeta ¶
func (ls *LocalSnapshot) GetFileMeta(fileName string) *raft.LocalFileMeta
func (*LocalSnapshot) GetPath ¶
func (ls *LocalSnapshot) GetPath() string
func (*LocalSnapshot) ListFiles ¶
func (ls *LocalSnapshot) ListFiles() []string
type LocalSnapshotCopier ¶
type LocalSnapshotCopier struct {
// contains filtered or unexported fields
}
LocalSnapshotCopier 本地快照文件的拷贝执行者
func (*LocalSnapshotCopier) Cancel ¶
func (copier *LocalSnapshotCopier) Cancel()
func (*LocalSnapshotCopier) Close ¶
func (copier *LocalSnapshotCopier) Close() error
func (*LocalSnapshotCopier) GetReader ¶
func (copier *LocalSnapshotCopier) GetReader() SnapshotReader
func (*LocalSnapshotCopier) GetStatus ¶
func (copier *LocalSnapshotCopier) GetStatus() entity.Status
func (*LocalSnapshotCopier) Init ¶
func (copier *LocalSnapshotCopier) Init(uri string, opts SnapshotCopierOptions) error
func (*LocalSnapshotCopier) Join ¶
func (copier *LocalSnapshotCopier) Join()
func (*LocalSnapshotCopier) SetError ¶
func (copier *LocalSnapshotCopier) SetError(code entity.RaftErrorCode, temp string, args ...interface{})
SetError 设置 entity.Status
func (*LocalSnapshotCopier) Start ¶
func (copier *LocalSnapshotCopier) Start()
type LocalSnapshotMetaTable ¶
type LocalSnapshotMetaTable struct {
// contains filtered or unexported fields
}
func NewLocalSnapshotMetaTable ¶
func NewLocalSnapshotMetaTable(raftOpt RaftOption) *LocalSnapshotMetaTable
NewLocalSnapshotMetaTable
func (*LocalSnapshotMetaTable) AddFile ¶
func (metaT *LocalSnapshotMetaTable) AddFile(filename string, meta *raft.LocalFileMeta) bool
AddFile 添加一个文件元数据信息描述
func (*LocalSnapshotMetaTable) GetFileMeta ¶
func (metaT *LocalSnapshotMetaTable) GetFileMeta(filename string) *raft.LocalFileMeta
GetFileMeta 根据文件名称获取本快照中的某一个文件的元信息数据
func (*LocalSnapshotMetaTable) GetMeta ¶
func (metaT *LocalSnapshotMetaTable) GetMeta() *raft.SnapshotMeta
func (*LocalSnapshotMetaTable) ListFiles ¶
func (metaT *LocalSnapshotMetaTable) ListFiles() []string
ListFiles 列出本快照的所有文件信息
type LocalSnapshotReader ¶
type LocalSnapshotReader struct {
// contains filtered or unexported fields
}
LocalSnapshotReader 本地快照读取者
func (*LocalSnapshotReader) Close ¶
func (lsr *LocalSnapshotReader) Close() error
func (*LocalSnapshotReader) DestorySelf ¶
func (lsr *LocalSnapshotReader) DestorySelf()
DestorySelf 销毁自己
func (*LocalSnapshotReader) GenerateURIForCopy ¶
func (lsr *LocalSnapshotReader) GenerateURIForCopy() string
GenerateURIForCopy 构建出一个 url 用于告诉 follower or learner 从哪里开始进行 snapshot 的数据拷贝
func (*LocalSnapshotReader) GetFileMeta ¶
func (lsr *LocalSnapshotReader) GetFileMeta(fileName string) proto.Message
func (*LocalSnapshotReader) GetPath ¶
func (lsr *LocalSnapshotReader) GetPath() string
func (*LocalSnapshotReader) GetSnapshotIndex ¶
func (lsr *LocalSnapshotReader) GetSnapshotIndex() (int64, error)
GetSnapshotIndex 获取快照的 Index 位点信息
func (*LocalSnapshotReader) GetStatus ¶
func (lsr *LocalSnapshotReader) GetStatus() entity.Status
func (*LocalSnapshotReader) Init ¶
func (lsr *LocalSnapshotReader) Init(arg interface{}) error
func (*LocalSnapshotReader) ListFiles ¶
func (lsr *LocalSnapshotReader) ListFiles() []string
ListFiles 列出当前的快照文件
func (*LocalSnapshotReader) Load ¶
func (lsr *LocalSnapshotReader) Load() *raft.SnapshotMeta
func (*LocalSnapshotReader) SetError ¶
func (lsr *LocalSnapshotReader) SetError(code entity.RaftErrorCode, temp string, args ...interface{})
func (*LocalSnapshotReader) Status ¶
func (lsr *LocalSnapshotReader) Status() entity.Status
type LocalSnapshotStorage ¶
type LocalSnapshotStorage struct {
// contains filtered or unexported fields
}
func (*LocalSnapshotStorage) Close ¶
func (storage *LocalSnapshotStorage) Close(writer *LocalSnapshotWriter, keepDataOnError bool)
func (*LocalSnapshotStorage) CopyFrom ¶
func (storage *LocalSnapshotStorage) CopyFrom(uri string, opts SnapshotCopierOptions) SnapshotReader
CopyFrom 从某一个地方开始拷贝 snapshot 数据
func (*LocalSnapshotStorage) Create ¶
func (storage *LocalSnapshotStorage) Create(fromEmpty bool) SnapshotWriter
Create 从本地快照管理者中创建出一个快照的写入执行者
func (*LocalSnapshotStorage) CreateFromEmpty ¶
func (storage *LocalSnapshotStorage) CreateFromEmpty() SnapshotWriter
func (*LocalSnapshotStorage) GetSnapshotPath ¶
func (storage *LocalSnapshotStorage) GetSnapshotPath(index int64) string
func (*LocalSnapshotStorage) Init ¶
func (storage *LocalSnapshotStorage) Init(arg interface{}) error
func (*LocalSnapshotStorage) Open ¶
func (storage *LocalSnapshotStorage) Open() SnapshotReader
Open 打开一最近的一个快照文件
func (*LocalSnapshotStorage) SetFilterBeforeCopyRemote ¶
func (storage *LocalSnapshotStorage) SetFilterBeforeCopyRemote()
func (*LocalSnapshotStorage) Shutdown ¶
func (storage *LocalSnapshotStorage) Shutdown()
type LocalSnapshotWriter ¶
type LocalSnapshotWriter struct {
// contains filtered or unexported fields
}
LocalSnapshotWriter 本地快照的写入操作者
func (*LocalSnapshotWriter) AddFile ¶
func (lsw *LocalSnapshotWriter) AddFile(fileName string, meta proto.Message) bool
func (*LocalSnapshotWriter) Close ¶
func (lsw *LocalSnapshotWriter) Close(keepDataOnError bool)
func (*LocalSnapshotWriter) GetPath ¶
func (lsw *LocalSnapshotWriter) GetPath() string
func (*LocalSnapshotWriter) GetSnapshotIndex ¶
func (lsw *LocalSnapshotWriter) GetSnapshotIndex() int64
func (*LocalSnapshotWriter) GetStatus ¶
func (lsw *LocalSnapshotWriter) GetStatus() entity.Status
func (*LocalSnapshotWriter) Init ¶
func (lsw *LocalSnapshotWriter) Init(arg interface{}) error
func (*LocalSnapshotWriter) RemoveFile ¶
func (lsw *LocalSnapshotWriter) RemoveFile(fileName string)
func (*LocalSnapshotWriter) SaveMeta ¶
func (lsw *LocalSnapshotWriter) SaveMeta(meta raft.SnapshotMeta) bool
func (*LocalSnapshotWriter) SetError ¶
func (lsw *LocalSnapshotWriter) SetError(code entity.RaftErrorCode, temp string, args ...interface{})
func (*LocalSnapshotWriter) Shutdown ¶
func (lsw *LocalSnapshotWriter) Shutdown()
type LogEntryAndClosure ¶
type LogEntryAndClosure struct { Entry *entity.LogEntry Done Closure ExpectedTerm int64 Latch *sync.WaitGroup }
func (*LogEntryAndClosure) Name ¶
func (lac *LogEntryAndClosure) Name() string
func (*LogEntryAndClosure) Reset ¶
func (lac *LogEntryAndClosure) Reset()
func (*LogEntryAndClosure) Sequence ¶
func (lac *LogEntryAndClosure) Sequence() int64
type LogManager ¶
type LogManager interface { utils.LifeCycle // AddLastLogIndexListener AddLastLogIndexListener(listener LastLogIndexListener) // RemoveLogIndexListener RemoveLogIndexListener(listener LastLogIndexListener) // AppendEntries 追加多个 RaftLog 日志条目 AppendEntries(entries []*entity.LogEntry, done *StableClosure) error // SetSnapshot SetSnapshot(meta *raft.SnapshotMeta) // ClearBufferedLogs ClearBufferedLogs() // GetEntry GetEntry(index int64) *entity.LogEntry // GetTerm GetTerm(index int64) int64 // GetFirstLogIndex GetFirstLogIndex() int64 // GetLastLogIndex GetLastLogIndex() int64 // GetLastLogID GetLastLogID(isFlush bool) entity.LogId // GetConfiguration GetConfiguration(index int64) (*entity.ConfigurationEntry, error) // CheckAndSetConfiguration CheckAndSetConfiguration(current *entity.ConfigurationEntry) *entity.ConfigurationEntry // Wait Wait(expectedLastLogIndex int64, cb NewLogCallback, replicator *Replicator) int64 // RemoveWaiter RemoveWaiter(id int64) bool // SetAppliedID SetAppliedID(appliedID entity.LogId) // CheckConsistency CheckConsistency() entity.Status }
LogManager Raft Log 日志管理器
type LogManagerOption ¶
type LogManagerOption struct {
MaxEventQueueSize int64
}
type LogManagerOptions ¶
type LogManagerOptions func(opt *LogManagerOption)
type LogMgnEventType ¶
type LogMgnEventType int32
const ( LMgnEventForOther LogMgnEventType = iota LMgnEventForReset LMgnEventForTruncatePrefix LMgnEventForTruncateSuffix LMgnEventForShutdown LMgnEventForLastLogID )
type LogStorage ¶
type LogStorage interface { //GetFirstLogIndex GetFirstLogIndex() int64 //GetLastLogIndex GetLastLogIndex() int64 //GetEntry GetEntry(index int64) *entity.LogEntry //GetTerm GetTerm(index int64) int64 //AppendEntry AppendEntry(entry *entity.LogEntry) (bool, error) //AppendEntries 将多个 LogEntry 刷入到磁盘存储中去 AppendEntries(entries []*entity.LogEntry) (int, error) //TruncatePrefix 清理 firstIndexKept 之前的所有数据 TruncatePrefix(firstIndexKept int64) bool //TruncateSuffix 清理 lastIndexKept 之后的所有数据 TruncateSuffix(lastIndexKept int64) bool //Rest always use when install snapshot from leader, will clear all exits log and set nextLogIndex Rest(nextLogIndex int64) (bool, error) //Shutdown 关闭 Shutdown() error }
LogStorage 实际的日志存储者
func NewLogStorage ¶
func NewLogStorage(storageType LogStorageType, options ...LogStorageOptions) (LogStorage, error)
type LogStorageOption ¶
type LogStorageOptions ¶
type LogStorageOptions func(opt *LogStorageOption)
type LogStorageType ¶
type LogStorageType int16
const ( PebbleLogStorageType LogStorageType = iota SimpleStorageType )
type NewLogCallback ¶
type NewLogCallback interface { // OnNewLog OnNewLog(arg *Replicator, errCode entity.RaftErrorCode) }
NewLogCallback
type Node ¶
type Node interface { GetLeaderID() entity.PeerId GetNodeID() entity.NodeId GetGroupID() string GetOptions() NodeOptions GetRaftOptions() RaftOption IsLeader() bool Shutdown(done Closure) Join() Apply(task *Task) error ReadIndex(reqCtx []byte, done *ReadIndexClosure) error ListPeers() ([]entity.PeerId, error) ListAlivePeers() ([]entity.PeerId, error) ListLearners() ([]entity.PeerId, error) ListAliveLearners() ([]entity.PeerId, error) AddPeer(peer entity.PeerId, done Closure) RemovePeer(peer entity.PeerId, done Closure) ChangePeers(newConf *entity.Configuration, done Closure) ResetPeers(newConf *entity.Configuration) entity.Status AddLearners(learners []entity.PeerId, done Closure) RemoveLearners(learners []entity.PeerId, done Closure) ResetLearners(learners []entity.PeerId, done Closure) Snapshot(done Closure) ResetElectionTimeoutMs(electionTimeoutMs int32) TransferLeadershipTo(peer entity.PeerId) entity.Status AddReplicatorStateListener(replicatorStateListener ReplicatorStateListener) RemoveReplicatorStateListener(replicatorStateListener ReplicatorStateListener) ClearReplicatorStateListeners() GetReplicatorStatueListeners() []ReplicatorStateListener GetNodeTargetPriority() int32 }
type NodeOptions ¶
type NodeOptions struct { ElectionTimeoutMs int64 ElectionMaxDelayMs int64 ElectionPriority entity.ElectionPriority DecayPriorityGap int32 LeaderLeaseTimeRatio int32 SnapshotIntervalSecs int32 // 做 snapshot 时,判断上次的 snapshot 的 raft log index 与最新的 raft log applied index 的距离,在一定距离之内不做 snapshot SnapshotLogIndexMargin int32 // 只有当 SnapshotLogIndexMargin 存在正确的值时才能生效 MaxSkipSnapshotTimes int16 CatchupMargin int32 InitialConf *entity.Configuration Fsm StateMachine LogURI string RaftMetaURI string SnapshotURI string FilterBeforeCopyRemote bool DisableCli bool CliRpcGoroutinePoolSize int32 RaftRpcGoroutinePoolSize int32 EnableMetrics bool SnapshotThrottle SnapshotThrottle }
func NewDefaultNodeOptions ¶
func NewDefaultNodeOptions() NodeOptions
type NodeState ¶
type NodeState int
const ( // It's a leader StateLeader NodeState = iota // It's transferring leadership StateTransferring // It's a candidate StateCandidate // It's a follower StateFollower // It's in error StateError // It's uninitialized StateUninitialized // It's shutting down StateShutting // It's shutdown already StateShutdown // State end StateEnd )
type OnCaughtUp ¶
type OnCaughtUp struct {
// contains filtered or unexported fields
}
OnCaughtUp 当 Follower 成功追上数据 or 未追上数据的操作
type OnErrorClosure ¶
func (*OnErrorClosure) Run ¶
func (oec *OnErrorClosure) Run(status entity.Status)
type OnPreVoteRpcDone ¶
type OnPreVoteRpcDone struct { RpcResponseClosure PeerId entity.PeerId Term int64 StartTime time.Time Req *raft.RequestVoteRequest }
type OnRequestVoteRpcDone ¶
type OnRequestVoteRpcDone struct { RpcResponseClosure PeerId entity.PeerId Term int64 StartTime time.Time Req *raft.RequestVoteRequest // contains filtered or unexported fields }
type RaftClientOperator ¶
type RaftClientOperator struct {
// contains filtered or unexported fields
}
RaftClient 的一些操作
func NewRaftClientOperator ¶
func NewRaftClientOperator(nodeOpt *NodeOptions, raftClient *rpc.RaftClient, replicateGroup *ReplicatorGroup) *RaftClientOperator
func (*RaftClientOperator) AppendEntries ¶
func (rcop *RaftClientOperator) AppendEntries(endpoint entity.Endpoint, req *raft.AppendEntriesRequest, done *AppendEntriesResponseClosure) mono.Mono
func (*RaftClientOperator) GetFile ¶
func (rcop *RaftClientOperator) GetFile(endpoint entity.Endpoint, req *raft.GetFileRequest, done *RpcResponseClosure) mono.Mono
func (*RaftClientOperator) InstallSnapshot ¶
func (rcop *RaftClientOperator) InstallSnapshot(endpoint entity.Endpoint, req *raft.InstallSnapshotRequest, done *InstallSnapshotResponseClosure) mono.Mono
func (*RaftClientOperator) PreVote ¶
func (rcop *RaftClientOperator) PreVote(endpoint entity.Endpoint, req *raft.RequestVoteRequest, done *OnPreVoteRpcDone) mono.Mono
func (*RaftClientOperator) ReadIndex ¶
func (rcop *RaftClientOperator) ReadIndex(endpoint entity.Endpoint, req *raft.ReadIndexRequest, done *ReadIndexResponseClosure) mono.Mono
func (*RaftClientOperator) RequestVote ¶
func (rcop *RaftClientOperator) RequestVote(endpoint entity.Endpoint, req *raft.RequestVoteRequest, done *OnRequestVoteRpcDone) mono.Mono
func (*RaftClientOperator) TimeoutNow ¶
func (rcop *RaftClientOperator) TimeoutNow(endpoint entity.Endpoint, req *raft.TimeoutNowRequest, done *TimeoutNowResponseClosure) mono.Mono
type RaftMetaStorage ¶
type RaftMetaStorage struct {
// contains filtered or unexported fields
}
type RaftMetaStorageOption ¶
type RaftMetaStorageOption struct { Path string RaftOpt RaftOption Node *nodeImpl }
type RaftMetaStorageOptions ¶
type RaftMetaStorageOptions func(opt *RaftMetaStorageOption)
type RaftNodeJobManager ¶
type RaftNodeJobManager struct {
// contains filtered or unexported fields
}
func NewRaftNodeJobManager ¶
func NewRaftNodeJobManager(node *nodeImpl) *RaftNodeJobManager
type RaftOption ¶
type RaftOption struct { // 是否开启每个 Log 都做一个 checksum 交验 EnableLogEntryChecksum bool // StepDownWhenVoteTimeout bool ReadOnlyOpt ReadOnlyOption MaxReplicatorInflightMsgs int64 MaxEntriesSize int32 MaxAppendBufferEntries int32 MaxBodySize int32 MaxByteCountPerRpc int32 }
RaftOption raft 的相关配置参数
type ReadIndexClosure ¶
type ReadIndexClosure struct {
// contains filtered or unexported fields
}
func NewReadIndexClosure ¶
func (*ReadIndexClosure) Run ¶
func (rc *ReadIndexClosure) Run(status entity.Status)
func (*ReadIndexClosure) SetResult ¶
func (rc *ReadIndexClosure) SetResult(index int64, reqCtx []byte)
type ReadIndexEvent ¶
type ReadIndexEvent struct {
// contains filtered or unexported fields
}
func (*ReadIndexEvent) Sequence ¶
func (re *ReadIndexEvent) Sequence() int64
The sequence number of the event
type ReadIndexEventSubscriber ¶
type ReadIndexEventSubscriber struct {
// contains filtered or unexported fields
}
func (*ReadIndexEventSubscriber) IgnoreExpireEvent ¶
func (res *ReadIndexEventSubscriber) IgnoreExpireEvent() bool
func (*ReadIndexEventSubscriber) OnEvent ¶
func (res *ReadIndexEventSubscriber) OnEvent(event utils.Event, endOfBatch bool)
func (*ReadIndexEventSubscriber) SubscribeType ¶
func (res *ReadIndexEventSubscriber) SubscribeType() utils.Event
type ReadIndexResponseClosure ¶
type ReadIndexResponseClosure struct { RpcResponseClosure // contains filtered or unexported fields }
func NewReadIndexResponseClosure ¶
func NewReadIndexResponseClosure(states []*ReadIndexState, req *raft.ReadIndexRequest) *ReadIndexResponseClosure
func (*ReadIndexResponseClosure) Run ¶
func (rrc *ReadIndexResponseClosure) Run(status entity.Status)
type ReadIndexState ¶
type ReadIndexState struct { Index int64 Done *ReadIndexClosure // contains filtered or unexported fields }
ReadIndexState 线性读的状态标识信息
func NewReadIndexState ¶
func NewReadIndexState(reqCtx []byte, done *ReadIndexClosure, startTime time.Time) *ReadIndexState
type ReadIndexStatus ¶
type ReadIndexStatus struct { States []*ReadIndexState Req *raft.ReadIndexRequest Index int64 }
func (*ReadIndexStatus) IsApplied ¶
func (ris *ReadIndexStatus) IsApplied(appliedIndex int64) bool
type ReadOnlyOperator ¶
type ReadOnlyOperator struct {
// contains filtered or unexported fields
}
func (*ReadOnlyOperator) OnApplied ¶
func (rop *ReadOnlyOperator) OnApplied(lastAppliedLogIndex int64)
OnApplied 监听当前状态机已经将哪一些 core.LogEntry 给 apply 成功了, 这里传入了当前最新的, appliedLogIndex
type ReadOnlyOption ¶
type ReadOnlyOption string
const ( // ReadOnlyLeaseBased 根据本地时钟租期判断是否可以直接处理 ReadOnlyLeaseBased ReadOnlyOption = "ReadOnlyLeaseBased" // 安全的一致性线性读操作,leader 去跟每个 follower 去确认自己是否是真正的 leader ReadOnlySafe ReadOnlyOption = "ReadOnlySafe" )
type RemoteFileCopierOption ¶
type RemoteFileCopierOption struct { Uri string SnapshotThrottle SnapshotThrottle Opts SnapshotCopierOptions }
type RemoteFileCopierOptions ¶
type RemoteFileCopierOptions func(opt *RemoteFileCopierOption)
type Replicator ¶
type Replicator struct {
// contains filtered or unexported fields
}
Replicator 这个对象本身,是一个临界资源,log的发送要是竞争的
type ReplicatorEvent ¶
type ReplicatorEvent int
const ( ReplicatorCreatedEvent ReplicatorEvent = iota ReplicatorErrorEvent ReplicatorDestroyedEvent )
type ReplicatorGroup ¶
type ReplicatorGroup struct {
// contains filtered or unexported fields
}
type ReplicatorState ¶
type ReplicatorState int32
const ( ReplicatorProbe ReplicatorState = iota ReplicatorSnapshot ReplicatorReplicate ReplicatorDestroyed )
type ReplicatorStateListener ¶
type ReplicatorStateListener interface { // OnCreate 创建出一个复制者 OnCreate(peer entity.PeerId) // OnError 某个复制者出现错误 OnError(peer entity.PeerId, st entity.Status) // OnDestroyed 某个复制者被销毁 OnDestroyed(peer entity.PeerId) }
ReplicatorStateListener 复制者状态变化的监听器
type ReplicatorType ¶
type ReplicatorType string
const ( ReplicatorFollower ReplicatorType = "Follower" // 可以参与投票的 Follower ReplicatorLearner ReplicatorType = "Learner" // 仅仅参与日志复制的 Learner,可以用来当作容灾的节点或者只读节点 )
func (ReplicatorType) IsFollower ¶
func (rt ReplicatorType) IsFollower() bool
func (ReplicatorType) IsLearner ¶
func (rt ReplicatorType) IsLearner() bool
type RequestType ¶
type RequestType int
const ( RequestTypeForSnapshot RequestType = iota RequestTypeForAppendEntries )
type RequestVoteResponseClosure ¶
type RequestVoteResponseClosure struct {
RpcResponseClosure
}
type RpcOptions ¶
type RpcOptions struct { RpcConnectTimeoutMs int32 RpcDefaultTimeout int32 RpcInstallSnapshotTimeout int32 RpcProcessorThreadPoolSize int32 EnableRpcChecksum bool }
func NewDefaultRpcOptions ¶
func NewDefaultRpcOptions() RpcOptions
type RpcRequestClosure ¶
type RpcRequestClosure struct { F func(status entity.Status) // contains filtered or unexported fields }
func NewRpcRequestClosure ¶
func NewRpcRequestClosure(rpcCtx rpc.RpcContext) *RpcRequestClosure
func NewRpcRequestClosureWithDefaultResp ¶
func NewRpcRequestClosureWithDefaultResp(rpcCtx rpc.RpcContext, defaultResp *api.ServerResponse) *RpcRequestClosure
func (*RpcRequestClosure) GetRpcCtx ¶
func (rrc *RpcRequestClosure) GetRpcCtx() rpc.RpcContext
func (*RpcRequestClosure) Run ¶
func (rrc *RpcRequestClosure) Run(status entity.Status)
func (*RpcRequestClosure) SendResponse ¶
func (rrc *RpcRequestClosure) SendResponse(msg proto.Message)
type RpcResponseClosure ¶
type RpcResponseClosure struct { // Resp receive response Resp proto.Message // F callback function F func(resp proto.Message, status entity.Status) }
RpcResponseClosure deal rpc response closure
func (*RpcResponseClosure) Run ¶
func (rrc *RpcResponseClosure) Run(status entity.Status)
type RunningState ¶
type RunningState int
const ( RunningStateForIdle RunningState = iota RunningStateForBlocking RunningStateForAppendingEntries RunningStateForInstallingSnapshot )
type SaveSnapshotClosure ¶
type SaveSnapshotClosure interface { Closure // Start 开启快照保存操作,这里创建出一个 writer,用于持久化文件信息 Start(meta *raft.SnapshotMeta) SnapshotWriter }
SaveSnapshotClosure 保存快照时的回调
type Snapshot ¶
type Snapshot interface { GetPath() string ListFiles() []string GetFileMeta(fileName string) proto.Message SetError(code entity.RaftErrorCode, temp string, args ...interface{}) GetStatus() entity.Status }
Snapshot 快照的定义
type SnapshotCopier ¶
type SnapshotCopier interface { io.Closer Cancel() Join() Start() GetReader() SnapshotReader GetStatus() entity.Status }
SnapshotCopier 负责快照的拷贝工作
type SnapshotCopierOptions ¶
type SnapshotCopierOptions struct {
// contains filtered or unexported fields
}
type SnapshotExecutor ¶
type SnapshotExecutor struct {
// contains filtered or unexported fields
}
SnapshotExecutor 快照执行器
func (*SnapshotExecutor) DoSnapshot ¶
func (se *SnapshotExecutor) DoSnapshot(done Closure)
DoSnapshot 执行创建一次 RaftSnapshot 操作
func (*SnapshotExecutor) GetNode ¶
func (se *SnapshotExecutor) GetNode() *nodeImpl
func (*SnapshotExecutor) GetSnapshotStorage ¶
func (se *SnapshotExecutor) GetSnapshotStorage() *LocalSnapshotStorage
func (*SnapshotExecutor) Init ¶
func (se *SnapshotExecutor) Init(arg interface{}) (bool, error)
func (*SnapshotExecutor) InstallSnapshot ¶
func (se *SnapshotExecutor) InstallSnapshot(req *raft.InstallSnapshotRequest, done *RpcRequestClosure)
InstallSnapshot 安装一个快照
func (*SnapshotExecutor) IsInstallingSnapshot ¶
func (se *SnapshotExecutor) IsInstallingSnapshot() bool
func (*SnapshotExecutor) Shutdown ¶
func (se *SnapshotExecutor) Shutdown()
type SnapshotExecutorOption ¶
type SnapshotExecutorOption struct { Uri string FsmCaller fsmCaller Addr entity.Endpoint FilterBeforeCopyRemote bool SnapshotThrottle SnapshotThrottle // contains filtered or unexported fields }
快照执行器的相关配置信息
type SnapshotExecutorOptions ¶
type SnapshotExecutorOptions func(opt *SnapshotExecutorOption)
type SnapshotFileReader ¶
type SnapshotFileReader struct {
// contains filtered or unexported fields
}
func NewSnapshotFileReader ¶
func NewSnapshotFileReader(path string, throttle SnapshotThrottle) *SnapshotFileReader
func (*SnapshotFileReader) GetLocalSnapshotMetaTable ¶
func (sfr *SnapshotFileReader) GetLocalSnapshotMetaTable() *LocalSnapshotMetaTable
func (*SnapshotFileReader) GetPath ¶
func (sfr *SnapshotFileReader) GetPath() string
func (*SnapshotFileReader) Open ¶
func (sfr *SnapshotFileReader) Open() bool
func (*SnapshotFileReader) ReadFile ¶
func (sfr *SnapshotFileReader) ReadFile(buf *bytes.Buffer, fileName string, offset, maxCount int64) (int64, error)
ReadFile 根据要求读取文件
func (*SnapshotFileReader) SetLocalSnapshotMetaTable ¶
func (sfr *SnapshotFileReader) SetLocalSnapshotMetaTable(memtable *LocalSnapshotMetaTable)
type SnapshotJob ¶
type SnapshotJob struct {
// contains filtered or unexported fields
}
type SnapshotReader ¶
type SnapshotStorageOption ¶
type SnapshotStorageOption struct { Uri string RaftOpt RaftOption }
type SnapshotStorageOptions ¶
type SnapshotStorageOptions func(opt *SnapshotStorageOption)
type SnapshotThrottle ¶
type SnapshotThrottle interface { //ThrottledByThroughput 计算下次可以发送多少数据 ThrottledByThroughput(bytes int64) int64 }
SnapshotThrottle 快照传输的限流计算器
type SnapshotWriter ¶
type SnapshotWriter interface { SaveMeta(meta raft.SnapshotMeta) bool AddFile(fileName string, meta proto.Message) bool RemoveFile(fileName string) Close(keepDataOnError bool) SetError(code entity.RaftErrorCode, temp string, args ...interface{}) GetPath() string GetStatus() entity.Status }
SnapshotWriter 负责快照的写入操作
type StableClosure ¶
type StableClosure struct {
// contains filtered or unexported fields
}
StableClosure
func NewStableClosure ¶
func NewStableClosure(entries []*entity.LogEntry, f func(status entity.Status)) *StableClosure
func (*StableClosure) Run ¶
func (sc *StableClosure) Run(status entity.Status)
type StableClosureEvent ¶
type StableClosureEvent struct {
// contains filtered or unexported fields
}
func (*StableClosureEvent) Name ¶
func (sce *StableClosureEvent) Name() string
func (*StableClosureEvent) Sequence ¶
func (sce *StableClosureEvent) Sequence() int64
type StateMachine ¶
type StateMachine interface { //OnApply 用户状态机进行批量的 apply 所有已经处于 committed 状态的 RaftLog OnApply(iterator Iterator) // OnShutdown 当状态机关闭时的回调 OnShutdown() // OnSnapshotSave 用户需要在这个回调上实现快照的处理,这里的快照处理可以异步处理 OnSnapshotSave(writer SnapshotWriter, done Closure) // OnSnapshotLoad 用户需要在这个回调上实现快照的 load 操作,这里不要进行异步化操作 OnSnapshotLoad(reader SnapshotReader) bool // OnLeaderStart 当自己作为 leader 启动时的回调,告知当前最新的任期信息 OnLeaderStart(term int64) // OnLeaderStop 当自己不再作为 leader 时的回调 OnLeaderStop(status entity.Status) // OnError 状态机出现不可挽回的错误时的回调 OnError(e entity.RaftError) // OnConfigurationCommitted 当集群配置被确认之后的回调 OnConfigurationCommitted(conf *entity.Configuration) // OnStopFollowing OnStopFollowing(ctx entity.LeaderChangeContext) // OnStartFollowing OnStartFollowing(ctx entity.LeaderChangeContext) }
StateMachine
type StepDownJob ¶
type StepDownJob struct {
// contains filtered or unexported fields
}
StepDownJob 定时任务,只针对于 Leader 节点可以进行,主要是检查当前 RaftGroup 成员组内每个节点的健康情况,其主要还是通 过 Replicator 针对每一个节点的 Request-Response 的响应时间
type SynchronizedClosure ¶
type SynchronizedClosure struct {
// contains filtered or unexported fields
}
func NewSynchronizedClosure ¶
func NewSynchronizedClosure(cnt int) *SynchronizedClosure
func (*SynchronizedClosure) Await ¶
func (sc *SynchronizedClosure) Await() entity.Status
func (*SynchronizedClosure) GetStatus ¶
func (sc *SynchronizedClosure) GetStatus() entity.Status
func (*SynchronizedClosure) Rest ¶
func (sc *SynchronizedClosure) Rest()
func (*SynchronizedClosure) Run ¶
func (sc *SynchronizedClosure) Run(status entity.Status)
type TaskClosure ¶
type TaskClosure interface { Closure // OnCommitted after committed and before log applied OnCommitted() }
TaskClosure 具体状态机的回调钩子
type ThroughputSnapshotThrottle ¶
type ThroughputSnapshotThrottle struct {
// contains filtered or unexported fields
}
func (*ThroughputSnapshotThrottle) ThrottledByThroughput ¶
func (t *ThroughputSnapshotThrottle) ThrottledByThroughput(bytes int64) int64
ThrottledByThroughput 计算下次可以发送多少数据
type TimeoutNowResponseClosure ¶
type TimeoutNowResponseClosure struct {
RpcResponseClosure
}