raftgroup

package
v0.0.0-...-16dfdc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2018 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VoteKeyPrefix       = "VoteKey_"
	ApplyIndexKeyPrefix = "ApplyIndexKey_"
)
View Source
const (

	// HealthInterval is the minimum time the cluster should be healthy
	// before accepting add member requests.
	HealthInterval = 5 * time.Second
)

Variables

View Source
var (
	ErrStopped               = errors.New("VDL: raftgroup stopped")
	ErrTimeoutLeaderTransfer = errors.New("VDL: Leader Transfer Timeout")
)
View Source
var (
	ErrExceedMaxSizePerMsg = errors.New("The message exceed the max size")
)

Functions

func IsStringEqualAny

func IsStringEqualAny(a, b []string) bool

Types

type GroupConfig

type GroupConfig struct {

	// raft listener configuration
	ListenerUrls types.URLs

	// raft group name
	GroupName string

	// vdl server name, one vdl server have only one server name
	VDLServerName string

	// use int32 match to kafka protocol
	VDLServerID int32

	// Client listener Host
	VDLClientListenerHost string

	// Client listener Port
	VDLClientListenerPort int32

	// raft group initial peers
	// be map[vdl server name] -> []Urls
	InitialPeerURLsMap types.URLsMap

	//log persistent data dir
	DataDir string

	// Log store segment size, use default when =0
	LogStoreSegmentSize int64

	// HeartbeatTick is 1, ElectionTicks is N times than HeartbeatTick
	ElectionTicks int

	// Millisecond for raft heartbeat
	HeartbeatMs time.Duration

	// true: initial-cluster-state=new , this use initial-cluster to init cluster
	// false: initial-cluster-state=existing, this use to add new member to a exists cluster
	IsInitialCluster bool

	// When join to a exists cluster, use this urls to fetch the exists cluster information
	ExistsClusterAdminUrls types.URLs

	// the max message size for StoreMessage method
	MaxSizePerMsg int

	//the logstore Memcache size
	MemCacheSizeByte uint64

	//the logstore must reserve segment counts
	ReserveSegmentCount int

	// strict check for member changes
	StrictMemberChangesCheck bool

	// max size once raft send append entries (per request)
	MaxSizePerAppendEntries uint64

	// the inflight number for append entries request
	MaxInflightAppendEntriesRequest int

	// If true, Raft runs an additional election phase
	// to check whether it would get enough votes to win
	// an election, thus minimizing disruptions.
	PreVote bool
}

func (*GroupConfig) Validate

func (conf *GroupConfig) Validate() error

basic validate for configuration

func (*GroupConfig) ValidateNewRaftGroup

func (conf *GroupConfig) ValidateNewRaftGroup() error

ValidateNewRaftGroup sanity-checks the initial config for #NEW RAFT GROUP# case and returns an error for things that should never happen.

type LogStreamWrapper

type LogStreamWrapper interface {

	// get wait message wait chan
	GetMessageArriveChan() <-chan struct{}

	// the enter interface for consume message, bool return whether read from cache
	ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool)

	GetHighWater() int64

	MinOffset() int64

	// get vdl server info relate to raft group
	GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)

	// get current leader node ID, 0 for no leader
	GetLeaderNodeID() uint64

	// store message, it's the enter interface for storing log
	StoreMessage(b []byte) (<-chan interface{}, error)

	// batch store message
	StoreMessageBatch(b [][]byte) ([]<-chan interface{}, error)

	// is leader for current node and can serve
	IsLeader() bool

	// get the max size per msg in configuration
	GetMaxSizePerMsg() int
}

LogStreamWrapper is the interface for log stream operations

type MockLogStreamWrapper

type MockLogStreamWrapper struct {
	wait.Wait
	// contains filtered or unexported fields
}

func NewMockLogStreamWrapper

func NewMockLogStreamWrapper() (*MockLogStreamWrapper, error)

func (*MockLogStreamWrapper) ConsumeMessages

func (l *MockLogStreamWrapper) ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool)

func (*MockLogStreamWrapper) GetHighWater

func (l *MockLogStreamWrapper) GetHighWater() int64

func (*MockLogStreamWrapper) GetLeaderNodeID

func (l *MockLogStreamWrapper) GetLeaderNodeID() uint64

func (*MockLogStreamWrapper) GetMaxSizePerMsg

func (r *MockLogStreamWrapper) GetMaxSizePerMsg() int

func (*MockLogStreamWrapper) GetMessageArriveChan

func (l *MockLogStreamWrapper) GetMessageArriveChan() <-chan struct{}

func (*MockLogStreamWrapper) GetVDLServerInfo

func (l *MockLogStreamWrapper) GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)

func (*MockLogStreamWrapper) IsLeader

func (l *MockLogStreamWrapper) IsLeader() bool

func (*MockLogStreamWrapper) MinOffset

func (l *MockLogStreamWrapper) MinOffset() int64

func (*MockLogStreamWrapper) StoreMessage

func (l *MockLogStreamWrapper) StoreMessage(b []byte) (<-chan interface{}, error)

func (*MockLogStreamWrapper) StoreMessageBatch

func (l *MockLogStreamWrapper) StoreMessageBatch(data [][]byte) ([]<-chan interface{}, error)

type PersistentVote

type PersistentVote struct {
	Vote uint64 `json:"vote"`
	Term uint64 `json:"term"`
}

type RaftGroup

type RaftGroup struct {
	GroupConfig *GroupConfig

	Membership *membership.Membership
	// contains filtered or unexported fields
}

func NewRaftGroup

func NewRaftGroup(conf *GroupConfig, stableStore stablestore.StableStore) (*RaftGroup, error)

func (*RaftGroup) AddMember

func (r *RaftGroup) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)

func (*RaftGroup) ConsumeMessages

func (r *RaftGroup) ConsumeMessages(startOffset int64, maxBytes int32) ([]logstream.Entry, error, bool)

bool return whether read from cache

func (*RaftGroup) GetHighWater

func (r *RaftGroup) GetHighWater() int64

func (*RaftGroup) GetLeaderID

func (r *RaftGroup) GetLeaderID() uint64

func (*RaftGroup) GetLeaderNodeID

func (r *RaftGroup) GetLeaderNodeID() uint64

func (*RaftGroup) GetLogStore

func (r *RaftGroup) GetLogStore() raftstore.LogStore

func (*RaftGroup) GetMaxSizePerMsg

func (r *RaftGroup) GetMaxSizePerMsg() int

func (*RaftGroup) GetMessageArriveChan

func (r *RaftGroup) GetMessageArriveChan() <-chan struct{}

func (*RaftGroup) GetMetaBatch

func (r *RaftGroup) GetMetaBatch(keys []string) (map[string][]byte, error)

get keys from stablestore by batch use for snapshot metadata

func (*RaftGroup) GetVDLServerInfo

func (r *RaftGroup) GetVDLServerInfo() ([]*apicommon.VDLServerInfo, error)

func (*RaftGroup) IsLeader

func (r *RaftGroup) IsLeader() bool

func (*RaftGroup) IsStarted

func (r *RaftGroup) IsStarted() bool

func (*RaftGroup) LeaderTransfer

func (r *RaftGroup) LeaderTransfer(ctx context.Context, transferee uint64) error

LeaderTransfer transfers the leader to the given transferee.

func (*RaftGroup) MinOffset

func (r *RaftGroup) MinOffset() int64

func (*RaftGroup) RemoveMember

func (r *RaftGroup) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)

func (*RaftGroup) Start

func (r *RaftGroup) Start() (err error)

func (*RaftGroup) Stop

func (r *RaftGroup) Stop() (err error)

func (*RaftGroup) StoreMessage

func (r *RaftGroup) StoreMessage(data []byte) (<-chan interface{}, error)

return request id

func (*RaftGroup) StoreMessageBatch

func (r *RaftGroup) StoreMessageBatch(data [][]byte) ([]<-chan interface{}, error)

return err when the following cases: 1) raftGroup not start 2) raft node close 3) cannot put into raft process (ctx timeout) all cases haven't add to raft

func (*RaftGroup) UpdateMember

func (r *RaftGroup) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)

type RaftGroupRunState

type RaftGroupRunState int

func (RaftGroupRunState) String

func (t RaftGroupRunState) String() string

type RaftGroupStableStore

type RaftGroupStableStore struct {
	// contains filtered or unexported fields
}

func NewRaftGroupStableStore

func NewRaftGroupStableStore(stableStore stablestore.StableStore) *RaftGroupStableStore

func (*RaftGroupStableStore) MustGetApplyIndex

func (r *RaftGroupStableStore) MustGetApplyIndex(raftGroupName string) uint64

func (*RaftGroupStableStore) MustGetVote

func (r *RaftGroupStableStore) MustGetVote(raftGroupName string) (term uint64, voteNodeID uint64)

func (*RaftGroupStableStore) MustSaveApplyIndex

func (r *RaftGroupStableStore) MustSaveApplyIndex(raftGroupName string, applyIndex uint64)

func (*RaftGroupStableStore) MustSaveVote

func (r *RaftGroupStableStore) MustSaveVote(raftGroupName string, voteNodeID uint64, term uint64)

Directories

Path Synopsis
api
Package stats defines a standard interface for etcd cluster statistics.
Package stats defines a standard interface for etcd cluster statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL