Documentation
¶
Index ¶
- Variables
- func BrokerRegistrationKey(brokerID string) string
- func ConsumerGroupKey(groupID string) string
- func ConsumerGroupPrefix() string
- func ConsumerOffsetKey(groupID, topic string, partition int32) string
- func DecodeConsumerGroup(data []byte) (*metadatapb.ConsumerGroup, error)
- func DecodePartitionState(data []byte) (*metadatapb.PartitionState, error)
- func DecodeTopicConfig(data []byte) (*metadatapb.TopicConfig, error)
- func EncodeConsumerGroup(group *metadatapb.ConsumerGroup) ([]byte, error)
- func EncodePartitionState(state *metadatapb.PartitionState) ([]byte, error)
- func EncodeTopicConfig(cfg *metadatapb.TopicConfig) ([]byte, error)
- func ParseConsumerGroupID(key string) (string, bool)
- func ParseConsumerOffsetKey(key string) (string, string, int32, bool)
- func PartitionAssignmentKey(topic string, partition int32) string
- func PartitionStateKey(topic string, partition int32) string
- func TopicConfigKey(topic string) string
- func TopicIDForName(name string) [16]byte
- type ClusterMetadata
- type ConsumerOffset
- type EtcdStore
- func (s *EtcdStore) Available() bool
- func (s *EtcdStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, ...) error
- func (s *EtcdStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error
- func (s *EtcdStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)
- func (s *EtcdStore) DeleteConsumerGroup(ctx context.Context, groupID string) error
- func (s *EtcdStore) DeleteTopic(ctx context.Context, name string) error
- func (s *EtcdStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)
- func (s *EtcdStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)
- func (s *EtcdStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)
- func (s *EtcdStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)
- func (s *EtcdStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)
- func (s *EtcdStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)
- func (s *EtcdStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error)
- func (s *EtcdStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error
- func (s *EtcdStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error
- func (s *EtcdStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error
- type EtcdStoreConfig
- type InMemoryStore
- func (s *InMemoryStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, ...) error
- func (s *InMemoryStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error
- func (s *InMemoryStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)
- func (s *InMemoryStore) DeleteConsumerGroup(ctx context.Context, groupID string) error
- func (s *InMemoryStore) DeleteTopic(ctx context.Context, name string) error
- func (s *InMemoryStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)
- func (s *InMemoryStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)
- func (s *InMemoryStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)
- func (s *InMemoryStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)
- func (s *InMemoryStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)
- func (s *InMemoryStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)
- func (s *InMemoryStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error)
- func (s *InMemoryStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error
- func (s *InMemoryStore) Update(state ClusterMetadata)
- func (s *InMemoryStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error
- func (s *InMemoryStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error
- type Store
- type TopicSpec
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTopicExists indicates the topic is already present. ErrTopicExists = errors.New("topic already exists") // ErrInvalidTopic indicates the topic specification is invalid. ErrInvalidTopic = errors.New("invalid topic configuration") // ErrUnknownTopic indicates the topic does not exist. ErrUnknownTopic = errors.New("unknown topic") )
var ( ErrStoreUnavailable = errors.New("metadata store unavailable") )
Functions ¶
func BrokerRegistrationKey ¶
BrokerRegistrationKey returns the etcd key for broker liveness data.
func ConsumerGroupKey ¶
ConsumerGroupKey returns the etcd key for a consumer group metadata blob.
func ConsumerGroupPrefix ¶
func ConsumerGroupPrefix() string
ConsumerGroupPrefix returns the etcd prefix for consumer groups.
func ConsumerOffsetKey ¶
ConsumerOffsetKey returns the etcd key holding the committed offset for a partition.
func DecodeConsumerGroup ¶
func DecodeConsumerGroup(data []byte) (*metadatapb.ConsumerGroup, error)
DecodeConsumerGroup parses bytes into a ConsumerGroup struct.
func DecodePartitionState ¶
func DecodePartitionState(data []byte) (*metadatapb.PartitionState, error)
DecodePartitionState parses bytes into a PartitionState.
func DecodeTopicConfig ¶
func DecodeTopicConfig(data []byte) (*metadatapb.TopicConfig, error)
DecodeTopicConfig parses bytes into a TopicConfig object.
func EncodeConsumerGroup ¶
func EncodeConsumerGroup(group *metadatapb.ConsumerGroup) ([]byte, error)
EncodeConsumerGroup serializes a ConsumerGroup.
func EncodePartitionState ¶
func EncodePartitionState(state *metadatapb.PartitionState) ([]byte, error)
EncodePartitionState serializes a PartitionState into bytes.
func EncodeTopicConfig ¶
func EncodeTopicConfig(cfg *metadatapb.TopicConfig) ([]byte, error)
EncodeTopicConfig serializes a TopicConfig into etcd-ready bytes.
func ParseConsumerGroupID ¶
ParseConsumerGroupID extracts a group ID from a metadata key.
func ParseConsumerOffsetKey ¶
ParseConsumerOffsetKey extracts group, topic, and partition from an offset key.
func PartitionAssignmentKey ¶
PartitionAssignmentKey returns the etcd key for the current leader assignment.
func PartitionStateKey ¶
PartitionStateKey returns the etcd key for a partition state object.
func TopicConfigKey ¶
TopicConfigKey returns the etcd key for a topic configuration object.
func TopicIDForName ¶
TopicIDForName returns a stable 16-byte ID derived from a topic name.
Types ¶
type ClusterMetadata ¶
type ClusterMetadata struct {
Brokers []protocol.MetadataBroker
ControllerID int32
Topics []protocol.MetadataTopic
ClusterName *string
ClusterID *string
}
ClusterMetadata describes the Kafka-visible cluster state.
type ConsumerOffset ¶
ConsumerOffset captures a committed offset entry.
type EtcdStore ¶
type EtcdStore struct {
// contains filtered or unexported fields
}
EtcdStore uses etcd for offset persistence while delegating metadata to an in-memory snapshot.
func NewEtcdStore ¶
func NewEtcdStore(ctx context.Context, snapshot ClusterMetadata, cfg EtcdStoreConfig) (*EtcdStore, error)
NewEtcdStore initializes a store backed by etcd.
func (*EtcdStore) CommitConsumerOffset ¶
func (s *EtcdStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, metadata string) error
CommitConsumerOffset implements Store.CommitConsumerOffset.
func (*EtcdStore) CreatePartitions ¶
CreatePartitions expands a topic and writes new partition state entries.
func (*EtcdStore) CreateTopic ¶
func (s *EtcdStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)
CreateTopic currently updates only the in-memory snapshot; the operator is still responsible for reconciling durable topic configuration into etcd/S3.
func (*EtcdStore) DeleteConsumerGroup ¶
DeleteConsumerGroup removes persisted consumer group metadata.
func (*EtcdStore) DeleteTopic ¶
DeleteTopic updates the local snapshot so admin APIs behave consistently.
func (*EtcdStore) FetchConsumerGroup ¶
func (s *EtcdStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)
FetchConsumerGroup loads consumer group metadata from etcd.
func (*EtcdStore) FetchConsumerOffset ¶
func (s *EtcdStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)
FetchConsumerOffset implements Store.FetchConsumerOffset.
func (*EtcdStore) FetchTopicConfig ¶
func (s *EtcdStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)
FetchTopicConfig loads topic configuration from etcd or falls back to defaults.
func (*EtcdStore) ListConsumerGroups ¶
func (s *EtcdStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)
ListConsumerGroups returns all persisted consumer groups.
func (*EtcdStore) ListConsumerOffsets ¶
func (s *EtcdStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)
ListConsumerOffsets returns all committed offsets stored in etcd.
func (*EtcdStore) Metadata ¶
Metadata delegates to the snapshot captured at startup (operator keeps it fresh).
func (*EtcdStore) NextOffset ¶
NextOffset reads the last committed offset from etcd and returns the next offset to assign.
func (*EtcdStore) PutConsumerGroup ¶
func (s *EtcdStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error
PutConsumerGroup persists consumer group metadata in etcd.
func (*EtcdStore) UpdateOffsets ¶
func (s *EtcdStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error
UpdateOffsets stores the next offset (last + 1) so future producers pick up from there.
func (*EtcdStore) UpdateTopicConfig ¶
func (s *EtcdStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error
UpdateTopicConfig persists topic configuration into etcd.
type EtcdStoreConfig ¶
type EtcdStoreConfig struct {
Endpoints []string
Username string
Password string
DialTimeout time.Duration
}
EtcdStoreConfig defines how we connect to etcd for metadata/offsets.
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
InMemoryStore is a simple Store backed by in-process state. Useful for early development and tests.
func NewInMemoryStore ¶
func NewInMemoryStore(state ClusterMetadata) *InMemoryStore
NewInMemoryStore builds an in-memory metadata store with the provided state.
func (*InMemoryStore) CommitConsumerOffset ¶
func (s *InMemoryStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, metadata string) error
CommitConsumerOffset implements Store.CommitConsumerOffset.
func (*InMemoryStore) CreatePartitions ¶
func (s *InMemoryStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error
CreatePartitions implements Store.CreatePartitions.
func (*InMemoryStore) CreateTopic ¶
func (s *InMemoryStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)
CreateTopic implements Store.CreateTopic.
func (*InMemoryStore) DeleteConsumerGroup ¶
func (s *InMemoryStore) DeleteConsumerGroup(ctx context.Context, groupID string) error
DeleteConsumerGroup implements Store.DeleteConsumerGroup.
func (*InMemoryStore) DeleteTopic ¶
func (s *InMemoryStore) DeleteTopic(ctx context.Context, name string) error
DeleteTopic implements Store.DeleteTopic.
func (*InMemoryStore) FetchConsumerGroup ¶
func (s *InMemoryStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)
FetchConsumerGroup implements Store.FetchConsumerGroup.
func (*InMemoryStore) FetchConsumerOffset ¶
func (s *InMemoryStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)
FetchConsumerOffset implements Store.FetchConsumerOffset.
func (*InMemoryStore) FetchTopicConfig ¶
func (s *InMemoryStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)
FetchTopicConfig implements Store.FetchTopicConfig.
func (*InMemoryStore) ListConsumerGroups ¶
func (s *InMemoryStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)
ListConsumerGroups implements Store.ListConsumerGroups.
func (*InMemoryStore) ListConsumerOffsets ¶
func (s *InMemoryStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)
ListConsumerOffsets implements Store.ListConsumerOffsets.
func (*InMemoryStore) Metadata ¶
func (s *InMemoryStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)
Metadata implements Store.
func (*InMemoryStore) NextOffset ¶
func (s *InMemoryStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error)
NextOffset implements Store.NextOffset.
func (*InMemoryStore) PutConsumerGroup ¶
func (s *InMemoryStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error
PutConsumerGroup implements Store.PutConsumerGroup.
func (*InMemoryStore) Update ¶
func (s *InMemoryStore) Update(state ClusterMetadata)
Update swaps the cluster metadata atomically.
func (*InMemoryStore) UpdateOffsets ¶
func (s *InMemoryStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error
UpdateOffsets implements Store.UpdateOffsets.
func (*InMemoryStore) UpdateTopicConfig ¶
func (s *InMemoryStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error
UpdateTopicConfig implements Store.UpdateTopicConfig.
type Store ¶
type Store interface {
// Metadata returns brokers, controller ID, and topics. When topics is non-empty,
// the implementation should filter to that subset and omit missing topics.
Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)
// NextOffset returns the next offset to assign for a topic/partition.
NextOffset(ctx context.Context, topic string, partition int32) (int64, error)
// UpdateOffsets records the last persisted offset so future appends continue from there.
UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error
// CommitConsumerOffset persists a consumer group offset.
CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, metadata string) error
// FetchConsumerOffset retrieves the committed offset for a consumer group partition.
FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)
// ListConsumerOffsets returns all committed consumer offsets.
ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)
// PutConsumerGroup persists consumer group metadata.
PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error
// FetchConsumerGroup retrieves consumer group metadata.
FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)
// ListConsumerGroups returns all stored consumer group metadata.
ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)
// DeleteConsumerGroup removes consumer group metadata.
DeleteConsumerGroup(ctx context.Context, groupID string) error
// FetchTopicConfig returns the stored topic configuration.
FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)
// UpdateTopicConfig persists topic configuration updates.
UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error
// CreatePartitions expands a topic's partition count.
CreatePartitions(ctx context.Context, topic string, partitionCount int32) error
// CreateTopic creates a new topic with the provided specification.
CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)
// DeleteTopic removes a topic and associated offsets.
DeleteTopic(ctx context.Context, name string) error
}
Store exposes read-only access to cluster metadata used by Kafka protocol handlers.