node

package
Version: v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2021 License: MIT Imports: 47 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NoConflict = iota
	MaybeConflict
	Conflict
)
View Source
const (
	RADIUS_COORDS searchType = iota
	RADIUS_MEMBER
)
View Source
const (
	SORT_NONE sortType = iota
	SORT_ASC
	SORT_DESC
)
View Source
const (
	RedisReq        int8 = 0
	CustomReq       int8 = 1
	SchemaChangeReq int8 = 2
	RedisV2Req      int8 = 3
)
View Source
const (
	ProposeOp_Backup                 int = 1
	ProposeOp_TransferRemoteSnap     int = 2
	ProposeOp_ApplyRemoteSnap        int = 3
	ProposeOp_RemoteConfChange       int = 4
	ProposeOp_ApplySkippedRemoteSnap int = 5
	ProposeOp_DeleteTable            int = 6
)
View Source
const (
	ApplySnapUnknown int = iota
	ApplySnapBegin
	ApplySnapTransferring
	ApplySnapTransferred
	ApplySnapApplying
	ApplySnapDone
	ApplySnapFailed
)

Variables

View Source
var (
	ErrNamespaceAlreadyExist    = errors.New("namespace already exist")
	ErrNamespaceAlreadyStarting = errors.New("namespace is starting")
	ErrRaftIDMismatch           = errors.New("raft id mismatch")
	ErrRaftConfMismatch         = errors.New("raft config mismatch")

	ErrNamespaceNotFound          = errors.New("ERR_CLUSTER_CHANGED: namespace is not found")
	ErrNamespacePartitionNotFound = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace is not found")
	ErrNamespaceNotLeader         = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace is not leader on the node")
	ErrNodeNoLeader               = errors.New("ERR_CLUSTER_CHANGED: partition of the node has no leader")
	ErrRaftGroupNotReady          = errors.New("ERR_CLUSTER_CHANGED: raft group not ready")
	ErrProposalCanceled           = errors.New("ERR_CLUSTER_CHANGED: raft proposal " + context.Canceled.Error())

	ErrLocalMagicCodeConflict = errors.New("namespace magic code conflict on local")
)
View Source
var (
	// DefaultSnapCount is the count for trigger snapshot
	DefaultSnapCount = int(settings.Soft.DefaultSnapCount)

	// HealthInterval is the minimum time the cluster should be healthy
	// before accepting add member requests.
	HealthInterval = time.Duration(settings.Soft.HealthIntervalSec) * time.Second
)
View Source
var (
	ErrInvalidLengthRaftInternal = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowRaftInternal   = fmt.Errorf("proto: integer overflow")
)
View Source
var (
	ErrExpiredBatchedBuffFull = errors.New("the expired data batched buffer is full now")
)
View Source
var (
	ErrReadIndexTimeout = errors.New("wait read index timeout")
)
View Source
var ErrSlowLimiterRefused = errors.New("refused by slow limiter")

ErrSlowLimiterRefused indicated the write request is slow while applying so it is refused to avoid slow down other write.

View Source
var ReqSourceType_name = map[int32]string{
	0: "FromAPI",
	1: "FromClusterSyncer",
}
View Source
var ReqSourceType_value = map[string]int32{
	"FromAPI":           0,
	"FromClusterSyncer": 1,
}
View Source
var SchemaChangeType_name = map[int32]string{
	0: "SchemaChangeAddHsetIndex",
	1: "SchemaChangeUpdateHsetIndex",
	2: "SchemaChangeDeleteHsetIndex",
}
View Source
var SchemaChangeType_value = map[string]int32{
	"SchemaChangeAddHsetIndex":    0,
	"SchemaChangeUpdateHsetIndex": 1,
	"SchemaChangeDeleteHsetIndex": 2,
}
View Source
var SlowHalfOpenSec = int64(10)
View Source
var SlowRefuseCostMs = int64(800)
View Source
var UseRedisV2 = false

Functions

func ChangeSlowRefuseCost added in v0.9.3

func ChangeSlowRefuseCost(v int64)

func EnableForTest

func EnableForTest()

func EnableSlowLimiterTest added in v0.8.2

func EnableSlowLimiterTest(t bool)

func EnableSnapForTest added in v0.6.3

func EnableSnapForTest(transfer bool, save bool, apply bool, restore bool)

func GetHashedPartitionID

func GetHashedPartitionID(pk []byte, pnum int) int

func GetLogLatencyStats added in v0.4.0

func GetLogLatencyStats() (*metric.WriteStats, *metric.WriteStats)

func GetSyncedOnlyChangedTs added in v0.4.3

func GetSyncedOnlyChangedTs() int64

func GetValidBackupInfo

func GetValidBackupInfo(machineConfig MachineConfig,
	clusterInfo common.IClusterInfo, fullNS string,
	localID uint64, stopChan chan struct{},
	raftSnapshot raftpb.Snapshot, retryIndex int, useRsyncForLocal bool) (string, string)

func HashedKey added in v0.9.3

func HashedKey(pk []byte) int

func IsMaybeSlowWriteCmd added in v0.9.0

func IsMaybeSlowWriteCmd(cmd string) bool

func IsSyncerNormalInit added in v0.9.0

func IsSyncerNormalInit() bool

func IsSyncerOnly

func IsSyncerOnly() bool

func LoadMagicCode added in v0.9.2

func LoadMagicCode(fileName string) (int64, error)

func MaybeConflictLogDisabled added in v0.9.1

func MaybeConflictLogDisabled() bool

func NewConflictRouter

func NewConflictRouter() *conflictRouter

func NewKVStoreSM

func NewKVStoreSM(opts *KVOptions, machineConfig MachineConfig, localID uint64, ns string,
	clusterInfo common.IClusterInfo, sl *SlowLimiter) (*kvStoreSM, error)

func NewLogSyncerSM

func NewLogSyncerSM(opts *KVOptions, machineConfig MachineConfig, localID uint64, fullNS string,
	clusterInfo common.IClusterInfo) (*logSyncerSM, error)

func RegisterSlowConfChanged added in v0.8.3

func RegisterSlowConfChanged()

func SetLogLevel

func SetLogLevel(level int)

func SetLogger

func SetLogger(level int32, logger common.Logger)

func SetSyncerNormalInit

func SetSyncerNormalInit(enable bool)

func SetSyncerOnly

func SetSyncerOnly(enable bool)

func SwitchDisableMaybeConflictLog added in v0.9.1

func SwitchDisableMaybeConflictLog(disable bool)

Types

type BatchInternalRaftRequest

type BatchInternalRaftRequest struct {
	ReqNum    int32                 `protobuf:"varint,1,opt,name=req_num,json=reqNum,proto3" json:"req_num,omitempty"`
	Reqs      []InternalRaftRequest `protobuf:"bytes,2,rep,name=reqs,proto3" json:"reqs"`
	Timestamp int64                 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Type      ReqSourceType         `protobuf:"varint,4,opt,name=type,proto3,enum=node.ReqSourceType" json:"type,omitempty"`
	ReqId     uint64                `protobuf:"varint,5,opt,name=req_id,json=reqId,proto3" json:"req_id,omitempty"`
	// used for cluster log syncer
	OrigTerm    uint64 `protobuf:"varint,6,opt,name=orig_term,json=origTerm,proto3" json:"orig_term,omitempty"`
	OrigIndex   uint64 `protobuf:"varint,7,opt,name=orig_index,json=origIndex,proto3" json:"orig_index,omitempty"`
	OrigCluster string `protobuf:"bytes,8,opt,name=orig_cluster,json=origCluster,proto3" json:"orig_cluster,omitempty"`
}

func (*BatchInternalRaftRequest) Descriptor

func (*BatchInternalRaftRequest) Descriptor() ([]byte, []int)

func (*BatchInternalRaftRequest) Marshal

func (m *BatchInternalRaftRequest) Marshal() (dAtA []byte, err error)

func (*BatchInternalRaftRequest) MarshalTo

func (m *BatchInternalRaftRequest) MarshalTo(dAtA []byte) (int, error)

func (*BatchInternalRaftRequest) ProtoMessage

func (*BatchInternalRaftRequest) ProtoMessage()

func (*BatchInternalRaftRequest) Reset

func (m *BatchInternalRaftRequest) Reset()

func (*BatchInternalRaftRequest) Size

func (m *BatchInternalRaftRequest) Size() (n int)

func (*BatchInternalRaftRequest) String

func (m *BatchInternalRaftRequest) String() string

func (*BatchInternalRaftRequest) Unmarshal

func (m *BatchInternalRaftRequest) Unmarshal(dAtA []byte) error

func (*BatchInternalRaftRequest) XXX_DiscardUnknown added in v0.7.1

func (m *BatchInternalRaftRequest) XXX_DiscardUnknown()

func (*BatchInternalRaftRequest) XXX_Marshal added in v0.7.1

func (m *BatchInternalRaftRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*BatchInternalRaftRequest) XXX_Merge added in v0.7.1

func (m *BatchInternalRaftRequest) XXX_Merge(src proto.Message)

func (*BatchInternalRaftRequest) XXX_Size added in v0.7.1

func (m *BatchInternalRaftRequest) XXX_Size() int

func (*BatchInternalRaftRequest) XXX_Unmarshal added in v0.7.1

func (m *BatchInternalRaftRequest) XXX_Unmarshal(b []byte) error

type CompactAPIRange added in v0.9.2

type CompactAPIRange struct {
	StartFrom []byte `json:"start_from,omitempty"`
	EndTo     []byte `json:"end_to,omitempty"`
	Dryrun    bool   `json:"dryrun,omitempty"`
}

type ConflictCheckFunc

type ConflictCheckFunc func(redcon.Command, int64) ConflictState

type ConflictState added in v0.5.2

type ConflictState int

type DataStorage

type DataStorage interface {
	CleanData() error
	RestoreFromSnapshot(raftpb.Snapshot) error
	PrepareSnapshot(raftpb.Snapshot) error
	GetSnapshot(term uint64, index uint64) (Snapshot, error)
	UpdateSnapshotState(term uint64, index uint64)
	Stop()
}

type DeleteTableRange added in v0.3.2

type DeleteTableRange struct {
	Table     string `json:"table,omitempty"`
	StartFrom []byte `json:"start_from,omitempty"`
	EndTo     []byte `json:"end_to,omitempty"`
	// to avoid drop all table data, this is needed to delete all data in table
	DeleteAll bool `json:"delete_all,omitempty"`
	Dryrun    bool `json:"dryrun,omitempty"`
	// flag to indicate should this be replicated to the remote cluster
	// to avoid delete too much by accident
	NoReplayToRemoteCluster bool `json:"noreplay_to_remote_cluster"`
}

func (DeleteTableRange) CheckValid added in v0.3.2

func (dtr DeleteTableRange) CheckValid() error

type FutureRsp added in v0.8.0

type FutureRsp struct {
	// contains filtered or unexported fields
}

func (*FutureRsp) WaitRsp added in v0.8.0

func (fr *FutureRsp) WaitRsp() (interface{}, error)

note: should not call twice on wait

type HindexSearchResults

type HindexSearchResults struct {
	Table string
	Rets  []common.HIndexRespWithValues
}

type IBatchOperator added in v0.7.0

type IBatchOperator interface {
	SetBatched(bool)
	IsBatched() bool
	BeginBatch() error
	AddBatchKey(string)
	AddBatchRsp(uint64, interface{})
	IsBatchable(string, string, [][]byte) bool
	CommitBatch()
	AbortBatchForError(err error)
}

type IRaftPersistStorage

type IRaftPersistStorage interface {
	// Save function saves ents and state to the underlying stable storage.
	// Save MUST block until st and ents are on stable storage.
	Save(st raftpb.HardState, ents []raftpb.Entry) error
	// SaveSnap function saves snapshot to the underlying stable storage.
	SaveSnap(snap raftpb.Snapshot) error
	Load() (*raftpb.Snapshot, error)
	LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error)
	// Close closes the Storage and performs finalization.
	Close() error
	// Release releases the locked wal files older than the provided snapshot.
	Release(snap raftpb.Snapshot) error
	// Sync WAL
	Sync() error
}

func NewRaftPersistStorage

func NewRaftPersistStorage(w *wal.WAL, s *snap.Snapshotter) IRaftPersistStorage

type InternalRaftRequest

type InternalRaftRequest struct {
	Header RequestHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header"`
	Data   []byte        `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}

func (*InternalRaftRequest) Descriptor

func (*InternalRaftRequest) Descriptor() ([]byte, []int)

func (*InternalRaftRequest) Marshal

func (m *InternalRaftRequest) Marshal() (dAtA []byte, err error)

func (*InternalRaftRequest) MarshalTo

func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error)

func (*InternalRaftRequest) ProtoMessage

func (*InternalRaftRequest) ProtoMessage()

func (*InternalRaftRequest) Reset

func (m *InternalRaftRequest) Reset()

func (*InternalRaftRequest) Size

func (m *InternalRaftRequest) Size() (n int)

func (*InternalRaftRequest) String

func (m *InternalRaftRequest) String() string

func (*InternalRaftRequest) Unmarshal

func (m *InternalRaftRequest) Unmarshal(dAtA []byte) error

func (*InternalRaftRequest) XXX_DiscardUnknown added in v0.7.1

func (m *InternalRaftRequest) XXX_DiscardUnknown()

func (*InternalRaftRequest) XXX_Marshal added in v0.7.1

func (m *InternalRaftRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*InternalRaftRequest) XXX_Merge added in v0.7.1

func (m *InternalRaftRequest) XXX_Merge(src proto.Message)

func (*InternalRaftRequest) XXX_Size added in v0.7.1

func (m *InternalRaftRequest) XXX_Size() int

func (*InternalRaftRequest) XXX_Unmarshal added in v0.7.1

func (m *InternalRaftRequest) XXX_Unmarshal(b []byte) error

type KVNode

type KVNode struct {
	// contains filtered or unexported fields
}

a key-value node backed by raft

func NewKVNode

func NewKVNode(kvopts *KVOptions, config *RaftConfig,
	transport *rafthttp.Transport, join bool, stopCb func(),
	clusterInfo common.IClusterInfo, newLeaderChan chan string) (*KVNode, error)

func (*KVNode) ApplyRemoteSnapshot

func (nd *KVNode) ApplyRemoteSnapshot(skip bool, name string, term uint64, index uint64) error

func (*KVNode) BackupDB added in v0.5.2

func (nd *KVNode) BackupDB(checkLast bool)

backup to avoid replay after restart, however we can check if last backup is almost up to date to avoid do backup again. In raft, restart will compact all logs before snapshot, so if we backup too new it may cause the snapshot transfer after a full restart raft cluster.

func (*KVNode) BeginTransferRemoteSnap

func (nd *KVNode) BeginTransferRemoteSnap(name string, term uint64, index uint64, syncAddr string, syncPath string) error

func (*KVNode) CanPass added in v0.8.2

func (nd *KVNode) CanPass(ts int64, cmd string, table string) bool

func (*KVNode) CheckLocalBackup

func (nd *KVNode) CheckLocalBackup(snapData []byte) (bool, error)

func (*KVNode) CleanData

func (nd *KVNode) CleanData() error

func (*KVNode) CustomPropose

func (nd *KVNode) CustomPropose(buf []byte) (interface{}, error)

func (*KVNode) DeleteRange added in v0.3.2

func (nd *KVNode) DeleteRange(drange DeleteTableRange) error

func (*KVNode) DisableOptimizeDB added in v0.9.2

func (nd *KVNode) DisableOptimizeDB(disable bool)

func (*KVNode) FillMyMemberInfo

func (nd *KVNode) FillMyMemberInfo(m *common.MemberInfo)

func (*KVNode) GetAppliedIndex added in v0.6.3

func (nd *KVNode) GetAppliedIndex() uint64

func (*KVNode) GetApplyRemoteSnapStatus

func (nd *KVNode) GetApplyRemoteSnapStatus(name string) (*SnapApplyStatus, bool)

func (*KVNode) GetDBInternalStats

func (nd *KVNode) GetDBInternalStats() string

func (*KVNode) GetFullName added in v0.8.2

func (nd *KVNode) GetFullName() string

func (*KVNode) GetHandler

func (nd *KVNode) GetHandler(cmd string) (common.CommandFunc, bool)

func (*KVNode) GetIndexSchema

func (nd *KVNode) GetIndexSchema(table string) (map[string]*common.IndexSchema, error)

func (*KVNode) GetLastLeaderChangedTime

func (nd *KVNode) GetLastLeaderChangedTime() int64

func (*KVNode) GetLastSnapIndex added in v0.8.1

func (nd *KVNode) GetLastSnapIndex() uint64

func (*KVNode) GetLeadMember

func (nd *KVNode) GetLeadMember() *common.MemberInfo

func (*KVNode) GetLearnerRole added in v0.9.0

func (nd *KVNode) GetLearnerRole() string

func (*KVNode) GetLearners

func (nd *KVNode) GetLearners() []*common.MemberInfo

func (*KVNode) GetLocalMemberInfo

func (nd *KVNode) GetLocalMemberInfo() *common.MemberInfo

func (*KVNode) GetLogSyncStatsInSyncLearner added in v0.4.0

func (nd *KVNode) GetLogSyncStatsInSyncLearner() (*metric.LogSyncStats, *metric.LogSyncStats)

func (*KVNode) GetMembers

func (nd *KVNode) GetMembers() []*common.MemberInfo

func (*KVNode) GetMergeHandler

func (nd *KVNode) GetMergeHandler(cmd string) (common.MergeCommandFunc, bool, bool)

func (*KVNode) GetRaftConfig added in v0.9.0

func (nd *KVNode) GetRaftConfig() *RaftConfig

func (*KVNode) GetRaftStatus

func (nd *KVNode) GetRaftStatus() raft.Status

func (*KVNode) GetRemoteClusterSyncedRaft

func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64, int64)

func (*KVNode) GetSnapshot

func (nd *KVNode) GetSnapshot(term uint64, index uint64) (Snapshot, error)

func (*KVNode) GetStats

func (nd *KVNode) GetStats(table string, needTableDetail bool) metric.NamespaceStats

func (*KVNode) GetWALDBInternalStats added in v0.6.0

func (nd *KVNode) GetWALDBInternalStats() map[string]interface{}

func (*KVNode) GetWriteHandler added in v0.8.0

func (nd *KVNode) GetWriteHandler(cmd string) (common.WriteCommandFunc, bool)

func (*KVNode) IsLead

func (nd *KVNode) IsLead() bool

func (*KVNode) IsPeerRemoved

func (nd *KVNode) IsPeerRemoved(peerID uint64) bool

func (*KVNode) IsRaftSynced

func (nd *KVNode) IsRaftSynced(checkCommitIndex bool) bool

func (*KVNode) IsReplicaRaftReady

func (nd *KVNode) IsReplicaRaftReady(raftID uint64) bool

this is used for leader to determine whether a follower is up to date.

func (*KVNode) IsStopping added in v0.6.2

func (nd *KVNode) IsStopping() bool

func (*KVNode) IsWriteReady

func (nd *KVNode) IsWriteReady() bool

func (*KVNode) Lookup

func (nd *KVNode) Lookup(key []byte) ([]byte, error)

func (*KVNode) MaybeAddSlow added in v0.8.2

func (nd *KVNode) MaybeAddSlow(ts int64, cost time.Duration, cmd, table string)

func (*KVNode) OnRaftLeaderChanged

func (nd *KVNode) OnRaftLeaderChanged()

should not block long in this

func (*KVNode) OptimizeDB

func (nd *KVNode) OptimizeDB(table string)

func (*KVNode) OptimizeDBAnyRange added in v0.9.2

func (nd *KVNode) OptimizeDBAnyRange(r CompactAPIRange)

func (*KVNode) OptimizeDBExpire added in v0.9.2

func (nd *KVNode) OptimizeDBExpire()

func (*KVNode) PreWaitQueue added in v0.9.0

func (nd *KVNode) PreWaitQueue(ctx context.Context, cmd string, table string) (*SlowWaitDone, error)

func (*KVNode) PrepareSnapshot added in v0.6.3

func (nd *KVNode) PrepareSnapshot(raftSnapshot raftpb.Snapshot) error

func (*KVNode) Process

func (nd *KVNode) Process(ctx context.Context, m raftpb.Message) error

func (*KVNode) ProposeAddLearner

func (nd *KVNode) ProposeAddLearner(m common.MemberInfo) error

func (*KVNode) ProposeAddMember

func (nd *KVNode) ProposeAddMember(m common.MemberInfo) error

func (*KVNode) ProposeChangeTableSchema

func (nd *KVNode) ProposeChangeTableSchema(table string, sc *SchemaChange) error

func (*KVNode) ProposeInternal added in v0.7.1

func (nd *KVNode) ProposeInternal(ctx context.Context, irr InternalRaftRequest, cancel context.CancelFunc, start time.Time) (*waitReqHeaders, error)

func (*KVNode) ProposeRawAndWaitFromSyncer added in v0.8.0

func (nd *KVNode) ProposeRawAndWaitFromSyncer(reqList *BatchInternalRaftRequest, term uint64, index uint64, raftTs int64) error

func (*KVNode) ProposeRawAsyncFromSyncer added in v0.8.0

func (nd *KVNode) ProposeRawAsyncFromSyncer(buffer []byte, reqList *BatchInternalRaftRequest, term uint64, index uint64, raftTs int64) (*FutureRsp, *BatchInternalRaftRequest, error)

func (*KVNode) ProposeRemoveMember

func (nd *KVNode) ProposeRemoveMember(m common.MemberInfo) error

func (*KVNode) ProposeUpdateMember

func (nd *KVNode) ProposeUpdateMember(m common.MemberInfo) error

func (*KVNode) RedisPropose added in v0.8.0

func (nd *KVNode) RedisPropose(buf []byte) (interface{}, error)

func (*KVNode) RedisProposeAsync added in v0.8.0

func (nd *KVNode) RedisProposeAsync(buf []byte) (*FutureRsp, error)

func (*KVNode) RedisV2ProposeAsync added in v0.8.0

func (nd *KVNode) RedisV2ProposeAsync(buf []byte) (*FutureRsp, error)

func (*KVNode) ReportMeLeaderToCluster

func (nd *KVNode) ReportMeLeaderToCluster()

func (*KVNode) ReportSnapshot

func (nd *KVNode) ReportSnapshot(id uint64, gp raftpb.Group, status raft.SnapshotStatus)

func (*KVNode) ReportUnreachable

func (nd *KVNode) ReportUnreachable(id uint64, group raftpb.Group)

func (*KVNode) RestoreFromSnapshot

func (nd *KVNode) RestoreFromSnapshot(raftSnapshot raftpb.Snapshot) error

func (*KVNode) SaveDBFrom

func (nd *KVNode) SaveDBFrom(r io.Reader, msg raftpb.Message) (int64, error)

func (*KVNode) SetAppliedIndex added in v0.6.3

func (nd *KVNode) SetAppliedIndex(ci uint64)

func (*KVNode) SetDynamicInfo added in v0.6.2

func (nd *KVNode) SetDynamicInfo(dync NamespaceDynamicConf)

func (*KVNode) SetLastSnapIndex added in v0.8.1

func (nd *KVNode) SetLastSnapIndex(ci uint64)

func (*KVNode) SetMaxBackgroundOptions added in v0.7.1

func (nd *KVNode) SetMaxBackgroundOptions(maxCompact int, maxBackJobs int) error

func (*KVNode) SetRemoteClusterSyncedRaft

func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64, ts int64)

func (*KVNode) Start

func (nd *KVNode) Start(standalone bool) error

func (*KVNode) Stop

func (nd *KVNode) Stop()

func (*KVNode) StopRaft

func (nd *KVNode) StopRaft()

func (*KVNode) Tick

func (nd *KVNode) Tick()

func (*KVNode) UpdateSnapshotState added in v0.7.1

func (nd *KVNode) UpdateSnapshotState(term uint64, index uint64)

func (*KVNode) UpdateWriteStats added in v0.8.0

func (nd *KVNode) UpdateWriteStats(vSize int64, latencyUs int64)

type KVOptions

type KVOptions struct {
	DataDir          string
	KeepBackup       int
	EngType          string
	ExpirationPolicy common.ExpirationPolicy
	DataVersion      common.DataVersionT
	RockOpts         engine.RockOptions
	SharedConfig     engine.SharedRockConfig
}

type KVSnapInfo

type KVSnapInfo struct {
	*rockredis.BackupInfo
	Ver                int                    `json:"version"`
	BackupMeta         []byte                 `json:"backup_meta"`
	LeaderInfo         *common.MemberInfo     `json:"leader_info"`
	Members            []*common.MemberInfo   `json:"members"`
	Learners           []*common.MemberInfo   `json:"learners"`
	RemoteSyncedStates map[string]SyncedState `json:"remote_synced_states"`
}

func (*KVSnapInfo) GetData

func (si *KVSnapInfo) GetData() ([]byte, error)

type KVStore

type KVStore struct {
	*rockredis.RockDB
	// contains filtered or unexported fields
}

a key-value store

func NewKVStore

func NewKVStore(kvopts *KVOptions) (*KVStore, error)

func (*KVStore) AbortBatch added in v0.8.1

func (s *KVStore) AbortBatch()

func (*KVStore) BeginBatchWrite

func (s *KVStore) BeginBatchWrite() error

func (*KVStore) CleanData

func (s *KVStore) CleanData() error

func (*KVStore) CommitBatchWrite

func (s *KVStore) CommitBatchWrite() error

func (*KVStore) Destroy

func (s *KVStore) Destroy() error

the caller should make sure call this after close/stop finished. and no any more operation on this store during and after the destroy

func (*KVStore) IsBatchableWrite

func (s *KVStore) IsBatchableWrite(cmdName string) bool

func (*KVStore) LocalDelete

func (s *KVStore) LocalDelete(key []byte) (int64, error)

func (*KVStore) LocalLookup

func (s *KVStore) LocalLookup(key []byte) ([]byte, error)

func (*KVStore) LocalPut

func (s *KVStore) LocalPut(ts int64, key []byte, value []byte) error

func (*KVStore) LocalWriteBatch

func (s *KVStore) LocalWriteBatch(cmd ...common.WriteCmd) error

type MachineConfig

type MachineConfig struct {
	// server node id
	NodeID                 uint64             `json:"node_id"`
	BroadcastAddr          string             `json:"broadcast_addr"`
	HttpAPIPort            int                `json:"http_api_port"`
	LocalRaftAddr          string             `json:"local_raft_addr"`
	DataRootDir            string             `json:"data_root_dir"`
	ElectionTick           int                `json:"election_tick"`
	TickMs                 int                `json:"tick_ms"`
	KeepBackup             int                `json:"keep_backup"`
	KeepWAL                int                `json:"keep_wal"`
	UseRocksWAL            bool               `json:"use_rocks_wal"`
	SharedRocksWAL         bool               `json:"shared_rocks_wal"`
	LearnerRole            string             `json:"learner_role"`
	RemoteSyncCluster      string             `json:"remote_sync_cluster"`
	StateMachineType       string             `json:"state_machine_type"`
	RocksDBOpts            engine.RockOptions `json:"rocksdb_opts"`
	RocksDBSharedConfig    engine.SharedRockConfig
	WALRocksDBOpts         engine.RockOptions `json:"wal_rocksdb_opts"`
	WALRocksDBSharedConfig engine.SharedRockConfig
}

type NamespaceConfig

type NamespaceConfig struct {
	// namespace full name with partition
	Name string `json:"name"`
	// namespace name without partition
	BaseName         string          `json:"base_name"`
	EngType          string          `json:"eng_type"`
	PartitionNum     int             `json:"partition_num"`
	SnapCount        int             `json:"snap_count"`
	SnapCatchup      int             `json:"snap_catchup"`
	Replicator       int             `json:"replicator"`
	OptimizedFsync   bool            `json:"optimized_fsync"`
	RaftGroupConf    RaftGroupConfig `json:"raft_group_conf"`
	ExpirationPolicy string          `json:"expiration_policy"`
	DataVersion      string          `json:"data_version"`
}

func NewNSConfig

func NewNSConfig() *NamespaceConfig

type NamespaceDynamicConf

type NamespaceDynamicConf struct {
	Replicator int
}

type NamespaceMeta

type NamespaceMeta struct {
	PartitionNum int
	// contains filtered or unexported fields
}

type NamespaceMgr

type NamespaceMgr struct {
	// contains filtered or unexported fields
}

func NewNamespaceMgr

func NewNamespaceMgr(transport *rafthttp.Transport, conf *MachineConfig) *NamespaceMgr

func (*NamespaceMgr) BackupDB added in v0.5.2

func (nsm *NamespaceMgr) BackupDB(ns string, checkLast bool)

func (*NamespaceMgr) CheckLocalNamespaces added in v0.9.1

func (nsm *NamespaceMgr) CheckLocalNamespaces() map[string]int64

func (*NamespaceMgr) CleanSharedNsFiles added in v0.9.1

func (nsm *NamespaceMgr) CleanSharedNsFiles(baseNS string) error

this is used to clean some shared namespace data bewteen all partitions, only remove them until all the partitions are removed and the whole namespace is deleted from cluster

func (*NamespaceMgr) ClearTopn added in v0.8.4

func (nsm *NamespaceMgr) ClearTopn(ns string)

func (*NamespaceMgr) DeleteRange added in v0.3.2

func (nsm *NamespaceMgr) DeleteRange(ns string, dtr DeleteTableRange) error

func (*NamespaceMgr) DisableOptimizeDB added in v0.9.2

func (nsm *NamespaceMgr) DisableOptimizeDB(disable bool)

func (*NamespaceMgr) EnableTopn added in v0.8.4

func (nsm *NamespaceMgr) EnableTopn(ns string, on bool)

func (*NamespaceMgr) GetDBStats

func (nsm *NamespaceMgr) GetDBStats(leaderOnly bool) map[string]string

func (*NamespaceMgr) GetDataRoot added in v0.9.2

func (nsm *NamespaceMgr) GetDataRoot() string

func (*NamespaceMgr) GetLogSyncStats

func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string) []metric.LogSyncStats

func (*NamespaceMgr) GetLogSyncStatsInSyncer added in v0.4.0

func (nsm *NamespaceMgr) GetLogSyncStatsInSyncer() ([]metric.LogSyncStats, []metric.LogSyncStats)

func (*NamespaceMgr) GetNamespaceNode

func (nsm *NamespaceMgr) GetNamespaceNode(ns string) *NamespaceNode

func (*NamespaceMgr) GetNamespaceNodeFromGID

func (nsm *NamespaceMgr) GetNamespaceNodeFromGID(gid uint64) *NamespaceNode

func (*NamespaceMgr) GetNamespaceNodeWithPrimaryKey

func (nsm *NamespaceMgr) GetNamespaceNodeWithPrimaryKey(nsBaseName string, pk []byte) (*NamespaceNode, error)

func (*NamespaceMgr) GetNamespaceNodeWithPrimaryKeySum added in v0.8.0

func (nsm *NamespaceMgr) GetNamespaceNodeWithPrimaryKeySum(nsBaseName string, pk []byte, pkSum int) (*NamespaceNode, error)

func (*NamespaceMgr) GetNamespaceNodes

func (nsm *NamespaceMgr) GetNamespaceNodes(nsBaseName string, leaderOnly bool) (map[string]*NamespaceNode, error)

func (*NamespaceMgr) GetNamespaces

func (nsm *NamespaceMgr) GetNamespaces() map[string]*NamespaceNode

func (*NamespaceMgr) GetStats

func (nsm *NamespaceMgr) GetStats(leaderOnly bool, table string, needTableDetail bool) []metric.NamespaceStats

func (*NamespaceMgr) GetWALDBStats added in v0.6.0

func (nsm *NamespaceMgr) GetWALDBStats(leaderOnly bool) map[string]map[string]interface{}

func (*NamespaceMgr) HandleSlowLimiterSwitchChanged added in v0.8.2

func (nsm *NamespaceMgr) HandleSlowLimiterSwitchChanged(v interface{})

func (*NamespaceMgr) InitNamespaceNode

func (nsm *NamespaceMgr) InitNamespaceNode(conf *NamespaceConfig, raftID uint64, join bool) (*NamespaceNode, error)

func (*NamespaceMgr) IsAllRecoveryDone

func (nsm *NamespaceMgr) IsAllRecoveryDone() bool

func (*NamespaceMgr) LoadMachineRegID

func (nsm *NamespaceMgr) LoadMachineRegID() (uint64, error)

func (*NamespaceMgr) OptimizeDB

func (nsm *NamespaceMgr) OptimizeDB(ns string, table string)

func (*NamespaceMgr) OptimizeDBAnyRange added in v0.9.2

func (nsm *NamespaceMgr) OptimizeDBAnyRange(ns string, r CompactAPIRange)

func (*NamespaceMgr) OptimizeDBExpire added in v0.9.2

func (nsm *NamespaceMgr) OptimizeDBExpire(ns string)

func (*NamespaceMgr) SaveMachineRegID

func (nsm *NamespaceMgr) SaveMachineRegID(regID uint64) error

func (*NamespaceMgr) SetDBOptions added in v0.7.1

func (nsm *NamespaceMgr) SetDBOptions(key string, value string) error

func (*NamespaceMgr) SetIClusterInfo

func (nsm *NamespaceMgr) SetIClusterInfo(clusterInfo common.IClusterInfo)

func (*NamespaceMgr) SetMaxBackgroundOptions added in v0.7.1

func (nsm *NamespaceMgr) SetMaxBackgroundOptions(maxCompact int, maxBackJobs int) error

func (*NamespaceMgr) SetRateLimiterBytesPerSec added in v0.7.1

func (nsm *NamespaceMgr) SetRateLimiterBytesPerSec(bytesPerSec int64)

func (*NamespaceMgr) Start

func (nsm *NamespaceMgr) Start()

func (*NamespaceMgr) Stop

func (nsm *NamespaceMgr) Stop()

type NamespaceNode

type NamespaceNode struct {
	Node *KVNode
	// contains filtered or unexported fields
}

func (*NamespaceNode) CheckRaftConf

func (nn *NamespaceNode) CheckRaftConf(raftID uint64, conf *NamespaceConfig) error

func (*NamespaceNode) Close

func (nn *NamespaceNode) Close()

func (*NamespaceNode) Destroy

func (nn *NamespaceNode) Destroy() error

func (*NamespaceNode) FullName

func (nn *NamespaceNode) FullName() string

func (*NamespaceNode) GetLastLeaderChangedTime

func (nn *NamespaceNode) GetLastLeaderChangedTime() int64

func (*NamespaceNode) GetLearners

func (nn *NamespaceNode) GetLearners() []*common.MemberInfo

func (*NamespaceNode) GetMembers

func (nn *NamespaceNode) GetMembers() []*common.MemberInfo

func (*NamespaceNode) GetRaftID

func (nn *NamespaceNode) GetRaftID() uint64

func (*NamespaceNode) IsDataNeedFix

func (nn *NamespaceNode) IsDataNeedFix() bool

func (*NamespaceNode) IsNsNodeFullReady

func (nn *NamespaceNode) IsNsNodeFullReady(checkCommitIndex bool) bool

full ready node means: all local commit log replay done and we are aware of leader and maybe we have done all the newest commit log in state machine.

func (*NamespaceNode) IsReady

func (nn *NamespaceNode) IsReady() bool

func (*NamespaceNode) SetDataFixState

func (nn *NamespaceNode) SetDataFixState(needFix bool)

func (*NamespaceNode) SetDynamicInfo

func (nn *NamespaceNode) SetDynamicInfo(dync NamespaceDynamicConf)

func (*NamespaceNode) SetMagicCode

func (nn *NamespaceNode) SetMagicCode(magic int64) error

func (*NamespaceNode) Start

func (nn *NamespaceNode) Start(forceStandaloneCluster bool) error

func (*NamespaceNode) SwitchForLearnerLeader

func (nn *NamespaceNode) SwitchForLearnerLeader(isLearnerLeader bool)

func (*NamespaceNode) TransferMyLeader

func (nn *NamespaceNode) TransferMyLeader(to uint64, toRaftID uint64) error

type RaftConfig

type RaftConfig struct {
	GroupID uint64 `json:"group_id"`
	// group name is combined namespace-partition string
	GroupName string `json:"group_name"`
	// this is replica id
	ID uint64 `json:"id"`
	// local server transport address, it
	// can be used by several raft group
	RaftAddr       string                 `json:"raft_addr"`
	DataDir        string                 `json:"data_dir"`
	WALDir         string                 `json:"wal_dir"`
	SnapDir        string                 `json:"snap_dir"`
	RaftStorageDir string                 `json:"raft_storage_dir"`
	RaftPeers      map[uint64]ReplicaInfo `json:"raft_peers"`
	SnapCount      int                    `json:"snap_count"`
	SnapCatchup    int                    `json:"snap_catchup"`
	Replicator     int32                  `json:"replicator"`
	OptimizedFsync bool                   `json:"optimized_fsync"`
	// contains filtered or unexported fields
}

func (*RaftConfig) SetEng added in v0.6.0

func (rc *RaftConfig) SetEng(eng engine.KVEngine)

type RaftGroupConfig

type RaftGroupConfig struct {
	GroupID   uint64        `json:"group_id"`
	SeedNodes []ReplicaInfo `json:"seed_nodes"`
}

type RaftRpcFunc

type RaftRpcFunc func() error

type RemoteLogSender

type RemoteLogSender struct {
	// contains filtered or unexported fields
}

RemoteLogSender is the raft log sender. It will send all the raft logs to the remote cluster using grpc service.

func NewRemoteLogSender

func NewRemoteLogSender(localCluster string, fullName string, remoteCluster string) (*RemoteLogSender, error)

func (*RemoteLogSender) GetStats

func (s *RemoteLogSender) GetStats() interface{}

func (*RemoteLogSender) Stop

func (s *RemoteLogSender) Stop()

type ReplicaInfo

type ReplicaInfo struct {
	NodeID    uint64 `json:"node_id"`
	ReplicaID uint64 `json:"replica_id"`
	RaftAddr  string `json:"raft_addr"`
}

type ReqSourceType

type ReqSourceType int32
const (
	FromAPI           ReqSourceType = 0
	FromClusterSyncer ReqSourceType = 1
)

func (ReqSourceType) EnumDescriptor

func (ReqSourceType) EnumDescriptor() ([]byte, []int)

func (ReqSourceType) String

func (x ReqSourceType) String() string

type RequestHeader

type RequestHeader struct {
	ID        uint64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
	DataType  int32  `protobuf:"varint,2,opt,name=data_type,json=dataType,proto3" json:"data_type,omitempty"`
	Timestamp int64  `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
}

func (*RequestHeader) Descriptor

func (*RequestHeader) Descriptor() ([]byte, []int)

func (*RequestHeader) Marshal

func (m *RequestHeader) Marshal() (dAtA []byte, err error)

func (*RequestHeader) MarshalTo

func (m *RequestHeader) MarshalTo(dAtA []byte) (int, error)

func (*RequestHeader) ProtoMessage

func (*RequestHeader) ProtoMessage()

func (*RequestHeader) Reset

func (m *RequestHeader) Reset()

func (*RequestHeader) Size

func (m *RequestHeader) Size() (n int)

func (*RequestHeader) String

func (m *RequestHeader) String() string

func (*RequestHeader) Unmarshal

func (m *RequestHeader) Unmarshal(dAtA []byte) error

func (*RequestHeader) XXX_DiscardUnknown added in v0.7.1

func (m *RequestHeader) XXX_DiscardUnknown()

func (*RequestHeader) XXX_Marshal added in v0.7.1

func (m *RequestHeader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*RequestHeader) XXX_Merge added in v0.7.1

func (m *RequestHeader) XXX_Merge(src proto.Message)

func (*RequestHeader) XXX_Size added in v0.7.1

func (m *RequestHeader) XXX_Size() int

func (*RequestHeader) XXX_Unmarshal added in v0.7.1

func (m *RequestHeader) XXX_Unmarshal(b []byte) error

type RequestResultCode added in v0.7.0

type RequestResultCode int
const (
	ReqComplete RequestResultCode = iota
	ReqCancelled
	ReqTimeouted
)

type SchemaChange

type SchemaChange struct {
	Type       SchemaChangeType `protobuf:"varint,1,opt,name=Type,proto3,enum=node.SchemaChangeType" json:"Type,omitempty"`
	Table      string           `protobuf:"bytes,2,opt,name=Table,proto3" json:"Table,omitempty"`
	SchemaData []byte           `protobuf:"bytes,3,opt,name=SchemaData,proto3" json:"SchemaData,omitempty"`
}

func (*SchemaChange) Descriptor

func (*SchemaChange) Descriptor() ([]byte, []int)

func (*SchemaChange) Marshal

func (m *SchemaChange) Marshal() (dAtA []byte, err error)

func (*SchemaChange) MarshalTo

func (m *SchemaChange) MarshalTo(dAtA []byte) (int, error)

func (*SchemaChange) ProtoMessage

func (*SchemaChange) ProtoMessage()

func (*SchemaChange) Reset

func (m *SchemaChange) Reset()

func (*SchemaChange) Size

func (m *SchemaChange) Size() (n int)

func (*SchemaChange) String

func (m *SchemaChange) String() string

func (*SchemaChange) Unmarshal

func (m *SchemaChange) Unmarshal(dAtA []byte) error

func (*SchemaChange) XXX_DiscardUnknown added in v0.7.1

func (m *SchemaChange) XXX_DiscardUnknown()

func (*SchemaChange) XXX_Marshal added in v0.7.1

func (m *SchemaChange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SchemaChange) XXX_Merge added in v0.7.1

func (m *SchemaChange) XXX_Merge(src proto.Message)

func (*SchemaChange) XXX_Size added in v0.7.1

func (m *SchemaChange) XXX_Size() int

func (*SchemaChange) XXX_Unmarshal added in v0.7.1

func (m *SchemaChange) XXX_Unmarshal(b []byte) error

type SchemaChangeType

type SchemaChangeType int32
const (
	SchemaChangeAddHsetIndex    SchemaChangeType = 0
	SchemaChangeUpdateHsetIndex SchemaChangeType = 1
	SchemaChangeDeleteHsetIndex SchemaChangeType = 2
)

func (SchemaChangeType) EnumDescriptor

func (SchemaChangeType) EnumDescriptor() ([]byte, []int)

func (SchemaChangeType) String

func (x SchemaChangeType) String() string

type SlowLimiter added in v0.8.2

type SlowLimiter struct {
	// contains filtered or unexported fields
}

SlowLimiter is used to limit some slow write command to avoid raft blocking

func NewSlowLimiter added in v0.8.2

func NewSlowLimiter(ns string) *SlowLimiter

func (*SlowLimiter) AddSlow added in v0.8.2

func (sl *SlowLimiter) AddSlow(ts int64)

func (*SlowLimiter) CanPass added in v0.8.2

func (sl *SlowLimiter) CanPass(ts int64, cmd string, prefix string) bool

func (*SlowLimiter) MarkHeavySlow added in v0.8.2

func (sl *SlowLimiter) MarkHeavySlow()

func (*SlowLimiter) MaybeAddSlow added in v0.8.2

func (sl *SlowLimiter) MaybeAddSlow(ts int64, cost time.Duration, cmd string, prefix string)

func (*SlowLimiter) PreWaitQueue added in v0.9.0

func (sl *SlowLimiter) PreWaitQueue(ctx context.Context, cmd string, prefix string) (*SlowWaitDone, error)

func (*SlowLimiter) RecordSlowCmd added in v0.8.2

func (sl *SlowLimiter) RecordSlowCmd(cmd string, prefix string, cost time.Duration)

func (*SlowLimiter) Start added in v0.8.2

func (sl *SlowLimiter) Start()

func (*SlowLimiter) Stop added in v0.8.2

func (sl *SlowLimiter) Stop()

func (*SlowLimiter) TurnOff added in v0.8.2

func (sl *SlowLimiter) TurnOff()

func (*SlowLimiter) TurnOn added in v0.8.2

func (sl *SlowLimiter) TurnOn()

type SlowWaitDone added in v0.9.0

type SlowWaitDone struct {
	// contains filtered or unexported fields
}

func (*SlowWaitDone) Done added in v0.9.0

func (swd *SlowWaitDone) Done()

type SnapApplyStatus

type SnapApplyStatus struct {
	SS          SyncedState
	StatusCode  int
	Status      string
	UpdatedTime time.Time
}

type Snapshot

type Snapshot interface {
	GetData() ([]byte, error)
}

type StateMachine

type StateMachine interface {
	ApplyRaftRequest(isReplaying bool, b IBatchOperator, req BatchInternalRaftRequest, term uint64, index uint64, stop chan struct{}) (bool, error)
	ApplyRaftConfRequest(req raftpb.ConfChange, term uint64, index uint64, stop chan struct{}) error
	GetSnapshot(term uint64, index uint64) (*KVSnapInfo, error)
	UpdateSnapshotState(term uint64, index uint64)
	PrepareSnapshot(raftSnapshot raftpb.Snapshot, stop chan struct{}) error
	RestoreFromSnapshot(raftSnapshot raftpb.Snapshot, stop chan struct{}) error
	Destroy()
	CleanData() error
	Optimize(string)
	OptimizeExpire()
	OptimizeAnyRange(CompactAPIRange)
	DisableOptimize(bool)
	GetStats(table string, needDetail bool) metric.NamespaceStats
	EnableTopn(on bool)
	ClearTopn()
	Start() error
	Close()
	GetBatchOperator() IBatchOperator
}

func NewStateMachine

func NewStateMachine(opts *KVOptions, machineConfig MachineConfig, localID uint64,
	fullNS string, clusterInfo common.IClusterInfo, w wait.Wait, sl *SlowLimiter) (StateMachine, error)

type SyncedState

type SyncedState struct {
	SyncedTerm  uint64 `json:"synced_term,omitempty"`
	SyncedIndex uint64 `json:"synced_index,omitempty"`
	Timestamp   int64  `json:"timestamp,omitempty"`
	// contains filtered or unexported fields
}

func (*SyncedState) IsNewer

func (ss *SyncedState) IsNewer(other *SyncedState) bool

func (*SyncedState) IsNewer2

func (ss *SyncedState) IsNewer2(term uint64, index uint64) bool

func (*SyncedState) IsSame added in v0.4.0

func (ss *SyncedState) IsSame(other *SyncedState) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL