replication

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// FetchMinBytes is the minimum bytes to return in a fetch response
	// Setting this higher reduces CPU usage but increases latency
	FetchMinBytes int

	// FetchMaxBytes is the maximum bytes to return in a fetch response
	FetchMaxBytes int

	// FetchMaxWaitMs is the maximum time to wait for FetchMinBytes
	// If MinBytes not reached within this time, return what we have
	FetchMaxWaitMs int

	// FetcherInterval is how often followers fetch from leader
	FetcherInterval time.Duration

	// ReplicaLagMaxMessages is max messages a replica can lag before ISR removal
	ReplicaLagMaxMessages int64

	// ReplicaLagTimeMaxMs is max time a replica can lag before ISR removal
	ReplicaLagTimeMaxMs int64

	// HighWaterMarkCheckpointIntervalMs is how often to checkpoint HW
	HighWaterMarkCheckpointIntervalMs int64

	// NumFetcherThreads is the number of fetcher threads per follower
	NumFetcherThreads int

	// MaxInflightFetches is max concurrent fetch requests per follower
	MaxInflightFetches int

	// FetchBackoffMs is backoff time after fetch error
	FetchBackoffMs int64

	// ReplicaIDForFollower is the replica ID when running as follower
	ReplicaIDForFollower ReplicaID
}

Config holds replication configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns sensible defaults for replication

func (*Config) Clone

func (c *Config) Clone() *Config

Clone returns a deep copy of the config

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid

type ErrorCode

type ErrorCode int

ErrorCode represents replication-specific errors

const (
	ErrorNone ErrorCode = iota
	ErrorNotLeader
	ErrorOffsetOutOfRange
	ErrorStaleEpoch
	ErrorPartitionNotFound
	ErrorReplicaNotInReplicas
	ErrorUnknown
)

func (ErrorCode) String

func (e ErrorCode) String() string

String returns the string representation of ErrorCode

type FailoverCoordinator

type FailoverCoordinator struct {
	// contains filtered or unexported fields
}

FailoverCoordinator manages automatic leader failover for partitions

func NewFailoverCoordinator

func NewFailoverCoordinator(
	config *Config,
	brokerID ReplicaID,
	metadataClient MetadataClient,
	heartbeatClient HeartbeatClient,
) *FailoverCoordinator

NewFailoverCoordinator creates a new failover coordinator

func (*FailoverCoordinator) GetMetrics

func (fc *FailoverCoordinator) GetMetrics() FailoverMetrics

GetMetrics returns failover metrics

func (*FailoverCoordinator) RegisterPartition

func (fc *FailoverCoordinator) RegisterPartition(topic string, partitionID int) error

RegisterPartition registers a partition for failover monitoring

func (*FailoverCoordinator) Start

func (fc *FailoverCoordinator) Start() error

Start starts the failover coordinator

func (*FailoverCoordinator) Stop

func (fc *FailoverCoordinator) Stop() error

Stop stops the failover coordinator

type FailoverMetrics

type FailoverMetrics struct {
	TotalFailovers      int64
	SuccessfulFailovers int64
	FailedFailovers     int64
	AverageFailoverTime time.Duration
	LastFailoverTime    time.Time
}

FailoverMetrics tracks failover statistics

type FailureDetector

type FailureDetector struct {
	// contains filtered or unexported fields
}

FailureDetector detects broker failures

func (*FailureDetector) HasFailed

func (fd *FailureDetector) HasFailed(brokerID ReplicaID) bool

HasFailed checks if a broker has failed

func (*FailureDetector) RecordHeartbeat

func (fd *FailureDetector) RecordHeartbeat(brokerID ReplicaID)

RecordHeartbeat records a successful heartbeat from a broker

type FetchRequest

type FetchRequest struct {
	// ReplicaID of the follower making the request
	ReplicaID ReplicaID

	// Topic to fetch from
	Topic string

	// PartitionID to fetch from
	PartitionID int

	// FetchOffset is the offset to start fetching from
	FetchOffset Offset

	// MaxBytes is the maximum bytes to return
	MaxBytes int

	// LeaderEpoch is the epoch the follower believes the leader to be in
	// Used to detect stale leaders
	LeaderEpoch int64

	// MinBytes is the minimum bytes to return (for long-polling)
	MinBytes int

	// MaxWaitMs is the maximum time to wait for MinBytes
	MaxWaitMs int
}

FetchRequest is sent by a follower to fetch messages from the leader

type FetchResponse

type FetchResponse struct {
	// Topic being fetched
	Topic string

	// PartitionID being fetched
	PartitionID int

	// LeaderEpoch of the current leader
	LeaderEpoch int64

	// HighWaterMark of the partition
	HighWaterMark Offset

	// LogEndOffset (LEO) of the leader
	LogEndOffset Offset

	// Messages fetched
	Messages []*Message

	// Error indicates any error during fetch
	Error error

	// ErrorCode for specific error types
	ErrorCode ErrorCode
}

FetchResponse is the leader's response to a FetchRequest

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

Fetcher coordinates replication for multiple partitions on a follower broker

func NewFetcher

func NewFetcher(config *Config, replicaID ReplicaID, storage StorageWriter, leaderClient LeaderClient) *Fetcher

NewFetcher creates a new fetcher coordinator

func (*Fetcher) AddPartition

func (f *Fetcher) AddPartition(
	topic string,
	partitionID int,
	leaderID ReplicaID,
	leaderEpoch int64,
) error

AddPartition adds a partition to replicate

func (*Fetcher) GetAllMetrics

func (f *Fetcher) GetAllMetrics() map[string]ReplicationMetrics

GetAllMetrics returns metrics for all partitions

func (*Fetcher) GetPartitionCount

func (f *Fetcher) GetPartitionCount() int

GetPartitionCount returns the number of partitions being replicated

func (*Fetcher) GetReplicationLag

func (f *Fetcher) GetReplicationLag(topic string, partitionID int) (int64, error)

GetReplicationLag returns the replication lag for a partition

func (*Fetcher) RemovePartition

func (f *Fetcher) RemovePartition(topic string, partitionID int) error

RemovePartition stops replicating a partition

func (*Fetcher) Start

func (f *Fetcher) Start() error

Start starts the fetcher

func (*Fetcher) Stop

func (f *Fetcher) Stop() error

Stop stops all replicators

func (*Fetcher) UpdateLeader

func (f *Fetcher) UpdateLeader(
	topic string,
	partitionID int,
	leaderID ReplicaID,
	leaderEpoch int64,
) error

UpdateLeader updates the leader for a partition

type FollowerReplicator

type FollowerReplicator struct {
	// contains filtered or unexported fields
}

FollowerReplicator manages replication on the follower side

func NewFollowerReplicator

func NewFollowerReplicator(
	config *Config,
	topic string,
	partitionID int,
	replicaID ReplicaID,
	leaderID ReplicaID,
	leaderEpoch int64,
	storage StorageWriter,
	leaderClient LeaderClient,
) *FollowerReplicator

NewFollowerReplicator creates a new follower replicator

func (*FollowerReplicator) GetFetchLag

func (fr *FollowerReplicator) GetFetchLag() int64

GetFetchLag returns the current lag behind leader

func (*FollowerReplicator) GetHighWaterMark

func (fr *FollowerReplicator) GetHighWaterMark() Offset

GetHighWaterMark returns the current HW

func (*FollowerReplicator) GetLogEndOffset

func (fr *FollowerReplicator) GetLogEndOffset() Offset

GetLogEndOffset returns the current LEO

func (*FollowerReplicator) GetMetrics

func (fr *FollowerReplicator) GetMetrics() ReplicationMetrics

GetMetrics returns replication metrics

func (*FollowerReplicator) Start

func (fr *FollowerReplicator) Start() error

Start starts the fetch loop

func (*FollowerReplicator) Stop

func (fr *FollowerReplicator) Stop() error

Stop stops the replicator

func (*FollowerReplicator) UpdateLeader

func (fr *FollowerReplicator) UpdateLeader(leaderID ReplicaID, leaderEpoch int64) error

UpdateLeader updates the leader information

type GlobalMetricsSnapshot

type GlobalMetricsSnapshot struct {
	TotalPartitions    int32
	LeaderPartitions   int32
	FollowerPartitions int32

	MaxReplicationLagMessages int64
	AvgReplicationLagMessages int64

	TotalFetchRequests int64
	TotalFetchBytes    int64
	TotalFetchErrors   int64

	TotalISRShrinks           int64
	TotalISRExpands           int64
	PartitionsUnderReplicated int32

	TotalFailovers    int64
	FailedFailovers   int64
	AvgFailoverTimeMs int64
}

GlobalMetricsSnapshot is a point-in-time snapshot of global metrics

type GlobalReplicationMetrics

type GlobalReplicationMetrics struct {
	// Total partition count
	TotalPartitions    atomic.Int32
	LeaderPartitions   atomic.Int32
	FollowerPartitions atomic.Int32

	// Aggregate replication lag
	MaxReplicationLagMessages atomic.Int64
	AvgReplicationLagMessages atomic.Int64

	// Aggregate fetch metrics
	TotalFetchRequests atomic.Int64
	TotalFetchBytes    atomic.Int64
	TotalFetchErrors   atomic.Int64

	// ISR health
	TotalISRShrinks           atomic.Int64
	TotalISRExpands           atomic.Int64
	PartitionsUnderReplicated atomic.Int32 // Partitions with ISR < replication factor

	// Failover metrics
	TotalFailovers    atomic.Int64
	FailedFailovers   atomic.Int64
	AvgFailoverTimeMs atomic.Int64
}

GlobalReplicationMetrics tracks cluster-wide replication metrics

type HeartbeatClient

type HeartbeatClient interface {
	// SendHeartbeat sends a heartbeat to a broker
	SendHeartbeat(ctx context.Context, brokerID ReplicaID) error
}

HeartbeatClient sends heartbeats to brokers

type ISRChangeNotification

type ISRChangeNotification struct {
	Topic       string
	PartitionID int
	OldISR      []ReplicaID
	NewISR      []ReplicaID
	Reason      string
	Timestamp   time.Time
}

ISRChangeNotification represents a change in ISR membership

type LeaderClient

type LeaderClient interface {
	// Fetch sends a fetch request to the leader
	Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)

	// GetLeaderEndpoint returns the leader's network endpoint
	GetLeaderEndpoint(topic string, partitionID int) (string, error)
}

LeaderClient defines the interface for communicating with the leader

type LeaderReplicator

type LeaderReplicator struct {
	// contains filtered or unexported fields
}

LeaderReplicator manages replication on the leader side

func NewLeaderReplicator

func NewLeaderReplicator(
	config *Config,
	topic string,
	partitionID int,
	leaderEpoch int64,
	replicaID ReplicaID,
	replicas []ReplicaID,
	storage StorageReader,
) *LeaderReplicator

NewLeaderReplicator creates a new leader replicator

func (*LeaderReplicator) GetHighWaterMark

func (lr *LeaderReplicator) GetHighWaterMark() Offset

GetHighWaterMark returns the current high water mark

func (*LeaderReplicator) GetISR

func (lr *LeaderReplicator) GetISR() []ReplicaID

GetISR returns the current ISR

func (*LeaderReplicator) GetMetrics

func (lr *LeaderReplicator) GetMetrics() ReplicationMetrics

GetMetrics returns replication metrics

func (*LeaderReplicator) GetPartitionState

func (lr *LeaderReplicator) GetPartitionState() *PartitionReplicationState

GetPartitionState returns the full partition replication state

func (*LeaderReplicator) HandleFetchRequest

func (lr *LeaderReplicator) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error)

HandleFetchRequest handles a fetch request from a follower

func (*LeaderReplicator) ISRChangeChannel

func (lr *LeaderReplicator) ISRChangeChannel() <-chan *ISRChangeNotification

ISRChangeChannel returns the channel for ISR change notifications

func (*LeaderReplicator) Start

func (lr *LeaderReplicator) Start() error

Start starts background tasks

func (*LeaderReplicator) Stop

func (lr *LeaderReplicator) Stop() error

Stop stops the replicator

func (*LeaderReplicator) UpdateLogEndOffset

func (lr *LeaderReplicator) UpdateLogEndOffset(newLEO Offset)

UpdateLogEndOffset updates the leader's LEO (called after appending)

type LogReconciliation

type LogReconciliation struct {
	// contains filtered or unexported fields
}

LogReconciliation handles log reconciliation after leader changes When a new leader is elected, it must ensure all followers have consistent logs

func NewLogReconciliation

func NewLogReconciliation(
	topic string,
	partitionID int,
	replicaID ReplicaID,
	leaderEpoch int64,
	storage StorageAdapter,
) *LogReconciliation

NewLogReconciliation creates a new log reconciliation handler

func (*LogReconciliation) GetDivergencePoint

func (lr *LogReconciliation) GetDivergencePoint(
	ctx context.Context,
	localLEO Offset,
	leaderLEO Offset,
) (Offset, error)

GetDivergencePoint finds the offset where two logs diverge Returns the offset of the first diverging entry, or -1 if logs are consistent

func (*LogReconciliation) ReconcileAsFollower

func (lr *LogReconciliation) ReconcileAsFollower(ctx context.Context, leaderEpoch int64) error

ReconcileAsFollower performs reconciliation when becoming follower This ensures the log is consistent with the new leader's log

func (*LogReconciliation) ReconcileAsNewLeader

func (lr *LogReconciliation) ReconcileAsNewLeader(ctx context.Context) error

ReconcileAsNewLeader performs reconciliation when becoming leader This truncates any uncommitted entries from the previous epoch

func (*LogReconciliation) ResetToOffset

func (lr *LogReconciliation) ResetToOffset(ctx context.Context, offset Offset) error

ResetToOffset resets the log to a specific offset All entries at or after this offset are removed

func (*LogReconciliation) ValidateConsistency

func (lr *LogReconciliation) ValidateConsistency(ctx context.Context) error

ValidateConsistency checks if the log is consistent This can be called periodically or after operations

type Message

type Message struct {
	// Offset of the message in the log
	Offset Offset

	// Key (optional)
	Key []byte

	// Value is the message payload
	Value []byte

	// Timestamp in Unix nanoseconds
	Timestamp int64

	// Headers (optional metadata)
	Headers map[string][]byte
}

Message represents a single message in the replication protocol

type MetadataClient

type MetadataClient interface {
	// GetPartitionLeader returns current leader and epoch
	GetPartitionLeader(topic string, partitionID int) (ReplicaID, int64, error)

	// GetPartitionISR returns current ISR
	GetPartitionISR(topic string, partitionID int) ([]ReplicaID, error)

	// UpdatePartitionLeader updates leader via Raft consensus
	UpdatePartitionLeader(ctx context.Context, topic string, partitionID int, newLeader ReplicaID, newEpoch int64) error

	// UpdatePartitionISR updates ISR via Raft consensus
	UpdatePartitionISR(ctx context.Context, topic string, partitionID int, newISR []ReplicaID) error

	// GetPartitionReplicas returns all replicas for partition
	GetPartitionReplicas(topic string, partitionID int) ([]ReplicaID, error)
}

MetadataClient defines operations needed for failover

type MetricsCollector

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector collects and tracks replication metrics

func NewMetricsCollector

func NewMetricsCollector() *MetricsCollector

NewMetricsCollector creates a new metrics collector

func (*MetricsCollector) ComputeGlobalMetrics

func (mc *MetricsCollector) ComputeGlobalMetrics()

ComputeGlobalMetrics computes global metrics from partition metrics

func (*MetricsCollector) GetAllPartitionMetrics

func (mc *MetricsCollector) GetAllPartitionMetrics() map[string]*PartitionMetrics

GetAllPartitionMetrics returns metrics for all partitions

func (*MetricsCollector) GetGlobalMetrics

func (mc *MetricsCollector) GetGlobalMetrics() *GlobalReplicationMetrics

GetGlobalMetrics returns global replication metrics

func (*MetricsCollector) GetGlobalSnapshot

func (mc *MetricsCollector) GetGlobalSnapshot() GlobalMetricsSnapshot

GetGlobalSnapshot returns a snapshot of global metrics

func (*MetricsCollector) GetPartitionMetrics

func (mc *MetricsCollector) GetPartitionMetrics(topic string, partitionID int) *PartitionMetrics

GetPartitionMetrics returns metrics for a specific partition

func (*MetricsCollector) RegisterPartition

func (mc *MetricsCollector) RegisterPartition(topic string, partitionID int) *PartitionMetrics

RegisterPartition registers a partition for metrics tracking

func (*MetricsCollector) UnregisterPartition

func (mc *MetricsCollector) UnregisterPartition(topic string, partitionID int)

UnregisterPartition removes a partition from metrics tracking

type Offset

type Offset int64

Offset represents a message offset in a partition log

type PartitionFailoverState

type PartitionFailoverState struct {
	// contains filtered or unexported fields
}

PartitionFailoverState tracks failover state for a partition

type PartitionMetrics

type PartitionMetrics struct {
	// Partition identification
	Topic       string
	PartitionID int

	// Offset tracking
	LogEndOffset  atomic.Int64
	HighWaterMark atomic.Int64

	// Replication lag (for followers)
	ReplicationLagMessages atomic.Int64 // Messages behind leader
	ReplicationLagMs       atomic.Int64 // Time lag in milliseconds

	// Fetch metrics (for followers)
	FetchRequestCount  atomic.Int64
	FetchBytesTotal    atomic.Int64
	FetchErrorCount    atomic.Int64
	LastFetchLatencyMs atomic.Int64
	LastFetchTime      atomic.Int64 // Unix timestamp in ms

	// ISR metrics (for leaders)
	ISRSize        atomic.Int32
	ISRShrinkCount atomic.Int64
	ISRExpandCount atomic.Int64
	LastISRChange  atomic.Int64 // Unix timestamp in ms

	// Failover metrics
	FailoverCount    atomic.Int64
	LastFailoverTime atomic.Int64 // Unix timestamp in ms
	LeaderEpoch      atomic.Int64
}

PartitionMetrics tracks metrics for a single partition

func (*PartitionMetrics) GetSnapshot

func (pm *PartitionMetrics) GetSnapshot() PartitionMetricsSnapshot

GetSnapshot returns a snapshot of partition metrics

func (*PartitionMetrics) RecordFailover

func (pm *PartitionMetrics) RecordFailover(newEpoch int64)

RecordFailover records a leader failover

func (*PartitionMetrics) RecordFetch

func (pm *PartitionMetrics) RecordFetch(bytesRead int, latencyMs int64, err error)

RecordFetch records a fetch operation

func (*PartitionMetrics) RecordISRChange

func (pm *PartitionMetrics) RecordISRChange(newSize int, isShrink bool)

RecordISRChange records an ISR membership change

func (*PartitionMetrics) UpdateFromReplicationMetrics

func (pm *PartitionMetrics) UpdateFromReplicationMetrics(rm ReplicationMetrics)

UpdateFromReplicationMetrics updates metrics from ReplicationMetrics

func (*PartitionMetrics) UpdateOffsets

func (pm *PartitionMetrics) UpdateOffsets(leo, hw Offset)

UpdateOffsets updates LEO and HW

type PartitionMetricsSnapshot

type PartitionMetricsSnapshot struct {
	Topic       string
	PartitionID int

	LogEndOffset  int64
	HighWaterMark int64

	ReplicationLagMessages int64
	ReplicationLagMs       int64

	FetchRequestCount  int64
	FetchBytesTotal    int64
	FetchErrorCount    int64
	LastFetchLatencyMs int64
	LastFetchTime      time.Time

	ISRSize        int32
	ISRShrinkCount int64
	ISRExpandCount int64
	LastISRChange  time.Time

	FailoverCount    int64
	LastFailoverTime time.Time
	LeaderEpoch      int64
}

PartitionMetricsSnapshot is a point-in-time snapshot of partition metrics

type PartitionReplicationState

type PartitionReplicationState struct {
	// Topic name
	Topic string

	// PartitionID within the topic
	PartitionID int

	// Leader broker ID
	Leader ReplicaID

	// LeaderEpoch increments on each leader change to prevent stale reads
	LeaderEpoch int64

	// Replicas is the full list of replicas (leader + followers)
	Replicas []ReplicaID

	// ISR (In-Sync Replicas) are replicas within acceptable lag limits
	ISR []ReplicaID

	// ReplicaStates maps each replica to its replication state
	ReplicaStates map[ReplicaID]*ReplicationState

	// HighWaterMark is the partition-wide HW (minimum LEO of all ISR members)
	HighWaterMark Offset

	// LogEndOffset is the leader's LEO
	LogEndOffset Offset
}

PartitionReplicationState tracks replication state for an entire partition

type PartitionRole

type PartitionRole int

PartitionRole represents whether this replica is leader or follower

const (
	RoleFollower PartitionRole = iota
	RoleLeader
)

type ReplicaID

type ReplicaID uint64

ReplicaID represents a unique identifier for a broker replica

type ReplicationManager

type ReplicationManager struct {
	// contains filtered or unexported fields
}

ReplicationManager coordinates replication for a partition It manages both leader and follower roles

func NewReplicationManager

func NewReplicationManager(
	config *Config,
	topic string,
	partitionID int,
	replicaID ReplicaID,
	storage StorageAdapter,
	leaderClient LeaderClient,
) *ReplicationManager

NewReplicationManager creates a new replication manager

func (*ReplicationManager) BecomeFollower

func (rm *ReplicationManager) BecomeFollower(
	leaderID ReplicaID,
	leaderEpoch int64,
) error

BecomeFollower transitions this replica to follower role

func (*ReplicationManager) BecomeLeader

func (rm *ReplicationManager) BecomeLeader(
	leaderEpoch int64,
	replicas []ReplicaID,
) error

BecomeLeader transitions this replica to leader role

func (*ReplicationManager) GetHighWaterMark

func (rm *ReplicationManager) GetHighWaterMark() (Offset, error)

GetHighWaterMark returns the current high water mark

func (*ReplicationManager) GetISR

func (rm *ReplicationManager) GetISR() ([]ReplicaID, error)

GetISR returns the current ISR (only valid for leader)

func (*ReplicationManager) GetMetrics

func (rm *ReplicationManager) GetMetrics() ReplicationMetrics

GetMetrics returns replication metrics

func (*ReplicationManager) HandleFetchRequest

func (rm *ReplicationManager) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error)

HandleFetchRequest handles a fetch request (only valid for leader)

func (*ReplicationManager) IsLeader

func (rm *ReplicationManager) IsLeader() bool

IsLeader returns true if this replica is the leader

func (*ReplicationManager) NotifyReplication

func (rm *ReplicationManager) NotifyReplication()

NotifyReplication notifies waiters that replication has progressed Called periodically or when HW advances

func (*ReplicationManager) Start

func (rm *ReplicationManager) Start() error

Start starts the replication manager

func (*ReplicationManager) Stop

func (rm *ReplicationManager) Stop() error

Stop stops the replication manager

func (*ReplicationManager) UpdateLogEndOffset

func (rm *ReplicationManager) UpdateLogEndOffset(newLEO Offset) error

UpdateLogEndOffset updates the leader's LEO after appending

func (*ReplicationManager) WaitForISR

func (rm *ReplicationManager) WaitForISR(ctx context.Context, offset Offset, timeoutMs int32) error

WaitForISR waits for an offset to be replicated to all ISR members Used for acks=all produce requests

type ReplicationMetrics

type ReplicationMetrics struct {
	// FetchRequestRate is fetches per second
	FetchRequestRate float64

	// FetchBytesRate is bytes fetched per second
	FetchBytesRate float64

	// ReplicationLagMessages is number of messages behind leader
	ReplicationLagMessages int64

	// ReplicationLagMs is time lag in milliseconds
	ReplicationLagMs int64

	// LastFetchLatencyMs is latency of last fetch in milliseconds
	LastFetchLatencyMs int64

	// ISRShrinkRate is ISR shrinks per minute
	ISRShrinkRate float64

	// ISRExpandRate is ISR expands per minute
	ISRExpandRate float64
}

ReplicationMetrics tracks replication performance metrics

type ReplicationState

type ReplicationState struct {
	// ReplicaID is the broker ID of this replica
	ReplicaID ReplicaID

	// LogEndOffset (LEO) is the offset of the last message in the replica's log
	// This represents the next offset to be written
	LogEndOffset Offset

	// HighWaterMark (HW) is the offset up to which messages are guaranteed
	// to be replicated to all ISR members. Only messages below HW are visible to consumers.
	HighWaterMark Offset

	// LastFetchTime is the timestamp of the last successful fetch from leader
	LastFetchTime time.Time

	// LastCaughtUpTime is the timestamp when this replica was last fully caught up
	LastCaughtUpTime time.Time

	// FetchLag is the number of messages this replica is behind the leader
	FetchLag int64

	// InSync indicates if this replica is in the ISR
	InSync bool
}

ReplicationState tracks the replication state of a follower replica

type StorageAdapter

type StorageAdapter interface {
	StorageReader
	StorageWriter
}

StorageAdapter combines reader and writer interfaces

type StorageMessage

type StorageMessage struct {
	Offset    Offset
	Key       []byte
	Value     []byte
	Timestamp int64
	Headers   map[string][]byte
}

StorageMessage represents a message from storage

type StorageReader

type StorageReader interface {
	// ReadRange reads messages between start and end offsets
	ReadRange(ctx context.Context, partition int, startOffset, endOffset Offset) ([]*StorageMessage, error)

	// GetLogEndOffset returns the next offset to be written
	GetLogEndOffset(partition int) (Offset, error)

	// GetHighWaterMark returns the current high water mark
	GetHighWaterMark(partition int) (Offset, error)
}

StorageReader defines the storage operations needed by the replicator

type StorageWriter

type StorageWriter interface {
	// AppendMessages appends messages to the log
	AppendMessages(ctx context.Context, partition int, messages []*StorageMessage) error

	// GetLogEndOffset returns the next offset to be written
	GetLogEndOffset(partition int) (Offset, error)

	// SetHighWaterMark updates the high water mark
	SetHighWaterMark(partition int, hw Offset) error

	// Truncate truncates the log to the given offset (for leader changes)
	Truncate(partition int, offset Offset) error
}

StorageWriter defines the storage operations needed by the follower

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL