Documentation ¶
Index ¶
- Variables
- func LoadMetrics(zk Handler, bm mapper.BrokerMetaMap) []error
- func PartitionMapFromZK(t []*regexp.Regexp, zk Handler) (*mapper.PartitionMap, error)
- type Config
- type ErrNoNode
- type Handler
- type KafkaConfig
- type KafkaConfigData
- type KafkaConfigKV
- type PartitionState
- type Reassignments
- type SimpleZooKeeperClient
- type Stub
- func (zk *Stub) AddBrokers(b map[int]mapper.BrokerMeta)
- func (zk *Stub) Children(p string) ([]string, error)
- func (zk *Stub) Close()
- func (zk *Stub) Create(p, d string) error
- func (zk *Stub) CreateSequential(a, b string) error
- func (zk *Stub) Delete(p string) error
- func (zk *Stub) Exists(p string) (bool, error)
- func (zk *Stub) Get(p string) ([]byte, error)
- func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, []error)
- func (zk *Stub) GetAllPartitionMeta() (mapper.PartitionMetaMap, error)
- func (zk *Stub) GetBrokerMetrics() (mapper.BrokerMetricsMap, error)
- func (zk *Stub) GetPartitionMap(t string) (*mapper.PartitionMap, error)
- func (zk *Stub) GetPendingDeletion() ([]string, error)
- func (zk *Stub) GetReassignments() Reassignments
- func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error)
- func (zk *Stub) GetTopicMetadata(t string) (TopicMetadata, error)
- func (zk *Stub) GetTopicState(t string) (*mapper.TopicState, error)
- func (zk *Stub) GetTopicStateISR(t string) (TopicStateISR, error)
- func (zk *Stub) GetTopics(ts []*regexp.Regexp) ([]string, error)
- func (zk *Stub) GetUnderReplicated() ([]string, error)
- func (zk *Stub) InitRawClient() error
- func (zk *Stub) ListReassignments() (Reassignments, error)
- func (zk *Stub) MaxMetaAge() (time.Duration, error)
- func (zk *Stub) NextInt(p string) (int32, error)
- func (zk *Stub) Ready() bool
- func (zk *Stub) RemoveBrokers(ids []int)
- func (zk *Stub) Set(p, d string) error
- func (zk *Stub) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
- type StubZnode
- type TopicConfig
- type TopicMetadata
- type TopicStateISR
- type ZKHandler
- func (z *ZKHandler) Children(p string) ([]string, error)
- func (z *ZKHandler) Close()
- func (z *ZKHandler) Create(p string, d string) error
- func (z *ZKHandler) CreateSequential(p string, d string) error
- func (z *ZKHandler) Delete(p string) error
- func (z *ZKHandler) Exists(p string) (bool, error)
- func (z *ZKHandler) Get(p string) ([]byte, error)
- func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, []error)
- func (z *ZKHandler) GetAllPartitionMeta() (mapper.PartitionMetaMap, error)
- func (z *ZKHandler) GetBrokerMetrics() (mapper.BrokerMetricsMap, error)
- func (z *ZKHandler) GetPartitionMap(t string) (*mapper.PartitionMap, error)
- func (z *ZKHandler) GetPendingDeletion() ([]string, error)
- func (z *ZKHandler) GetReassignments() Reassignments
- func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)
- func (z *ZKHandler) GetTopicMetadata(t string) (TopicMetadata, error)
- func (z *ZKHandler) GetTopicState(t string) (*mapper.TopicState, error)
- func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)
- func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error)
- func (z *ZKHandler) GetUnderReplicated() ([]string, error)
- func (z *ZKHandler) ListReassignments() (Reassignments, error)
- func (z *ZKHandler) MaxMetaAge() (time.Duration, error)
- func (z *ZKHandler) NextInt(p string) (int32, error)
- func (z *ZKHandler) Ready() bool
- func (z *ZKHandler) Set(p string, d string) error
- func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidKafkaConfigType error. ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type") )
Functions ¶
func LoadMetrics ¶ added in v4.2.0
func LoadMetrics(zk Handler, bm mapper.BrokerMetaMap) []error
LoadMetrics takes a Handler and fetches stored broker metrics, populating the BrokerMetaMap.
func PartitionMapFromZK ¶
PartitionMapFromZK takes a slice of regexp and finds all matching topics for each. A merged *PartitionMap of all matching topic maps is returned.
Types ¶
type Config ¶
Config holds initialization paramaters for a Handler. Connect is a ZooKeeper connect string. Prefix should reflect any prefix used for Kafka on the reference ZooKeeper cluster (excluding slashes). MetricsPrefix is the prefix used for broker metrics metadata persisted in ZooKeeper.
type ErrNoNode ¶
type ErrNoNode struct {
// contains filtered or unexported fields
}
ErrNoNode error type is specifically for Get method calls where the underlying error type is a zkclient.ErrNoNode.
type Handler ¶
type Handler interface { SimpleZooKeeperClient GetBrokerMetrics() (mapper.BrokerMetricsMap, error) GetTopicState(string) (*mapper.TopicState, error) GetTopicStateISR(string) (TopicStateISR, error) UpdateKafkaConfig(KafkaConfig) ([]bool, error) GetReassignments() Reassignments ListReassignments() (Reassignments, error) GetUnderReplicated() ([]string, error) GetPendingDeletion() ([]string, error) GetTopics([]*regexp.Regexp) ([]string, error) GetTopicConfig(string) (*TopicConfig, error) GetTopicMetadata(string) (TopicMetadata, error) GetAllBrokerMeta(bool) (mapper.BrokerMetaMap, []error) GetAllPartitionMeta() (mapper.PartitionMetaMap, error) MaxMetaAge() (time.Duration, error) GetPartitionMap(string) (*mapper.PartitionMap, error) }
Handler specifies an interface for common Kafka metadata retrieval and configuration methods.
func NewHandler ¶
NewHandler takes a *Config, performs any initialization and returns a Handler.
type KafkaConfig ¶
type KafkaConfig struct { Type string // Topic or broker. Name string // Entity name. Configs []KafkaConfigKV // Config KVs. }
KafkaConfig is used to issue configuration updates to either topics or brokers in ZooKeeper.
type KafkaConfigData ¶
type KafkaConfigData struct { Version int `json:"version"` Config map[string]string `json:"config"` }
KafkaConfigData is used for unmarshalling /config/<type>/<name> data from ZooKeeper.
func NewKafkaConfigData ¶
func NewKafkaConfigData() KafkaConfigData
NewKafkaConfigData creates a KafkaConfigData.
type KafkaConfigKV ¶
type KafkaConfigKV [2]string
KafkaConfigKV is a [2]string{key, value} representing a Kafka configuration.
type PartitionState ¶
type PartitionState struct { Version int `json:"version"` ControllerEpoch int `json:"controller_epoch"` Leader int `json:"leader"` LeaderEpoch int `json:"leader_epoch"` ISR []int `json:"isr"` }
PartitionState is used for unmarshalling json data from a partition state: e.g. /brokers/topics/some-topic/partitions/0/state
type Reassignments ¶
Reassignments is a map of topic:partition:brokers.
func (Reassignments) List ¶ added in v4.1.0
func (r Reassignments) List() []string
List returns a []string of topic names held in the Reassignments.
type SimpleZooKeeperClient ¶
type SimpleZooKeeperClient interface { Exists(string) (bool, error) Create(string, string) error CreateSequential(string, string) error Set(string, string) error Get(string) ([]byte, error) Delete(string) error Children(string) ([]string, error) NextInt(string) (int32, error) Close() Ready() bool }
SimpleZooKeeperClient is an interface that wraps a real ZooKeeper client, obscuring much of the API semantics that are unneeded for a ZooKeeper based Handler implementation.
type Stub ¶
type Stub struct {
// contains filtered or unexported fields
}
Stub stubs the Handler interface.
func (*Stub) AddBrokers ¶
func (zk *Stub) AddBrokers(b map[int]mapper.BrokerMeta)
AddBrokers takes a map of broker ID to BrokerMeta and adds it to the Stub mapper.BrokerMetaMap.
func (*Stub) CreateSequential ¶
CreateSequential stubs CreateSequential.
func (*Stub) GetAllBrokerMeta ¶
func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, []error)
GetAllBrokerMeta stubs GetAllBrokerMeta.
func (*Stub) GetAllPartitionMeta ¶
func (zk *Stub) GetAllPartitionMeta() (mapper.PartitionMetaMap, error)
GetAllPartitionMeta stubs GetAllPartitionMeta.
func (*Stub) GetBrokerMetrics ¶
func (zk *Stub) GetBrokerMetrics() (mapper.BrokerMetricsMap, error)
GetBrokerMetrics stubs GetBrokerMetrics.
func (*Stub) GetPartitionMap ¶
func (zk *Stub) GetPartitionMap(t string) (*mapper.PartitionMap, error)
GetPartitionMap stubs Getmapper.PartitionMap.
func (*Stub) GetPendingDeletion ¶
func (*Stub) GetReassignments ¶
func (zk *Stub) GetReassignments() Reassignments
GetReassignments stubs GetReassignments.
func (*Stub) GetTopicConfig ¶
func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error)
GetTopicConfig stubs GetTopicConfig.
func (*Stub) GetTopicMetadata ¶
func (zk *Stub) GetTopicMetadata(t string) (TopicMetadata, error)
GetTopicMetadata stubs GetTopicMetadata.
func (*Stub) GetTopicState ¶
func (zk *Stub) GetTopicState(t string) (*mapper.TopicState, error)
GetTopicState stubs GetTopicState.
func (*Stub) GetTopicStateISR ¶
func (zk *Stub) GetTopicStateISR(t string) (TopicStateISR, error)
GetTopicStateISR stubs GetTopicStateISR.
func (*Stub) GetUnderReplicated ¶
func (*Stub) InitRawClient ¶
InitRawClient stubs InitRawClient.
func (*Stub) ListReassignments ¶
func (zk *Stub) ListReassignments() (Reassignments, error)
ListReassignments stubs ListReassignments.
func (*Stub) MaxMetaAge ¶
MaxMetaAge stubs MaxMetaAge.
func (*Stub) RemoveBrokers ¶
RemoveBrokers removes the specified IDs from the mapper.BrokerMetaMap. This can be used in testing to simulate brokers leaving the cluster.
func (*Stub) UpdateKafkaConfig ¶
func (zk *Stub) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
UpdateKafkaConfig stubs UpdateKafkaConfig.
type StubZnode ¶
type StubZnode struct {
// contains filtered or unexported fields
}
StubZnode stubs a ZooKeeper znode.
type TopicConfig ¶
TopicConfig is used for unmarshalling /config/topics/<topic> from ZooKeeper.
type TopicMetadata ¶
type TopicMetadata struct { Version int Name string TopicID string `json:"topic_id"` Partitions map[int][]int AddingReplicas map[int][]int `json:"adding_replicas"` RemovingReplicas map[int][]int `json:"removing_replicas"` }
TopicMetadata holds the topic data found in the /brokers/topics/<topic> znode. This is designed for the version 3 fields present in Kafka version ~2.4+.
func (TopicMetadata) Reassignments ¶
func (tm TopicMetadata) Reassignments() Reassignments
Reassignments returns a Reassignments from a given topics TopicMetadata.
type TopicStateISR ¶
type TopicStateISR map[string]PartitionState
TopicStateISR is a map of partition numbers to PartitionState.
type ZKHandler ¶
type ZKHandler struct { Connect string Prefix string MetricsPrefix string // contains filtered or unexported fields }
ZKHandler implements the Handler interface for real ZooKeeper clusters.
func (*ZKHandler) Children ¶
Children takes a path p and returns a list of child znodes and an error if encountered.
func (*ZKHandler) Close ¶
func (z *ZKHandler) Close()
Close calls close on the *ZKHandler. Any additional shutdown cleanup or other tasks should be performed here.
func (*ZKHandler) Create ¶
Create creates the provided path p with the data from the provided string d and returns an error if encountered.
func (*ZKHandler) CreateSequential ¶
CreateSequential takes a path p and data d and creates a sequential znode at p with data d. An error is returned if encountered.
func (*ZKHandler) Exists ¶
Exists takes a path p and returns a bool as to whether the path exists and an error if encountered.
func (*ZKHandler) GetAllBrokerMeta ¶
func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, []error)
GetAllBrokerMeta looks up all registered Kafka brokers and returns their metadata as a mapper.BrokerMetaMap. A withMetrics bool param determines whether we additionally want to fetch stored broker metrics.
func (*ZKHandler) GetAllPartitionMeta ¶
func (z *ZKHandler) GetAllPartitionMeta() (mapper.PartitionMetaMap, error)
GetAllPartitionMeta fetches partition metadata stored in Zookeeper.
func (*ZKHandler) GetBrokerMetrics ¶ added in v4.2.0
func (z *ZKHandler) GetBrokerMetrics() (mapper.BrokerMetricsMap, error)
GetBrokerMetrics fetches broker metrics stored in ZooKeeper and returns a BrokerMetricsMap and an error if encountered.
func (*ZKHandler) GetPartitionMap ¶
func (z *ZKHandler) GetPartitionMap(t string) (*mapper.PartitionMap, error)
GetPartitionMap takes a topic name. If the topic exists, the state of the topic is fetched and returned as a *PartitionMap.
func (*ZKHandler) GetPendingDeletion ¶
GetPendingDeletion returns any topics pending deletion.
func (*ZKHandler) GetReassignments ¶
func (z *ZKHandler) GetReassignments() Reassignments
GetReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments.
func (*ZKHandler) GetTopicConfig ¶
func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)
GetTopicConfig takes a topic name. If the topic exists, the topic config is returned as a *TopicConfig.
func (*ZKHandler) GetTopicMetadata ¶
func (z *ZKHandler) GetTopicMetadata(t string) (TopicMetadata, error)
GetTopicMetadata takes a topic name. If the topic exists, the topic metadata is returned as a TopicMetadata.
func (*ZKHandler) GetTopicState ¶
func (z *ZKHandler) GetTopicState(t string) (*mapper.TopicState, error)
GetTopicState takes a topic name. If the topic exists, the topic state is returned as a *mapper.TopicState.
func (*ZKHandler) GetTopicStateISR ¶
func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)
GetTopicStateISR takes a topic name. If the topic exists, the topic state is returned as a TopicStateISR. GetTopicStateCurrentISR differs from GetTopicState in that the actual, current broker IDs in the ISR are returned for each partition. This method is more expensive due to the need for a call per partition to ZK.
func (*ZKHandler) GetTopics ¶
GetTopics takes a []*regexp.Regexp and returns a []string of all topic names that match any of the provided regex.
func (*ZKHandler) GetUnderReplicated ¶
GetUnderReplicated returns a []string of all under-replicated topics.
func (*ZKHandler) ListReassignments ¶
func (z *ZKHandler) ListReassignments() (Reassignments, error)
ListReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments. ListReassignments is a KIP-455 compatible call for Kafka 2.4 and Kafka cli tools 2.6.
func (*ZKHandler) MaxMetaAge ¶
MaxMetaAge returns the greatest age between the partitionmeta and brokermetrics stuctures.
func (*ZKHandler) NextInt ¶
NextInt works as an atomic int generator. It does this by setting nil value to path p and returns the znode version.
func (*ZKHandler) Ready ¶
Ready returns true if the client is in either state StateConnected or StateHasSession. See https://godoc.org/github.com/go-zookeeper/zk#State.
func (*ZKHandler) UpdateKafkaConfig ¶
func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
UpdateKafkaConfig takes a KafkaConfig with key value pairs of entity config. If the config is changed, a persistent sequential znode is also written to propagate changes (via watches) to all Kafka brokers. This is a Kafka specific behavior; further references are available from the Kafka codebase. A []bool is returned indicating whether the config of the respective index was changed (if a config is updated to the existing value, 'false' is returned) along with any errors encountered. If a config value is set to an empty string (""), the entire config key itself is deleted. This was a convenient method to combine update/delete into a single func.