metadata

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTopicExists indicates the topic is already present.
	ErrTopicExists = errors.New("topic already exists")
	// ErrInvalidTopic indicates the topic specification is invalid.
	ErrInvalidTopic = errors.New("invalid topic configuration")
	// ErrUnknownTopic indicates the topic does not exist.
	ErrUnknownTopic = errors.New("unknown topic")
)
View Source
var (
	// ErrStoreUnavailable is returned when the metadata store cannot be reached.
	ErrStoreUnavailable = errors.New("metadata store unavailable")
)

Functions

func BrokerRegistrationKey

func BrokerRegistrationKey(brokerID string) string

BrokerRegistrationKey returns the etcd key for broker liveness data.

func ConsumerGroupKey

func ConsumerGroupKey(groupID string) string

ConsumerGroupKey returns the etcd key for a consumer group metadata blob.

func ConsumerGroupPrefix

func ConsumerGroupPrefix() string

ConsumerGroupPrefix returns the etcd prefix for consumer groups.

func ConsumerOffsetKey

func ConsumerOffsetKey(groupID, topic string, partition int32) string

ConsumerOffsetKey returns the etcd key holding the committed offset for a partition.

func DecodeConsumerGroup

func DecodeConsumerGroup(data []byte) (*metadatapb.ConsumerGroup, error)

DecodeConsumerGroup parses bytes into a ConsumerGroup struct.

func DecodePartitionState

func DecodePartitionState(data []byte) (*metadatapb.PartitionState, error)

DecodePartitionState parses bytes into a PartitionState.

func DecodeTopicConfig

func DecodeTopicConfig(data []byte) (*metadatapb.TopicConfig, error)

DecodeTopicConfig parses bytes into a TopicConfig object.

func EncodeConsumerGroup

func EncodeConsumerGroup(group *metadatapb.ConsumerGroup) ([]byte, error)

EncodeConsumerGroup serializes a ConsumerGroup.

func EncodePartitionState

func EncodePartitionState(state *metadatapb.PartitionState) ([]byte, error)

EncodePartitionState serializes a PartitionState into bytes.

func EncodeTopicConfig

func EncodeTopicConfig(cfg *metadatapb.TopicConfig) ([]byte, error)

EncodeTopicConfig serializes a TopicConfig into etcd-ready bytes.

func ParseConsumerGroupID

func ParseConsumerGroupID(key string) (string, bool)

ParseConsumerGroupID extracts a group ID from a metadata key.

func ParseConsumerOffsetKey

func ParseConsumerOffsetKey(key string) (string, string, int32, bool)

ParseConsumerOffsetKey extracts group, topic, and partition from an offset key.

func PartitionAssignmentKey

func PartitionAssignmentKey(topic string, partition int32) string

PartitionAssignmentKey returns the etcd key for the current leader assignment.

func PartitionStateKey

func PartitionStateKey(topic string, partition int32) string

PartitionStateKey returns the etcd key for a partition state object.

func TopicConfigKey

func TopicConfigKey(topic string) string

TopicConfigKey returns the etcd key for a topic configuration object.

func TopicIDForName

func TopicIDForName(name string) [16]byte

TopicIDForName returns a stable 16-byte ID derived from a topic name.

Types

type ClusterMetadata

type ClusterMetadata struct {
	Brokers      []protocol.MetadataBroker
	ControllerID int32
	Topics       []protocol.MetadataTopic
	ClusterName  *string
	ClusterID    *string
}

ClusterMetadata describes the Kafka-visible cluster state.

type ConsumerOffset

type ConsumerOffset struct {
	Group     string
	Topic     string
	Partition int32
	Offset    int64
}

ConsumerOffset captures a committed offset entry.

type EtcdStore

type EtcdStore struct {
	// contains filtered or unexported fields
}

EtcdStore uses etcd for offset persistence while delegating metadata to an in-memory snapshot.

func NewEtcdStore

func NewEtcdStore(ctx context.Context, snapshot ClusterMetadata, cfg EtcdStoreConfig) (*EtcdStore, error)

NewEtcdStore initializes a store backed by etcd.

func (*EtcdStore) Available

func (s *EtcdStore) Available() bool

Available reports whether the most recent etcd operation succeeded.

func (*EtcdStore) CommitConsumerOffset

func (s *EtcdStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, metadata string) error

CommitConsumerOffset implements Store.CommitConsumerOffset.

func (*EtcdStore) CreatePartitions

func (s *EtcdStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error

CreatePartitions expands a topic and writes new partition state entries.

func (*EtcdStore) CreateTopic

func (s *EtcdStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)

CreateTopic currently updates only the in-memory snapshot; the operator is still responsible for reconciling durable topic configuration into etcd/S3.

func (*EtcdStore) DeleteConsumerGroup

func (s *EtcdStore) DeleteConsumerGroup(ctx context.Context, groupID string) error

DeleteConsumerGroup removes persisted consumer group metadata.

func (*EtcdStore) DeleteTopic

func (s *EtcdStore) DeleteTopic(ctx context.Context, name string) error

DeleteTopic updates the local snapshot so admin APIs behave consistently.

func (*EtcdStore) FetchConsumerGroup

func (s *EtcdStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)

FetchConsumerGroup loads consumer group metadata from etcd.

func (*EtcdStore) FetchConsumerOffset

func (s *EtcdStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)

FetchConsumerOffset implements Store.FetchConsumerOffset.

func (*EtcdStore) FetchTopicConfig

func (s *EtcdStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)

FetchTopicConfig loads topic configuration from etcd or falls back to defaults.

func (*EtcdStore) ListConsumerGroups

func (s *EtcdStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)

ListConsumerGroups returns all persisted consumer groups.

func (*EtcdStore) ListConsumerOffsets

func (s *EtcdStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)

ListConsumerOffsets returns all committed offsets stored in etcd.

func (*EtcdStore) Metadata

func (s *EtcdStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)

Metadata delegates to the snapshot captured at startup (operator keeps it fresh).

func (*EtcdStore) NextOffset

func (s *EtcdStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error)

NextOffset reads the last committed offset from etcd and returns the next offset to assign.

func (*EtcdStore) PutConsumerGroup

func (s *EtcdStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error

PutConsumerGroup persists consumer group metadata in etcd.

func (*EtcdStore) UpdateOffsets

func (s *EtcdStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error

UpdateOffsets stores the next offset (last + 1) so future producers pick up from there.

func (*EtcdStore) UpdateTopicConfig

func (s *EtcdStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error

UpdateTopicConfig persists topic configuration into etcd.

type EtcdStoreConfig

type EtcdStoreConfig struct {
	Endpoints   []string
	Username    string
	Password    string
	DialTimeout time.Duration
}

EtcdStoreConfig defines how we connect to etcd for metadata/offsets.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is a simple Store backed by in-process state. Useful for early development and tests.

func NewInMemoryStore

func NewInMemoryStore(state ClusterMetadata) *InMemoryStore

NewInMemoryStore builds an in-memory metadata store with the provided state.

func (*InMemoryStore) CommitConsumerOffset

func (s *InMemoryStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, metadata string) error

CommitConsumerOffset implements Store.CommitConsumerOffset.

func (*InMemoryStore) CreatePartitions

func (s *InMemoryStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error

CreatePartitions implements Store.CreatePartitions.

func (*InMemoryStore) CreateTopic

func (s *InMemoryStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)

CreateTopic implements Store.CreateTopic.

func (*InMemoryStore) DeleteConsumerGroup

func (s *InMemoryStore) DeleteConsumerGroup(ctx context.Context, groupID string) error

DeleteConsumerGroup implements Store.DeleteConsumerGroup.

func (*InMemoryStore) DeleteTopic

func (s *InMemoryStore) DeleteTopic(ctx context.Context, name string) error

DeleteTopic implements Store.DeleteTopic.

func (*InMemoryStore) FetchConsumerGroup

func (s *InMemoryStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)

FetchConsumerGroup implements Store.FetchConsumerGroup.

func (*InMemoryStore) FetchConsumerOffset

func (s *InMemoryStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)

FetchConsumerOffset implements Store.FetchConsumerOffset.

func (*InMemoryStore) FetchTopicConfig

func (s *InMemoryStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)

FetchTopicConfig implements Store.FetchTopicConfig.

func (*InMemoryStore) ListConsumerGroups

func (s *InMemoryStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)

ListConsumerGroups implements Store.ListConsumerGroups.

func (*InMemoryStore) ListConsumerOffsets

func (s *InMemoryStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)

ListConsumerOffsets implements Store.ListConsumerOffsets.

func (*InMemoryStore) Metadata

func (s *InMemoryStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)

Metadata implements Store.

func (*InMemoryStore) NextOffset

func (s *InMemoryStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error)

NextOffset implements Store.NextOffset.

func (*InMemoryStore) PutConsumerGroup

func (s *InMemoryStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error

PutConsumerGroup implements Store.PutConsumerGroup.

func (*InMemoryStore) Update

func (s *InMemoryStore) Update(state ClusterMetadata)

Update swaps the cluster metadata atomically.

func (*InMemoryStore) UpdateOffsets

func (s *InMemoryStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error

UpdateOffsets implements Store.UpdateOffsets.

func (*InMemoryStore) UpdateTopicConfig

func (s *InMemoryStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error

UpdateTopicConfig implements Store.UpdateTopicConfig.

type Store

type Store interface {
	// Metadata returns brokers, controller ID, and topics. When topics is non-empty,
	// the implementation should filter to that subset and omit missing topics.
	Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error)
	// NextOffset returns the next offset to assign for a topic/partition.
	NextOffset(ctx context.Context, topic string, partition int32) (int64, error)
	// UpdateOffsets records the last persisted offset so future appends continue from there.
	UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error
	// CommitConsumerOffset persists a consumer group offset.
	CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, metadata string) error
	// FetchConsumerOffset retrieves the committed offset for a consumer group partition.
	FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error)
	// ListConsumerOffsets returns all committed consumer offsets.
	ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error)
	// PutConsumerGroup persists consumer group metadata.
	PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error
	// FetchConsumerGroup retrieves consumer group metadata.
	FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error)
	// ListConsumerGroups returns all stored consumer group metadata.
	ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error)
	// DeleteConsumerGroup removes consumer group metadata.
	DeleteConsumerGroup(ctx context.Context, groupID string) error
	// FetchTopicConfig returns the stored topic configuration.
	FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error)
	// UpdateTopicConfig persists topic configuration updates.
	UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error
	// CreatePartitions expands a topic's partition count.
	CreatePartitions(ctx context.Context, topic string, partitionCount int32) error
	// CreateTopic creates a new topic with the provided specification.
	CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error)
	// DeleteTopic removes a topic and associated offsets.
	DeleteTopic(ctx context.Context, name string) error
}

Store exposes read-only access to cluster metadata used by Kafka protocol handlers.

type TopicSpec

type TopicSpec struct {
	Name              string
	NumPartitions     int32
	ReplicationFactor int16
}

TopicSpec describes a topic creation request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL