node

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2018 License: MIT Imports: 40 Imported by: 0

Documentation

Overview

Package node is a generated protocol buffer package.

It is generated from these files:

raft_internal.proto

It has these top-level messages:

RequestHeader
InternalRaftRequest
BatchInternalRaftRequest
SchemaChange

Index

Constants

View Source
const (
	RADIUS_COORDS searchType = iota
	RADIUS_MEMBER
)
View Source
const (
	SORT_NONE sortType = iota
	SORT_ASC
	SORT_DESC
)
View Source
const (
	RedisReq        int8 = 0
	CustomReq       int8 = 1
	SchemaChangeReq int8 = 2
)
View Source
const (
	ProposeOp_Backup                 int = 1
	ProposeOp_TransferRemoteSnap     int = 2
	ProposeOp_ApplyRemoteSnap        int = 3
	ProposeOp_RemoteConfChange       int = 4
	ProposeOp_ApplySkippedRemoteSnap int = 5
	ProposeOp_DeleteTable            int = 6
)
View Source
const (
	DefaultSnapCount = 160000

	// HealthInterval is the minimum time the cluster should be healthy
	// before accepting add member requests.
	HealthInterval = 5 * time.Second
)
View Source
const (
	ApplySnapUnknown int = iota
	ApplySnapBegin
	ApplySnapTransferring
	ApplySnapTransferred
	ApplySnapApplying
	ApplySnapDone
	ApplySnapFailed
)

Variables

View Source
var (
	ErrNamespaceAlreadyExist = errors.New("namespace already exist")
	ErrRaftIDMismatch        = errors.New("raft id mismatch")
	ErrRaftConfMismatch      = errors.New("raft config mismatch")

	ErrNamespaceNotFound          = errors.New("ERR_CLUSTER_CHANGED: namespace is not found")
	ErrNamespacePartitionNotFound = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace is not found")
	ErrNamespaceNotLeader         = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace is not leader on the node")
	ErrNodeNoLeader               = errors.New("ERR_CLUSTER_CHANGED: partition of the node has no leader")
	ErrRaftGroupNotReady          = errors.New("ERR_CLUSTER_CHANGED: raft group not ready")
	ErrProposalCanceled           = errors.New("ERR_CLUSTER_CHANGED: raft proposal " + context.Canceled.Error())
)
View Source
var (
	ErrInvalidLengthRaftInternal = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowRaftInternal   = fmt.Errorf("proto: integer overflow")
)
View Source
var (
	ErrExpiredBatchedBuffFull = errors.New("the expired data batched buffer is full now")
)
View Source
var ReqSourceType_name = map[int32]string{
	0: "FromAPI",
	1: "FromClusterSyncer",
}
View Source
var ReqSourceType_value = map[string]int32{
	"FromAPI":           0,
	"FromClusterSyncer": 1,
}
View Source
var SchemaChangeType_name = map[int32]string{
	0: "SchemaChangeAddHsetIndex",
	1: "SchemaChangeUpdateHsetIndex",
	2: "SchemaChangeDeleteHsetIndex",
}
View Source
var SchemaChangeType_value = map[string]int32{
	"SchemaChangeAddHsetIndex":    0,
	"SchemaChangeUpdateHsetIndex": 1,
	"SchemaChangeDeleteHsetIndex": 2,
}

Functions

func EnableForTest

func EnableForTest()

func GetHashedPartitionID

func GetHashedPartitionID(pk []byte, pnum int) int

func GetPerfLevel added in v0.3.2

func GetPerfLevel() int

func GetValidBackupInfo

func GetValidBackupInfo(machineConfig MachineConfig,
	clusterInfo common.IClusterInfo, fullNS string,
	localID uint64, stopChan chan struct{},
	raftSnapshot raftpb.Snapshot, retryIndex int, useRsyncForLocal bool) (string, string)

func IsPerfEnabled added in v0.3.2

func IsPerfEnabled() bool

func IsSyncerOnly

func IsSyncerOnly() bool

func NewConflictRouter

func NewConflictRouter() *conflictRouter

func NewKVStoreSM

func NewKVStoreSM(opts *KVOptions, machineConfig MachineConfig, localID uint64, ns string,
	clusterInfo common.IClusterInfo) (*kvStoreSM, error)

func NewLogSyncerSM

func NewLogSyncerSM(opts *KVOptions, machineConfig MachineConfig, localID uint64, fullNS string,
	clusterInfo common.IClusterInfo) (*logSyncerSM, error)

func SetLogLevel

func SetLogLevel(level int)

func SetLogger

func SetLogger(level int32, logger common.Logger)

func SetPerfLevel added in v0.3.2

func SetPerfLevel(level int)

func SetSyncerNormalInit

func SetSyncerNormalInit()

func SetSyncerOnly

func SetSyncerOnly(enable bool)

Types

type BatchInternalRaftRequest

type BatchInternalRaftRequest struct {
	ReqNum    int32                  `protobuf:"varint,1,opt,name=req_num,json=reqNum,proto3" json:"req_num,omitempty"`
	Reqs      []*InternalRaftRequest `protobuf:"bytes,2,rep,name=reqs" json:"reqs,omitempty"`
	Timestamp int64                  `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Type      ReqSourceType          `protobuf:"varint,4,opt,name=type,proto3,enum=node.ReqSourceType" json:"type,omitempty"`
	ReqId     uint64                 `protobuf:"varint,5,opt,name=req_id,json=reqId,proto3" json:"req_id,omitempty"`
	// used for cluster log syncer
	OrigTerm    uint64 `protobuf:"varint,6,opt,name=orig_term,json=origTerm,proto3" json:"orig_term,omitempty"`
	OrigIndex   uint64 `protobuf:"varint,7,opt,name=orig_index,json=origIndex,proto3" json:"orig_index,omitempty"`
	OrigCluster string `protobuf:"bytes,8,opt,name=orig_cluster,json=origCluster,proto3" json:"orig_cluster,omitempty"`
}

func (*BatchInternalRaftRequest) Descriptor

func (*BatchInternalRaftRequest) Descriptor() ([]byte, []int)

func (*BatchInternalRaftRequest) Marshal

func (m *BatchInternalRaftRequest) Marshal() (dAtA []byte, err error)

func (*BatchInternalRaftRequest) MarshalTo

func (m *BatchInternalRaftRequest) MarshalTo(dAtA []byte) (int, error)

func (*BatchInternalRaftRequest) ProtoMessage

func (*BatchInternalRaftRequest) ProtoMessage()

func (*BatchInternalRaftRequest) Reset

func (m *BatchInternalRaftRequest) Reset()

func (*BatchInternalRaftRequest) Size

func (m *BatchInternalRaftRequest) Size() (n int)

func (*BatchInternalRaftRequest) String

func (m *BatchInternalRaftRequest) String() string

func (*BatchInternalRaftRequest) Unmarshal

func (m *BatchInternalRaftRequest) Unmarshal(dAtA []byte) error

type ConflictCheckFunc

type ConflictCheckFunc func(redcon.Command, int64) bool

type DataStorage

type DataStorage interface {
	CleanData() error
	RestoreFromSnapshot(bool, raftpb.Snapshot) error
	GetSnapshot(term uint64, index uint64) (Snapshot, error)
	Stop()
}

type DeleteTableRange added in v0.3.2

type DeleteTableRange struct {
	Table     string `json:"table,omitempty"`
	StartFrom []byte `json:"start_from,omitempty"`
	EndTo     []byte `json:"end_to,omitempty"`
	// to avoid drop all table data, this is needed to delete all data in table
	DeleteAll bool `json:"delete_all,omitempty"`
	Dryrun    bool `json:"dryrun,omitempty"`
}

func (DeleteTableRange) CheckValid added in v0.3.2

func (dtr DeleteTableRange) CheckValid() error

type ExpireHandler

type ExpireHandler struct {
	// contains filtered or unexported fields
}

func NewExpireHandler

func NewExpireHandler(node *KVNode) *ExpireHandler

func (*ExpireHandler) LeaderChanged

func (exp *ExpireHandler) LeaderChanged()

func (*ExpireHandler) Start

func (exp *ExpireHandler) Start()

func (*ExpireHandler) Stop

func (exp *ExpireHandler) Stop()

type HindexSearchResults

type HindexSearchResults struct {
	Table string
	Rets  []common.HIndexRespWithValues
}

type IRaftPersistStorage

type IRaftPersistStorage interface {
	// Save function saves ents and state to the underlying stable storage.
	// Save MUST block until st and ents are on stable storage.
	Save(st raftpb.HardState, ents []raftpb.Entry) error
	// SaveSnap function saves snapshot to the underlying stable storage.
	SaveSnap(snap raftpb.Snapshot) error
	Load() (*raftpb.Snapshot, string, error)
	// Close closes the Storage and performs finalization.
	Close() error
}

func NewRaftPersistStorage

func NewRaftPersistStorage(w *wal.WAL, s *snap.Snapshotter) IRaftPersistStorage

type InternalRaftRequest

type InternalRaftRequest struct {
	Header *RequestHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
	Data   []byte         `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}

func (*InternalRaftRequest) Descriptor

func (*InternalRaftRequest) Descriptor() ([]byte, []int)

func (*InternalRaftRequest) Marshal

func (m *InternalRaftRequest) Marshal() (dAtA []byte, err error)

func (*InternalRaftRequest) MarshalTo

func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error)

func (*InternalRaftRequest) ProtoMessage

func (*InternalRaftRequest) ProtoMessage()

func (*InternalRaftRequest) Reset

func (m *InternalRaftRequest) Reset()

func (*InternalRaftRequest) Size

func (m *InternalRaftRequest) Size() (n int)

func (*InternalRaftRequest) String

func (m *InternalRaftRequest) String() string

func (*InternalRaftRequest) Unmarshal

func (m *InternalRaftRequest) Unmarshal(dAtA []byte) error

type KVNode

type KVNode struct {
	// contains filtered or unexported fields
}

a key-value node backed by raft

func NewKVNode

func NewKVNode(kvopts *KVOptions, machineConfig *MachineConfig, config *RaftConfig,
	transport *rafthttp.Transport, join bool, deleteCb func(),
	clusterInfo common.IClusterInfo, newLeaderChan chan string) (*KVNode, error)

func (*KVNode) ApplyRemoteSnapshot

func (nd *KVNode) ApplyRemoteSnapshot(skip bool, name string, term uint64, index uint64) error

func (*KVNode) BeginTransferRemoteSnap

func (nd *KVNode) BeginTransferRemoteSnap(name string, term uint64, index uint64, syncAddr string, syncPath string) error

func (*KVNode) CheckLocalBackup

func (nd *KVNode) CheckLocalBackup(snapData []byte) (bool, error)

func (*KVNode) CleanData

func (nd *KVNode) CleanData() error

func (*KVNode) CustomPropose

func (nd *KVNode) CustomPropose(buf []byte) (interface{}, error)

func (*KVNode) DeleteRange added in v0.3.2

func (nd *KVNode) DeleteRange(drange DeleteTableRange) error

func (*KVNode) FillMyMemberInfo

func (nd *KVNode) FillMyMemberInfo(m *common.MemberInfo)

func (*KVNode) GetApplyRemoteSnapStatus

func (nd *KVNode) GetApplyRemoteSnapStatus(name string) (*SnapApplyStatus, bool)

func (*KVNode) GetCommittedIndex

func (nd *KVNode) GetCommittedIndex() uint64

func (*KVNode) GetDBInternalStats

func (nd *KVNode) GetDBInternalStats() string

func (*KVNode) GetHandler

func (nd *KVNode) GetHandler(cmd string) (common.CommandFunc, bool, bool)

func (*KVNode) GetIndexSchema

func (nd *KVNode) GetIndexSchema(table string) (map[string]*common.IndexSchema, error)

func (*KVNode) GetLastLeaderChangedTime

func (nd *KVNode) GetLastLeaderChangedTime() int64

func (*KVNode) GetLeadMember

func (nd *KVNode) GetLeadMember() *common.MemberInfo

func (*KVNode) GetLearners

func (nd *KVNode) GetLearners() []*common.MemberInfo

func (*KVNode) GetLocalMemberInfo

func (nd *KVNode) GetLocalMemberInfo() *common.MemberInfo

func (*KVNode) GetMembers

func (nd *KVNode) GetMembers() []*common.MemberInfo

func (*KVNode) GetMergeHandler

func (nd *KVNode) GetMergeHandler(cmd string) (common.MergeCommandFunc, bool, bool)

func (*KVNode) GetRaftStatus

func (nd *KVNode) GetRaftStatus() raft.Status

func (*KVNode) GetRemoteClusterSyncedRaft

func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64)

func (*KVNode) GetSnapshot

func (nd *KVNode) GetSnapshot(term uint64, index uint64) (Snapshot, error)

func (*KVNode) GetStats

func (nd *KVNode) GetStats() common.NamespaceStats

func (*KVNode) IsLead

func (nd *KVNode) IsLead() bool

func (*KVNode) IsPeerRemoved

func (nd *KVNode) IsPeerRemoved(peerID uint64) bool

func (*KVNode) IsRaftSynced

func (nd *KVNode) IsRaftSynced(checkCommitIndex bool) bool

func (*KVNode) IsReplicaRaftReady

func (nd *KVNode) IsReplicaRaftReady(raftID uint64) bool

this is used for leader to determine whether a follower is up to date.

func (*KVNode) IsWriteReady

func (nd *KVNode) IsWriteReady() bool

func (*KVNode) Lookup

func (nd *KVNode) Lookup(key []byte) ([]byte, error)

func (*KVNode) OnRaftLeaderChanged

func (nd *KVNode) OnRaftLeaderChanged()

should not block long in this

func (*KVNode) OptimizeDB

func (nd *KVNode) OptimizeDB(table string)

func (*KVNode) Process

func (nd *KVNode) Process(ctx context.Context, m raftpb.Message) error

func (*KVNode) Propose

func (nd *KVNode) Propose(buf []byte) (interface{}, error)

func (*KVNode) ProposeAddLearner

func (nd *KVNode) ProposeAddLearner(m common.MemberInfo) error

func (*KVNode) ProposeAddMember

func (nd *KVNode) ProposeAddMember(m common.MemberInfo) error

func (*KVNode) ProposeChangeTableSchema

func (nd *KVNode) ProposeChangeTableSchema(table string, sc *SchemaChange) error

func (*KVNode) ProposeRawAndWait

func (nd *KVNode) ProposeRawAndWait(buffer []byte, term uint64, index uint64, raftTs int64) error

func (*KVNode) ProposeRemoveMember

func (nd *KVNode) ProposeRemoveMember(m common.MemberInfo) error

func (*KVNode) ProposeUpdateMember

func (nd *KVNode) ProposeUpdateMember(m common.MemberInfo) error

func (*KVNode) ReportMeLeaderToCluster

func (nd *KVNode) ReportMeLeaderToCluster()

func (*KVNode) ReportSnapshot

func (nd *KVNode) ReportSnapshot(id uint64, gp raftpb.Group, status raft.SnapshotStatus)

func (*KVNode) ReportUnreachable

func (nd *KVNode) ReportUnreachable(id uint64, group raftpb.Group)

func (*KVNode) RestoreFromSnapshot

func (nd *KVNode) RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Snapshot) error

func (*KVNode) SaveDBFrom

func (nd *KVNode) SaveDBFrom(r io.Reader, msg raftpb.Message) (int64, error)

func (*KVNode) SetCommittedIndex

func (nd *KVNode) SetCommittedIndex(ci uint64)

func (*KVNode) SetRemoteClusterSyncedRaft

func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64)

func (*KVNode) Start

func (nd *KVNode) Start(standalone bool) error

func (*KVNode) Stop

func (nd *KVNode) Stop()

func (*KVNode) StopRaft

func (nd *KVNode) StopRaft()

func (*KVNode) Tick

func (nd *KVNode) Tick()

type KVOptions

type KVOptions struct {
	DataDir          string
	EngType          string
	ExpirationPolicy common.ExpirationPolicy
	RockOpts         rockredis.RockOptions
	SharedConfig     *rockredis.SharedRockConfig
}

type KVSnapInfo

type KVSnapInfo struct {
	*rockredis.BackupInfo
	Ver                int                    `json:"version"`
	BackupMeta         []byte                 `json:"backup_meta"`
	LeaderInfo         *common.MemberInfo     `json:"leader_info"`
	Members            []*common.MemberInfo   `json:"members"`
	Learners           []*common.MemberInfo   `json:"learners"`
	RemoteSyncedStates map[string]SyncedState `json:"remote_synced_states"`
}

func (*KVSnapInfo) GetData

func (si *KVSnapInfo) GetData() ([]byte, error)

type KVStore

type KVStore struct {
	*rockredis.RockDB
	// contains filtered or unexported fields
}

a key-value store

func NewKVStore

func NewKVStore(kvopts *KVOptions) (*KVStore, error)

func (*KVStore) BeginBatchWrite

func (s *KVStore) BeginBatchWrite() error

func (*KVStore) CheckExpiredData

func (s *KVStore) CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error

func (*KVStore) CleanData

func (s *KVStore) CleanData() error

func (*KVStore) CommitBatchWrite

func (s *KVStore) CommitBatchWrite() error

func (*KVStore) Destroy

func (s *KVStore) Destroy() error

the caller should make sure call this after close/stop finished. and no any more operation on this store during and after the destroy

func (*KVStore) IsBatchableWrite

func (s *KVStore) IsBatchableWrite(cmdName string) bool

func (*KVStore) LocalDelete

func (s *KVStore) LocalDelete(key []byte) (int64, error)

func (*KVStore) LocalLookup

func (s *KVStore) LocalLookup(key []byte) ([]byte, error)

func (*KVStore) LocalPut

func (s *KVStore) LocalPut(ts int64, key []byte, value []byte) error

func (*KVStore) LocalWriteBatch

func (s *KVStore) LocalWriteBatch(cmd ...common.WriteCmd) error

type MachineConfig

type MachineConfig struct {
	// server node id
	NodeID              uint64                `json:"node_id"`
	BroadcastAddr       string                `json:"broadcast_addr"`
	HttpAPIPort         int                   `json:"http_api_port"`
	LocalRaftAddr       string                `json:"local_raft_addr"`
	DataRootDir         string                `json:"data_root_dir"`
	ElectionTick        int                   `json:"election_tick"`
	TickMs              int                   `json:"tick_ms"`
	KeepWAL             int                   `json:"keep_wal"`
	LearnerRole         string                `json:"learner_role"`
	RemoteSyncCluster   string                `json:"remote_sync_cluster"`
	StateMachineType    string                `json:"state_machine_type"`
	RocksDBOpts         rockredis.RockOptions `json:"rocksdb_opts"`
	RocksDBSharedConfig *rockredis.SharedRockConfig
}

type NamespaceConfig

type NamespaceConfig struct {
	// namespace full name with partition
	Name string `json:"name"`
	// namespace name without partition
	BaseName         string          `json:"base_name"`
	EngType          string          `json:"eng_type"`
	PartitionNum     int             `json:"partition_num"`
	SnapCount        int             `json:"snap_count"`
	SnapCatchup      int             `json:"snap_catchup"`
	Replicator       int             `json:"replicator"`
	OptimizedFsync   bool            `json:"optimized_fsync"`
	RaftGroupConf    RaftGroupConfig `json:"raft_group_conf"`
	ExpirationPolicy string          `json:"expiration_policy"`
}

func NewNSConfig

func NewNSConfig() *NamespaceConfig

type NamespaceDynamicConf

type NamespaceDynamicConf struct {
}

type NamespaceMeta

type NamespaceMeta struct {
	PartitionNum int
}

type NamespaceMgr

type NamespaceMgr struct {
	// contains filtered or unexported fields
}

func NewNamespaceMgr

func NewNamespaceMgr(transport *rafthttp.Transport, conf *MachineConfig) *NamespaceMgr

func (*NamespaceMgr) CheckMagicCode

func (nsm *NamespaceMgr) CheckMagicCode(ns string, magic int64, fix bool) error

func (*NamespaceMgr) DeleteRange added in v0.3.2

func (nsm *NamespaceMgr) DeleteRange(ns string, dtr DeleteTableRange) error

func (*NamespaceMgr) GetDBStats

func (nsm *NamespaceMgr) GetDBStats(leaderOnly bool) map[string]string

func (*NamespaceMgr) GetLogSyncStats

func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string) []common.LogSyncStats

func (*NamespaceMgr) GetNamespaceNode

func (nsm *NamespaceMgr) GetNamespaceNode(ns string) *NamespaceNode

func (*NamespaceMgr) GetNamespaceNodeFromGID

func (nsm *NamespaceMgr) GetNamespaceNodeFromGID(gid uint64) *NamespaceNode

func (*NamespaceMgr) GetNamespaceNodeWithPrimaryKey

func (nsm *NamespaceMgr) GetNamespaceNodeWithPrimaryKey(nsBaseName string, pk []byte) (*NamespaceNode, error)

func (*NamespaceMgr) GetNamespaceNodes

func (nsm *NamespaceMgr) GetNamespaceNodes(nsBaseName string, leaderOnly bool) (map[string]*NamespaceNode, error)

func (*NamespaceMgr) GetNamespaces

func (nsm *NamespaceMgr) GetNamespaces() map[string]*NamespaceNode

func (*NamespaceMgr) GetStats

func (nsm *NamespaceMgr) GetStats(leaderOnly bool) []common.NamespaceStats

func (*NamespaceMgr) InitNamespaceNode

func (nsm *NamespaceMgr) InitNamespaceNode(conf *NamespaceConfig, raftID uint64, join bool) (*NamespaceNode, error)

func (*NamespaceMgr) IsAllRecoveryDone

func (nsm *NamespaceMgr) IsAllRecoveryDone() bool

func (*NamespaceMgr) LoadMachineRegID

func (nsm *NamespaceMgr) LoadMachineRegID() (uint64, error)

func (*NamespaceMgr) OptimizeDB

func (nsm *NamespaceMgr) OptimizeDB(ns string, table string)

func (*NamespaceMgr) SaveMachineRegID

func (nsm *NamespaceMgr) SaveMachineRegID(regID uint64) error

func (*NamespaceMgr) SetIClusterInfo

func (nsm *NamespaceMgr) SetIClusterInfo(clusterInfo common.IClusterInfo)

func (*NamespaceMgr) SetNamespaceMagicCode

func (nsm *NamespaceMgr) SetNamespaceMagicCode(node *NamespaceNode, magic int64) error

TODO:

func (*NamespaceMgr) Start

func (nsm *NamespaceMgr) Start()

func (*NamespaceMgr) Stop

func (nsm *NamespaceMgr) Stop()

type NamespaceNode

type NamespaceNode struct {
	Node *KVNode
	// contains filtered or unexported fields
}

func (*NamespaceNode) CheckRaftConf

func (nn *NamespaceNode) CheckRaftConf(raftID uint64, conf *NamespaceConfig) error

func (*NamespaceNode) Close

func (nn *NamespaceNode) Close()

func (*NamespaceNode) Destroy

func (nn *NamespaceNode) Destroy() error

func (*NamespaceNode) FullName

func (nn *NamespaceNode) FullName() string

func (*NamespaceNode) GetLastLeaderChangedTime

func (nn *NamespaceNode) GetLastLeaderChangedTime() int64

func (*NamespaceNode) GetLearners

func (nn *NamespaceNode) GetLearners() []*common.MemberInfo

func (*NamespaceNode) GetMembers

func (nn *NamespaceNode) GetMembers() []*common.MemberInfo

func (*NamespaceNode) GetRaftID

func (nn *NamespaceNode) GetRaftID() uint64

func (*NamespaceNode) IsDataNeedFix

func (nn *NamespaceNode) IsDataNeedFix() bool

func (*NamespaceNode) IsNsNodeFullReady

func (nn *NamespaceNode) IsNsNodeFullReady(checkCommitIndex bool) bool

full ready node means: all local commit log replay done and we are aware of leader and maybe we have done all the newest commit log in state machine.

func (*NamespaceNode) IsReady

func (nn *NamespaceNode) IsReady() bool

func (*NamespaceNode) SetDataFixState

func (nn *NamespaceNode) SetDataFixState(needFix bool)

func (*NamespaceNode) SetDynamicInfo

func (nn *NamespaceNode) SetDynamicInfo(dync NamespaceDynamicConf)

func (*NamespaceNode) SetMagicCode

func (nn *NamespaceNode) SetMagicCode(magic int64) error

func (*NamespaceNode) Start

func (nn *NamespaceNode) Start(forceStandaloneCluster bool) error

func (*NamespaceNode) SwitchForLearnerLeader

func (nn *NamespaceNode) SwitchForLearnerLeader(isLearnerLeader bool)

func (*NamespaceNode) TransferMyLeader

func (nn *NamespaceNode) TransferMyLeader(to uint64, toRaftID uint64) error

type RaftConfig

type RaftConfig struct {
	GroupID uint64 `json:"group_id"`
	// group name is combined namespace-partition string
	GroupName string `json:"group_name"`
	// this is replica id
	ID uint64 `json:"id"`
	// local server transport address, it
	// can be used by several raft group
	RaftAddr       string                 `json:"raft_addr"`
	DataDir        string                 `json:"data_dir"`
	WALDir         string                 `json:"wal_dir"`
	KeepWAL        int                    `json:"keep_wal"`
	SnapDir        string                 `json:"snap_dir"`
	RaftPeers      map[uint64]ReplicaInfo `json:"raft_peers"`
	SnapCount      int                    `json:"snap_count"`
	SnapCatchup    int                    `json:"snap_catchup"`
	Replicator     int                    `json:"replicator"`
	OptimizedFsync bool                   `json:"optimized_fsync"`
	// contains filtered or unexported fields
}

type RaftGroupConfig

type RaftGroupConfig struct {
	GroupID   uint64        `json:"group_id"`
	SeedNodes []ReplicaInfo `json:"seed_nodes"`
}

type RaftRpcFunc

type RaftRpcFunc func() error

type RemoteLogSender

type RemoteLogSender struct {
	// contains filtered or unexported fields
}

RemoteLogSender is the raft log sender. It will send all the raft logs to the remote cluster using grpc service.

func NewRemoteLogSender

func NewRemoteLogSender(localCluster string, fullName string, remoteCluster string) (*RemoteLogSender, error)

func (*RemoteLogSender) GetStats

func (s *RemoteLogSender) GetStats() interface{}

func (*RemoteLogSender) Stop

func (s *RemoteLogSender) Stop()

type ReplicaInfo

type ReplicaInfo struct {
	NodeID    uint64 `json:"node_id"`
	ReplicaID uint64 `json:"replica_id"`
	RaftAddr  string `json:"raft_addr"`
}

type ReqSourceType

type ReqSourceType int32
const (
	FromAPI           ReqSourceType = 0
	FromClusterSyncer ReqSourceType = 1
)

func (ReqSourceType) EnumDescriptor

func (ReqSourceType) EnumDescriptor() ([]byte, []int)

func (ReqSourceType) String

func (x ReqSourceType) String() string

type RequestHeader

type RequestHeader struct {
	ID        uint64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
	DataType  int32  `protobuf:"varint,2,opt,name=data_type,json=dataType,proto3" json:"data_type,omitempty"`
	Timestamp int64  `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
}

func (*RequestHeader) Descriptor

func (*RequestHeader) Descriptor() ([]byte, []int)

func (*RequestHeader) Marshal

func (m *RequestHeader) Marshal() (dAtA []byte, err error)

func (*RequestHeader) MarshalTo

func (m *RequestHeader) MarshalTo(dAtA []byte) (int, error)

func (*RequestHeader) ProtoMessage

func (*RequestHeader) ProtoMessage()

func (*RequestHeader) Reset

func (m *RequestHeader) Reset()

func (*RequestHeader) Size

func (m *RequestHeader) Size() (n int)

func (*RequestHeader) String

func (m *RequestHeader) String() string

func (*RequestHeader) Unmarshal

func (m *RequestHeader) Unmarshal(dAtA []byte) error

type SchemaChange

type SchemaChange struct {
	Type       SchemaChangeType `protobuf:"varint,1,opt,name=Type,proto3,enum=node.SchemaChangeType" json:"Type,omitempty"`
	Table      string           `protobuf:"bytes,2,opt,name=Table,proto3" json:"Table,omitempty"`
	SchemaData []byte           `protobuf:"bytes,3,opt,name=SchemaData,proto3" json:"SchemaData,omitempty"`
}

func (*SchemaChange) Descriptor

func (*SchemaChange) Descriptor() ([]byte, []int)

func (*SchemaChange) Marshal

func (m *SchemaChange) Marshal() (dAtA []byte, err error)

func (*SchemaChange) MarshalTo

func (m *SchemaChange) MarshalTo(dAtA []byte) (int, error)

func (*SchemaChange) ProtoMessage

func (*SchemaChange) ProtoMessage()

func (*SchemaChange) Reset

func (m *SchemaChange) Reset()

func (*SchemaChange) Size

func (m *SchemaChange) Size() (n int)

func (*SchemaChange) String

func (m *SchemaChange) String() string

func (*SchemaChange) Unmarshal

func (m *SchemaChange) Unmarshal(dAtA []byte) error

type SchemaChangeType

type SchemaChangeType int32
const (
	SchemaChangeAddHsetIndex    SchemaChangeType = 0
	SchemaChangeUpdateHsetIndex SchemaChangeType = 1
	SchemaChangeDeleteHsetIndex SchemaChangeType = 2
)

func (SchemaChangeType) EnumDescriptor

func (SchemaChangeType) EnumDescriptor() ([]byte, []int)

func (SchemaChangeType) String

func (x SchemaChangeType) String() string

type SnapApplyStatus

type SnapApplyStatus struct {
	SS          SyncedState
	StatusCode  int
	Status      string
	UpdatedTime time.Time
}

type Snapshot

type Snapshot interface {
	GetData() ([]byte, error)
}

type StateMachine

type StateMachine interface {
	ApplyRaftRequest(isReplaying bool, req BatchInternalRaftRequest, term uint64, index uint64, stop chan struct{}) (bool, error)
	ApplyRaftConfRequest(req raftpb.ConfChange, term uint64, index uint64, stop chan struct{}) error
	GetSnapshot(term uint64, index uint64) (*KVSnapInfo, error)
	RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Snapshot, stop chan struct{}) error
	Destroy()
	CleanData() error
	Optimize(string)
	GetStats() common.NamespaceStats
	Start() error
	Close()
	CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error
}

func NewStateMachine

func NewStateMachine(opts *KVOptions, machineConfig MachineConfig, localID uint64,
	fullNS string, clusterInfo common.IClusterInfo, w wait.Wait) (StateMachine, error)

type SyncedState

type SyncedState struct {
	SyncedTerm  uint64 `json:"synced_term,omitempty"`
	SyncedIndex uint64 `json:"synced_index,omitempty"`
}

func (*SyncedState) IsNewer

func (ss *SyncedState) IsNewer(other *SyncedState) bool

func (*SyncedState) IsNewer2

func (ss *SyncedState) IsNewer2(term uint64, index uint64) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL