Documentation
¶
Overview ¶
Package pd implements a simplified Placement Driver server for gookv. It provides TSO allocation, cluster metadata management, heartbeat processing, ID allocation, and GC safe point management.
Index ¶
- Variables
- func HasPersistedPDRaftState(engine traits.KvEngine, clusterID uint64) bool
- func RegisterPDPeerService(srv *grpc.Server, peer *PDRaftPeer)
- type GCSafePointManager
- type IDAllocator
- type IDBuffer
- type MetadataStore
- func (m *MetadataStore) GetAllRegions() map[uint64]*metapb.Region
- func (m *MetadataStore) GetAllStores() []*metapb.Store
- func (m *MetadataStore) GetDeadStores() []uint64
- func (m *MetadataStore) GetLeaderCountPerStore() map[uint64]int
- func (m *MetadataStore) GetRegionByID(id uint64) (*metapb.Region, *metapb.Peer)
- func (m *MetadataStore) GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer)
- func (m *MetadataStore) GetRegionCountPerStore() map[uint64]int
- func (m *MetadataStore) GetStore(id uint64) *metapb.Store
- func (m *MetadataStore) GetStoreState(storeID uint64) StoreState
- func (m *MetadataStore) IsBootstrapped() bool
- func (m *MetadataStore) IsStoreAlive(storeID uint64) bool
- func (m *MetadataStore) IsStoreSchedulable(storeID uint64) bool
- func (m *MetadataStore) PutRegion(region *metapb.Region, leader *metapb.Peer)
- func (m *MetadataStore) PutStore(store *metapb.Store)
- func (m *MetadataStore) SetBootstrapped(v bool)
- func (m *MetadataStore) SetStoreState(storeID uint64, state StoreState)
- func (m *MetadataStore) UpdateStoreStats(storeID uint64, stats *pdpb.StoreStats)
- type MoveState
- type MoveTracker
- func (t *MoveTracker) ActiveMoveCount() int
- func (t *MoveTracker) Advance(regionID uint64, region *metapb.Region, leader *metapb.Peer) *ScheduleCommand
- func (t *MoveTracker) CleanupStale(timeout time.Duration)
- func (t *MoveTracker) HasPendingMove(regionID uint64) bool
- func (t *MoveTracker) StartMove(regionID uint64, sourcePeer *metapb.Peer, targetStoreID uint64)
- type MoveTrackerInterface
- type PDCommand
- type PDCommandType
- type PDPeerService
- type PDPeerServiceServer
- type PDProposal
- type PDRaftConfig
- type PDRaftMsg
- type PDRaftMsgType
- type PDRaftPeer
- func (p *PDRaftPeer) IsLeader() bool
- func (p *PDRaftPeer) IsStopped() bool
- func (p *PDRaftPeer) LeaderID() uint64
- func (p *PDRaftPeer) ProposeAndWait(ctx context.Context, cmd PDCommand) ([]byte, error)
- func (p *PDRaftPeer) Run(ctx context.Context)
- func (p *PDRaftPeer) SetApplyFunc(f func(PDCommand) ([]byte, error))
- func (p *PDRaftPeer) SetApplySnapshotFunc(f func([]byte) error)
- func (p *PDRaftPeer) SetLeaderChangeFunc(f func(isLeader bool))
- func (p *PDRaftPeer) SetSendFunc(f func([]raftpb.Message))
- func (p *PDRaftPeer) WireTransport(transport *PDTransport)
- type PDRaftStorage
- func (s *PDRaftStorage) AppliedIndex() uint64
- func (s *PDRaftStorage) CompactTo(compactTo uint64)
- func (s *PDRaftStorage) DeleteEntriesTo(endIdx uint64) error
- func (s *PDRaftStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
- func (s *PDRaftStorage) FirstIndex() (uint64, error)
- func (s *PDRaftStorage) GetApplyState() raftstore.ApplyState
- func (s *PDRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error)
- func (s *PDRaftStorage) LastIndex() (uint64, error)
- func (s *PDRaftStorage) RecoverFromEngine() error
- func (s *PDRaftStorage) SaveReady(rd raft.Ready) error
- func (s *PDRaftStorage) SetApplyState(state raftstore.ApplyState)
- func (s *PDRaftStorage) SetDummyEntry()
- func (s *PDRaftStorage) SetPersistedLastIndex(idx uint64)
- func (s *PDRaftStorage) SetSnapshotGenFunc(f func() ([]byte, error))
- func (s *PDRaftStorage) Snapshot() (raftpb.Snapshot, error)
- func (s *PDRaftStorage) Term(i uint64) (uint64, error)
- type PDServer
- func (s *PDServer) Addr() string
- func (s *PDServer) AllocID(ctx context.Context, req *pdpb.AllocIDRequest) (*pdpb.AllocIDResponse, error)
- func (s *PDServer) ApplySnapshot(data []byte) error
- func (s *PDServer) AskBatchSplit(ctx context.Context, req *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error)
- func (s *PDServer) Bootstrap(ctx context.Context, req *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error)
- func (s *PDServer) DataDir() string
- func (s *PDServer) GenerateSnapshot() ([]byte, error)
- func (s *PDServer) GetAllStores(ctx context.Context, req *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error)
- func (s *PDServer) GetGCSafePoint(ctx context.Context, req *pdpb.GetGCSafePointRequest) (*pdpb.GetGCSafePointResponse, error)
- func (s *PDServer) GetMembers(ctx context.Context, req *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error)
- func (s *PDServer) GetRegion(ctx context.Context, req *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error)
- func (s *PDServer) GetRegionByID(ctx context.Context, req *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error)
- func (s *PDServer) GetStore(ctx context.Context, req *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error)
- func (s *PDServer) IsBootstrapped(ctx context.Context, req *pdpb.IsBootstrappedRequest) (*pdpb.IsBootstrappedResponse, error)
- func (s *PDServer) IsRaftLeader() bool
- func (s *PDServer) PutStore(ctx context.Context, req *pdpb.PutStoreRequest) (*pdpb.PutStoreResponse, error)
- func (s *PDServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
- func (s *PDServer) ReportBatchSplit(ctx context.Context, req *pdpb.ReportBatchSplitRequest) (*pdpb.ReportBatchSplitResponse, error)
- func (s *PDServer) Start() error
- func (s *PDServer) Stop()
- func (s *PDServer) StoreHeartbeat(ctx context.Context, req *pdpb.StoreHeartbeatRequest) (*pdpb.StoreHeartbeatResponse, error)
- func (s *PDServer) Tso(stream pdpb.PD_TsoServer) error
- func (s *PDServer) UpdateGCSafePoint(ctx context.Context, req *pdpb.UpdateGCSafePointRequest) (*pdpb.UpdateGCSafePointResponse, error)
- type PDServerConfig
- type PDServerRaftConfig
- type PDSnapshot
- type PDTransport
- type PendingMove
- type ScheduleCommand
- type Scheduler
- type StoreState
- type TSOAllocator
- type TSOBuffer
- type TSOSnapshotState
Constants ¶
This section is empty.
Variables ¶
var ErrNotLeader = errors.New("pd: not leader")
ErrNotLeader is returned when a proposal is attempted on a non-leader peer.
Functions ¶
func HasPersistedPDRaftState ¶
HasPersistedPDRaftState checks whether the engine has persisted PD Raft state for the given cluster. Returns true if a hard state key exists.
func RegisterPDPeerService ¶
func RegisterPDPeerService(srv *grpc.Server, peer *PDRaftPeer)
RegisterPDPeerService registers the PD peer-to-peer Raft message service on the given gRPC server. Uses a hand-coded service descriptor (no proto code generation needed).
Types ¶
type GCSafePointManager ¶
type GCSafePointManager struct {
// contains filtered or unexported fields
}
GCSafePointManager manages the GC safe point.
func NewGCSafePointManager ¶
func NewGCSafePointManager() *GCSafePointManager
func (*GCSafePointManager) GetSafePoint ¶
func (g *GCSafePointManager) GetSafePoint() uint64
func (*GCSafePointManager) UpdateSafePoint ¶
func (g *GCSafePointManager) UpdateSafePoint(newSP uint64) uint64
type IDAllocator ¶
type IDAllocator struct {
// contains filtered or unexported fields
}
IDAllocator provides monotonically increasing unique IDs.
func NewIDAllocator ¶
func NewIDAllocator() *IDAllocator
type IDBuffer ¶
type IDBuffer struct {
// contains filtered or unexported fields
}
IDBuffer pre-allocates ID ranges via Raft and serves subsequent requests from a local buffer. This amortizes the cost of Raft consensus over many ID allocations. On leader change, the buffer must be reset so the new leader proposes a fresh batch.
func NewIDBuffer ¶
func NewIDBuffer(raftPeer *PDRaftPeer) *IDBuffer
NewIDBuffer creates an IDBuffer backed by the given Raft peer.
type MetadataStore ¶
type MetadataStore struct {
// contains filtered or unexported fields
}
MetadataStore manages cluster metadata in memory.
func NewMetadataStore ¶
func NewMetadataStore(clusterID uint64, disconnectDuration, downDuration time.Duration) *MetadataStore
func (*MetadataStore) GetAllRegions ¶
func (m *MetadataStore) GetAllRegions() map[uint64]*metapb.Region
GetAllRegions returns all regions.
func (*MetadataStore) GetAllStores ¶
func (m *MetadataStore) GetAllStores() []*metapb.Store
func (*MetadataStore) GetDeadStores ¶
func (m *MetadataStore) GetDeadStores() []uint64
GetDeadStores returns store IDs that are in the Down state.
func (*MetadataStore) GetLeaderCountPerStore ¶
func (m *MetadataStore) GetLeaderCountPerStore() map[uint64]int
GetLeaderCountPerStore returns the number of region leaders per store.
func (*MetadataStore) GetRegionByID ¶
func (*MetadataStore) GetRegionByKey ¶
func (*MetadataStore) GetRegionCountPerStore ¶
func (m *MetadataStore) GetRegionCountPerStore() map[uint64]int
GetRegionCountPerStore returns the number of region peers per store.
func (*MetadataStore) GetStoreState ¶
func (m *MetadataStore) GetStoreState(storeID uint64) StoreState
GetStoreState returns the current state of a store.
func (*MetadataStore) IsBootstrapped ¶
func (m *MetadataStore) IsBootstrapped() bool
func (*MetadataStore) IsStoreAlive ¶
func (m *MetadataStore) IsStoreAlive(storeID uint64) bool
IsStoreAlive returns whether a store is considered alive (Up or Disconnected). Kept for backward compatibility; prefer GetStoreState or IsStoreSchedulable.
func (*MetadataStore) IsStoreSchedulable ¶
func (m *MetadataStore) IsStoreSchedulable(storeID uint64) bool
IsStoreSchedulable returns true only if the store is in the Up state.
func (*MetadataStore) PutRegion ¶
func (m *MetadataStore) PutRegion(region *metapb.Region, leader *metapb.Peer)
func (*MetadataStore) PutStore ¶
func (m *MetadataStore) PutStore(store *metapb.Store)
func (*MetadataStore) SetBootstrapped ¶
func (m *MetadataStore) SetBootstrapped(v bool)
func (*MetadataStore) SetStoreState ¶
func (m *MetadataStore) SetStoreState(storeID uint64, state StoreState)
SetStoreState sets the state for a store (e.g. Tombstone).
func (*MetadataStore) UpdateStoreStats ¶
func (m *MetadataStore) UpdateStoreStats(storeID uint64, stats *pdpb.StoreStats)
type MoveTracker ¶
type MoveTracker struct {
// contains filtered or unexported fields
}
MoveTracker tracks in-progress region moves across heartbeat cycles.
func (*MoveTracker) ActiveMoveCount ¶
func (t *MoveTracker) ActiveMoveCount() int
ActiveMoveCount returns the number of in-progress moves plus regions in cooldown. This is used by the scheduler's rate limiter to prevent bursts of balance moves.
func (*MoveTracker) Advance ¶
func (t *MoveTracker) Advance(regionID uint64, region *metapb.Region, leader *metapb.Peer) *ScheduleCommand
Advance progresses the move state machine for the given region based on the current region metadata and leader. Returns a ScheduleCommand if an action is needed, or nil if no action is required (either waiting or move is complete).
func (*MoveTracker) CleanupStale ¶
func (t *MoveTracker) CleanupStale(timeout time.Duration)
CleanupStale removes moves that have been in progress longer than timeout.
func (*MoveTracker) HasPendingMove ¶
func (t *MoveTracker) HasPendingMove(regionID uint64) bool
HasPendingMove returns true if the given region has an in-progress move or is in a post-move cooldown period.
type MoveTrackerInterface ¶
type MoveTrackerInterface interface {
HasPendingMove(regionID uint64) bool
ActiveMoveCount() int
StartMove(regionID uint64, sourcePeer *metapb.Peer, targetStoreID uint64)
Advance(regionID uint64, region *metapb.Region, leader *metapb.Peer) *ScheduleCommand
}
MoveTrackerInterface is used by the scheduler to check pending moves. Nil-safe: scheduler checks if moveTracker != nil before calling.
type PDCommand ¶
type PDCommand struct {
Type PDCommandType `json:"type"`
// Payload fields (only the relevant field is non-nil for each type).
Bootstrapped *bool `json:"bootstrapped,omitempty"`
Store *metapb.Store `json:"store,omitempty"`
Region *metapb.Region `json:"region,omitempty"`
Leader *metapb.Peer `json:"leader,omitempty"`
StoreStats *pdpb.StoreStats `json:"store_stats,omitempty"`
StoreID uint64 `json:"store_id,omitempty"`
StoreState *StoreState `json:"store_state,omitempty"`
TSOBatchSize int `json:"tso_batch_size,omitempty"`
IDBatchSize int `json:"id_batch_size,omitempty"`
GCSafePoint uint64 `json:"gc_safe_point,omitempty"`
MoveRegionID uint64 `json:"move_region_id,omitempty"`
MoveSourcePeer *metapb.Peer `json:"move_source_peer,omitempty"`
MoveTargetStoreID uint64 `json:"move_target_store_id,omitempty"`
AdvanceRegion *metapb.Region `json:"advance_region,omitempty"`
AdvanceLeader *metapb.Peer `json:"advance_leader,omitempty"`
CleanupTimeout time.Duration `json:"cleanup_timeout,omitempty"`
CompactIndex uint64 `json:"compact_index,omitempty"`
CompactTerm uint64 `json:"compact_term,omitempty"`
}
PDCommand represents a single state-mutation that can be replicated via Raft. For each command type, only the relevant payload fields are populated.
func UnmarshalPDCommand ¶
UnmarshalPDCommand decodes a PDCommand from the wire format produced by Marshal.
type PDCommandType ¶
type PDCommandType uint8
PDCommandType identifies the type of PD state-mutation command.
const ( CmdSetBootstrapped PDCommandType = iota + 1 // 1 CmdPutStore // 2 CmdPutRegion // 3 CmdUpdateStoreStats // 4 CmdSetStoreState // 5 CmdTSOAllocate // 6 CmdIDAlloc // 7 CmdUpdateGCSafePoint // 8 CmdStartMove // 9 CmdAdvanceMove // 10 CmdCleanupStaleMove // 11 CmdCompactLog // 12 )
type PDPeerService ¶
type PDPeerService struct {
// contains filtered or unexported fields
}
PDPeerService handles incoming PD Raft messages from other PD peers.
func (*PDPeerService) SendPDRaftMessage ¶
func (s *PDPeerService) SendPDRaftMessage(ctx context.Context, req *raft_serverpb.RaftMessage) (*raft_serverpb.RaftMessage, error)
SendPDRaftMessage receives a RaftMessage from a remote PD peer, converts it from eraftpb to raftpb format, and delivers it to the local peer's mailbox.
type PDPeerServiceServer ¶
type PDPeerServiceServer interface {
SendPDRaftMessage(ctx context.Context, req *raft_serverpb.RaftMessage) (*raft_serverpb.RaftMessage, error)
}
PDPeerServiceServer is the interface that the PD peer gRPC service must implement.
type PDProposal ¶
PDProposal wraps a PDCommand with a callback for notification when the proposal is committed and applied.
type PDRaftConfig ¶
type PDRaftConfig struct {
// RaftTickInterval is the base tick interval for Raft (default 100ms).
RaftTickInterval time.Duration
// ElectionTimeoutTicks is the election timeout in ticks (default 10).
ElectionTimeoutTicks int
// HeartbeatTicks is the heartbeat interval in ticks (default 2).
HeartbeatTicks int
// MaxInflightMsgs is the maximum number of in-flight messages (default 256).
MaxInflightMsgs int
// MaxSizePerMsg is the maximum size of a single Raft message (default 1 MiB).
MaxSizePerMsg uint64
// MailboxCapacity is the size of the peer's mailbox channel (default 256).
MailboxCapacity int
// RaftLogGCTickInterval is how often the log GC tick fires (default 60s).
// Set to 0 to disable Raft log GC.
RaftLogGCTickInterval time.Duration
// RaftLogGCCountLimit triggers compaction when excess entry count exceeds this (default 10000).
RaftLogGCCountLimit uint64
// RaftLogGCThreshold is the minimum number of entries to keep after compaction (default 50).
RaftLogGCThreshold uint64
}
PDRaftConfig holds configuration for a PDRaftPeer.
func DefaultPDRaftConfig ¶
func DefaultPDRaftConfig() PDRaftConfig
DefaultPDRaftConfig returns a PDRaftConfig with sensible defaults.
type PDRaftMsg ¶
type PDRaftMsg struct {
Type PDRaftMsgType
Data interface{}
}
PDRaftMsg is a message delivered to a PDRaftPeer's Mailbox channel.
type PDRaftMsgType ¶
type PDRaftMsgType int
PDRaftMsgType identifies the type of message sent to a PDRaftPeer's mailbox.
const ( // PDRaftMsgTypeRaftMessage carries a raftpb.Message from another peer. PDRaftMsgTypeRaftMessage PDRaftMsgType = iota // PDRaftMsgTypeProposal carries a PDProposal to be proposed to the Raft group. PDRaftMsgTypeProposal )
type PDRaftPeer ¶
type PDRaftPeer struct {
// Mailbox receives PDRaftMsg from other peers and from ProposeAndWait.
Mailbox chan PDRaftMsg
// contains filtered or unexported fields
}
PDRaftPeer manages a single Raft node in the PD cluster. It is modeled on Peer in internal/raftstore/peer.go but simplified: no region management, no conf change handling, no split checks.
func NewPDRaftPeer ¶
func NewPDRaftPeer( nodeID uint64, storage *PDRaftStorage, peers []raft.Peer, peerAddrs map[uint64]string, cfg PDRaftConfig, ) (*PDRaftPeer, error)
NewPDRaftPeer creates a new PDRaftPeer for the given node. If peers is non-nil, the Raft group is bootstrapped with the given peer list. If peers is nil, the storage must have been recovered via RecoverFromEngine().
func (*PDRaftPeer) IsLeader ¶
func (p *PDRaftPeer) IsLeader() bool
IsLeader returns whether this peer believes it is the Raft leader.
func (*PDRaftPeer) IsStopped ¶
func (p *PDRaftPeer) IsStopped() bool
IsStopped returns whether this peer has been stopped.
func (*PDRaftPeer) LeaderID ¶
func (p *PDRaftPeer) LeaderID() uint64
LeaderID returns the current leader's node ID. Returns 0 if unknown.
func (*PDRaftPeer) ProposeAndWait ¶
ProposeAndWait proposes a PDCommand and blocks until it is committed and applied, or the context is cancelled. Returns ErrNotLeader if this peer is not the leader.
func (*PDRaftPeer) Run ¶
func (p *PDRaftPeer) Run(ctx context.Context)
Run starts the peer's main event loop. Blocks until ctx is cancelled or the peer is stopped.
func (*PDRaftPeer) SetApplyFunc ¶
func (p *PDRaftPeer) SetApplyFunc(f func(PDCommand) ([]byte, error))
SetApplyFunc sets the function used to apply committed PDCommands.
func (*PDRaftPeer) SetApplySnapshotFunc ¶
func (p *PDRaftPeer) SetApplySnapshotFunc(f func([]byte) error)
SetApplySnapshotFunc sets the function used to apply a received snapshot.
func (*PDRaftPeer) SetLeaderChangeFunc ¶
func (p *PDRaftPeer) SetLeaderChangeFunc(f func(isLeader bool))
SetLeaderChangeFunc sets a callback that fires when the local leader status changes. The callback receives true if this node became the leader, false otherwise. This is used to reset buffered allocators on leader change.
func (*PDRaftPeer) SetSendFunc ¶
func (p *PDRaftPeer) SetSendFunc(f func([]raftpb.Message))
SetSendFunc sets the function used to send Raft messages to other peers.
func (*PDRaftPeer) WireTransport ¶
func (p *PDRaftPeer) WireTransport(transport *PDTransport)
WireTransport sets up p.sendFunc to route outbound Raft messages via the given PDTransport. Messages addressed to the local node are delivered directly to the peer's own mailbox (avoiding a network round-trip). Errors from transport.Send are logged and skipped; Raft handles retransmission automatically.
type PDRaftStorage ¶
type PDRaftStorage struct {
// contains filtered or unexported fields
}
PDRaftStorage implements raft.Storage for the PD cluster Raft group. It is modeled on PeerStorage in internal/raftstore/storage.go, using clusterID instead of regionID as the key namespace discriminator.
func NewPDRaftStorage ¶
func NewPDRaftStorage(clusterID uint64, engine traits.KvEngine) *PDRaftStorage
NewPDRaftStorage creates a PDRaftStorage for the given cluster.
func (*PDRaftStorage) AppliedIndex ¶
func (s *PDRaftStorage) AppliedIndex() uint64
AppliedIndex returns the current applied index.
func (*PDRaftStorage) CompactTo ¶
func (s *PDRaftStorage) CompactTo(compactTo uint64)
CompactTo removes entries from the in-memory cache up to compactTo. Entries before compactTo will no longer be served from cache.
func (*PDRaftStorage) DeleteEntriesTo ¶
func (s *PDRaftStorage) DeleteEntriesTo(endIdx uint64) error
DeleteEntriesTo deletes persisted Raft log entries in [1, endIdx) from the engine. This is the physical counterpart to CompactTo (which only trims the in-memory cache). It uses engine.DeleteRange for efficient bulk deletion, following the same pattern as RaftLogGCWorker.gcRaftLog in internal/raftstore/raftlog_gc.go.
func (*PDRaftStorage) Entries ¶
func (s *PDRaftStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
Entries returns a slice of Raft log entries in [lo, hi), capped at maxSize bytes.
func (*PDRaftStorage) FirstIndex ¶
func (s *PDRaftStorage) FirstIndex() (uint64, error)
FirstIndex returns the index of the first available log entry.
func (*PDRaftStorage) GetApplyState ¶
func (s *PDRaftStorage) GetApplyState() raftstore.ApplyState
GetApplyState returns the current apply state.
func (*PDRaftStorage) InitialState ¶
InitialState returns the initial HardState and ConfState from storage.
func (*PDRaftStorage) LastIndex ¶
func (s *PDRaftStorage) LastIndex() (uint64, error)
LastIndex returns the index of the last log entry.
func (*PDRaftStorage) RecoverFromEngine ¶
func (s *PDRaftStorage) RecoverFromEngine() error
RecoverFromEngine restores PDRaftStorage state from the engine. This should be called when restarting a PD node that already has persisted Raft state.
func (*PDRaftStorage) SaveReady ¶
func (s *PDRaftStorage) SaveReady(rd raft.Ready) error
SaveReady persists the Raft state changes from a Ready batch. Entries and hard state are written atomically via a WriteBatch.
func (*PDRaftStorage) SetApplyState ¶
func (s *PDRaftStorage) SetApplyState(state raftstore.ApplyState)
SetApplyState updates the apply state.
func (*PDRaftStorage) SetDummyEntry ¶
func (s *PDRaftStorage) SetDummyEntry()
SetDummyEntry adds a dummy entry at index 0 with term 0, matching etcd/raft's MemoryStorage convention for empty storage.
func (*PDRaftStorage) SetPersistedLastIndex ¶
func (s *PDRaftStorage) SetPersistedLastIndex(idx uint64)
SetPersistedLastIndex sets the persisted last index (for initialization).
func (*PDRaftStorage) SetSnapshotGenFunc ¶
func (s *PDRaftStorage) SetSnapshotGenFunc(f func() ([]byte, error))
SetSnapshotGenFunc sets the function used to generate snapshot data.
type PDServer ¶
type PDServer struct {
pdpb.UnimplementedPDServer
// contains filtered or unexported fields
}
PDServer implements the pdpb.PDServer gRPC interface.
func NewPDServer ¶
func NewPDServer(cfg PDServerConfig) (*PDServer, error)
NewPDServer creates a new PD server.
func (*PDServer) AllocID ¶
func (s *PDServer) AllocID(ctx context.Context, req *pdpb.AllocIDRequest) (*pdpb.AllocIDResponse, error)
func (*PDServer) ApplySnapshot ¶
ApplySnapshot replaces the PD server's in-memory state with the state from a JSON-encoded snapshot. It acquires write locks on all mutable sub-components.
func (*PDServer) AskBatchSplit ¶
func (s *PDServer) AskBatchSplit(ctx context.Context, req *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error)
func (*PDServer) Bootstrap ¶
func (s *PDServer) Bootstrap(ctx context.Context, req *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error)
func (*PDServer) GenerateSnapshot ¶
GenerateSnapshot captures the current PD server state as a JSON-encoded snapshot. It acquires read locks on all mutable sub-components and deep-copies their state.
func (*PDServer) GetAllStores ¶
func (s *PDServer) GetAllStores(ctx context.Context, req *pdpb.GetAllStoresRequest) (*pdpb.GetAllStoresResponse, error)
func (*PDServer) GetGCSafePoint ¶
func (s *PDServer) GetGCSafePoint(ctx context.Context, req *pdpb.GetGCSafePointRequest) (*pdpb.GetGCSafePointResponse, error)
func (*PDServer) GetMembers ¶
func (s *PDServer) GetMembers(ctx context.Context, req *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error)
func (*PDServer) GetRegion ¶
func (s *PDServer) GetRegion(ctx context.Context, req *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error)
func (*PDServer) GetRegionByID ¶
func (s *PDServer) GetRegionByID(ctx context.Context, req *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error)
func (*PDServer) GetStore ¶
func (s *PDServer) GetStore(ctx context.Context, req *pdpb.GetStoreRequest) (*pdpb.GetStoreResponse, error)
func (*PDServer) IsBootstrapped ¶
func (s *PDServer) IsBootstrapped(ctx context.Context, req *pdpb.IsBootstrappedRequest) (*pdpb.IsBootstrappedResponse, error)
func (*PDServer) IsRaftLeader ¶
IsRaftLeader returns whether this server's Raft peer believes it is the leader. Returns false if the server is running in single-node mode (no Raft).
func (*PDServer) PutStore ¶
func (s *PDServer) PutStore(ctx context.Context, req *pdpb.PutStoreRequest) (*pdpb.PutStoreResponse, error)
func (*PDServer) RegionHeartbeat ¶
func (s *PDServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
func (*PDServer) ReportBatchSplit ¶
func (s *PDServer) ReportBatchSplit(ctx context.Context, req *pdpb.ReportBatchSplitRequest) (*pdpb.ReportBatchSplitResponse, error)
func (*PDServer) Stop ¶
func (s *PDServer) Stop()
Stop gracefully stops the PD server. It is safe to call multiple times.
func (*PDServer) StoreHeartbeat ¶
func (s *PDServer) StoreHeartbeat(ctx context.Context, req *pdpb.StoreHeartbeatRequest) (*pdpb.StoreHeartbeatResponse, error)
func (*PDServer) UpdateGCSafePoint ¶
func (s *PDServer) UpdateGCSafePoint(ctx context.Context, req *pdpb.UpdateGCSafePointRequest) (*pdpb.UpdateGCSafePointResponse, error)
type PDServerConfig ¶
type PDServerConfig struct {
ListenAddr string
StatusAddr string // HTTP status/pprof listen address
DataDir string
ClusterID uint64
TSOSaveInterval time.Duration
TSOUpdatePhysicalInterval time.Duration
MaxPeerCount int
StoreDisconnectDuration time.Duration
StoreDownDuration time.Duration
RegionBalanceThreshold float64
RegionBalanceRateLimit int
// RaftConfig enables Raft-based replication when non-nil.
// When nil, the server operates in single-node mode (backward compatible).
RaftConfig *PDServerRaftConfig
}
PDServerConfig holds configuration for the PD server.
func DefaultPDServerConfig ¶
func DefaultPDServerConfig() PDServerConfig
DefaultPDServerConfig returns default PD server configuration.
type PDServerRaftConfig ¶
type PDServerRaftConfig struct {
PDNodeID uint64 // this node's Raft ID
InitialCluster map[uint64]string // peerID -> peer gRPC address
PeerAddr string // listen address for peer-to-peer gRPC
ClientAddrs map[uint64]string // peerID -> client gRPC address (for forwarding)
RaftTickInterval time.Duration
ElectionTimeoutTicks int
HeartbeatTicks int
}
PDServerRaftConfig holds the Raft-cluster configuration for a PD server. If nil in PDServerConfig.RaftConfig, the server runs in single-node mode.
type PDSnapshot ¶
type PDSnapshot struct {
Bootstrapped bool `json:"bootstrapped"`
Stores map[uint64]*metapb.Store `json:"stores"`
Regions map[uint64]*metapb.Region `json:"regions"`
Leaders map[uint64]*metapb.Peer `json:"leaders"`
StoreStats map[uint64]*pdpb.StoreStats `json:"store_stats"`
StoreStates map[uint64]StoreState `json:"store_states"`
NextID uint64 `json:"next_id"`
TSOState TSOSnapshotState `json:"tso_state"`
GCSafePoint uint64 `json:"gc_safe_point"`
PendingMoves map[uint64]*PendingMove `json:"pending_moves"`
StoreLastHeartbeat map[uint64]int64 `json:"store_last_heartbeat"`
}
PDSnapshot captures all mutable PD state for Raft snapshot transfer.
type PDTransport ¶
type PDTransport struct {
// contains filtered or unexported fields
}
PDTransport manages gRPC connections to other PD peers for Raft message transport. It uses lazy, single-connection-per-peer semantics since PD Raft traffic is relatively infrequent compared to TiKV store Raft traffic.
func NewPDTransport ¶
func NewPDTransport(peerAddrs map[uint64]string) *PDTransport
NewPDTransport creates a new PDTransport with the given peer address map. peerAddrs maps peerID to network address (e.g., "127.0.0.1:2380").
func (*PDTransport) Close ¶
func (t *PDTransport) Close()
Close closes all connections managed by this transport.
func (*PDTransport) Send ¶
func (t *PDTransport) Send(peerID uint64, msg raftpb.Message) error
Send sends a single raftpb.Message to the specified peer via gRPC. It converts the message from raftpb to eraftpb format, wraps it in a RaftMessage (with RegionId=0 as PD sentinel), and calls the remote SendPDRaftMessage unary RPC.
type PendingMove ¶
type PendingMove struct {
RegionID uint64
SourcePeer *metapb.Peer
TargetStoreID uint64
TargetPeerID uint64
State MoveState
StartedAt time.Time
StabilizeCount int // heartbeat cycles spent in MoveStateStabilizing
}
PendingMove tracks a single in-progress region move.
type ScheduleCommand ¶
type ScheduleCommand struct {
RegionID uint64
TransferLeader *pdpb.TransferLeader
ChangePeer *pdpb.ChangePeer
Merge *pdpb.Merge
}
ScheduleCommand represents a scheduling command to return in a heartbeat response.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler produces scheduling commands based on cluster state.
func NewScheduler ¶
func NewScheduler(meta *MetadataStore, idAlloc *IDAllocator, maxPeerCount int, threshold float64, rateLimit int, moveTracker MoveTrackerInterface) *Scheduler
NewScheduler creates a new Scheduler.
type StoreState ¶
type StoreState int
StoreState represents the lifecycle state of a TiKV store.
const ( StoreStateUp StoreState = iota // Heartbeat within disconnect threshold StoreStateDisconnected // Heartbeat missed > DisconnectDuration StoreStateDown // Disconnected > DownDuration — replicas should be repaired StoreStateTombstone // Permanently removed )
type TSOAllocator ¶
type TSOAllocator struct {
// contains filtered or unexported fields
}
TSOAllocator generates monotonically increasing timestamps.
func NewTSOAllocator ¶
func NewTSOAllocator(saveInterval time.Duration) *TSOAllocator
type TSOBuffer ¶
type TSOBuffer struct {
// contains filtered or unexported fields
}
TSOBuffer pre-allocates TSO ranges via Raft and serves subsequent requests from a local buffer. This amortizes the cost of Raft consensus over many TSO allocations. On leader change, the buffer must be reset so the new leader proposes a fresh batch.
func NewTSOBuffer ¶
func NewTSOBuffer(raftPeer *PDRaftPeer) *TSOBuffer
NewTSOBuffer creates a TSOBuffer backed by the given Raft peer.
type TSOSnapshotState ¶
TSOSnapshotState captures the TSOAllocator state for snapshotting.