mapper

package
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

GoDoc

Documentation

Index

Constants

View Source
const (
	// StubBrokerID is the platform int max.
	StubBrokerID int = int(^uint(0) >> 1)
)

Variables

View Source
var (
	// ErrNoBrokers error.
	ErrNoBrokers = errors.New("No additional brokers that meet Constraints")
	// ErrInvalidSelectionMethod error.
	ErrInvalidSelectionMethod = errors.New("Invalid selection method")
)
View Source
var NotReplacedBrokersFn = func(b *Broker) bool { return !b.Replace }
View Source
var ReplacedBrokersFn = func(b *Broker) bool { return b.Replace }

Functions

func WriteMap

func WriteMap(pm *PartitionMap, path string) error

WriteMap takes a *PartitionMap and writes a JSON text file to the provided path.

Types

type Broker

type Broker struct {
	ID          int
	Locality    string
	Used        int
	StorageFree float64
	Replace     bool
	Missing     bool
	New         bool
}

Broker associates metadata with a real broker by ID.

func (Broker) Copy

func (b Broker) Copy() Broker

Copy returns a copy of a Broker.

type BrokerFilterFn

type BrokerFilterFn func(*Broker) bool

BrokerFilterFn is a filter function for BrokerList and BrokerMap types.

var AllBrokersFn BrokerFilterFn = func(b *Broker) bool { return true }

AllBrokersFn returns all brokers.

func AboveMeanFn

func AboveMeanFn(d float64, f func() float64) BrokerFilterFn

AboveMeanFn returns a BrokerFilterFn that filters brokers that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.

func BelowMeanFn

func BelowMeanFn(d float64, f func() float64) BrokerFilterFn

BelowMeanFn returns a BrokerFilterFn that filters brokers that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.

type BrokerList

type BrokerList []*Broker

BrokerList is a slice of brokers for sorting by used count.

func (BrokerList) BestCandidate

func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)

TODO deprecate. BestCandidate takes a *Constraints, selection method and pass / iteration number (for use as a seed value for pseudo-random number generation) and returns the most suitable broker.

func (BrokerList) Filter

func (b BrokerList) Filter(f BrokerFilterFn) BrokerList

Filter returns a BrokerList of brokers that return true as an input to function f.

func (BrokerList) SortByCount

func (b BrokerList) SortByCount()

SortByCount sorts the BrokerList by Used values.

func (BrokerList) SortByID

func (b BrokerList) SortByID()

SortByID sorts the BrokerList by ID values.

func (BrokerList) SortByIDDesc

func (b BrokerList) SortByIDDesc()

SortByIDDesc sorts the BrokerList by ID values.

func (BrokerList) SortByStorage

func (b BrokerList) SortByStorage()

SortByStorage sorts the BrokerList by StorageFree values.

func (BrokerList) SortPseudoShuffle

func (b BrokerList) SortPseudoShuffle(seed int64)

SortPseudoShuffle takes a BrokerList and performs a sort by count. For each sequence of brokers with equal counts, the sub-slice is pseudo random shuffled using the provided seed value s.

type BrokerMap

type BrokerMap map[int]*Broker

BrokerMap holds a mapping of broker IDs to *Broker.

func BrokerMapFromPartitionMap

func BrokerMapFromPartitionMap(pm *PartitionMap, bm BrokerMetaMap, force bool) BrokerMap

BrokerMapFromPartitionMap creates a BrokerMap from a partitionMap.

func NewBrokerMap

func NewBrokerMap() BrokerMap

NewBrokerMap returns a new BrokerMap.

func (BrokerMap) AboveMean

func (b BrokerMap) AboveMean(d float64, f func() float64) []int

AboveMean returns a sorted []int of broker IDs that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.

func (BrokerMap) BelowMean

func (b BrokerMap) BelowMean(d float64, f func() float64) []int

BelowMean returns a sorted []int of broker IDs that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.

func (BrokerMap) Copy

func (b BrokerMap) Copy() BrokerMap

Copy returns a copy of a BrokerMap.

func (BrokerMap) Filter

func (b BrokerMap) Filter(f BrokerFilterFn) BrokerMap

Filter returns a BrokerMap of brokers that return true as an input to function f.

func (BrokerMap) HMean

func (b BrokerMap) HMean() float64

HMean returns the harmonic mean of broker storage free.

func (BrokerMap) List

func (b BrokerMap) List() BrokerList

List take a BrokerMap and returns a BrokerList.

func (BrokerMap) Mean

func (b BrokerMap) Mean() float64

Mean returns the arithmetic mean of broker storage free.

func (BrokerMap) MinMax

func (b BrokerMap) MinMax() (float64, float64)

func (BrokerMap) StorageDiff

func (b BrokerMap) StorageDiff(b2 BrokerMap) map[int][2]float64

StorageDiff takes two BrokerMaps and returns a per broker ID diff in storage as a [2]float64: [absolute, percentage] diff.

func (BrokerMap) StorageRange

func (b BrokerMap) StorageRange() float64

StorageRange returns the range of free storage for all brokers in the BrokerMap.

func (BrokerMap) StorageRangeSpread

func (b BrokerMap) StorageRangeSpread() float64

StorageRangeSpread returns the range spread of free storage for all brokers in the BrokerMap.

func (BrokerMap) StorageStdDev

func (b BrokerMap) StorageStdDev() float64

StorageStdDev returns the standard deviation of free storage for all brokers in the BrokerMap.

func (BrokerMap) SubStorage

func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error

SubStorageAll takes a PartitionMap, PartitionMetaMap, and a function. For all brokers that return true as an input to function f, the size of all partitions held is added back to the broker StorageFree value.

func (BrokerMap) SubstitutionAffinities

func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)

SubstitutionAffinities finds all brokers marked for replacement and foeach broker, it creates an exclusive association with a newly provided brokerIn the rebuild stage, each to-be-replaced broker will be only replaced witthe affinity it's associated with. A given new broker can only be an affinitfor a single outgoing broker. An error is returned if a completmapping of affinities cannot be constructed (e.g. two brokers armarked for replacement but only one new replacement was provideand substitution affinities is enabled).

func (BrokerMap) Update

func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string)

Update takes a []int of broker IDs and BrokerMetaMap then adds them to the BrokerMap, returning the count of marked for replacement, newly included, and brokers that weren't found in ZooKeeper. Additionally, a channel of msgs describing changes is returned.

type BrokerMeta

type BrokerMeta struct {
	StorageFree       float64 // In bytes.
	MetricsIncomplete bool
	// Metadata from the Kafka cluster state.
	Host                       string
	Port                       int
	Rack                       string
	LogMessageFormat           string
	InterBrokerProtocolVersion string
}

BrokerMeta holds metadata that describes a broker.

func (BrokerMeta) Copy

func (bm BrokerMeta) Copy() BrokerMeta

Copy returns a copy of a BrokerMeta.

type BrokerMetaMap

type BrokerMetaMap map[int]*BrokerMeta

BrokerMetaMap is a map of broker IDs to BrokerMeta.

func BrokerMetaMapFromStates added in v4.1.0

func BrokerMetaMapFromStates(states kafkaadmin.BrokerStates) (BrokerMetaMap, error)

BrokerMetaMapFromStates takes a kafkaadmin.BrokerStates and translates it to a BrokerMetaMap.

func (BrokerMetaMap) Copy

func (bmm BrokerMetaMap) Copy() BrokerMetaMap

Copy returns a copy of a BrokerMetaMap.

type BrokerMetrics

type BrokerMetrics struct {
	StorageFree float64
}

BrokerMetrics holds broker metric data fetched from ZK.

type BrokerMetricsMap

type BrokerMetricsMap map[int]*BrokerMetrics

BrokerMetricsMap holds a mapping of broker ID to BrokerMetrics.

type BrokerStatus

type BrokerStatus struct {
	New         int
	Missing     int
	OldMissing  int
	Replace     int
	RackMissing int
}

BrokerStatus summarizes change counts from an input and output broker list.

func (BrokerStatus) Changes

func (bs BrokerStatus) Changes() bool

Changes returns a bool that indicates whether a BrokerStatus values represent a change in brokers.

type BrokerUseStats

type BrokerUseStats struct {
	ID       int
	Leader   int
	Follower int
}

BrokerUseStats holds counts of partition ownership.

type BrokerUseStatsList

type BrokerUseStatsList []*BrokerUseStats

BrokerUseStatsList is a slice of *BrokerUseStats.

func (BrokerUseStatsList) Len

func (b BrokerUseStatsList) Len() int

func (BrokerUseStatsList) Less

func (b BrokerUseStatsList) Less(i, j int) bool

func (BrokerUseStatsList) Swap

func (b BrokerUseStatsList) Swap(i, j int)

type BrokerUseStatsMap

type BrokerUseStatsMap map[int]*BrokerUseStats

BrokerUseStatsList is a map of IDs to *BrokerUseStats.

func (BrokerUseStatsMap) List

List returns a BrokerUseStatsList from a BrokerUseStatsMap.

type Constraints

type Constraints struct {
	// contains filtered or unexported fields
}

Constraints holds a map of IDs and locality key-values.

func MergeConstraints

func MergeConstraints(bl BrokerList) *Constraints

TODO deprecate. MergeConstraints takes a brokerlist and builds a *Constraints by merging the attributes of all brokers from the supplied list.

func NewConstraints

func NewConstraints() *Constraints

NewConstraints returns an empty *Constraints.

func (*Constraints) Add

func (c *Constraints) Add(b *Broker)

Add takes a *Broker and adds its attributes to the *Constraints. The requestSize is also subtracted from the *Broker.StorageFree.

func (*Constraints) MergeConstraints

func (c *Constraints) MergeConstraints(bl BrokerList)

MergeConstraints takes a brokerlist and updates the *Constraints by merging the attributes of all brokers from the supplied list.

func (*Constraints) SelectBroker

func (c *Constraints) SelectBroker(b BrokerList, p ConstraintsParams) (*Broker, error)

SelectBroker takes a BrokerList and a ConstraintsParams and selects the most suitable broker that passes all specified constraints.

type ConstraintsParams

type ConstraintsParams struct {
	SelectorMethod   string
	MinUniqueRackIDs int
	RequestSize      float64
	SeedVal          int64
}

ConstraintsParams holds parameters for the SelectBroker method.

type DegreeDistribution

type DegreeDistribution struct {
	// Relationships is a an adjacency list where an edge between brokers is
	// defined as a common occupancy in at least one replica set. For instance,
	// given the replica set [1001,1002,1003], ID 1002 has a relationship with
	// 1001 and 1003.
	Relationships map[int]map[int]struct{}
}

DegreeDistribution counts broker to broker relationships.

func NewDegreeDistribution

func NewDegreeDistribution() DegreeDistribution

NewDegreeDistribution returns a new DegreeDistribution.

func (DegreeDistribution) Add

func (dd DegreeDistribution) Add(nodes []int)

Add takes a []int of broker IDs representing a replica set and updates the adjacency lists for each broker in the set.

func (DegreeDistribution) Count

func (dd DegreeDistribution) Count(n int) int

Count takes a node ID and returns the degree distribution.

func (DegreeDistribution) Stats

Stats returns a DegreeDistributionStats.

type DegreeDistributionStats

type DegreeDistributionStats struct {
	Min float64
	Max float64
	Avg float64
}

DegreeDistributionStats holds general statistical information describing the DegreeDistribution counts.

type Mappings

type Mappings map[int]map[string]PartitionList

Mappings is a mapping of broker IDs to currently held partition as a PartitionList.

func NewMappings

func NewMappings() Mappings

NewMappings returns a new Mappings.

func (Mappings) LargestPartitions

func (m Mappings) LargestPartitions(id int, k int, pm PartitionMetaMap) (PartitionList, error)

LargestPartitions takes a broker ID and PartitionMetaMap and returns a PartitionList with the top k partitions by size for the provided broker ID.

func (Mappings) Remove

func (m Mappings) Remove(id int, p Partition) error

Remove takes a broker ID and partition and removes the mapping association.

type NoMappingForBroker

type NoMappingForBroker struct {
	// contains filtered or unexported fields
}

NoMappingForBroker error.

func (NoMappingForBroker) Error

func (e NoMappingForBroker) Error() string

type NoMappingForTopic

type NoMappingForTopic struct {
	// contains filtered or unexported fields
}

NoMappingForTopic error.

func (NoMappingForTopic) Error

func (e NoMappingForTopic) Error() string

type Partition

type Partition struct {
	Topic     string `json:"topic"`
	Partition int    `json:"partition"`
	Replicas  []int  `json:"replicas"`
}

Partition represents the Kafka partition structure.

func (Partition) Equal

func (p Partition) Equal(p2 Partition) bool

Equal defines equalty between two Partition objects as an equality of topic, partition and replicas.

type PartitionList

type PartitionList []Partition

PartitionList is a []Partition.

func (PartitionList) Len

func (p PartitionList) Len() int

func (PartitionList) Less

func (p PartitionList) Less(i, j int) bool

func (PartitionList) SortBySize

func (p PartitionList) SortBySize(m PartitionMetaMap)

SortBySize takes a PartitionMetaMap and sorts the PartitionList by partition size.

func (PartitionList) Swap

func (p PartitionList) Swap(i, j int)

type PartitionMap

type PartitionMap struct {
	Version    int           `json:"version"`
	Partitions PartitionList `json:"partitions"`
}

PartitionMap represents the Kafka partition map structure.

func NewPartitionMap

func NewPartitionMap(opts ...PartitionMapOpt) *PartitionMap

NewPartitionMap returns an empty *PartitionMap with optionally provided PartitionMapOpts.

func PartitionMapFromString

func PartitionMapFromString(s string) (*PartitionMap, error)

PartitionMapFromString takes a json encoded string and returns a *PartitionMap.

func PartitionMapFromTopicStates added in v4.1.0

func PartitionMapFromTopicStates(ts kafkaadmin.TopicStates) (*PartitionMap, error)

PartitionMapFromTopicStates translates a kafkaadmin.TopicStates to a *PartitionMap.

func (PartitionMap) BrokersIn

func (pm PartitionMap) BrokersIn() BrokerFilterFn

BrokersIn returns a BrokerFilterFn that filters for brokers in the PartitionMap.

func (*PartitionMap) Copy

func (pm *PartitionMap) Copy() *PartitionMap

Copy returns a copy of a *PartitionMap.

func (*PartitionMap) DegreeDistribution

func (pm *PartitionMap) DegreeDistribution() DegreeDistribution

DegreeDistribution returns the DegreeDistribution for the PartitionMap.

func (*PartitionMap) Equal

func (pm *PartitionMap) Equal(pm2 *PartitionMap) (bool, error)

Equal checks the ity betwee two partition maps. Equality requires that the total order is exactly the same.

func (*PartitionMap) LocalitiesAvailable

func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string

LocalitiesAvailable takes a broker map and broker and returns a []string of localities that are unused by any of the brokers in any replica sets that the reference broker was found in. This is done by building a set of all localities observed across all replica sets and a set of all localities observed in replica sets containing the reference broker, then returning the diff.

func (*PartitionMap) Mappings

func (pm *PartitionMap) Mappings() Mappings

Mappings returns a Mappings from a *PartitionMap.

func (*PartitionMap) OptimizeLeaderFollower

func (pm *PartitionMap) OptimizeLeaderFollower()

OptimizeLeaderFollower is a simple leadership optimization algorithm that iterates over each partition's replica set and sorts brokers according to their leader/follower position ratio, ascending. The idea is that if a broker has a high leader/follower ratio, it should go further down the replica list. This ratio is recalculated at each replica set visited to avoid extreme skew.

func (*PartitionMap) Rebuild

func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error)

Rebuild takes a BrokerMap and rebuild strategy. It then traverses the partition map, replacing brokers marked removal with the best available candidate based on the selected rebuild strategy. A rebuilt *PartitionMap and []error of errors is returned.

func (*PartitionMap) ReplicaSets

func (pm *PartitionMap) ReplicaSets(t string) ReplicaSets

ReplicaSets takes a topic name and returns a ReplicaSets.

func (*PartitionMap) SetReplication

func (pm *PartitionMap) SetReplication(r int)

SetReplication ensures that replica sets is reset to the replication factor r. Sets exceeding r are truncated, sets below r are extended with stub brokers.

func (*PartitionMap) Strip

func (pm *PartitionMap) Strip() *PartitionMap

Strip takes a PartitionMap and returns a copy where all broker ID references are replaced with the stub broker (ID == StubBrokerID) with the replace field is set to true. This ensures that the entire map is rebuilt, even if the provided broker list matches what's already in the map.

func (*PartitionMap) Topics

func (pm *PartitionMap) Topics() []string

Topics returns a []string of topic names held in the PartitionMap.

func (*PartitionMap) UseStats

func (pm *PartitionMap) UseStats() BrokerUseStatsMap

UseStats returns a map of broker IDs to BrokerUseStats; each contains a count of leader and follower partition assignments.

type PartitionMapOpt

type PartitionMapOpt func(*PartitionMap)

PartitionMapOpt is a function that configures a *PartitionMap at instantiation time.

func Populate

func Populate(s string, n, r int) PartitionMapOpt

Populate takes a name, partition count and replication factor and returns a PartitionMapOpt.

type PartitionMeta

type PartitionMeta struct {
	Size float64 // In bytes.
}

PartitionMeta holds partition metadata.

type PartitionMetaMap

type PartitionMetaMap map[string]map[int]*PartitionMeta

PartitionMetaMap is a mapping of topic, partition number to PartitionMeta.

func NewPartitionMetaMap

func NewPartitionMetaMap() PartitionMetaMap

NewPartitionMetaMap returns an empty PartitionMetaMap.

func (PartitionMetaMap) Size

func (pmm PartitionMetaMap) Size(p Partition) (float64, error)

Size takes a Partition and returns the size. An error is returned if the partition isn't in the PartitionMetaMap.

type RebuildParams

type RebuildParams struct {
	PMM              PartitionMetaMap
	BM               BrokerMap
	Strategy         string
	Optimization     string
	Affinities       SubstitutionAffinities
	PartnSzFactor    float64
	MinUniqueRackIDs int
	// contains filtered or unexported fields
}

RebuildParams holds required parameters to call the Rebuild method on a *PartitionMap.

func NewRebuildParams

func NewRebuildParams() RebuildParams

NewRebuildParams initializes a RebuildParams.

type ReplicaSets

type ReplicaSets map[int][]int

ReplicaSets is a mapping of partition number to Partition.Replicas. Take note that there is no topic identifier and that partition numbers from two different topics can overwrite one another.

type Stub

type Stub struct {
	// contains filtered or unexported fields
}

Stub stubs the Handler interface.

func NewZooKeeperStub

func NewZooKeeperStub() *Stub

NewZooKeeperStub returns a stub ZooKeeper.

func (*Stub) GetAllBrokerMeta

func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)

GetAllBrokerMeta stubs GetAllBrokerMeta.

func (*Stub) GetAllPartitionMeta

func (zk *Stub) GetAllPartitionMeta() (PartitionMetaMap, error)

GetAllPartitionMeta stubs GetAllPartitionMeta.

func (*Stub) GetBrokerMetrics

func (zk *Stub) GetBrokerMetrics() (BrokerMetricsMap, error)

GetBrokerMetrics stubs GetBrokerMetrics.

func (*Stub) GetPartitionMap

func (zk *Stub) GetPartitionMap(t string) (*PartitionMap, error)

GetPartitionMap stubs GetPartitionMap.

func (*Stub) GetTopicState

func (zk *Stub) GetTopicState(t string) (*TopicState, error)

GetTopicState stubs GetTopicState.

type StubZnode

type StubZnode struct {
	// contains filtered or unexported fields
}

StubZnode stubs a ZooKeeper znode.

type SubstitutionAffinities

type SubstitutionAffinities map[int]*Broker

SubstitutionAffinities is a mapping of an ID belonging to a *Broker marked for replacement and a replacement *Broker that will fill all previously filled replica slots held by the *Broker being replaced.

func (SubstitutionAffinities) Get

func (sa SubstitutionAffinities) Get(id int) *Broker

Get takes a broker ID and returns a *Broker if one was set as a substitution affinity.

type TopicState

type TopicState struct {
	Partitions map[string][]int `json:"partitions"`
}

TopicState is used for unmarshalling ZooKeeper json data from a topic: e.g. /brokers/topics/some-topic

func (*TopicState) Brokers

func (ts *TopicState) Brokers() []int

Brokers returns a []int of broker IDs from all partitions in the TopicState.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL