Documentation ¶
Index ¶
- Constants
- Variables
- func IsStringEqualAny(a, b []string) bool
- type GroupConfig
- type LogStreamWrapper
- type MockLogStreamWrapper
- func (l *MockLogStreamWrapper) ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool)
- func (l *MockLogStreamWrapper) GetHighWater() int64
- func (l *MockLogStreamWrapper) GetLeaderNodeID() uint64
- func (r *MockLogStreamWrapper) GetMaxSizePerMsg() int
- func (l *MockLogStreamWrapper) GetMessageArriveChan() <-chan struct{}
- func (l *MockLogStreamWrapper) GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)
- func (l *MockLogStreamWrapper) IsLeader() bool
- func (l *MockLogStreamWrapper) MinOffset() int64
- func (l *MockLogStreamWrapper) StoreMessage(b []byte) (<-chan interface{}, error)
- func (l *MockLogStreamWrapper) StoreMessageBatch(data [][]byte) ([]<-chan interface{}, error)
- type PersistentVote
- type RaftGroup
- func (r *RaftGroup) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
- func (r *RaftGroup) ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool)
- func (r *RaftGroup) GetHighWater() int64
- func (r *RaftGroup) GetLeaderID() uint64
- func (r *RaftGroup) GetLeaderNodeID() uint64
- func (r *RaftGroup) GetLogStore() raftstore.LogStore
- func (r *RaftGroup) GetMaxSizePerMsg() int
- func (r *RaftGroup) GetMessageArriveChan() <-chan struct{}
- func (r *RaftGroup) GetMetaBatch(keys []string) (map[string][]byte, error)
- func (r *RaftGroup) GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)
- func (r *RaftGroup) IsLeader() bool
- func (r *RaftGroup) IsStarted() bool
- func (r *RaftGroup) LeaderTransfer(ctx context.Context, transferee uint64) error
- func (r *RaftGroup) MinOffset() int64
- func (r *RaftGroup) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
- func (r *RaftGroup) Start() (err error)
- func (r *RaftGroup) Stop() (err error)
- func (r *RaftGroup) StoreMessage(data []byte) (<-chan interface{}, error)
- func (r *RaftGroup) StoreMessageBatch(data [][]byte) ([]<-chan interface{}, error)
- func (r *RaftGroup) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
- type RaftGroupRunState
- type RaftGroupStableStore
- func (r *RaftGroupStableStore) MustGetApplyIndex(raftGroupName string) uint64
- func (r *RaftGroupStableStore) MustGetVote(raftGroupName string) (term uint64, voteNodeID uint64)
- func (r *RaftGroupStableStore) MustSaveApplyIndex(raftGroupName string, applyIndex uint64)
- func (r *RaftGroupStableStore) MustSaveVote(raftGroupName string, voteNodeID uint64, term uint64)
Constants ¶
View Source
const ( VoteKeyPrefix = "VoteKey_" ApplyIndexKeyPrefix = "ApplyIndexKey_" )
View Source
const ( // HealthInterval is the minimum time the cluster should be healthy // before accepting add member requests. HealthInterval = 5 * time.Second )
Variables ¶
View Source
var ( ErrStopped = errors.New("VDL: raftgroup stopped") ErrTimeoutLeaderTransfer = errors.New("VDL: Leader Transfer Timeout") )
View Source
var (
ErrExceedMaxSizePerMsg = errors.New("The message exceed the max size")
)
Functions ¶
func IsStringEqualAny ¶
Types ¶
type GroupConfig ¶
type GroupConfig struct { // raft listener configuration ListenerUrls types.URLs // raft group name GroupName string // vdl server name, one vdl server have only one server name VDLServerName string // use int32 match to kafka protocol VDLServerID int32 // Client listener Host VDLClientListenerHost string // Client listener Port VDLClientListenerPort int32 // raft group initial peers // be map[vdl server name] -> []Urls InitialPeerURLsMap types.URLsMap //log persistent data dir DataDir string // Log store segment size, use default when =0 LogStoreSegmentSize int64 // HeartbeatTick is 1, ElectionTicks is N times than HeartbeatTick ElectionTicks int // Millisecond for raft heartbeat HeartbeatMs time.Duration // true: initial-cluster-state=new , this use initial-cluster to init cluster // false: initial-cluster-state=existing, this use to add new member to a exists cluster IsInitialCluster bool // When join to a exists cluster, use this urls to fetch the exists cluster information ExistsClusterAdminUrls types.URLs // the max message size for StoreMessage method MaxSizePerMsg int //the logstore Memcache size MemCacheSizeByte uint64 //the logstore must reserve segment counts ReserveSegmentCount int // strict check for member changes StrictMemberChangesCheck bool // max size once raft send append entries (per request) MaxSizePerAppendEntries uint64 // the inflight number for append entries request MaxInflightAppendEntriesRequest int // If true, Raft runs an additional election phase // to check whether it would get enough votes to win // an election, thus minimizing disruptions. PreVote bool }
func (*GroupConfig) Validate ¶
func (conf *GroupConfig) Validate() error
basic validate for configuration
func (*GroupConfig) ValidateNewRaftGroup ¶
func (conf *GroupConfig) ValidateNewRaftGroup() error
ValidateNewRaftGroup sanity-checks the initial config for #NEW RAFT GROUP# case and returns an error for things that should never happen.
type LogStreamWrapper ¶
type LogStreamWrapper interface { // get wait message wait chan GetMessageArriveChan() <-chan struct{} // the enter interface for consume message, bool return whether read from cache ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool) GetHighWater() int64 MinOffset() int64 // get vdl server info relate to raft group GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error) // get current leader node ID, 0 for no leader GetLeaderNodeID() uint64 // store message, it's the enter interface for storing log StoreMessage(b []byte) (<-chan interface{}, error) // batch store message StoreMessageBatch(b [][]byte) ([]<-chan interface{}, error) // is leader for current node and can serve IsLeader() bool // get the max size per msg in configuration GetMaxSizePerMsg() int }
LogStreamWrapper is the interface for log stream operations
type MockLogStreamWrapper ¶
func NewMockLogStreamWrapper ¶
func NewMockLogStreamWrapper() (*MockLogStreamWrapper, error)
func (*MockLogStreamWrapper) ConsumeMessages ¶
func (*MockLogStreamWrapper) GetHighWater ¶
func (l *MockLogStreamWrapper) GetHighWater() int64
func (*MockLogStreamWrapper) GetLeaderNodeID ¶
func (l *MockLogStreamWrapper) GetLeaderNodeID() uint64
func (*MockLogStreamWrapper) GetMaxSizePerMsg ¶
func (r *MockLogStreamWrapper) GetMaxSizePerMsg() int
func (*MockLogStreamWrapper) GetMessageArriveChan ¶
func (l *MockLogStreamWrapper) GetMessageArriveChan() <-chan struct{}
func (*MockLogStreamWrapper) GetVDLServerInfo ¶
func (l *MockLogStreamWrapper) GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)
func (*MockLogStreamWrapper) IsLeader ¶
func (l *MockLogStreamWrapper) IsLeader() bool
func (*MockLogStreamWrapper) MinOffset ¶
func (l *MockLogStreamWrapper) MinOffset() int64
func (*MockLogStreamWrapper) StoreMessage ¶
func (l *MockLogStreamWrapper) StoreMessage(b []byte) (<-chan interface{}, error)
func (*MockLogStreamWrapper) StoreMessageBatch ¶
func (l *MockLogStreamWrapper) StoreMessageBatch(data [][]byte) ([]<-chan interface{}, error)
type PersistentVote ¶
type RaftGroup ¶
type RaftGroup struct { GroupConfig *GroupConfig Membership *membership.Membership // contains filtered or unexported fields }
func NewRaftGroup ¶
func NewRaftGroup(conf *GroupConfig, stableStore stablestore.StableStore) (*RaftGroup, error)
func (*RaftGroup) AddMember ¶
func (r *RaftGroup) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
func (*RaftGroup) ConsumeMessages ¶
func (r *RaftGroup) ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool)
bool return whether read from cache
func (*RaftGroup) GetHighWater ¶
func (*RaftGroup) GetLeaderID ¶
func (*RaftGroup) GetLeaderNodeID ¶
func (*RaftGroup) GetLogStore ¶
func (*RaftGroup) GetMaxSizePerMsg ¶
func (*RaftGroup) GetMessageArriveChan ¶
func (r *RaftGroup) GetMessageArriveChan() <-chan struct{}
func (*RaftGroup) GetMetaBatch ¶
get keys from stablestore by batch use for snapshot metadata
func (*RaftGroup) GetVDLServerInfo ¶
func (r *RaftGroup) GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)
func (*RaftGroup) LeaderTransfer ¶
LeaderTransfer transfers the leader to the given transferee.
func (*RaftGroup) RemoveMember ¶
func (*RaftGroup) StoreMessage ¶
return request id
func (*RaftGroup) StoreMessageBatch ¶
return err when the following cases: 1) raftGroup not start 2) raft node close 3) cannot put into raft process (ctx timeout) all cases haven't add to raft
func (*RaftGroup) UpdateMember ¶
func (r *RaftGroup) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
type RaftGroupRunState ¶
type RaftGroupRunState int
func (RaftGroupRunState) String ¶
func (t RaftGroupRunState) String() string
type RaftGroupStableStore ¶
type RaftGroupStableStore struct {
// contains filtered or unexported fields
}
func NewRaftGroupStableStore ¶
func NewRaftGroupStableStore(stableStore stablestore.StableStore) *RaftGroupStableStore
func (*RaftGroupStableStore) MustGetApplyIndex ¶
func (r *RaftGroupStableStore) MustGetApplyIndex(raftGroupName string) uint64
func (*RaftGroupStableStore) MustGetVote ¶
func (r *RaftGroupStableStore) MustGetVote(raftGroupName string) (term uint64, voteNodeID uint64)
func (*RaftGroupStableStore) MustSaveApplyIndex ¶
func (r *RaftGroupStableStore) MustSaveApplyIndex(raftGroupName string, applyIndex uint64)
func (*RaftGroupStableStore) MustSaveVote ¶
func (r *RaftGroupStableStore) MustSaveVote(raftGroupName string, voteNodeID uint64, term uint64)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
api
|
|
Package stats defines a standard interface for etcd cluster statistics.
|
Package stats defines a standard interface for etcd cluster statistics. |
Click to show internal directories.
Click to hide internal directories.