pd

package
v0.0.0-...-e7d2c85 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Package pd implements a simplified Placement Driver server for gookv. It provides TSO allocation, cluster metadata management, heartbeat processing, ID allocation, and GC safe point management.

Index

Constants

This section is empty.

Variables

View Source
var ErrNotLeader = errors.New("pd: not leader")

ErrNotLeader is returned when a proposal is attempted on a non-leader peer.

Functions

func HasPersistedPDRaftState

func HasPersistedPDRaftState(engine traits.KvEngine, clusterID uint64) bool

HasPersistedPDRaftState checks whether the engine has persisted PD Raft state for the given cluster. Returns true if a hard state key exists.

func RegisterPDPeerService

func RegisterPDPeerService(srv *grpc.Server, peer *PDRaftPeer)

RegisterPDPeerService registers the PD peer-to-peer Raft message service on the given gRPC server. Uses a hand-coded service descriptor (no proto code generation needed).

Types

type GCSafePointManager

type GCSafePointManager struct {
	// contains filtered or unexported fields
}

GCSafePointManager manages the GC safe point.

func NewGCSafePointManager

func NewGCSafePointManager() *GCSafePointManager

func (*GCSafePointManager) GetSafePoint

func (g *GCSafePointManager) GetSafePoint() uint64

func (*GCSafePointManager) UpdateSafePoint

func (g *GCSafePointManager) UpdateSafePoint(newSP uint64) uint64

type IDAllocator

type IDAllocator struct {
	// contains filtered or unexported fields
}

IDAllocator provides monotonically increasing unique IDs.

func NewIDAllocator

func NewIDAllocator() *IDAllocator

func (*IDAllocator) Alloc

func (a *IDAllocator) Alloc() uint64

Alloc allocates the next ID.

type IDBuffer

type IDBuffer struct {
	// contains filtered or unexported fields
}

IDBuffer pre-allocates ID ranges via Raft and serves subsequent requests from a local buffer. This amortizes the cost of Raft consensus over many ID allocations. On leader change, the buffer must be reset so the new leader proposes a fresh batch.

func NewIDBuffer

func NewIDBuffer(raftPeer *PDRaftPeer) *IDBuffer

NewIDBuffer creates an IDBuffer backed by the given Raft peer.

func (*IDBuffer) Alloc

func (b *IDBuffer) Alloc(ctx context.Context) (uint64, error)

Alloc allocates a single unique ID from the buffer. If the buffer is depleted, it proposes a new CmdIDAlloc batch via Raft.

func (*IDBuffer) Reset

func (b *IDBuffer) Reset()

Reset clears the buffer. Must be called on leader change so the new leader proposes a fresh batch.

type MetadataStore

type MetadataStore struct {
	// contains filtered or unexported fields
}

MetadataStore manages cluster metadata in memory.

func NewMetadataStore

func NewMetadataStore(clusterID uint64, disconnectDuration, downDuration time.Duration) *MetadataStore

func (*MetadataStore) GetAllRegions

func (m *MetadataStore) GetAllRegions() map[uint64]*metapb.Region

GetAllRegions returns all regions.

func (*MetadataStore) GetAllStores

func (m *MetadataStore) GetAllStores() []*metapb.Store

func (*MetadataStore) GetDeadStores

func (m *MetadataStore) GetDeadStores() []uint64

GetDeadStores returns store IDs that are in the Down state.

func (*MetadataStore) GetLeaderCountPerStore

func (m *MetadataStore) GetLeaderCountPerStore() map[uint64]int

GetLeaderCountPerStore returns the number of region leaders per store.

func (*MetadataStore) GetRegionByID

func (m *MetadataStore) GetRegionByID(id uint64) (*metapb.Region, *metapb.Peer)

func (*MetadataStore) GetRegionByKey

func (m *MetadataStore) GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer)

func (*MetadataStore) GetRegionCountPerStore

func (m *MetadataStore) GetRegionCountPerStore() map[uint64]int

GetRegionCountPerStore returns the number of region peers per store.

func (*MetadataStore) GetStore

func (m *MetadataStore) GetStore(id uint64) *metapb.Store

func (*MetadataStore) GetStoreState

func (m *MetadataStore) GetStoreState(storeID uint64) StoreState

GetStoreState returns the current state of a store.

func (*MetadataStore) IsBootstrapped

func (m *MetadataStore) IsBootstrapped() bool

func (*MetadataStore) IsStoreAlive

func (m *MetadataStore) IsStoreAlive(storeID uint64) bool

IsStoreAlive returns whether a store is considered alive (Up or Disconnected). Kept for backward compatibility; prefer GetStoreState or IsStoreSchedulable.

func (*MetadataStore) IsStoreSchedulable

func (m *MetadataStore) IsStoreSchedulable(storeID uint64) bool

IsStoreSchedulable returns true only if the store is in the Up state.

func (*MetadataStore) PutRegion

func (m *MetadataStore) PutRegion(region *metapb.Region, leader *metapb.Peer)

func (*MetadataStore) PutStore

func (m *MetadataStore) PutStore(store *metapb.Store)

func (*MetadataStore) SetBootstrapped

func (m *MetadataStore) SetBootstrapped(v bool)

func (*MetadataStore) SetStoreState

func (m *MetadataStore) SetStoreState(storeID uint64, state StoreState)

SetStoreState sets the state for a store (e.g. Tombstone).

func (*MetadataStore) UpdateStoreStats

func (m *MetadataStore) UpdateStoreStats(storeID uint64, stats *pdpb.StoreStats)

type MoveState

type MoveState int

MoveState tracks the current stage of a region move.

const (
	MoveStateAdding       MoveState = iota // Waiting for AddPeer to complete
	MoveStateStabilizing                   // Waiting for new peer to receive snapshot and stabilize
	MoveStateTransferring                  // Waiting for leader transfer
	MoveStateRemoving                      // Waiting for RemovePeer to complete
)

func (MoveState) String

func (s MoveState) String() string

String returns a human-readable name for the move state.

type MoveTracker

type MoveTracker struct {
	// contains filtered or unexported fields
}

MoveTracker tracks in-progress region moves across heartbeat cycles.

func NewMoveTracker

func NewMoveTracker() *MoveTracker

NewMoveTracker creates a new MoveTracker.

func (*MoveTracker) ActiveMoveCount

func (t *MoveTracker) ActiveMoveCount() int

ActiveMoveCount returns the number of in-progress moves plus regions in cooldown. This is used by the scheduler's rate limiter to prevent bursts of balance moves.

func (*MoveTracker) Advance

func (t *MoveTracker) Advance(regionID uint64, region *metapb.Region, leader *metapb.Peer) *ScheduleCommand

Advance progresses the move state machine for the given region based on the current region metadata and leader. Returns a ScheduleCommand if an action is needed, or nil if no action is required (either waiting or move is complete).

func (*MoveTracker) CleanupStale

func (t *MoveTracker) CleanupStale(timeout time.Duration)

CleanupStale removes moves that have been in progress longer than timeout.

func (*MoveTracker) HasPendingMove

func (t *MoveTracker) HasPendingMove(regionID uint64) bool

HasPendingMove returns true if the given region has an in-progress move or is in a post-move cooldown period.

func (*MoveTracker) StartMove

func (t *MoveTracker) StartMove(regionID uint64, sourcePeer *metapb.Peer, targetStoreID uint64)

StartMove begins tracking a new region move from sourcePeer's store to targetStoreID.

type MoveTrackerInterface

type MoveTrackerInterface interface {
	HasPendingMove(regionID uint64) bool
	ActiveMoveCount() int
	StartMove(regionID uint64, sourcePeer *metapb.Peer, targetStoreID uint64)
	Advance(regionID uint64, region *metapb.Region, leader *metapb.Peer) *ScheduleCommand
}

MoveTrackerInterface is used by the scheduler to check pending moves. Nil-safe: scheduler checks if moveTracker != nil before calling.

type PDCommand

type PDCommand struct {
	Type PDCommandType `json:"type"`

	// Payload fields (only the relevant field is non-nil for each type).
	Bootstrapped      *bool            `json:"bootstrapped,omitempty"`
	Store             *metapb.Store    `json:"store,omitempty"`
	Region            *metapb.Region   `json:"region,omitempty"`
	Leader            *metapb.Peer     `json:"leader,omitempty"`
	StoreStats        *pdpb.StoreStats `json:"store_stats,omitempty"`
	StoreID           uint64           `json:"store_id,omitempty"`
	StoreState        *StoreState      `json:"store_state,omitempty"`
	TSOBatchSize      int              `json:"tso_batch_size,omitempty"`
	IDBatchSize       int              `json:"id_batch_size,omitempty"`
	GCSafePoint       uint64           `json:"gc_safe_point,omitempty"`
	MoveRegionID      uint64           `json:"move_region_id,omitempty"`
	MoveSourcePeer    *metapb.Peer     `json:"move_source_peer,omitempty"`
	MoveTargetStoreID uint64           `json:"move_target_store_id,omitempty"`
	AdvanceRegion     *metapb.Region   `json:"advance_region,omitempty"`
	AdvanceLeader     *metapb.Peer     `json:"advance_leader,omitempty"`
	CleanupTimeout    time.Duration    `json:"cleanup_timeout,omitempty"`
	CompactIndex      uint64           `json:"compact_index,omitempty"`
	CompactTerm       uint64           `json:"compact_term,omitempty"`
}

PDCommand represents a single state-mutation that can be replicated via Raft. For each command type, only the relevant payload fields are populated.

func UnmarshalPDCommand

func UnmarshalPDCommand(data []byte) (PDCommand, error)

UnmarshalPDCommand decodes a PDCommand from the wire format produced by Marshal.

func (*PDCommand) Marshal

func (c *PDCommand) Marshal() ([]byte, error)

Marshal encodes a PDCommand as [1-byte type prefix] + [JSON payload].

type PDCommandType

type PDCommandType uint8

PDCommandType identifies the type of PD state-mutation command.

const (
	CmdSetBootstrapped   PDCommandType = iota + 1 // 1
	CmdPutStore                                   // 2
	CmdPutRegion                                  // 3
	CmdUpdateStoreStats                           // 4
	CmdSetStoreState                              // 5
	CmdTSOAllocate                                // 6
	CmdIDAlloc                                    // 7
	CmdUpdateGCSafePoint                          // 8
	CmdStartMove                                  // 9
	CmdAdvanceMove                                // 10
	CmdCleanupStaleMove                           // 11
	CmdCompactLog                                 // 12
)

type PDPeerService

type PDPeerService struct {
	// contains filtered or unexported fields
}

PDPeerService handles incoming PD Raft messages from other PD peers.

func (*PDPeerService) SendPDRaftMessage

SendPDRaftMessage receives a RaftMessage from a remote PD peer, converts it from eraftpb to raftpb format, and delivers it to the local peer's mailbox.

type PDPeerServiceServer

type PDPeerServiceServer interface {
	SendPDRaftMessage(ctx context.Context, req *raft_serverpb.RaftMessage) (*raft_serverpb.RaftMessage, error)
}

PDPeerServiceServer is the interface that the PD peer gRPC service must implement.

type PDProposal

type PDProposal struct {
	Command  PDCommand
	Callback func([]byte, error)
}

PDProposal wraps a PDCommand with a callback for notification when the proposal is committed and applied.

type PDRaftConfig

type PDRaftConfig struct {
	// RaftTickInterval is the base tick interval for Raft (default 100ms).
	RaftTickInterval time.Duration
	// ElectionTimeoutTicks is the election timeout in ticks (default 10).
	ElectionTimeoutTicks int
	// HeartbeatTicks is the heartbeat interval in ticks (default 2).
	HeartbeatTicks int
	// MaxInflightMsgs is the maximum number of in-flight messages (default 256).
	MaxInflightMsgs int
	// MaxSizePerMsg is the maximum size of a single Raft message (default 1 MiB).
	MaxSizePerMsg uint64
	// MailboxCapacity is the size of the peer's mailbox channel (default 256).
	MailboxCapacity int
	// RaftLogGCTickInterval is how often the log GC tick fires (default 60s).
	// Set to 0 to disable Raft log GC.
	RaftLogGCTickInterval time.Duration
	// RaftLogGCCountLimit triggers compaction when excess entry count exceeds this (default 10000).
	RaftLogGCCountLimit uint64
	// RaftLogGCThreshold is the minimum number of entries to keep after compaction (default 50).
	RaftLogGCThreshold uint64
}

PDRaftConfig holds configuration for a PDRaftPeer.

func DefaultPDRaftConfig

func DefaultPDRaftConfig() PDRaftConfig

DefaultPDRaftConfig returns a PDRaftConfig with sensible defaults.

type PDRaftMsg

type PDRaftMsg struct {
	Type PDRaftMsgType
	Data interface{}
}

PDRaftMsg is a message delivered to a PDRaftPeer's Mailbox channel.

type PDRaftMsgType

type PDRaftMsgType int

PDRaftMsgType identifies the type of message sent to a PDRaftPeer's mailbox.

const (
	// PDRaftMsgTypeRaftMessage carries a raftpb.Message from another peer.
	PDRaftMsgTypeRaftMessage PDRaftMsgType = iota
	// PDRaftMsgTypeProposal carries a PDProposal to be proposed to the Raft group.
	PDRaftMsgTypeProposal
)

type PDRaftPeer

type PDRaftPeer struct {

	// Mailbox receives PDRaftMsg from other peers and from ProposeAndWait.
	Mailbox chan PDRaftMsg
	// contains filtered or unexported fields
}

PDRaftPeer manages a single Raft node in the PD cluster. It is modeled on Peer in internal/raftstore/peer.go but simplified: no region management, no conf change handling, no split checks.

func NewPDRaftPeer

func NewPDRaftPeer(
	nodeID uint64,
	storage *PDRaftStorage,
	peers []raft.Peer,
	peerAddrs map[uint64]string,
	cfg PDRaftConfig,
) (*PDRaftPeer, error)

NewPDRaftPeer creates a new PDRaftPeer for the given node. If peers is non-nil, the Raft group is bootstrapped with the given peer list. If peers is nil, the storage must have been recovered via RecoverFromEngine().

func (*PDRaftPeer) IsLeader

func (p *PDRaftPeer) IsLeader() bool

IsLeader returns whether this peer believes it is the Raft leader.

func (*PDRaftPeer) IsStopped

func (p *PDRaftPeer) IsStopped() bool

IsStopped returns whether this peer has been stopped.

func (*PDRaftPeer) LeaderID

func (p *PDRaftPeer) LeaderID() uint64

LeaderID returns the current leader's node ID. Returns 0 if unknown.

func (*PDRaftPeer) ProposeAndWait

func (p *PDRaftPeer) ProposeAndWait(ctx context.Context, cmd PDCommand) ([]byte, error)

ProposeAndWait proposes a PDCommand and blocks until it is committed and applied, or the context is cancelled. Returns ErrNotLeader if this peer is not the leader.

func (*PDRaftPeer) Run

func (p *PDRaftPeer) Run(ctx context.Context)

Run starts the peer's main event loop. Blocks until ctx is cancelled or the peer is stopped.

func (*PDRaftPeer) SetApplyFunc

func (p *PDRaftPeer) SetApplyFunc(f func(PDCommand) ([]byte, error))

SetApplyFunc sets the function used to apply committed PDCommands.

func (*PDRaftPeer) SetApplySnapshotFunc

func (p *PDRaftPeer) SetApplySnapshotFunc(f func([]byte) error)

SetApplySnapshotFunc sets the function used to apply a received snapshot.

func (*PDRaftPeer) SetLeaderChangeFunc

func (p *PDRaftPeer) SetLeaderChangeFunc(f func(isLeader bool))

SetLeaderChangeFunc sets a callback that fires when the local leader status changes. The callback receives true if this node became the leader, false otherwise. This is used to reset buffered allocators on leader change.

func (*PDRaftPeer) SetSendFunc

func (p *PDRaftPeer) SetSendFunc(f func([]raftpb.Message))

SetSendFunc sets the function used to send Raft messages to other peers.

func (*PDRaftPeer) WireTransport

func (p *PDRaftPeer) WireTransport(transport *PDTransport)

WireTransport sets up p.sendFunc to route outbound Raft messages via the given PDTransport. Messages addressed to the local node are delivered directly to the peer's own mailbox (avoiding a network round-trip). Errors from transport.Send are logged and skipped; Raft handles retransmission automatically.

type PDRaftStorage

type PDRaftStorage struct {
	// contains filtered or unexported fields
}

PDRaftStorage implements raft.Storage for the PD cluster Raft group. It is modeled on PeerStorage in internal/raftstore/storage.go, using clusterID instead of regionID as the key namespace discriminator.

func NewPDRaftStorage

func NewPDRaftStorage(clusterID uint64, engine traits.KvEngine) *PDRaftStorage

NewPDRaftStorage creates a PDRaftStorage for the given cluster.

func (*PDRaftStorage) AppliedIndex

func (s *PDRaftStorage) AppliedIndex() uint64

AppliedIndex returns the current applied index.

func (*PDRaftStorage) CompactTo

func (s *PDRaftStorage) CompactTo(compactTo uint64)

CompactTo removes entries from the in-memory cache up to compactTo. Entries before compactTo will no longer be served from cache.

func (*PDRaftStorage) DeleteEntriesTo

func (s *PDRaftStorage) DeleteEntriesTo(endIdx uint64) error

DeleteEntriesTo deletes persisted Raft log entries in [1, endIdx) from the engine. This is the physical counterpart to CompactTo (which only trims the in-memory cache). It uses engine.DeleteRange for efficient bulk deletion, following the same pattern as RaftLogGCWorker.gcRaftLog in internal/raftstore/raftlog_gc.go.

func (*PDRaftStorage) Entries

func (s *PDRaftStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)

Entries returns a slice of Raft log entries in [lo, hi), capped at maxSize bytes.

func (*PDRaftStorage) FirstIndex

func (s *PDRaftStorage) FirstIndex() (uint64, error)

FirstIndex returns the index of the first available log entry.

func (*PDRaftStorage) GetApplyState

func (s *PDRaftStorage) GetApplyState() raftstore.ApplyState

GetApplyState returns the current apply state.

func (*PDRaftStorage) InitialState

func (s *PDRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error)

InitialState returns the initial HardState and ConfState from storage.

func (*PDRaftStorage) LastIndex

func (s *PDRaftStorage) LastIndex() (uint64, error)

LastIndex returns the index of the last log entry.

func (*PDRaftStorage) RecoverFromEngine

func (s *PDRaftStorage) RecoverFromEngine() error

RecoverFromEngine restores PDRaftStorage state from the engine. This should be called when restarting a PD node that already has persisted Raft state.

func (*PDRaftStorage) SaveReady

func (s *PDRaftStorage) SaveReady(rd raft.Ready) error

SaveReady persists the Raft state changes from a Ready batch. Entries and hard state are written atomically via a WriteBatch.

func (*PDRaftStorage) SetApplyState

func (s *PDRaftStorage) SetApplyState(state raftstore.ApplyState)

SetApplyState updates the apply state.

func (*PDRaftStorage) SetDummyEntry

func (s *PDRaftStorage) SetDummyEntry()

SetDummyEntry adds a dummy entry at index 0 with term 0, matching etcd/raft's MemoryStorage convention for empty storage.

func (*PDRaftStorage) SetPersistedLastIndex

func (s *PDRaftStorage) SetPersistedLastIndex(idx uint64)

SetPersistedLastIndex sets the persisted last index (for initialization).

func (*PDRaftStorage) SetSnapshotGenFunc

func (s *PDRaftStorage) SetSnapshotGenFunc(f func() ([]byte, error))

SetSnapshotGenFunc sets the function used to generate snapshot data.

func (*PDRaftStorage) Snapshot

func (s *PDRaftStorage) Snapshot() (raftpb.Snapshot, error)

Snapshot returns a snapshot of the PD state. When snapGenFunc is set, the snapshot includes the full serialized PD state for transfer to slow followers. Otherwise, only metadata is returned.

func (*PDRaftStorage) Term

func (s *PDRaftStorage) Term(i uint64) (uint64, error)

Term returns the term of the entry at the given index.

type PDServer

type PDServer struct {
	pdpb.UnimplementedPDServer
	// contains filtered or unexported fields
}

PDServer implements the pdpb.PDServer gRPC interface.

func NewPDServer

func NewPDServer(cfg PDServerConfig) (*PDServer, error)

NewPDServer creates a new PD server.

func (*PDServer) Addr

func (s *PDServer) Addr() string

Addr returns the listen address.

func (*PDServer) AllocID

func (*PDServer) ApplySnapshot

func (s *PDServer) ApplySnapshot(data []byte) error

ApplySnapshot replaces the PD server's in-memory state with the state from a JSON-encoded snapshot. It acquires write locks on all mutable sub-components.

func (*PDServer) AskBatchSplit

func (*PDServer) Bootstrap

func (*PDServer) DataDir

func (s *PDServer) DataDir() string

DataDir returns the data directory path configured for this server.

func (*PDServer) GenerateSnapshot

func (s *PDServer) GenerateSnapshot() ([]byte, error)

GenerateSnapshot captures the current PD server state as a JSON-encoded snapshot. It acquires read locks on all mutable sub-components and deep-copies their state.

func (*PDServer) GetAllStores

func (*PDServer) GetGCSafePoint

func (*PDServer) GetMembers

func (*PDServer) GetRegion

func (*PDServer) GetRegionByID

func (s *PDServer) GetRegionByID(ctx context.Context, req *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error)

func (*PDServer) GetStore

func (*PDServer) IsBootstrapped

func (*PDServer) IsRaftLeader

func (s *PDServer) IsRaftLeader() bool

IsRaftLeader returns whether this server's Raft peer believes it is the leader. Returns false if the server is running in single-node mode (no Raft).

func (*PDServer) PutStore

func (*PDServer) RegionHeartbeat

func (s *PDServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error

func (*PDServer) ReportBatchSplit

func (*PDServer) Start

func (s *PDServer) Start() error

Start starts the PD server.

func (*PDServer) Stop

func (s *PDServer) Stop()

Stop gracefully stops the PD server. It is safe to call multiple times.

func (*PDServer) StoreHeartbeat

func (*PDServer) Tso

func (s *PDServer) Tso(stream pdpb.PD_TsoServer) error

func (*PDServer) UpdateGCSafePoint

type PDServerConfig

type PDServerConfig struct {
	ListenAddr string
	StatusAddr string // HTTP status/pprof listen address
	DataDir    string
	ClusterID  uint64

	TSOSaveInterval           time.Duration
	TSOUpdatePhysicalInterval time.Duration

	MaxPeerCount int

	StoreDisconnectDuration time.Duration
	StoreDownDuration       time.Duration

	RegionBalanceThreshold float64
	RegionBalanceRateLimit int

	// RaftConfig enables Raft-based replication when non-nil.
	// When nil, the server operates in single-node mode (backward compatible).
	RaftConfig *PDServerRaftConfig
}

PDServerConfig holds configuration for the PD server.

func DefaultPDServerConfig

func DefaultPDServerConfig() PDServerConfig

DefaultPDServerConfig returns default PD server configuration.

type PDServerRaftConfig

type PDServerRaftConfig struct {
	PDNodeID             uint64            // this node's Raft ID
	InitialCluster       map[uint64]string // peerID -> peer gRPC address
	PeerAddr             string            // listen address for peer-to-peer gRPC
	ClientAddrs          map[uint64]string // peerID -> client gRPC address (for forwarding)
	RaftTickInterval     time.Duration
	ElectionTimeoutTicks int
	HeartbeatTicks       int
}

PDServerRaftConfig holds the Raft-cluster configuration for a PD server. If nil in PDServerConfig.RaftConfig, the server runs in single-node mode.

type PDSnapshot

type PDSnapshot struct {
	Bootstrapped       bool                        `json:"bootstrapped"`
	Stores             map[uint64]*metapb.Store    `json:"stores"`
	Regions            map[uint64]*metapb.Region   `json:"regions"`
	Leaders            map[uint64]*metapb.Peer     `json:"leaders"`
	StoreStats         map[uint64]*pdpb.StoreStats `json:"store_stats"`
	StoreStates        map[uint64]StoreState       `json:"store_states"`
	NextID             uint64                      `json:"next_id"`
	TSOState           TSOSnapshotState            `json:"tso_state"`
	GCSafePoint        uint64                      `json:"gc_safe_point"`
	PendingMoves       map[uint64]*PendingMove     `json:"pending_moves"`
	StoreLastHeartbeat map[uint64]int64            `json:"store_last_heartbeat"`
}

PDSnapshot captures all mutable PD state for Raft snapshot transfer.

type PDTransport

type PDTransport struct {
	// contains filtered or unexported fields
}

PDTransport manages gRPC connections to other PD peers for Raft message transport. It uses lazy, single-connection-per-peer semantics since PD Raft traffic is relatively infrequent compared to TiKV store Raft traffic.

func NewPDTransport

func NewPDTransport(peerAddrs map[uint64]string) *PDTransport

NewPDTransport creates a new PDTransport with the given peer address map. peerAddrs maps peerID to network address (e.g., "127.0.0.1:2380").

func (*PDTransport) Close

func (t *PDTransport) Close()

Close closes all connections managed by this transport.

func (*PDTransport) Send

func (t *PDTransport) Send(peerID uint64, msg raftpb.Message) error

Send sends a single raftpb.Message to the specified peer via gRPC. It converts the message from raftpb to eraftpb format, wraps it in a RaftMessage (with RegionId=0 as PD sentinel), and calls the remote SendPDRaftMessage unary RPC.

type PendingMove

type PendingMove struct {
	RegionID       uint64
	SourcePeer     *metapb.Peer
	TargetStoreID  uint64
	TargetPeerID   uint64
	State          MoveState
	StartedAt      time.Time
	StabilizeCount int // heartbeat cycles spent in MoveStateStabilizing
}

PendingMove tracks a single in-progress region move.

type ScheduleCommand

type ScheduleCommand struct {
	RegionID       uint64
	TransferLeader *pdpb.TransferLeader
	ChangePeer     *pdpb.ChangePeer
	Merge          *pdpb.Merge
}

ScheduleCommand represents a scheduling command to return in a heartbeat response.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler produces scheduling commands based on cluster state.

func NewScheduler

func NewScheduler(meta *MetadataStore, idAlloc *IDAllocator, maxPeerCount int,
	threshold float64, rateLimit int, moveTracker MoveTrackerInterface) *Scheduler

NewScheduler creates a new Scheduler.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(regionID uint64, region *metapb.Region, leader *metapb.Peer) *ScheduleCommand

Schedule evaluates the cluster state for a given region and returns a command if needed. Returns nil if no scheduling action is required.

type StoreState

type StoreState int

StoreState represents the lifecycle state of a TiKV store.

const (
	StoreStateUp           StoreState = iota // Heartbeat within disconnect threshold
	StoreStateDisconnected                   // Heartbeat missed > DisconnectDuration
	StoreStateDown                           // Disconnected > DownDuration — replicas should be repaired
	StoreStateTombstone                      // Permanently removed
)

type TSOAllocator

type TSOAllocator struct {
	// contains filtered or unexported fields
}

TSOAllocator generates monotonically increasing timestamps.

func NewTSOAllocator

func NewTSOAllocator(saveInterval time.Duration) *TSOAllocator

func (*TSOAllocator) Allocate

func (t *TSOAllocator) Allocate(count int) (*pdpb.Timestamp, error)

Allocate allocates `count` timestamps and returns the last one.

type TSOBuffer

type TSOBuffer struct {
	// contains filtered or unexported fields
}

TSOBuffer pre-allocates TSO ranges via Raft and serves subsequent requests from a local buffer. This amortizes the cost of Raft consensus over many TSO allocations. On leader change, the buffer must be reset so the new leader proposes a fresh batch.

func NewTSOBuffer

func NewTSOBuffer(raftPeer *PDRaftPeer) *TSOBuffer

NewTSOBuffer creates a TSOBuffer backed by the given Raft peer.

func (*TSOBuffer) GetTS

func (b *TSOBuffer) GetTS(ctx context.Context, count int) (*pdpb.Timestamp, error)

GetTS allocates count timestamps from the buffer. If the buffer is depleted, it proposes a new CmdTSOAllocate batch via Raft. Returns a *pdpb.Timestamp representing the last allocated timestamp.

func (*TSOBuffer) Reset

func (b *TSOBuffer) Reset()

Reset clears the buffer. Must be called on leader change so the new leader proposes a fresh batch.

type TSOSnapshotState

type TSOSnapshotState struct {
	Physical int64 `json:"physical"`
	Logical  int64 `json:"logical"`
}

TSOSnapshotState captures the TSOAllocator state for snapshotting.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL