Documentation
¶
Index ¶
- func EncodeOperation(op Operation) ([]byte, error)
- func PartitionID(topic string, partition int) string
- type BatchCreatePartitionsOp
- type BrokerInfo
- type BrokerResources
- type BrokerStatus
- type ClusterMetadata
- type ClusterMetadataStore
- func (cms *ClusterMetadataStore) DeleteBroker(ctx context.Context, brokerID int32) error
- func (cms *ClusterMetadataStore) GetBrokerMetadata(ctx context.Context, brokerID int32) (*cluster.BrokerMetadata, error)
- func (cms *ClusterMetadataStore) GetPartitionAssignment(ctx context.Context, topic string) (*cluster.Assignment, error)
- func (cms *ClusterMetadataStore) ListBrokers(ctx context.Context) ([]*cluster.BrokerMetadata, error)
- func (cms *ClusterMetadataStore) StoreBrokerMetadata(ctx context.Context, broker *cluster.BrokerMetadata) error
- func (cms *ClusterMetadataStore) StorePartitionAssignment(ctx context.Context, assignment *cluster.Assignment) error
- type ConsensusNode
- type CreatePartitionOp
- type CreateTopicOp
- type DeleteTopicOp
- type FSM
- func (f *FSM) Apply(data []byte) error
- func (f *FSM) GetBroker(id uint64) (*BrokerInfo, bool)
- func (f *FSM) GetPartition(topic string, partition int) (*PartitionInfo, bool)
- func (f *FSM) GetState() *ClusterMetadata
- func (f *FSM) GetTopic(name string) (*TopicInfo, bool)
- func (f *FSM) ListBrokers() []*BrokerInfo
- func (f *FSM) ListPartitions(topic string) []*PartitionInfo
- func (f *FSM) ListTopics() []*TopicInfo
- func (f *FSM) Restore(snapshot []byte) error
- func (f *FSM) SetLogger(l Logger)
- func (f *FSM) Snapshot() ([]byte, error)
- type Logger
- type NoOpLogger
- type Operation
- type OperationType
- type PartitionInfo
- type RegisterBrokerOp
- type Store
- func (s *Store) AllocatePartitions(topic string, numPartitions, replicationFactor int) ([]*PartitionInfo, error)
- func (s *Store) BatchCreatePartitions(ctx context.Context, partitions []*PartitionInfo) error
- func (s *Store) CreatePartition(ctx context.Context, partition *PartitionInfo) error
- func (s *Store) CreateTopic(ctx context.Context, name string, numPartitions, replicationFactor int, ...) error
- func (s *Store) DeleteTopic(ctx context.Context, name string) error
- func (s *Store) GetBroker(id uint64) (*BrokerInfo, bool)
- func (s *Store) GetFSM() *FSM
- func (s *Store) GetPartition(topic string, partition int) (*PartitionInfo, bool)
- func (s *Store) GetState() *ClusterMetadata
- func (s *Store) GetTopic(name string) (*TopicInfo, bool)
- func (s *Store) IsLeader() bool
- func (s *Store) Leader() uint64
- func (s *Store) ListBrokers() []*BrokerInfo
- func (s *Store) ListPartitions(topic string) []*PartitionInfo
- func (s *Store) ListTopics() []*TopicInfo
- func (s *Store) RegisterBroker(ctx context.Context, broker *BrokerInfo) error
- func (s *Store) UnregisterBroker(ctx context.Context, brokerID uint64) error
- func (s *Store) UpdateBroker(ctx context.Context, brokerID uint64, status BrokerStatus, ...) error
- func (s *Store) UpdateISR(ctx context.Context, topic string, partition int, isr []uint64) error
- func (s *Store) UpdateLeader(ctx context.Context, topic string, partition int, leader, leaderEpoch uint64) error
- func (s *Store) UpdatePartition(ctx context.Context, partition *PartitionInfo) error
- func (s *Store) UpdateReplicaList(ctx context.Context, topic string, partition int, replicas []uint64) error
- func (s *Store) UpdateTopic(ctx context.Context, name string, config TopicConfig) error
- type TopicConfig
- type TopicInfo
- type UnregisterBrokerOp
- type UpdateBrokerOp
- type UpdateISROp
- type UpdateLeaderOp
- type UpdatePartitionOp
- type UpdateReplicaListOp
- type UpdateTopicOp
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EncodeOperation ¶
EncodeOperation encodes an operation to bytes.
func PartitionID ¶
PartitionID returns the partition identifier string.
Types ¶
type BatchCreatePartitionsOp ¶
type BatchCreatePartitionsOp struct {
Partitions []PartitionInfo
}
BatchCreatePartitionsOp is the data for OpBatchCreatePartitions. This allows creating multiple partitions in a single Raft proposal, which is more efficient and prevents leadership instability.
type BrokerInfo ¶
type BrokerInfo struct {
// ID is the unique broker identifier
ID uint64
// Addr is the network address (host:port)
Addr string
// Status is the current broker status
Status BrokerStatus
// Resources contains resource information (CPU, disk, memory, network)
Resources BrokerResources
// Rack is the rack/availability zone (for rack-aware placement)
Rack string
// RegisteredAt is when the broker joined the cluster
RegisteredAt time.Time
// LastHeartbeat is the last time we received a heartbeat
LastHeartbeat time.Time
}
BrokerInfo represents metadata about a broker in the cluster.
func (*BrokerInfo) Clone ¶
func (bi *BrokerInfo) Clone() *BrokerInfo
Clone creates a deep copy of the broker info.
type BrokerResources ¶
type BrokerResources struct {
// DiskTotal is total disk space in bytes
DiskTotal uint64
// DiskUsed is used disk space in bytes
DiskUsed uint64
// DiskAvailable is available disk space in bytes
DiskAvailable uint64
// NetworkBandwidth is network bandwidth in bytes/sec
NetworkBandwidth uint64
// CPUCores is the number of CPU cores
CPUCores int
// MemoryTotal is total memory in bytes
MemoryTotal uint64
// MemoryUsed is used memory in bytes
MemoryUsed uint64
}
BrokerResources represents resource information for a broker.
type BrokerStatus ¶
type BrokerStatus string
BrokerStatus represents the status of a broker.
const ( BrokerStatusAlive BrokerStatus = "alive" BrokerStatusDead BrokerStatus = "dead" BrokerStatusDraining BrokerStatus = "draining" BrokerStatusUnreachable BrokerStatus = "unreachable" )
type ClusterMetadata ¶
type ClusterMetadata struct {
// Brokers maps broker ID to broker metadata
Brokers map[uint64]*BrokerInfo
// Topics maps topic name to topic metadata
Topics map[string]*TopicInfo
// Partitions maps partition ID to partition metadata
// Key format: "topic:partition" (e.g., "events:0")
Partitions map[string]*PartitionInfo
// Version is incremented on every change (for optimistic locking)
Version uint64
// LastModified is the timestamp of the last modification
LastModified time.Time
}
ClusterMetadata represents all metadata for the StreamBus cluster. This is the root state that is replicated via Raft consensus.
func NewClusterMetadata ¶
func NewClusterMetadata() *ClusterMetadata
NewClusterMetadata creates a new empty cluster metadata.
func (*ClusterMetadata) Clone ¶
func (cm *ClusterMetadata) Clone() *ClusterMetadata
Clone creates a deep copy of the cluster metadata.
type ClusterMetadataStore ¶
type ClusterMetadataStore struct {
// contains filtered or unexported fields
}
ClusterMetadataStore is an adapter that implements cluster.MetadataStore using the Raft-backed metadata.Store
func NewClusterMetadataStore ¶
func NewClusterMetadataStore(store *Store) *ClusterMetadataStore
NewClusterMetadataStore creates a new adapter for cluster metadata operations
func (*ClusterMetadataStore) DeleteBroker ¶
func (cms *ClusterMetadataStore) DeleteBroker(ctx context.Context, brokerID int32) error
DeleteBroker removes broker metadata
func (*ClusterMetadataStore) GetBrokerMetadata ¶
func (cms *ClusterMetadataStore) GetBrokerMetadata(ctx context.Context, brokerID int32) (*cluster.BrokerMetadata, error)
GetBrokerMetadata retrieves broker metadata
func (*ClusterMetadataStore) GetPartitionAssignment ¶
func (cms *ClusterMetadataStore) GetPartitionAssignment(ctx context.Context, topic string) (*cluster.Assignment, error)
GetPartitionAssignment retrieves partition assignment
func (*ClusterMetadataStore) ListBrokers ¶
func (cms *ClusterMetadataStore) ListBrokers(ctx context.Context) ([]*cluster.BrokerMetadata, error)
ListBrokers lists all brokers
func (*ClusterMetadataStore) StoreBrokerMetadata ¶
func (cms *ClusterMetadataStore) StoreBrokerMetadata(ctx context.Context, broker *cluster.BrokerMetadata) error
StoreBrokerMetadata stores or updates broker metadata via Raft consensus
func (*ClusterMetadataStore) StorePartitionAssignment ¶
func (cms *ClusterMetadataStore) StorePartitionAssignment(ctx context.Context, assignment *cluster.Assignment) error
StorePartitionAssignment stores partition assignment via Raft consensus
type ConsensusNode ¶
type ConsensusNode interface {
Propose(ctx context.Context, data []byte) error
IsLeader() bool
Leader() uint64
}
ConsensusNode is the interface for proposing operations via consensus. This matches the consensus.Node interface.
type CreatePartitionOp ¶
type CreatePartitionOp struct {
Partition PartitionInfo
}
CreatePartitionOp is the data for OpCreatePartition.
type CreateTopicOp ¶
type CreateTopicOp struct {
Name string
NumPartitions int
ReplicationFactor int
Config TopicConfig
}
CreateTopicOp is the data for OpCreateTopic.
type DeleteTopicOp ¶
type DeleteTopicOp struct {
Name string
}
DeleteTopicOp is the data for OpDeleteTopic.
type FSM ¶
type FSM struct {
// contains filtered or unexported fields
}
FSM implements the Finite State Machine for cluster metadata. It applies operations to the metadata state and supports snapshots.
func (*FSM) Apply ¶
Apply applies a committed log entry to the state machine. This method is called by the consensus layer when entries are committed.
func (*FSM) GetBroker ¶
func (f *FSM) GetBroker(id uint64) (*BrokerInfo, bool)
GetBroker returns a broker by ID.
func (*FSM) GetPartition ¶
func (f *FSM) GetPartition(topic string, partition int) (*PartitionInfo, bool)
GetPartition returns a partition by topic and partition number.
func (*FSM) GetState ¶
func (f *FSM) GetState() *ClusterMetadata
GetState returns a clone of the current state. This is safe to call concurrently with Apply.
func (*FSM) ListBrokers ¶
func (f *FSM) ListBrokers() []*BrokerInfo
ListBrokers returns all brokers.
func (*FSM) ListPartitions ¶
func (f *FSM) ListPartitions(topic string) []*PartitionInfo
ListPartitions returns all partitions for a topic.
type Logger ¶
type Logger interface {
Printf(format string, v ...interface{})
}
Logger defines the interface for FSM logging
type NoOpLogger ¶
type NoOpLogger struct{}
NoOpLogger implements Logger but does nothing (for testing)
func (*NoOpLogger) Printf ¶
func (l *NoOpLogger) Printf(format string, v ...interface{})
type Operation ¶
type Operation struct {
// Type is the operation type
Type OperationType
// Data is the operation-specific data (JSON encoded)
Data []byte
// Timestamp is when the operation was created
Timestamp time.Time
}
Operation represents a metadata operation to be replicated via Raft.
func DecodeOperation ¶
DecodeOperation decodes an operation from bytes.
type OperationType ¶
type OperationType string
OperationType represents the type of metadata operation.
const ( // Broker operations OpRegisterBroker OperationType = "register_broker" OpUnregisterBroker OperationType = "unregister_broker" OpUpdateBroker OperationType = "update_broker" // Topic operations OpCreateTopic OperationType = "create_topic" OpDeleteTopic OperationType = "delete_topic" OpUpdateTopic OperationType = "update_topic" // Partition operations OpCreatePartition OperationType = "create_partition" OpUpdatePartition OperationType = "update_partition" OpUpdateLeader OperationType = "update_leader" OpUpdateISR OperationType = "update_isr" OpAddReplica OperationType = "add_replica" OpRemoveReplica OperationType = "remove_replica" OpUpdateReplicaList OperationType = "update_replica_list" // Batch operations OpBatchCreatePartitions OperationType = "batch_create_partitions" )
type PartitionInfo ¶
type PartitionInfo struct {
// Topic is the topic name
Topic string
// Partition is the partition number
Partition int
// Leader is the broker ID of the leader (0 if no leader)
Leader uint64
// Replicas is the list of all replica broker IDs
Replicas []uint64
// ISR is the list of in-sync replica broker IDs
ISR []uint64
// OfflineReplicas is the list of offline replica broker IDs
OfflineReplicas []uint64
// LeaderEpoch is incremented each time leadership changes
LeaderEpoch uint64
// CreatedAt is when the partition was created
CreatedAt time.Time
// ModifiedAt is when the partition was last modified
ModifiedAt time.Time
}
PartitionInfo represents metadata about a partition.
func (*PartitionInfo) Clone ¶
func (pi *PartitionInfo) Clone() *PartitionInfo
Clone creates a deep copy of the partition info.
func (*PartitionInfo) IsInISR ¶
func (pi *PartitionInfo) IsInISR(brokerID uint64) bool
IsInISR returns true if the given broker is in the ISR.
func (*PartitionInfo) IsLeader ¶
func (pi *PartitionInfo) IsLeader(brokerID uint64) bool
IsLeader returns true if the given broker is the leader.
func (*PartitionInfo) IsReplica ¶
func (pi *PartitionInfo) IsReplica(brokerID uint64) bool
IsReplica returns true if the given broker is a replica.
func (*PartitionInfo) PartitionID ¶
func (pi *PartitionInfo) PartitionID() string
PartitionID returns the partition identifier (topic:partition).
type RegisterBrokerOp ¶
type RegisterBrokerOp struct {
Broker BrokerInfo
}
RegisterBrokerOp is the data for OpRegisterBroker.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store provides a high-level API for managing cluster metadata. It wraps the FSM and integrates with the consensus layer.
func NewStore ¶
func NewStore(fsm *FSM, consensus ConsensusNode) *Store
NewStore creates a new metadata store with the given FSM and consensus node.
func (*Store) AllocatePartitions ¶
func (s *Store) AllocatePartitions(topic string, numPartitions, replicationFactor int) ([]*PartitionInfo, error)
AllocatePartitions assigns partitions to brokers based on the allocation strategy. This is a helper method that creates partition assignments.
func (*Store) BatchCreatePartitions ¶
func (s *Store) BatchCreatePartitions(ctx context.Context, partitions []*PartitionInfo) error
BatchCreatePartitions creates multiple partitions atomically in a single Raft proposal. This is more efficient than calling CreatePartition multiple times and prevents leadership instability under rapid operations.
func (*Store) CreatePartition ¶
func (s *Store) CreatePartition(ctx context.Context, partition *PartitionInfo) error
CreatePartition creates a new partition.
func (*Store) CreateTopic ¶
func (s *Store) CreateTopic(ctx context.Context, name string, numPartitions, replicationFactor int, config TopicConfig) error
CreateTopic creates a new topic.
func (*Store) DeleteTopic ¶
DeleteTopic deletes a topic.
func (*Store) GetBroker ¶
func (s *Store) GetBroker(id uint64) (*BrokerInfo, bool)
GetBroker returns a broker by ID.
func (*Store) GetPartition ¶
func (s *Store) GetPartition(topic string, partition int) (*PartitionInfo, bool)
GetPartition returns a partition by topic and partition number.
func (*Store) GetState ¶
func (s *Store) GetState() *ClusterMetadata
GetState returns a clone of the current metadata state.
func (*Store) ListBrokers ¶
func (s *Store) ListBrokers() []*BrokerInfo
ListBrokers returns all brokers.
func (*Store) ListPartitions ¶
func (s *Store) ListPartitions(topic string) []*PartitionInfo
ListPartitions returns all partitions for a topic.
func (*Store) RegisterBroker ¶
func (s *Store) RegisterBroker(ctx context.Context, broker *BrokerInfo) error
RegisterBroker registers a new broker in the cluster.
func (*Store) UnregisterBroker ¶
UnregisterBroker unregisters a broker from the cluster.
func (*Store) UpdateBroker ¶
func (s *Store) UpdateBroker(ctx context.Context, brokerID uint64, status BrokerStatus, resources BrokerResources) error
UpdateBroker updates broker information.
func (*Store) UpdateLeader ¶
func (s *Store) UpdateLeader(ctx context.Context, topic string, partition int, leader, leaderEpoch uint64) error
UpdateLeader updates the leader for a partition.
func (*Store) UpdatePartition ¶
func (s *Store) UpdatePartition(ctx context.Context, partition *PartitionInfo) error
UpdatePartition updates a partition.
func (*Store) UpdateReplicaList ¶
func (s *Store) UpdateReplicaList(ctx context.Context, topic string, partition int, replicas []uint64) error
UpdateReplicaList updates the replica list for a partition.
func (*Store) UpdateTopic ¶
UpdateTopic updates topic configuration.
type TopicConfig ¶
type TopicConfig struct {
// RetentionMs is the retention time in milliseconds
RetentionMs int64
// RetentionBytes is the retention size in bytes per partition
RetentionBytes int64
// SegmentMs is the segment roll time in milliseconds
SegmentMs int64
// SegmentBytes is the segment size in bytes
SegmentBytes int64
// MinInsyncReplicas is the minimum number of in-sync replicas
MinInsyncReplicas int
// CompressionType is the compression type (none, gzip, snappy, lz4, zstd)
CompressionType string
}
TopicConfig contains topic-specific configuration.
func DefaultTopicConfig ¶
func DefaultTopicConfig() TopicConfig
DefaultTopicConfig returns default topic configuration.
func (TopicConfig) Clone ¶
func (tc TopicConfig) Clone() TopicConfig
Clone creates a copy of the topic config.
type TopicInfo ¶
type TopicInfo struct {
// Name is the topic name
Name string
// NumPartitions is the number of partitions
NumPartitions int
// ReplicationFactor is the replication factor
ReplicationFactor int
// Config contains topic-specific configuration
Config TopicConfig
// CreatedAt is when the topic was created
CreatedAt time.Time
// ModifiedAt is when the topic was last modified
ModifiedAt time.Time
}
TopicInfo represents metadata about a topic.
type UnregisterBrokerOp ¶
type UnregisterBrokerOp struct {
BrokerID uint64
}
UnregisterBrokerOp is the data for OpUnregisterBroker.
type UpdateBrokerOp ¶
type UpdateBrokerOp struct {
BrokerID uint64
Status BrokerStatus
Resources BrokerResources
Heartbeat time.Time
}
UpdateBrokerOp is the data for OpUpdateBroker.
type UpdateISROp ¶
UpdateISROp is the data for OpUpdateISR.
type UpdateLeaderOp ¶
UpdateLeaderOp is the data for OpUpdateLeader.
type UpdatePartitionOp ¶
type UpdatePartitionOp struct {
Partition PartitionInfo
}
UpdatePartitionOp is the data for OpUpdatePartition.
type UpdateReplicaListOp ¶
UpdateReplicaListOp is the data for OpUpdateReplicaList.
type UpdateTopicOp ¶
type UpdateTopicOp struct {
Name string
Config TopicConfig
}
UpdateTopicOp is the data for OpUpdateTopic.