kafkazk

package
v3.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2021 License: Apache-2.0 Imports: 15 Imported by: 1

README

GoDoc

Documentation

Index

Constants

View Source
const (
	// StubBrokerID is the platform int max.
	StubBrokerID int = int(^uint(0) >> 1)
)

Variables

View Source
var (
	// ErrNoBrokers error.
	ErrNoBrokers = errors.New("No additional brokers that meet Constraints")
	// ErrInvalidSelectionMethod error.
	ErrInvalidSelectionMethod = errors.New("Invalid selection method")
)
View Source
var (
	// ErrInvalidKafkaConfigType error.
	ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type")
)

Functions

func WriteMap

func WriteMap(pm *PartitionMap, path string) error

WriteMap takes a *PartitionMap and writes a JSON text file to the provided path.

Types

type Broker

type Broker struct {
	ID          int
	Locality    string
	Used        int
	StorageFree float64
	Replace     bool
	Missing     bool
	New         bool
}

Broker associates metadata with a real broker by ID.

func (Broker) Copy

func (b Broker) Copy() Broker

Copy returns a copy of a Broker.

type BrokerFilterFn

type BrokerFilterFn func(*Broker) bool

BrokerFilterFn is a filter function for BrokerList and BrokerMap types.

var AllBrokersFn BrokerFilterFn = func(b *Broker) bool { return true }

AllBrokersFn returns all brokers.

type BrokerList

type BrokerList []*Broker

BrokerList is a slice of brokers for sorting by used count.

func (BrokerList) BestCandidate

func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)

TODO deprecate. BestCandidate takes a *Constraints, selection method and pass / iteration number (for use as a seed value for pseudo-random number generation) and returns the most suitable broker.

func (BrokerList) Filter

func (b BrokerList) Filter(f BrokerFilterFn) BrokerList

Filter returns a BrokerList of brokers that return true as an input to function f.

func (BrokerList) SortByCount

func (b BrokerList) SortByCount()

SortByCount sorts the BrokerList by Used values.

func (BrokerList) SortByID

func (b BrokerList) SortByID()

SortByID sorts the BrokerList by ID values.

func (BrokerList) SortByStorage

func (b BrokerList) SortByStorage()

SortByStorage sorts the BrokerList by StorageFree values.

func (BrokerList) SortPseudoShuffle

func (b BrokerList) SortPseudoShuffle(seed int64)

SortPseudoShuffle takes a BrokerList and performs a sort by count. For each sequence of brokers with equal counts, the sub-slice is pseudo random shuffled using the provided seed value s.

type BrokerMap

type BrokerMap map[int]*Broker

BrokerMap holds a mapping of broker IDs to *Broker.

func BrokerMapFromPartitionMap

func BrokerMapFromPartitionMap(pm *PartitionMap, bm BrokerMetaMap, force bool) BrokerMap

BrokerMapFromPartitionMap creates a BrokerMap from a partitionMap.

func NewBrokerMap

func NewBrokerMap() BrokerMap

NewBrokerMap returns a new BrokerMap.

func (BrokerMap) AboveMean

func (b BrokerMap) AboveMean(d float64, f func() float64) []int

AboveMean returns a sorted []int of broker IDs that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.

func (BrokerMap) BelowMean

func (b BrokerMap) BelowMean(d float64, f func() float64) []int

BelowMean returns a sorted []int of broker IDs that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.

func (BrokerMap) Copy

func (b BrokerMap) Copy() BrokerMap

Copy returns a copy of a BrokerMap.

func (BrokerMap) Filter

func (b BrokerMap) Filter(f BrokerFilterFn) BrokerMap

Filter returns a BrokerMap of brokers that return true as an input to function f.

func (BrokerMap) HMean

func (b BrokerMap) HMean() float64

HMean returns the harmonic mean of broker storage free.

func (BrokerMap) List

func (b BrokerMap) List() BrokerList

List take a BrokerMap and returns a BrokerList.

func (BrokerMap) Mean

func (b BrokerMap) Mean() float64

Mean returns the arithmetic mean of broker storage free.

func (BrokerMap) MinMax

func (b BrokerMap) MinMax() (float64, float64)

func (BrokerMap) StorageDiff

func (b BrokerMap) StorageDiff(b2 BrokerMap) map[int][2]float64

StorageDiff takes two BrokerMaps and returns a per broker ID diff in storage as a [2]float64: [absolute, percentage] diff.

func (BrokerMap) StorageRange

func (b BrokerMap) StorageRange() float64

StorageRange returns the range of free storage for all brokers in the BrokerMap.

func (BrokerMap) StorageRangeSpread

func (b BrokerMap) StorageRangeSpread() float64

StorageRangeSpread returns the range spread of free storage for all brokers in the BrokerMap.

func (BrokerMap) StorageStdDev

func (b BrokerMap) StorageStdDev() float64

StorageStdDev returns the standard deviation of free storage for all brokers in the BrokerMap.

func (BrokerMap) SubStorage

func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f func(*Broker) bool) error

SubStorageAll takes a PartitionMap, PartitionMetaMap, and a function. For all brokers that return true as an input to function f, the size of all partitions held is added back to the broker StorageFree value.

func (BrokerMap) SubstitutionAffinities

func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)

SubstitutionAffinities finds all brokers marked for replacement and for each broker, it creates an exclusive association with a newly provided broker. In the rebuild stage, each to-be-replaced broker will be only replaced with the affinity it's associated with. A given new broker can only be an affinity for a single outgoing broker. An error is returned if a complete mapping of affinities cannot be constructed (e.g. two brokers are marked for replacement but only one new replacement was provided and substitution affinities is enabled).

func (BrokerMap) Update

func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string)

Update takes a []int of broker IDs and BrokerMetaMap then adds them to the BrokerMap, returning the count of marked for replacement, newly included, and brokers that weren't found in ZooKeeper. Additionally, a channel of msgs describing changes is returned.

type BrokerMeta

type BrokerMeta struct {
	StorageFree       float64 // In bytes.
	MetricsIncomplete bool
	// Metadata from ZooKeeper.
	ListenerSecurityProtocolMap map[string]string `json:"listener_security_protocol_map"`
	Endpoints                   []string          `json:"endpoints"`
	Rack                        string            `json:"rack"`
	JMXPort                     int               `json:"jmx_port"`
	Host                        string            `json:"host"`
	Timestamp                   string            `json:"timestamp"`
	Port                        int               `json:"port"`
	Version                     int               `json:"version"`
}

BrokerMeta holds metadata that describes a broker, used in satisfying constraints.

func (BrokerMeta) Copy added in v3.11.0

func (bm BrokerMeta) Copy() BrokerMeta

Copy returns a copy of a BrokerMeta.

type BrokerMetaMap

type BrokerMetaMap map[int]*BrokerMeta

BrokerMetaMap is a map of broker IDs to BrokerMeta metadata fetched from ZooKeeper. Currently, just the rack field is retrieved.

func (BrokerMetaMap) Copy added in v3.11.0

func (bmm BrokerMetaMap) Copy() BrokerMetaMap

Copy returns a copy of a BrokerMetaMap.

type BrokerMetrics

type BrokerMetrics struct {
	StorageFree float64
}

BrokerMetrics holds broker metric data fetched from ZK.

type BrokerMetricsMap

type BrokerMetricsMap map[int]*BrokerMetrics

BrokerMetricsMap holds a mapping of broker ID to BrokerMetrics.

type BrokerStatus

type BrokerStatus struct {
	New         int
	Missing     int
	OldMissing  int
	Replace     int
	RackMissing int
}

BrokerStatus summarizes change counts from an input and output broker list.

func (BrokerStatus) Changes

func (bs BrokerStatus) Changes() bool

Changes returns a bool that indicates whether a BrokerStatus values represent a change in brokers.

type BrokerUseStats

type BrokerUseStats struct {
	ID       int
	Leader   int
	Follower int
}

BrokerUseStats holds counts of partition ownership.

type BrokerUseStatsList

type BrokerUseStatsList []*BrokerUseStats

BrokerUseStatsList is a slice of *BrokerUseStats.

func (BrokerUseStatsList) Len

func (b BrokerUseStatsList) Len() int

func (BrokerUseStatsList) Less

func (b BrokerUseStatsList) Less(i, j int) bool

func (BrokerUseStatsList) Swap

func (b BrokerUseStatsList) Swap(i, j int)

type BrokerUseStatsMap

type BrokerUseStatsMap map[int]*BrokerUseStats

BrokerUseStatsList is a map of IDs to *BrokerUseStats.

func (BrokerUseStatsMap) List

List returns a BrokerUseStatsList from a BrokerUseStatsMap.

type Config

type Config struct {
	Connect       string
	Prefix        string
	MetricsPrefix string
}

Config holds initialization paramaters for a Handler. Connect is a ZooKeeper connect string. Prefix should reflect any prefix used for Kafka on the reference ZooKeeper cluster (excluding slashes). MetricsPrefix is the prefix used for broker metrics metadata persisted in ZooKeeper.

type Constraints

type Constraints struct {
	// contains filtered or unexported fields
}

Constraints holds a map of IDs and locality key-values.

func MergeConstraints

func MergeConstraints(bl BrokerList) *Constraints

TODO deprecate. MergeConstraints takes a brokerlist and builds a *Constraints by merging the attributes of all brokers from the supplied list.

func NewConstraints

func NewConstraints() *Constraints

NewConstraints returns an empty *Constraints.

func (*Constraints) Add

func (c *Constraints) Add(b *Broker)

Add takes a *Broker and adds its attributes to the *Constraints. The requestSize is also subtracted from the *Broker.StorageFree.

func (*Constraints) MergeConstraints

func (c *Constraints) MergeConstraints(bl BrokerList)

MergeConstraints takes a brokerlist and updates the *Constraints by merging the attributes of all brokers from the supplied list.

func (*Constraints) SelectBroker

func (c *Constraints) SelectBroker(b BrokerList, p ConstraintsParams) (*Broker, error)

SelectBroker takes a BrokerList and a ConstraintsParams and selects the most suitable broker that passes all specified constraints.

type ConstraintsParams

type ConstraintsParams struct {
	SelectorMethod   string
	MinUniqueRackIDs int
	RequestSize      float64
	SeedVal          int64
}

ConstraintsParams holds parameters for the SelectBroker method.

type DegreeDistribution

type DegreeDistribution struct {
	// Relationships is a an adjacency list
	// where an edge between brokers is defined as
	// a common occupancy in at least one replica set.
	// For instance, given the replica set [1001,1002,1003],
	// ID 1002 has a relationship with 1001 and 1003.
	Relationships map[int]map[int]struct{}
}

DegreeDistribution counts broker to broker relationships.

func NewDegreeDistribution

func NewDegreeDistribution() DegreeDistribution

NewDegreeDistribution returns a new DegreeDistribution.

func (DegreeDistribution) Add

func (dd DegreeDistribution) Add(nodes []int)

Add takes a []int of broker IDs representing a replica set and updates the adjacency lists for each broker in the set.

func (DegreeDistribution) Count

func (dd DegreeDistribution) Count(n int) int

Count takes a node ID and returns the degree distribution.

func (DegreeDistribution) Stats

Stats returns a DegreeDistributionStats.

type DegreeDistributionStats

type DegreeDistributionStats struct {
	Min float64
	Max float64
	Avg float64
}

DegreeDistributionStats holds general statistical information describing the DegreeDistribution counts.

type ErrNoNode

type ErrNoNode struct {
	// contains filtered or unexported fields
}

ErrNoNode error type is specifically for Get method calls where the underlying error type is a zkclient.ErrNoNode.

func (ErrNoNode) Error

func (e ErrNoNode) Error() string

type Handler

type Handler interface {
	Exists(string) (bool, error)
	Create(string, string) error
	CreateSequential(string, string) error
	Set(string, string) error
	Get(string) ([]byte, error)
	Delete(string) error
	Children(string) ([]string, error)
	NextInt(string) (int32, error)
	Close()
	Ready() bool
	// Kafka specific.
	GetTopicState(string) (*TopicState, error)
	GetTopicStateISR(string) (TopicStateISR, error)
	UpdateKafkaConfig(KafkaConfig) ([]bool, error)
	GetReassignments() Reassignments
	GetUnderReplicated() ([]string, error)
	GetPendingDeletion() ([]string, error)
	GetTopics([]*regexp.Regexp) ([]string, error)
	GetTopicConfig(string) (*TopicConfig, error)
	GetAllBrokerMeta(bool) (BrokerMetaMap, []error)
	GetAllPartitionMeta() (PartitionMetaMap, error)
	MaxMetaAge() (time.Duration, error)
	GetPartitionMap(string) (*PartitionMap, error)
}

Handler provides basic ZooKeeper operations along with calls that return kafkazk types describing Kafka states.

func NewHandler

func NewHandler(c *Config) (Handler, error)

NewHandler takes a *Config, performs any initialization and returns a Handler.

type KafkaConfig

type KafkaConfig struct {
	Type    string          // Topic or broker.
	Name    string          // Entity name.
	Configs []KafkaConfigKV // Config KVs.
}

KafkaConfig is used to issue configuration updates to either topics or brokers in ZooKeeper.

type KafkaConfigData

type KafkaConfigData struct {
	Version int               `json:"version"`
	Config  map[string]string `json:"config"`
}

KafkaConfigData is used for unmarshalling /config/<type>/<name> data from ZooKeeper.

func NewKafkaConfigData

func NewKafkaConfigData() KafkaConfigData

NewKafkaConfigData creates a KafkaConfigData.

type KafkaConfigKV

type KafkaConfigKV [2]string

KafkaConfigKV is a [2]string{key, value} representing a Kafka configuration.

type Mappings

type Mappings map[int]map[string]PartitionList

Mappings is a mapping of broker IDs to currently held partition as a PartitionList.

func NewMappings

func NewMappings() Mappings

NewMappings returns a new Mappings.

func (Mappings) LargestPartitions

func (m Mappings) LargestPartitions(id int, k int, pm PartitionMetaMap) (PartitionList, error)

LargestPartitions takes a broker ID and PartitionMetaMap and returns a PartitionList with the top k partitions by size for the provided broker ID.

func (Mappings) Remove

func (m Mappings) Remove(id int, p Partition) error

Remove takes a broker ID and partition and removes the mapping association.

type NoMappingForBroker

type NoMappingForBroker struct {
	// contains filtered or unexported fields
}

NoMappingForBroker error.

func (NoMappingForBroker) Error

func (e NoMappingForBroker) Error() string

type NoMappingForTopic

type NoMappingForTopic struct {
	// contains filtered or unexported fields
}

NoMappingForTopic error.

func (NoMappingForTopic) Error

func (e NoMappingForTopic) Error() string

type Partition

type Partition struct {
	Topic     string `json:"topic"`
	Partition int    `json:"partition"`
	Replicas  []int  `json:"replicas"`
}

Partition represents the Kafka partition structure.

func (Partition) Equal

func (p Partition) Equal(p2 Partition) bool

Equal defines equalty between two Partition objects as an equality of topic, partition and replicas.

type PartitionList

type PartitionList []Partition

PartitionList is a []Partition.

func (PartitionList) Len

func (p PartitionList) Len() int

func (PartitionList) Less

func (p PartitionList) Less(i, j int) bool

func (PartitionList) SortBySize

func (p PartitionList) SortBySize(m PartitionMetaMap)

SortBySize takes a PartitionMetaMap and sorts the PartitionList by partition size.

func (PartitionList) Swap

func (p PartitionList) Swap(i, j int)

type PartitionMap

type PartitionMap struct {
	Version    int           `json:"version"`
	Partitions PartitionList `json:"partitions"`
}

PartitionMap represents the Kafka partition map structure.

func NewPartitionMap

func NewPartitionMap(opts ...PartitionMapOpt) *PartitionMap

NewPartitionMap returns an empty *PartitionMap with optionally provided PartitionMapOpts.

func PartitionMapFromString

func PartitionMapFromString(s string) (*PartitionMap, error)

PartitionMapFromString takes a json encoded string and returns a *PartitionMap.

func PartitionMapFromZK

func PartitionMapFromZK(t []*regexp.Regexp, zk Handler) (*PartitionMap, error)

PartitionMapFromZK takes a slice of regexp and finds all matching topics for each. A merged *PartitionMap of all matching topic maps is returned.

func (*PartitionMap) Copy

func (pm *PartitionMap) Copy() *PartitionMap

Copy returns a copy of a *PartitionMap.

func (*PartitionMap) DegreeDistribution

func (pm *PartitionMap) DegreeDistribution() DegreeDistribution

DegreeDistribution returns the DegreeDistribution for the PartitionMap.

func (*PartitionMap) Equal

func (pm *PartitionMap) Equal(pm2 *PartitionMap) (bool, error)

Equal checks the ity betwee two partition maps. Equality requires that the total order is exactly the same.

func (*PartitionMap) LocalitiesAvailable

func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string

LocalitiesAvailable takes a broker map and broker and returns a []string of localities that are unused by any of the brokers in any replica sets that the reference broker was found in. This is done by building a set of all localities observed across all replica sets and a set of all localities observed in replica sets containing the reference broker, then returning the diff.

func (*PartitionMap) Mappings

func (pm *PartitionMap) Mappings() Mappings

Mappings returns a Mappings from a *PartitionMap.

func (*PartitionMap) OptimizeLeaderFollower

func (pm *PartitionMap) OptimizeLeaderFollower()

OptimizeLeaderFollower is a simple leadership optimization algorithm that iterates over each partition's replica set and sorts brokers according to their leader/follower position ratio, ascending. The idea is that if a broker has a high leader/follower ratio, it should go further down the replica list. This ratio is recalculated at each replica set visited to avoid extreme skew.

func (*PartitionMap) Rebuild

func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error)

Rebuild takes a BrokerMap and rebuild strategy. It then traverses the partition map, replacing brokers marked removal with the best available candidate based on the selected rebuild strategy. A rebuilt *PartitionMap and []error of errors is returned.

func (*PartitionMap) ReplicaSets

func (pm *PartitionMap) ReplicaSets(t string) ReplicaSets

ReplicaSets takes a topic name and returns a ReplicaSets.

func (*PartitionMap) SetReplication

func (pm *PartitionMap) SetReplication(r int)

SetReplication ensures that replica sets is reset to the replication factor r. Sets exceeding r are truncated, sets below r are extended with stub brokers.

func (*PartitionMap) Strip

func (pm *PartitionMap) Strip() *PartitionMap

Strip takes a PartitionMap and returns a copy where all broker ID references are replaced with the stub broker (ID == StubBrokerID) with the replace field is set to true. This ensures that the entire map is rebuilt, even if the provided broker list matches what's already in the map.

func (*PartitionMap) Topics

func (pm *PartitionMap) Topics() []string

Topics returns a []string of topic names held in the PartitionMap.

func (*PartitionMap) UseStats

func (pm *PartitionMap) UseStats() BrokerUseStatsMap

UseStats returns a map of broker IDs to BrokerUseStats; each contains a count of leader and follower partition assignments.

type PartitionMapOpt

type PartitionMapOpt func(*PartitionMap)

PartitionMapOpt is a function that configures a *PartitionMap at instantiation time.

func Populate

func Populate(s string, n, r int) PartitionMapOpt

Populate takes a name, partition count and replication factor and returns a PartitionMapOpt.

type PartitionMeta

type PartitionMeta struct {
	Size float64 // In bytes.
}

PartitionMeta holds partition metadata.

type PartitionMetaMap

type PartitionMetaMap map[string]map[int]*PartitionMeta

PartitionMetaMap is a mapping of topic, partition number to PartitionMeta.

func NewPartitionMetaMap

func NewPartitionMetaMap() PartitionMetaMap

NewPartitionMetaMap returns an empty PartitionMetaMap.

func (PartitionMetaMap) Size

func (pmm PartitionMetaMap) Size(p Partition) (float64, error)

Size takes a Partition and returns the size. An error is returned if the partition isn't in the PartitionMetaMap.

type PartitionState

type PartitionState struct {
	Version         int   `json:"version"`
	ControllerEpoch int   `json:"controller_epoch"`
	Leader          int   `json:"leader"`
	LeaderEpoch     int   `json:"leader_epoch"`
	ISR             []int `json:"isr"`
}

PartitionState is used for unmarshalling json data from a partition state: e.g. /brokers/topics/some-topic/partitions/0/state

type Reassignments

type Reassignments map[string]map[int][]int

Reassignments is a map of topic:partition:brokers.

type RebuildParams

type RebuildParams struct {
	PMM              PartitionMetaMap
	BM               BrokerMap
	Strategy         string
	Optimization     string
	Affinities       SubstitutionAffinities
	PartnSzFactor    float64
	MinUniqueRackIDs int
	// contains filtered or unexported fields
}

RebuildParams holds required parameters to call the Rebuild method on a *PartitionMap.

func NewRebuildParams

func NewRebuildParams() RebuildParams

NewRebuildParams initializes a RebuildParams.

type ReplicaSets

type ReplicaSets map[int][]int

ReplicaSets is a mapping of partition number to Partition.Replicas. Take note that there is no topic identifier and that partition numbers from two different topics can overwrite one another.

type Stub added in v3.6.0

type Stub struct {
	// contains filtered or unexported fields
}

Stub stubs the Handler interface.

func NewZooKeeperStub added in v3.6.0

func NewZooKeeperStub() *Stub

NewZooKeeperStub returns a stub ZooKeeper.

func (*Stub) AddBrokers added in v3.11.0

func (zk *Stub) AddBrokers(b map[int]BrokerMeta)

AddBrokers takes a map of broker ID to BrokerMeta and adds it to the Stub BrokerMetaMap.

func (*Stub) Children added in v3.6.0

func (zk *Stub) Children(p string) ([]string, error)

Children stubs children.

func (*Stub) Close added in v3.6.0

func (zk *Stub) Close()

Close stubs Close.

func (*Stub) Create added in v3.6.0

func (zk *Stub) Create(p, d string) error

Create stubs Create.

func (*Stub) CreateSequential added in v3.6.0

func (zk *Stub) CreateSequential(a, b string) error

CreateSequential stubs CreateSequential.

func (*Stub) Delete added in v3.6.0

func (zk *Stub) Delete(p string) error

Delete stubs Delete.

func (*Stub) Exists added in v3.6.0

func (zk *Stub) Exists(p string) (bool, error)

Exists stubs Exists.

func (*Stub) Get added in v3.6.0

func (zk *Stub) Get(p string) ([]byte, error)

Get stubs Get.

func (*Stub) GetAllBrokerMeta added in v3.6.0

func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)

GetAllBrokerMeta stubs GetAllBrokerMeta.

func (*Stub) GetAllPartitionMeta added in v3.6.0

func (zk *Stub) GetAllPartitionMeta() (PartitionMetaMap, error)

GetAllPartitionMeta stubs GetAllPartitionMeta.

func (*Stub) GetBrokerMetrics added in v3.6.0

func (zk *Stub) GetBrokerMetrics() (BrokerMetricsMap, error)

GetBrokerMetrics stubs GetBrokerMetrics.

func (*Stub) GetPartitionMap added in v3.6.0

func (zk *Stub) GetPartitionMap(t string) (*PartitionMap, error)

GetPartitionMap stubs GetPartitionMap.

func (*Stub) GetPendingDeletion added in v3.6.0

func (zk *Stub) GetPendingDeletion() ([]string, error)

func (*Stub) GetReassignments added in v3.6.0

func (zk *Stub) GetReassignments() Reassignments

GetReassignments stubs GetReassignments.

func (*Stub) GetTopicConfig added in v3.6.0

func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error)

GetTopicConfig stubs GetTopicConfig.

func (*Stub) GetTopicState added in v3.6.0

func (zk *Stub) GetTopicState(t string) (*TopicState, error)

GetTopicState stubs GetTopicState.

func (*Stub) GetTopicStateISR added in v3.6.0

func (zk *Stub) GetTopicStateISR(t string) (TopicStateISR, error)

GetTopicStateISR stubs GetTopicStateISR.

func (*Stub) GetTopics added in v3.6.0

func (zk *Stub) GetTopics(ts []*regexp.Regexp) ([]string, error)

GetTopics stubs GetTopics.

func (*Stub) GetUnderReplicated added in v3.6.0

func (zk *Stub) GetUnderReplicated() ([]string, error)

func (*Stub) InitRawClient added in v3.6.0

func (zk *Stub) InitRawClient() error

InitRawClient stubs InitRawClient.

func (*Stub) MaxMetaAge added in v3.6.0

func (zk *Stub) MaxMetaAge() (time.Duration, error)

MaxMetaAge stubs MaxMetaAge.

func (*Stub) NextInt added in v3.6.0

func (zk *Stub) NextInt(p string) (int32, error)

func (*Stub) Ready added in v3.6.0

func (zk *Stub) Ready() bool

Ready stubs Ready.

func (*Stub) RemoveBrokers added in v3.11.0

func (zk *Stub) RemoveBrokers(ids []int)

RemoveBrokers removes the specified IDs from the BrokerMetaMap. This can be used in testing to simulate brokers leaving the cluster.

func (*Stub) Set added in v3.6.0

func (zk *Stub) Set(p, d string) error

Set stubs Set.

func (*Stub) UpdateKafkaConfig added in v3.6.0

func (zk *Stub) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)

UpdateKafkaConfig stubs UpdateKafkaConfig.

type StubZnode added in v3.6.0

type StubZnode struct {
	// contains filtered or unexported fields
}

StubZnode stubs a ZooKeeper znode.

type SubstitutionAffinities

type SubstitutionAffinities map[int]*Broker

SubstitutionAffinities is a mapping of an ID belonging to a *Broker marked for replacement and a replacement *Broker that will fill all previously filled replica slots held by the *Broker being replaced.

func (SubstitutionAffinities) Get

func (sa SubstitutionAffinities) Get(id int) *Broker

Get takes a broker ID and returns a *Broker if one was set as a substitution affinity.

type TopicConfig

type TopicConfig struct {
	Version int               `json:"version"`
	Config  map[string]string `json:"config"`
}

TopicConfig is used for unmarshalling /config/topics/<topic> from ZooKeeper.

type TopicState

type TopicState struct {
	Partitions map[string][]int `json:"partitions"`
}

TopicState is used for unmarshalling ZooKeeper json data from a topic: e.g. /brokers/topics/some-topic

func (*TopicState) Brokers added in v3.11.0

func (ts *TopicState) Brokers() []int

Brokers returns a []int of broker IDs from all partitions in the TopicState.

type TopicStateISR

type TopicStateISR map[string]PartitionState

TopicStateISR is a map of partition numbers to PartitionState.

type ZKHandler

type ZKHandler struct {
	Connect       string
	Prefix        string
	MetricsPrefix string
	// contains filtered or unexported fields
}

ZKHandler implements the Handler interface for real ZooKeeper clusters.

func (*ZKHandler) Children

func (z *ZKHandler) Children(p string) ([]string, error)

Children takes a path p and returns a list of child znodes and an error if encountered.

func (*ZKHandler) Close

func (z *ZKHandler) Close()

Close calls close on the *ZKHandler. Any additional shutdown cleanup or other tasks should be performed here.

func (*ZKHandler) Create

func (z *ZKHandler) Create(p string, d string) error

Create creates the provided path p with the data from the provided string d and returns an error if encountered.

func (*ZKHandler) CreateSequential

func (z *ZKHandler) CreateSequential(p string, d string) error

CreateSequential takes a path p and data d and creates a sequential znode at p with data d. An error is returned if encountered.

func (*ZKHandler) Delete

func (z *ZKHandler) Delete(p string) error

Delete deletes the znode at path p.

func (*ZKHandler) Exists

func (z *ZKHandler) Exists(p string) (bool, error)

Exists takes a path p and returns a bool as to whether the path exists and an error if encountered.

func (*ZKHandler) Get

func (z *ZKHandler) Get(p string) ([]byte, error)

Get returns the data from path p.

func (*ZKHandler) GetAllBrokerMeta

func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)

GetAllBrokerMeta looks up all registered Kafka brokers and returns their metadata as a BrokerMetaMap. A withMetrics bool param determines whether we additionally want to fetch stored broker metrics.

func (*ZKHandler) GetAllPartitionMeta

func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error)

GetAllPartitionMeta fetches partition metadata stored in Zookeeper.

func (*ZKHandler) GetPartitionMap

func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error)

GetPartitionMap takes a topic name. If the topic exists, the state of the topic is fetched and returned as a *PartitionMap.

func (*ZKHandler) GetPendingDeletion

func (z *ZKHandler) GetPendingDeletion() ([]string, error)

GetPendingDeletion returns any topics pending deletion.

func (*ZKHandler) GetReassignments

func (z *ZKHandler) GetReassignments() Reassignments

GetReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments.

func (*ZKHandler) GetTopicConfig

func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)

GetTopicConfig takes a topic name. If the topic exists, the topic config is returned as a *TopicConfig.

func (*ZKHandler) GetTopicState

func (z *ZKHandler) GetTopicState(t string) (*TopicState, error)

GetTopicState takes a topic name. If the topic exists, the topic state is returned as a *TopicState.

func (*ZKHandler) GetTopicStateISR

func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)

GetTopicStateISR takes a topic name. If the topic exists, the topic state is returned as a TopicStateISR. GetTopicStateCurrentISR differs from GetTopicState in that the actual, current broker IDs in the ISR are returned for each partition. This method is more expensive due to the need for a call per partition to ZK.

func (*ZKHandler) GetTopics

func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error)

GetTopics takes a []*regexp.Regexp and returns a []string of all topic names that match any of the provided regex.

func (*ZKHandler) GetUnderReplicated added in v3.5.0

func (z *ZKHandler) GetUnderReplicated() ([]string, error)

GetUnderReplicated returns a []string of all under-replicated topics.

func (*ZKHandler) MaxMetaAge

func (z *ZKHandler) MaxMetaAge() (time.Duration, error)

MaxMetaAge returns the greatest age between the partitionmeta and brokermetrics stuctures.

func (*ZKHandler) NextInt added in v3.3.1

func (z *ZKHandler) NextInt(p string) (int32, error)

NextInt works as an atomic int generator. It does this by setting nil value to path p and returns the znode version.

func (*ZKHandler) Ready

func (z *ZKHandler) Ready() bool

Ready returns true if the client is in either state StateConnected or StateHasSession. See https://godoc.org/github.com/go-zookeeper/zk#State.

func (*ZKHandler) Set

func (z *ZKHandler) Set(p string, d string) error

Set sets the data at path p.

func (*ZKHandler) UpdateKafkaConfig

func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)

UpdateKafkaConfig takes a KafkaConfig with key value pairs of entity config. If the config is changed, a persistent sequential znode is also written to propagate changes (via watches) to all Kafka brokers. This is a Kafka specific behavior; further references are available from the Kafka codebase. A []bool is returned indicating whether the config of the respective index was changed (if a config is updated to the existing value, 'false' is returned) along with any errors encountered. If a config value is set to an empty string (""), the entire config key itself is deleted. This was a convenient method to combine update/delete into a single func.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL