metadata

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeOperation

func EncodeOperation(op Operation) ([]byte, error)

EncodeOperation encodes an operation to bytes.

func PartitionID

func PartitionID(topic string, partition int) string

PartitionID returns the partition identifier string.

Types

type BatchCreatePartitionsOp

type BatchCreatePartitionsOp struct {
	Partitions []PartitionInfo
}

BatchCreatePartitionsOp is the data for OpBatchCreatePartitions. This allows creating multiple partitions in a single Raft proposal, which is more efficient and prevents leadership instability.

type BrokerInfo

type BrokerInfo struct {
	// ID is the unique broker identifier
	ID uint64

	// Addr is the network address (host:port)
	Addr string

	// Status is the current broker status
	Status BrokerStatus

	// Resources contains resource information (CPU, disk, memory, network)
	Resources BrokerResources

	// Rack is the rack/availability zone (for rack-aware placement)
	Rack string

	// RegisteredAt is when the broker joined the cluster
	RegisteredAt time.Time

	// LastHeartbeat is the last time we received a heartbeat
	LastHeartbeat time.Time
}

BrokerInfo represents metadata about a broker in the cluster.

func (*BrokerInfo) Clone

func (bi *BrokerInfo) Clone() *BrokerInfo

Clone creates a deep copy of the broker info.

func (*BrokerInfo) IsAlive

func (bi *BrokerInfo) IsAlive(timeout time.Duration) bool

IsAlive returns true if the broker is considered alive.

type BrokerResources

type BrokerResources struct {
	// DiskTotal is total disk space in bytes
	DiskTotal uint64

	// DiskUsed is used disk space in bytes
	DiskUsed uint64

	// DiskAvailable is available disk space in bytes
	DiskAvailable uint64

	// NetworkBandwidth is network bandwidth in bytes/sec
	NetworkBandwidth uint64

	// CPUCores is the number of CPU cores
	CPUCores int

	// MemoryTotal is total memory in bytes
	MemoryTotal uint64

	// MemoryUsed is used memory in bytes
	MemoryUsed uint64
}

BrokerResources represents resource information for a broker.

type BrokerStatus

type BrokerStatus string

BrokerStatus represents the status of a broker.

const (
	BrokerStatusAlive       BrokerStatus = "alive"
	BrokerStatusDead        BrokerStatus = "dead"
	BrokerStatusDraining    BrokerStatus = "draining"
	BrokerStatusUnreachable BrokerStatus = "unreachable"
)

type ClusterMetadata

type ClusterMetadata struct {
	// Brokers maps broker ID to broker metadata
	Brokers map[uint64]*BrokerInfo

	// Topics maps topic name to topic metadata
	Topics map[string]*TopicInfo

	// Partitions maps partition ID to partition metadata
	// Key format: "topic:partition" (e.g., "events:0")
	Partitions map[string]*PartitionInfo

	// Version is incremented on every change (for optimistic locking)
	Version uint64

	// LastModified is the timestamp of the last modification
	LastModified time.Time
}

ClusterMetadata represents all metadata for the StreamBus cluster. This is the root state that is replicated via Raft consensus.

func NewClusterMetadata

func NewClusterMetadata() *ClusterMetadata

NewClusterMetadata creates a new empty cluster metadata.

func (*ClusterMetadata) Clone

func (cm *ClusterMetadata) Clone() *ClusterMetadata

Clone creates a deep copy of the cluster metadata.

type ClusterMetadataStore

type ClusterMetadataStore struct {
	// contains filtered or unexported fields
}

ClusterMetadataStore is an adapter that implements cluster.MetadataStore using the Raft-backed metadata.Store

func NewClusterMetadataStore

func NewClusterMetadataStore(store *Store) *ClusterMetadataStore

NewClusterMetadataStore creates a new adapter for cluster metadata operations

func (*ClusterMetadataStore) DeleteBroker

func (cms *ClusterMetadataStore) DeleteBroker(ctx context.Context, brokerID int32) error

DeleteBroker removes broker metadata

func (*ClusterMetadataStore) GetBrokerMetadata

func (cms *ClusterMetadataStore) GetBrokerMetadata(ctx context.Context, brokerID int32) (*cluster.BrokerMetadata, error)

GetBrokerMetadata retrieves broker metadata

func (*ClusterMetadataStore) GetPartitionAssignment

func (cms *ClusterMetadataStore) GetPartitionAssignment(ctx context.Context, topic string) (*cluster.Assignment, error)

GetPartitionAssignment retrieves partition assignment

func (*ClusterMetadataStore) ListBrokers

func (cms *ClusterMetadataStore) ListBrokers(ctx context.Context) ([]*cluster.BrokerMetadata, error)

ListBrokers lists all brokers

func (*ClusterMetadataStore) StoreBrokerMetadata

func (cms *ClusterMetadataStore) StoreBrokerMetadata(ctx context.Context, broker *cluster.BrokerMetadata) error

StoreBrokerMetadata stores or updates broker metadata via Raft consensus

func (*ClusterMetadataStore) StorePartitionAssignment

func (cms *ClusterMetadataStore) StorePartitionAssignment(ctx context.Context, assignment *cluster.Assignment) error

StorePartitionAssignment stores partition assignment via Raft consensus

type ConsensusNode

type ConsensusNode interface {
	Propose(ctx context.Context, data []byte) error
	IsLeader() bool
	Leader() uint64
}

ConsensusNode is the interface for proposing operations via consensus. This matches the consensus.Node interface.

type CreatePartitionOp

type CreatePartitionOp struct {
	Partition PartitionInfo
}

CreatePartitionOp is the data for OpCreatePartition.

type CreateTopicOp

type CreateTopicOp struct {
	Name              string
	NumPartitions     int
	ReplicationFactor int
	Config            TopicConfig
}

CreateTopicOp is the data for OpCreateTopic.

type DeleteTopicOp

type DeleteTopicOp struct {
	Name string
}

DeleteTopicOp is the data for OpDeleteTopic.

type FSM

type FSM struct {
	// contains filtered or unexported fields
}

FSM implements the Finite State Machine for cluster metadata. It applies operations to the metadata state and supports snapshots.

func NewFSM

func NewFSM() *FSM

NewFSM creates a new metadata FSM.

func (*FSM) Apply

func (f *FSM) Apply(data []byte) error

Apply applies a committed log entry to the state machine. This method is called by the consensus layer when entries are committed.

func (*FSM) GetBroker

func (f *FSM) GetBroker(id uint64) (*BrokerInfo, bool)

GetBroker returns a broker by ID.

func (*FSM) GetPartition

func (f *FSM) GetPartition(topic string, partition int) (*PartitionInfo, bool)

GetPartition returns a partition by topic and partition number.

func (*FSM) GetState

func (f *FSM) GetState() *ClusterMetadata

GetState returns a clone of the current state. This is safe to call concurrently with Apply.

func (*FSM) GetTopic

func (f *FSM) GetTopic(name string) (*TopicInfo, bool)

GetTopic returns a topic by name.

func (*FSM) ListBrokers

func (f *FSM) ListBrokers() []*BrokerInfo

ListBrokers returns all brokers.

func (*FSM) ListPartitions

func (f *FSM) ListPartitions(topic string) []*PartitionInfo

ListPartitions returns all partitions for a topic.

func (*FSM) ListTopics

func (f *FSM) ListTopics() []*TopicInfo

ListTopics returns all topics.

func (*FSM) Restore

func (f *FSM) Restore(snapshot []byte) error

Restore restores the state from a snapshot.

func (*FSM) SetLogger

func (f *FSM) SetLogger(l Logger)

SetLogger sets the logger for the FSM

func (*FSM) Snapshot

func (f *FSM) Snapshot() ([]byte, error)

Snapshot returns the current state as a snapshot.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger defines the interface for FSM logging

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger implements Logger but does nothing (for testing)

func (*NoOpLogger) Printf

func (l *NoOpLogger) Printf(format string, v ...interface{})

type Operation

type Operation struct {
	// Type is the operation type
	Type OperationType

	// Data is the operation-specific data (JSON encoded)
	Data []byte

	// Timestamp is when the operation was created
	Timestamp time.Time
}

Operation represents a metadata operation to be replicated via Raft.

func DecodeOperation

func DecodeOperation(data []byte) (*Operation, error)

DecodeOperation decodes an operation from bytes.

type OperationType

type OperationType string

OperationType represents the type of metadata operation.

const (
	// Broker operations
	OpRegisterBroker   OperationType = "register_broker"
	OpUnregisterBroker OperationType = "unregister_broker"
	OpUpdateBroker     OperationType = "update_broker"

	// Topic operations
	OpCreateTopic OperationType = "create_topic"
	OpDeleteTopic OperationType = "delete_topic"
	OpUpdateTopic OperationType = "update_topic"

	// Partition operations
	OpCreatePartition   OperationType = "create_partition"
	OpUpdatePartition   OperationType = "update_partition"
	OpUpdateLeader      OperationType = "update_leader"
	OpUpdateISR         OperationType = "update_isr"
	OpAddReplica        OperationType = "add_replica"
	OpRemoveReplica     OperationType = "remove_replica"
	OpUpdateReplicaList OperationType = "update_replica_list"

	// Batch operations
	OpBatchCreatePartitions OperationType = "batch_create_partitions"
)

type PartitionInfo

type PartitionInfo struct {
	// Topic is the topic name
	Topic string

	// Partition is the partition number
	Partition int

	// Leader is the broker ID of the leader (0 if no leader)
	Leader uint64

	// Replicas is the list of all replica broker IDs
	Replicas []uint64

	// ISR is the list of in-sync replica broker IDs
	ISR []uint64

	// OfflineReplicas is the list of offline replica broker IDs
	OfflineReplicas []uint64

	// LeaderEpoch is incremented each time leadership changes
	LeaderEpoch uint64

	// CreatedAt is when the partition was created
	CreatedAt time.Time

	// ModifiedAt is when the partition was last modified
	ModifiedAt time.Time
}

PartitionInfo represents metadata about a partition.

func (*PartitionInfo) Clone

func (pi *PartitionInfo) Clone() *PartitionInfo

Clone creates a deep copy of the partition info.

func (*PartitionInfo) IsInISR

func (pi *PartitionInfo) IsInISR(brokerID uint64) bool

IsInISR returns true if the given broker is in the ISR.

func (*PartitionInfo) IsLeader

func (pi *PartitionInfo) IsLeader(brokerID uint64) bool

IsLeader returns true if the given broker is the leader.

func (*PartitionInfo) IsReplica

func (pi *PartitionInfo) IsReplica(brokerID uint64) bool

IsReplica returns true if the given broker is a replica.

func (*PartitionInfo) PartitionID

func (pi *PartitionInfo) PartitionID() string

PartitionID returns the partition identifier (topic:partition).

type RegisterBrokerOp

type RegisterBrokerOp struct {
	Broker BrokerInfo
}

RegisterBrokerOp is the data for OpRegisterBroker.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store provides a high-level API for managing cluster metadata. It wraps the FSM and integrates with the consensus layer.

func NewStore

func NewStore(fsm *FSM, consensus ConsensusNode) *Store

NewStore creates a new metadata store with the given FSM and consensus node.

func (*Store) AllocatePartitions

func (s *Store) AllocatePartitions(topic string, numPartitions, replicationFactor int) ([]*PartitionInfo, error)

AllocatePartitions assigns partitions to brokers based on the allocation strategy. This is a helper method that creates partition assignments.

func (*Store) BatchCreatePartitions

func (s *Store) BatchCreatePartitions(ctx context.Context, partitions []*PartitionInfo) error

BatchCreatePartitions creates multiple partitions atomically in a single Raft proposal. This is more efficient than calling CreatePartition multiple times and prevents leadership instability under rapid operations.

func (*Store) CreatePartition

func (s *Store) CreatePartition(ctx context.Context, partition *PartitionInfo) error

CreatePartition creates a new partition.

func (*Store) CreateTopic

func (s *Store) CreateTopic(ctx context.Context, name string, numPartitions, replicationFactor int, config TopicConfig) error

CreateTopic creates a new topic.

func (*Store) DeleteTopic

func (s *Store) DeleteTopic(ctx context.Context, name string) error

DeleteTopic deletes a topic.

func (*Store) GetBroker

func (s *Store) GetBroker(id uint64) (*BrokerInfo, bool)

GetBroker returns a broker by ID.

func (*Store) GetFSM

func (s *Store) GetFSM() *FSM

GetFSM returns the underlying FSM (for consensus integration).

func (*Store) GetPartition

func (s *Store) GetPartition(topic string, partition int) (*PartitionInfo, bool)

GetPartition returns a partition by topic and partition number.

func (*Store) GetState

func (s *Store) GetState() *ClusterMetadata

GetState returns a clone of the current metadata state.

func (*Store) GetTopic

func (s *Store) GetTopic(name string) (*TopicInfo, bool)

GetTopic returns a topic by name.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

IsLeader returns true if the local node is the leader.

func (*Store) Leader

func (s *Store) Leader() uint64

Leader returns the ID of the current leader.

func (*Store) ListBrokers

func (s *Store) ListBrokers() []*BrokerInfo

ListBrokers returns all brokers.

func (*Store) ListPartitions

func (s *Store) ListPartitions(topic string) []*PartitionInfo

ListPartitions returns all partitions for a topic.

func (*Store) ListTopics

func (s *Store) ListTopics() []*TopicInfo

ListTopics returns all topics.

func (*Store) RegisterBroker

func (s *Store) RegisterBroker(ctx context.Context, broker *BrokerInfo) error

RegisterBroker registers a new broker in the cluster.

func (*Store) UnregisterBroker

func (s *Store) UnregisterBroker(ctx context.Context, brokerID uint64) error

UnregisterBroker unregisters a broker from the cluster.

func (*Store) UpdateBroker

func (s *Store) UpdateBroker(ctx context.Context, brokerID uint64, status BrokerStatus, resources BrokerResources) error

UpdateBroker updates broker information.

func (*Store) UpdateISR

func (s *Store) UpdateISR(ctx context.Context, topic string, partition int, isr []uint64) error

UpdateISR updates the ISR for a partition.

func (*Store) UpdateLeader

func (s *Store) UpdateLeader(ctx context.Context, topic string, partition int, leader, leaderEpoch uint64) error

UpdateLeader updates the leader for a partition.

func (*Store) UpdatePartition

func (s *Store) UpdatePartition(ctx context.Context, partition *PartitionInfo) error

UpdatePartition updates a partition.

func (*Store) UpdateReplicaList

func (s *Store) UpdateReplicaList(ctx context.Context, topic string, partition int, replicas []uint64) error

UpdateReplicaList updates the replica list for a partition.

func (*Store) UpdateTopic

func (s *Store) UpdateTopic(ctx context.Context, name string, config TopicConfig) error

UpdateTopic updates topic configuration.

type TopicConfig

type TopicConfig struct {
	// RetentionMs is the retention time in milliseconds
	RetentionMs int64

	// RetentionBytes is the retention size in bytes per partition
	RetentionBytes int64

	// SegmentMs is the segment roll time in milliseconds
	SegmentMs int64

	// SegmentBytes is the segment size in bytes
	SegmentBytes int64

	// MinInsyncReplicas is the minimum number of in-sync replicas
	MinInsyncReplicas int

	// CompressionType is the compression type (none, gzip, snappy, lz4, zstd)
	CompressionType string
}

TopicConfig contains topic-specific configuration.

func DefaultTopicConfig

func DefaultTopicConfig() TopicConfig

DefaultTopicConfig returns default topic configuration.

func (TopicConfig) Clone

func (tc TopicConfig) Clone() TopicConfig

Clone creates a copy of the topic config.

type TopicInfo

type TopicInfo struct {
	// Name is the topic name
	Name string

	// NumPartitions is the number of partitions
	NumPartitions int

	// ReplicationFactor is the replication factor
	ReplicationFactor int

	// Config contains topic-specific configuration
	Config TopicConfig

	// CreatedAt is when the topic was created
	CreatedAt time.Time

	// ModifiedAt is when the topic was last modified
	ModifiedAt time.Time
}

TopicInfo represents metadata about a topic.

func (*TopicInfo) Clone

func (ti *TopicInfo) Clone() *TopicInfo

Clone creates a deep copy of the topic info.

type UnregisterBrokerOp

type UnregisterBrokerOp struct {
	BrokerID uint64
}

UnregisterBrokerOp is the data for OpUnregisterBroker.

type UpdateBrokerOp

type UpdateBrokerOp struct {
	BrokerID  uint64
	Status    BrokerStatus
	Resources BrokerResources
	Heartbeat time.Time
}

UpdateBrokerOp is the data for OpUpdateBroker.

type UpdateISROp

type UpdateISROp struct {
	Topic     string
	Partition int
	ISR       []uint64
}

UpdateISROp is the data for OpUpdateISR.

type UpdateLeaderOp

type UpdateLeaderOp struct {
	Topic       string
	Partition   int
	Leader      uint64
	LeaderEpoch uint64
}

UpdateLeaderOp is the data for OpUpdateLeader.

type UpdatePartitionOp

type UpdatePartitionOp struct {
	Partition PartitionInfo
}

UpdatePartitionOp is the data for OpUpdatePartition.

type UpdateReplicaListOp

type UpdateReplicaListOp struct {
	Topic     string
	Partition int
	Replicas  []uint64
}

UpdateReplicaListOp is the data for OpUpdateReplicaList.

type UpdateTopicOp

type UpdateTopicOp struct {
	Name   string
	Config TopicConfig
}

UpdateTopicOp is the data for OpUpdateTopic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL