Documentation
¶
Index ¶
- type Config
- type ErrorCode
- type FailoverCoordinator
- type FailoverMetrics
- type FailureDetector
- type FetchRequest
- type FetchResponse
- type Fetcher
- func (f *Fetcher) AddPartition(topic string, partitionID int, leaderID ReplicaID, leaderEpoch int64) error
- func (f *Fetcher) GetAllMetrics() map[string]ReplicationMetrics
- func (f *Fetcher) GetPartitionCount() int
- func (f *Fetcher) GetReplicationLag(topic string, partitionID int) (int64, error)
- func (f *Fetcher) RemovePartition(topic string, partitionID int) error
- func (f *Fetcher) Start() error
- func (f *Fetcher) Stop() error
- func (f *Fetcher) UpdateLeader(topic string, partitionID int, leaderID ReplicaID, leaderEpoch int64) error
- type FollowerReplicator
- func (fr *FollowerReplicator) GetFetchLag() int64
- func (fr *FollowerReplicator) GetHighWaterMark() Offset
- func (fr *FollowerReplicator) GetLogEndOffset() Offset
- func (fr *FollowerReplicator) GetMetrics() ReplicationMetrics
- func (fr *FollowerReplicator) Start() error
- func (fr *FollowerReplicator) Stop() error
- func (fr *FollowerReplicator) UpdateLeader(leaderID ReplicaID, leaderEpoch int64) error
- type GlobalMetricsSnapshot
- type GlobalReplicationMetrics
- type HeartbeatClient
- type ISRChangeNotification
- type LeaderClient
- type LeaderReplicator
- func (lr *LeaderReplicator) GetHighWaterMark() Offset
- func (lr *LeaderReplicator) GetISR() []ReplicaID
- func (lr *LeaderReplicator) GetMetrics() ReplicationMetrics
- func (lr *LeaderReplicator) GetPartitionState() *PartitionReplicationState
- func (lr *LeaderReplicator) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error)
- func (lr *LeaderReplicator) ISRChangeChannel() <-chan *ISRChangeNotification
- func (lr *LeaderReplicator) Start() error
- func (lr *LeaderReplicator) Stop() error
- func (lr *LeaderReplicator) UpdateLogEndOffset(newLEO Offset)
- type LogReconciliation
- func (lr *LogReconciliation) GetDivergencePoint(ctx context.Context, localLEO Offset, leaderLEO Offset) (Offset, error)
- func (lr *LogReconciliation) ReconcileAsFollower(ctx context.Context, leaderEpoch int64) error
- func (lr *LogReconciliation) ReconcileAsNewLeader(ctx context.Context) error
- func (lr *LogReconciliation) ResetToOffset(ctx context.Context, offset Offset) error
- func (lr *LogReconciliation) ValidateConsistency(ctx context.Context) error
- type Message
- type MetadataClient
- type MetricsCollector
- func (mc *MetricsCollector) ComputeGlobalMetrics()
- func (mc *MetricsCollector) GetAllPartitionMetrics() map[string]*PartitionMetrics
- func (mc *MetricsCollector) GetGlobalMetrics() *GlobalReplicationMetrics
- func (mc *MetricsCollector) GetGlobalSnapshot() GlobalMetricsSnapshot
- func (mc *MetricsCollector) GetPartitionMetrics(topic string, partitionID int) *PartitionMetrics
- func (mc *MetricsCollector) RegisterPartition(topic string, partitionID int) *PartitionMetrics
- func (mc *MetricsCollector) UnregisterPartition(topic string, partitionID int)
- type Offset
- type PartitionFailoverState
- type PartitionMetrics
- func (pm *PartitionMetrics) GetSnapshot() PartitionMetricsSnapshot
- func (pm *PartitionMetrics) RecordFailover(newEpoch int64)
- func (pm *PartitionMetrics) RecordFetch(bytesRead int, latencyMs int64, err error)
- func (pm *PartitionMetrics) RecordISRChange(newSize int, isShrink bool)
- func (pm *PartitionMetrics) UpdateFromReplicationMetrics(rm ReplicationMetrics)
- func (pm *PartitionMetrics) UpdateOffsets(leo, hw Offset)
- type PartitionMetricsSnapshot
- type PartitionReplicationState
- type PartitionRole
- type ReplicaID
- type ReplicationManager
- func (rm *ReplicationManager) BecomeFollower(leaderID ReplicaID, leaderEpoch int64) error
- func (rm *ReplicationManager) BecomeLeader(leaderEpoch int64, replicas []ReplicaID) error
- func (rm *ReplicationManager) GetHighWaterMark() (Offset, error)
- func (rm *ReplicationManager) GetISR() ([]ReplicaID, error)
- func (rm *ReplicationManager) GetMetrics() ReplicationMetrics
- func (rm *ReplicationManager) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error)
- func (rm *ReplicationManager) IsLeader() bool
- func (rm *ReplicationManager) NotifyReplication()
- func (rm *ReplicationManager) Start() error
- func (rm *ReplicationManager) Stop() error
- func (rm *ReplicationManager) UpdateLogEndOffset(newLEO Offset) error
- func (rm *ReplicationManager) WaitForISR(ctx context.Context, offset Offset, timeoutMs int32) error
- type ReplicationMetrics
- type ReplicationState
- type StorageAdapter
- type StorageMessage
- type StorageReader
- type StorageWriter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// FetchMinBytes is the minimum bytes to return in a fetch response
// Setting this higher reduces CPU usage but increases latency
FetchMinBytes int
// FetchMaxBytes is the maximum bytes to return in a fetch response
FetchMaxBytes int
// FetchMaxWaitMs is the maximum time to wait for FetchMinBytes
// If MinBytes not reached within this time, return what we have
FetchMaxWaitMs int
// FetcherInterval is how often followers fetch from leader
FetcherInterval time.Duration
// ReplicaLagMaxMessages is max messages a replica can lag before ISR removal
ReplicaLagMaxMessages int64
// ReplicaLagTimeMaxMs is max time a replica can lag before ISR removal
ReplicaLagTimeMaxMs int64
// HighWaterMarkCheckpointIntervalMs is how often to checkpoint HW
HighWaterMarkCheckpointIntervalMs int64
// NumFetcherThreads is the number of fetcher threads per follower
NumFetcherThreads int
// MaxInflightFetches is max concurrent fetch requests per follower
MaxInflightFetches int
// FetchBackoffMs is backoff time after fetch error
FetchBackoffMs int64
// ReplicaIDForFollower is the replica ID when running as follower
ReplicaIDForFollower ReplicaID
}
Config holds replication configuration
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns sensible defaults for replication
type FailoverCoordinator ¶
type FailoverCoordinator struct {
// contains filtered or unexported fields
}
FailoverCoordinator manages automatic leader failover for partitions
func NewFailoverCoordinator ¶
func NewFailoverCoordinator( config *Config, brokerID ReplicaID, metadataClient MetadataClient, heartbeatClient HeartbeatClient, ) *FailoverCoordinator
NewFailoverCoordinator creates a new failover coordinator
func (*FailoverCoordinator) GetMetrics ¶
func (fc *FailoverCoordinator) GetMetrics() FailoverMetrics
GetMetrics returns failover metrics
func (*FailoverCoordinator) RegisterPartition ¶
func (fc *FailoverCoordinator) RegisterPartition(topic string, partitionID int) error
RegisterPartition registers a partition for failover monitoring
func (*FailoverCoordinator) Start ¶
func (fc *FailoverCoordinator) Start() error
Start starts the failover coordinator
func (*FailoverCoordinator) Stop ¶
func (fc *FailoverCoordinator) Stop() error
Stop stops the failover coordinator
type FailoverMetrics ¶
type FailoverMetrics struct {
TotalFailovers int64
SuccessfulFailovers int64
FailedFailovers int64
AverageFailoverTime time.Duration
LastFailoverTime time.Time
}
FailoverMetrics tracks failover statistics
type FailureDetector ¶
type FailureDetector struct {
// contains filtered or unexported fields
}
FailureDetector detects broker failures
func (*FailureDetector) HasFailed ¶
func (fd *FailureDetector) HasFailed(brokerID ReplicaID) bool
HasFailed checks if a broker has failed
func (*FailureDetector) RecordHeartbeat ¶
func (fd *FailureDetector) RecordHeartbeat(brokerID ReplicaID)
RecordHeartbeat records a successful heartbeat from a broker
type FetchRequest ¶
type FetchRequest struct {
// ReplicaID of the follower making the request
ReplicaID ReplicaID
// Topic to fetch from
Topic string
// PartitionID to fetch from
PartitionID int
// FetchOffset is the offset to start fetching from
FetchOffset Offset
// MaxBytes is the maximum bytes to return
MaxBytes int
// LeaderEpoch is the epoch the follower believes the leader to be in
// Used to detect stale leaders
LeaderEpoch int64
// MinBytes is the minimum bytes to return (for long-polling)
MinBytes int
// MaxWaitMs is the maximum time to wait for MinBytes
MaxWaitMs int
}
FetchRequest is sent by a follower to fetch messages from the leader
type FetchResponse ¶
type FetchResponse struct {
// Topic being fetched
Topic string
// PartitionID being fetched
PartitionID int
// LeaderEpoch of the current leader
LeaderEpoch int64
// HighWaterMark of the partition
HighWaterMark Offset
// LogEndOffset (LEO) of the leader
LogEndOffset Offset
// Messages fetched
Messages []*Message
// Error indicates any error during fetch
Error error
// ErrorCode for specific error types
ErrorCode ErrorCode
}
FetchResponse is the leader's response to a FetchRequest
type Fetcher ¶
type Fetcher struct {
// contains filtered or unexported fields
}
Fetcher coordinates replication for multiple partitions on a follower broker
func NewFetcher ¶
func NewFetcher(config *Config, replicaID ReplicaID, storage StorageWriter, leaderClient LeaderClient) *Fetcher
NewFetcher creates a new fetcher coordinator
func (*Fetcher) AddPartition ¶
func (f *Fetcher) AddPartition( topic string, partitionID int, leaderID ReplicaID, leaderEpoch int64, ) error
AddPartition adds a partition to replicate
func (*Fetcher) GetAllMetrics ¶
func (f *Fetcher) GetAllMetrics() map[string]ReplicationMetrics
GetAllMetrics returns metrics for all partitions
func (*Fetcher) GetPartitionCount ¶
GetPartitionCount returns the number of partitions being replicated
func (*Fetcher) GetReplicationLag ¶
GetReplicationLag returns the replication lag for a partition
func (*Fetcher) RemovePartition ¶
RemovePartition stops replicating a partition
type FollowerReplicator ¶
type FollowerReplicator struct {
// contains filtered or unexported fields
}
FollowerReplicator manages replication on the follower side
func NewFollowerReplicator ¶
func NewFollowerReplicator( config *Config, topic string, partitionID int, replicaID ReplicaID, leaderID ReplicaID, leaderEpoch int64, storage StorageWriter, leaderClient LeaderClient, ) *FollowerReplicator
NewFollowerReplicator creates a new follower replicator
func (*FollowerReplicator) GetFetchLag ¶
func (fr *FollowerReplicator) GetFetchLag() int64
GetFetchLag returns the current lag behind leader
func (*FollowerReplicator) GetHighWaterMark ¶
func (fr *FollowerReplicator) GetHighWaterMark() Offset
GetHighWaterMark returns the current HW
func (*FollowerReplicator) GetLogEndOffset ¶
func (fr *FollowerReplicator) GetLogEndOffset() Offset
GetLogEndOffset returns the current LEO
func (*FollowerReplicator) GetMetrics ¶
func (fr *FollowerReplicator) GetMetrics() ReplicationMetrics
GetMetrics returns replication metrics
func (*FollowerReplicator) Start ¶
func (fr *FollowerReplicator) Start() error
Start starts the fetch loop
func (*FollowerReplicator) Stop ¶
func (fr *FollowerReplicator) Stop() error
Stop stops the replicator
func (*FollowerReplicator) UpdateLeader ¶
func (fr *FollowerReplicator) UpdateLeader(leaderID ReplicaID, leaderEpoch int64) error
UpdateLeader updates the leader information
type GlobalMetricsSnapshot ¶
type GlobalMetricsSnapshot struct {
TotalPartitions int32
LeaderPartitions int32
FollowerPartitions int32
MaxReplicationLagMessages int64
AvgReplicationLagMessages int64
TotalFetchRequests int64
TotalFetchBytes int64
TotalFetchErrors int64
TotalISRShrinks int64
TotalISRExpands int64
PartitionsUnderReplicated int32
TotalFailovers int64
FailedFailovers int64
AvgFailoverTimeMs int64
}
GlobalMetricsSnapshot is a point-in-time snapshot of global metrics
type GlobalReplicationMetrics ¶
type GlobalReplicationMetrics struct {
// Total partition count
TotalPartitions atomic.Int32
LeaderPartitions atomic.Int32
FollowerPartitions atomic.Int32
// Aggregate replication lag
MaxReplicationLagMessages atomic.Int64
AvgReplicationLagMessages atomic.Int64
// Aggregate fetch metrics
TotalFetchRequests atomic.Int64
TotalFetchBytes atomic.Int64
TotalFetchErrors atomic.Int64
// ISR health
TotalISRShrinks atomic.Int64
TotalISRExpands atomic.Int64
PartitionsUnderReplicated atomic.Int32 // Partitions with ISR < replication factor
// Failover metrics
TotalFailovers atomic.Int64
FailedFailovers atomic.Int64
AvgFailoverTimeMs atomic.Int64
}
GlobalReplicationMetrics tracks cluster-wide replication metrics
type HeartbeatClient ¶
type HeartbeatClient interface {
// SendHeartbeat sends a heartbeat to a broker
SendHeartbeat(ctx context.Context, brokerID ReplicaID) error
}
HeartbeatClient sends heartbeats to brokers
type ISRChangeNotification ¶
type ISRChangeNotification struct {
Topic string
PartitionID int
OldISR []ReplicaID
NewISR []ReplicaID
Reason string
Timestamp time.Time
}
ISRChangeNotification represents a change in ISR membership
type LeaderClient ¶
type LeaderClient interface {
// Fetch sends a fetch request to the leader
Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
// GetLeaderEndpoint returns the leader's network endpoint
GetLeaderEndpoint(topic string, partitionID int) (string, error)
}
LeaderClient defines the interface for communicating with the leader
type LeaderReplicator ¶
type LeaderReplicator struct {
// contains filtered or unexported fields
}
LeaderReplicator manages replication on the leader side
func NewLeaderReplicator ¶
func NewLeaderReplicator( config *Config, topic string, partitionID int, leaderEpoch int64, replicaID ReplicaID, replicas []ReplicaID, storage StorageReader, ) *LeaderReplicator
NewLeaderReplicator creates a new leader replicator
func (*LeaderReplicator) GetHighWaterMark ¶
func (lr *LeaderReplicator) GetHighWaterMark() Offset
GetHighWaterMark returns the current high water mark
func (*LeaderReplicator) GetISR ¶
func (lr *LeaderReplicator) GetISR() []ReplicaID
GetISR returns the current ISR
func (*LeaderReplicator) GetMetrics ¶
func (lr *LeaderReplicator) GetMetrics() ReplicationMetrics
GetMetrics returns replication metrics
func (*LeaderReplicator) GetPartitionState ¶
func (lr *LeaderReplicator) GetPartitionState() *PartitionReplicationState
GetPartitionState returns the full partition replication state
func (*LeaderReplicator) HandleFetchRequest ¶
func (lr *LeaderReplicator) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error)
HandleFetchRequest handles a fetch request from a follower
func (*LeaderReplicator) ISRChangeChannel ¶
func (lr *LeaderReplicator) ISRChangeChannel() <-chan *ISRChangeNotification
ISRChangeChannel returns the channel for ISR change notifications
func (*LeaderReplicator) Start ¶
func (lr *LeaderReplicator) Start() error
Start starts background tasks
func (*LeaderReplicator) UpdateLogEndOffset ¶
func (lr *LeaderReplicator) UpdateLogEndOffset(newLEO Offset)
UpdateLogEndOffset updates the leader's LEO (called after appending)
type LogReconciliation ¶
type LogReconciliation struct {
// contains filtered or unexported fields
}
LogReconciliation handles log reconciliation after leader changes When a new leader is elected, it must ensure all followers have consistent logs
func NewLogReconciliation ¶
func NewLogReconciliation( topic string, partitionID int, replicaID ReplicaID, leaderEpoch int64, storage StorageAdapter, ) *LogReconciliation
NewLogReconciliation creates a new log reconciliation handler
func (*LogReconciliation) GetDivergencePoint ¶
func (lr *LogReconciliation) GetDivergencePoint( ctx context.Context, localLEO Offset, leaderLEO Offset, ) (Offset, error)
GetDivergencePoint finds the offset where two logs diverge Returns the offset of the first diverging entry, or -1 if logs are consistent
func (*LogReconciliation) ReconcileAsFollower ¶
func (lr *LogReconciliation) ReconcileAsFollower(ctx context.Context, leaderEpoch int64) error
ReconcileAsFollower performs reconciliation when becoming follower This ensures the log is consistent with the new leader's log
func (*LogReconciliation) ReconcileAsNewLeader ¶
func (lr *LogReconciliation) ReconcileAsNewLeader(ctx context.Context) error
ReconcileAsNewLeader performs reconciliation when becoming leader This truncates any uncommitted entries from the previous epoch
func (*LogReconciliation) ResetToOffset ¶
func (lr *LogReconciliation) ResetToOffset(ctx context.Context, offset Offset) error
ResetToOffset resets the log to a specific offset All entries at or after this offset are removed
func (*LogReconciliation) ValidateConsistency ¶
func (lr *LogReconciliation) ValidateConsistency(ctx context.Context) error
ValidateConsistency checks if the log is consistent This can be called periodically or after operations
type Message ¶
type Message struct {
// Offset of the message in the log
Offset Offset
// Key (optional)
Key []byte
// Value is the message payload
Value []byte
// Timestamp in Unix nanoseconds
Timestamp int64
// Headers (optional metadata)
Headers map[string][]byte
}
Message represents a single message in the replication protocol
type MetadataClient ¶
type MetadataClient interface {
// GetPartitionLeader returns current leader and epoch
GetPartitionLeader(topic string, partitionID int) (ReplicaID, int64, error)
// GetPartitionISR returns current ISR
GetPartitionISR(topic string, partitionID int) ([]ReplicaID, error)
// UpdatePartitionLeader updates leader via Raft consensus
UpdatePartitionLeader(ctx context.Context, topic string, partitionID int, newLeader ReplicaID, newEpoch int64) error
// UpdatePartitionISR updates ISR via Raft consensus
UpdatePartitionISR(ctx context.Context, topic string, partitionID int, newISR []ReplicaID) error
// GetPartitionReplicas returns all replicas for partition
GetPartitionReplicas(topic string, partitionID int) ([]ReplicaID, error)
}
MetadataClient defines operations needed for failover
type MetricsCollector ¶
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and tracks replication metrics
func NewMetricsCollector ¶
func NewMetricsCollector() *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) ComputeGlobalMetrics ¶
func (mc *MetricsCollector) ComputeGlobalMetrics()
ComputeGlobalMetrics computes global metrics from partition metrics
func (*MetricsCollector) GetAllPartitionMetrics ¶
func (mc *MetricsCollector) GetAllPartitionMetrics() map[string]*PartitionMetrics
GetAllPartitionMetrics returns metrics for all partitions
func (*MetricsCollector) GetGlobalMetrics ¶
func (mc *MetricsCollector) GetGlobalMetrics() *GlobalReplicationMetrics
GetGlobalMetrics returns global replication metrics
func (*MetricsCollector) GetGlobalSnapshot ¶
func (mc *MetricsCollector) GetGlobalSnapshot() GlobalMetricsSnapshot
GetGlobalSnapshot returns a snapshot of global metrics
func (*MetricsCollector) GetPartitionMetrics ¶
func (mc *MetricsCollector) GetPartitionMetrics(topic string, partitionID int) *PartitionMetrics
GetPartitionMetrics returns metrics for a specific partition
func (*MetricsCollector) RegisterPartition ¶
func (mc *MetricsCollector) RegisterPartition(topic string, partitionID int) *PartitionMetrics
RegisterPartition registers a partition for metrics tracking
func (*MetricsCollector) UnregisterPartition ¶
func (mc *MetricsCollector) UnregisterPartition(topic string, partitionID int)
UnregisterPartition removes a partition from metrics tracking
type PartitionFailoverState ¶
type PartitionFailoverState struct {
// contains filtered or unexported fields
}
PartitionFailoverState tracks failover state for a partition
type PartitionMetrics ¶
type PartitionMetrics struct {
// Partition identification
Topic string
PartitionID int
// Offset tracking
LogEndOffset atomic.Int64
HighWaterMark atomic.Int64
// Replication lag (for followers)
ReplicationLagMessages atomic.Int64 // Messages behind leader
ReplicationLagMs atomic.Int64 // Time lag in milliseconds
// Fetch metrics (for followers)
FetchRequestCount atomic.Int64
FetchBytesTotal atomic.Int64
FetchErrorCount atomic.Int64
LastFetchLatencyMs atomic.Int64
LastFetchTime atomic.Int64 // Unix timestamp in ms
// ISR metrics (for leaders)
ISRSize atomic.Int32
ISRShrinkCount atomic.Int64
ISRExpandCount atomic.Int64
LastISRChange atomic.Int64 // Unix timestamp in ms
// Failover metrics
FailoverCount atomic.Int64
LastFailoverTime atomic.Int64 // Unix timestamp in ms
LeaderEpoch atomic.Int64
}
PartitionMetrics tracks metrics for a single partition
func (*PartitionMetrics) GetSnapshot ¶
func (pm *PartitionMetrics) GetSnapshot() PartitionMetricsSnapshot
GetSnapshot returns a snapshot of partition metrics
func (*PartitionMetrics) RecordFailover ¶
func (pm *PartitionMetrics) RecordFailover(newEpoch int64)
RecordFailover records a leader failover
func (*PartitionMetrics) RecordFetch ¶
func (pm *PartitionMetrics) RecordFetch(bytesRead int, latencyMs int64, err error)
RecordFetch records a fetch operation
func (*PartitionMetrics) RecordISRChange ¶
func (pm *PartitionMetrics) RecordISRChange(newSize int, isShrink bool)
RecordISRChange records an ISR membership change
func (*PartitionMetrics) UpdateFromReplicationMetrics ¶
func (pm *PartitionMetrics) UpdateFromReplicationMetrics(rm ReplicationMetrics)
UpdateFromReplicationMetrics updates metrics from ReplicationMetrics
func (*PartitionMetrics) UpdateOffsets ¶
func (pm *PartitionMetrics) UpdateOffsets(leo, hw Offset)
UpdateOffsets updates LEO and HW
type PartitionMetricsSnapshot ¶
type PartitionMetricsSnapshot struct {
Topic string
PartitionID int
LogEndOffset int64
HighWaterMark int64
ReplicationLagMessages int64
ReplicationLagMs int64
FetchRequestCount int64
FetchBytesTotal int64
FetchErrorCount int64
LastFetchLatencyMs int64
LastFetchTime time.Time
ISRSize int32
ISRShrinkCount int64
ISRExpandCount int64
LastISRChange time.Time
FailoverCount int64
LastFailoverTime time.Time
LeaderEpoch int64
}
PartitionMetricsSnapshot is a point-in-time snapshot of partition metrics
type PartitionReplicationState ¶
type PartitionReplicationState struct {
// Topic name
Topic string
// PartitionID within the topic
PartitionID int
// Leader broker ID
Leader ReplicaID
// LeaderEpoch increments on each leader change to prevent stale reads
LeaderEpoch int64
// Replicas is the full list of replicas (leader + followers)
Replicas []ReplicaID
// ISR (In-Sync Replicas) are replicas within acceptable lag limits
ISR []ReplicaID
// ReplicaStates maps each replica to its replication state
ReplicaStates map[ReplicaID]*ReplicationState
// HighWaterMark is the partition-wide HW (minimum LEO of all ISR members)
HighWaterMark Offset
// LogEndOffset is the leader's LEO
LogEndOffset Offset
}
PartitionReplicationState tracks replication state for an entire partition
type PartitionRole ¶
type PartitionRole int
PartitionRole represents whether this replica is leader or follower
const ( RoleFollower PartitionRole = iota RoleLeader )
type ReplicaID ¶
type ReplicaID uint64
ReplicaID represents a unique identifier for a broker replica
type ReplicationManager ¶
type ReplicationManager struct {
// contains filtered or unexported fields
}
ReplicationManager coordinates replication for a partition It manages both leader and follower roles
func NewReplicationManager ¶
func NewReplicationManager( config *Config, topic string, partitionID int, replicaID ReplicaID, storage StorageAdapter, leaderClient LeaderClient, ) *ReplicationManager
NewReplicationManager creates a new replication manager
func (*ReplicationManager) BecomeFollower ¶
func (rm *ReplicationManager) BecomeFollower( leaderID ReplicaID, leaderEpoch int64, ) error
BecomeFollower transitions this replica to follower role
func (*ReplicationManager) BecomeLeader ¶
func (rm *ReplicationManager) BecomeLeader( leaderEpoch int64, replicas []ReplicaID, ) error
BecomeLeader transitions this replica to leader role
func (*ReplicationManager) GetHighWaterMark ¶
func (rm *ReplicationManager) GetHighWaterMark() (Offset, error)
GetHighWaterMark returns the current high water mark
func (*ReplicationManager) GetISR ¶
func (rm *ReplicationManager) GetISR() ([]ReplicaID, error)
GetISR returns the current ISR (only valid for leader)
func (*ReplicationManager) GetMetrics ¶
func (rm *ReplicationManager) GetMetrics() ReplicationMetrics
GetMetrics returns replication metrics
func (*ReplicationManager) HandleFetchRequest ¶
func (rm *ReplicationManager) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error)
HandleFetchRequest handles a fetch request (only valid for leader)
func (*ReplicationManager) IsLeader ¶
func (rm *ReplicationManager) IsLeader() bool
IsLeader returns true if this replica is the leader
func (*ReplicationManager) NotifyReplication ¶
func (rm *ReplicationManager) NotifyReplication()
NotifyReplication notifies waiters that replication has progressed Called periodically or when HW advances
func (*ReplicationManager) Start ¶
func (rm *ReplicationManager) Start() error
Start starts the replication manager
func (*ReplicationManager) Stop ¶
func (rm *ReplicationManager) Stop() error
Stop stops the replication manager
func (*ReplicationManager) UpdateLogEndOffset ¶
func (rm *ReplicationManager) UpdateLogEndOffset(newLEO Offset) error
UpdateLogEndOffset updates the leader's LEO after appending
func (*ReplicationManager) WaitForISR ¶
WaitForISR waits for an offset to be replicated to all ISR members Used for acks=all produce requests
type ReplicationMetrics ¶
type ReplicationMetrics struct {
// FetchRequestRate is fetches per second
FetchRequestRate float64
// FetchBytesRate is bytes fetched per second
FetchBytesRate float64
// ReplicationLagMessages is number of messages behind leader
ReplicationLagMessages int64
// ReplicationLagMs is time lag in milliseconds
ReplicationLagMs int64
// LastFetchLatencyMs is latency of last fetch in milliseconds
LastFetchLatencyMs int64
// ISRShrinkRate is ISR shrinks per minute
ISRShrinkRate float64
// ISRExpandRate is ISR expands per minute
ISRExpandRate float64
}
ReplicationMetrics tracks replication performance metrics
type ReplicationState ¶
type ReplicationState struct {
// ReplicaID is the broker ID of this replica
ReplicaID ReplicaID
// LogEndOffset (LEO) is the offset of the last message in the replica's log
// This represents the next offset to be written
LogEndOffset Offset
// HighWaterMark (HW) is the offset up to which messages are guaranteed
// to be replicated to all ISR members. Only messages below HW are visible to consumers.
HighWaterMark Offset
// LastFetchTime is the timestamp of the last successful fetch from leader
LastFetchTime time.Time
// LastCaughtUpTime is the timestamp when this replica was last fully caught up
LastCaughtUpTime time.Time
// FetchLag is the number of messages this replica is behind the leader
FetchLag int64
// InSync indicates if this replica is in the ISR
InSync bool
}
ReplicationState tracks the replication state of a follower replica
type StorageAdapter ¶
type StorageAdapter interface {
StorageReader
StorageWriter
}
StorageAdapter combines reader and writer interfaces
type StorageMessage ¶
type StorageMessage struct {
Offset Offset
Key []byte
Value []byte
Timestamp int64
Headers map[string][]byte
}
StorageMessage represents a message from storage
type StorageReader ¶
type StorageReader interface {
// ReadRange reads messages between start and end offsets
ReadRange(ctx context.Context, partition int, startOffset, endOffset Offset) ([]*StorageMessage, error)
// GetLogEndOffset returns the next offset to be written
GetLogEndOffset(partition int) (Offset, error)
// GetHighWaterMark returns the current high water mark
GetHighWaterMark(partition int) (Offset, error)
}
StorageReader defines the storage operations needed by the replicator
type StorageWriter ¶
type StorageWriter interface {
// AppendMessages appends messages to the log
AppendMessages(ctx context.Context, partition int, messages []*StorageMessage) error
// GetLogEndOffset returns the next offset to be written
GetLogEndOffset(partition int) (Offset, error)
// SetHighWaterMark updates the high water mark
SetHighWaterMark(partition int, hw Offset) error
// Truncate truncates the log to the given offset (for leader changes)
Truncate(partition int, offset Offset) error
}
StorageWriter defines the storage operations needed by the follower