Documentation ¶
Index ¶
- Constants
- Variables
- func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)
- func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int
- func BrokerIDs(brokers []BrokerInfo) []int
- func BrokerRacks(brokers []BrokerInfo) map[int]string
- func BrokersPerRack(brokers []BrokerInfo) map[string][]int
- func CheckAssignments(assignments []PartitionAssignment) error
- func DistinctRacks(brokers []BrokerInfo) []string
- func FormatAssignentDiffs(curr []PartitionAssignment, desired []PartitionAssignment, ...) string
- func FormatBrokerMaxPartitions(curr []PartitionAssignment, desired []PartitionAssignment, ...) string
- func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string
- func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string
- func FormatBrokers(brokers []BrokerInfo, full bool) string
- func FormatBrokersPerRack(brokers []BrokerInfo) string
- func FormatConfig(configMap map[string]string) string
- func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string
- func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string
- func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string
- func HasLeaders(topics []TopicInfo) bool
- func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int
- func MaxPartitionsPerBroker(allAssignments ...[]PartitionAssignment) map[int]int
- func MaxReplication(topics []TopicInfo) int
- func NewLeaderPartitions(current []PartitionAssignment, desired []PartitionAssignment) []int
- func ParseBrokerThrottles(brokers []BrokerInfo) ([]BrokerThrottle, []BrokerThrottle, error)
- func ParsePartitionThrottles(topic TopicInfo) ([]PartitionThrottle, []PartitionThrottle, error)
- func PartitionIDs(partitions []PartitionInfo) []int
- func PartitionThrottleConfigEntries(leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle) []kafka.ConfigEntry
- func SameBrokers(a PartitionAssignment, b PartitionAssignment) bool
- func ThrottledBrokerIDs(brokers []BrokerInfo) []int
- func ThrottledTopicNames(topics []TopicInfo) []string
- type AssignmentDiff
- type BrokerInfo
- type BrokerThrottle
- type Client
- func (c *Client) AcquireLock(ctx context.Context, path string) (zk.Lock, error)
- func (c *Client) AddPartitions(ctx context.Context, topic string, newAssignments []PartitionAssignment) error
- func (c *Client) AssignPartitions(ctx context.Context, topic string, assignments []PartitionAssignment) error
- func (c *Client) AssignmentInProgress(ctx context.Context) (bool, error)
- func (c *Client) Close() error
- func (c *Client) CreateTopic(ctx context.Context, config kafka.TopicConfig) error
- func (c *Client) ElectionInProgress(ctx context.Context) (bool, error)
- func (c *Client) GetBootstrapAddrs() []string
- func (c *Client) GetBrokerIDs(ctx context.Context) ([]int, error)
- func (c *Client) GetBrokerPartitions(ctx context.Context, names []string) ([]PartitionInfo, error)
- func (c *Client) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)
- func (c *Client) GetClusterID(ctx context.Context) (string, error)
- func (c *Client) GetControllerAddr(ctx context.Context) (string, error)
- func (c *Client) GetTopic(ctx context.Context, name string, detailed bool) (TopicInfo, error)
- func (c *Client) GetTopicNames(ctx context.Context) ([]string, error)
- func (c *Client) GetTopics(ctx context.Context, names []string, detailed bool) ([]TopicInfo, error)
- func (c *Client) LockHeld(ctx context.Context, path string) (bool, error)
- func (c *Client) RunLeaderElection(ctx context.Context, topic string, partitions []int) error
- func (c *Client) UpdateBrokerConfig(ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool) ([]string, error)
- func (c *Client) UpdateTopicConfig(ctx context.Context, name string, configEntries []kafka.ConfigEntry, ...) ([]string, error)
- type ClientConfig
- type PartitionAssignment
- type PartitionInfo
- type PartitionThrottle
- type TopicInfo
- func (t TopicInfo) AllLeadersCorrect() bool
- func (t TopicInfo) AllReplicasInSync() bool
- func (t TopicInfo) IsThrottled() bool
- func (t TopicInfo) MaxISR() int
- func (t TopicInfo) MaxReplication() int
- func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo
- func (t TopicInfo) PartitionIDs() []int
- func (t TopicInfo) RackCounts(brokerRacks map[int]string) (int, int, error)
- func (t TopicInfo) Retention() time.Duration
- func (t TopicInfo) ToAssignments() []PartitionAssignment
- func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo
Constants ¶
const ( // RetentionKey is the config key used for topic time retention. RetentionKey = "retention.ms" // LeaderThrottledKey is the config key for the leader throttle rate. LeaderThrottledKey = "leader.replication.throttled.rate" // FollowerThrottledKey is the config key for the follower throttle rate. FollowerThrottledKey = "follower.replication.throttled.rate" // LeaderReplicasThrottledKey is the config key for the list of leader replicas // that should be throttled. LeaderReplicasThrottledKey = "leader.replication.throttled.replicas" // FollowerReplicasThrottledKey is the config key for the list of follower replicas // that should be throttled. FollowerReplicasThrottledKey = "follower.replication.throttled.replicas" )
Variables ¶
var ( // ErrTopicDoesNotExist is returned by admin functions when a topic that should exist // does not. ErrTopicDoesNotExist = errors.New("Topic does not exist") )
Functions ¶
func AssignmentsToReplicas ¶
func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)
AssignmentsToReplicas is the inverse of ReplicasToAssignments. Used for unit tests.
func BrokerCountsPerRack ¶
func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int
BrokerCountsPerRack returns a mapping of rack -> number of brokers.
func BrokerIDs ¶
func BrokerIDs(brokers []BrokerInfo) []int
BrokerIDs returns a slice of the IDs of the argument brokers.
func BrokerRacks ¶
func BrokerRacks(brokers []BrokerInfo) map[int]string
BrokerRacks returns a mapping of broker ID -> rack.
func BrokersPerRack ¶
func BrokersPerRack(brokers []BrokerInfo) map[string][]int
BrokersPerRack returns a mapping of rack -> broker IDs.
func CheckAssignments ¶
func CheckAssignments(assignments []PartitionAssignment) error
CheckAssignments does some basic sanity checks on the assignments that are passed into an Assigner or extender so that we can fail early if something is obviously wrong.
func DistinctRacks ¶
func DistinctRacks(brokers []BrokerInfo) []string
DistinctRacks returns a sorted slice of all the distinct racks in the cluster.
func FormatAssignentDiffs ¶
func FormatAssignentDiffs( curr []PartitionAssignment, desired []PartitionAssignment, brokers []BrokerInfo, ) string
FormatAssignentDiffs generates a pretty table that shows the before and after states of a partition replica and/or leader update.
func FormatBrokerMaxPartitions ¶
func FormatBrokerMaxPartitions( curr []PartitionAssignment, desired []PartitionAssignment, brokers []BrokerInfo, ) string
FormatBrokerMaxPartitions generates a pretty table that shows the total number of partitions that each broker is involved in for a diff. It's used to evaluate the potential extra load that could occur on brokers during a migration.
func FormatBrokerRackReplicas ¶
func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string
FormatBrokerRackReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by rack across all topics. Useful for showing total-topic balance.
func FormatBrokerReplicas ¶
func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string
FormatBrokerReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by broker across all topics. Useful for showing total-topic balance.
func FormatBrokers ¶
func FormatBrokers(brokers []BrokerInfo, full bool) string
FormatBrokers creates a pretty table from a list of brokers.
func FormatBrokersPerRack ¶
func FormatBrokersPerRack(brokers []BrokerInfo) string
FormatBrokersPerRack creates a pretty table that shows the number of brokers per rack.
func FormatConfig ¶
FormatConfig creates a pretty table with all of the keys and values in a topic or broker config.
func FormatTopicLeadersPerRack ¶
func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string
FormatTopicLeadersPerRack creates a pretty table that shows the number of partitions with a leader in each rack.
func FormatTopicPartitions ¶
func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string
FormatTopicPartitions creates a pretty table with information on all of the partitions for a topic.
func FormatTopics ¶
func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string
FormatTopics creates a pretty table that lists the details of the argument topics.
func HasLeaders ¶
HasLeaders returns whether at least one partition in the argument topics has a non-zero leader set. Used for formatting purposes.
func LeadersPerRack ¶
func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int
LeadersPerRack returns a mapping of rack -> number of partitions with a leader in that rack.
func MaxPartitionsPerBroker ¶
func MaxPartitionsPerBroker( allAssignments ...[]PartitionAssignment, ) map[int]int
MaxPartitionsPerBroker calculates the number of partitions that each broker may need to handle during a migration.
func MaxReplication ¶
MaxReplication returns the maximum amount of replication across all partitions in the argument topics.
func NewLeaderPartitions ¶
func NewLeaderPartitions( current []PartitionAssignment, desired []PartitionAssignment) []int
NewLeaderPartitions returns the partition IDs which will have new leaders given the current and desired assignments.
func ParseBrokerThrottles ¶
func ParseBrokerThrottles(brokers []BrokerInfo) ( []BrokerThrottle, []BrokerThrottle, error, )
ParseBrokerThrottles returns slices of the leader and follower throttles for the argument brokers.
func ParsePartitionThrottles ¶
func ParsePartitionThrottles(topic TopicInfo) ( []PartitionThrottle, []PartitionThrottle, error, )
ParsePartitionThrottles returns slices of the leader and follower partition throttles for the argument topic.
func PartitionIDs ¶
func PartitionIDs(partitions []PartitionInfo) []int
PartitionIDs returns the IDs from the argument partitions.
func PartitionThrottleConfigEntries ¶
func PartitionThrottleConfigEntries( leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle, ) []kafka.ConfigEntry
PartitionThrottleConfigEntries generates the topic config entries for the provided leader and follower throttles.
func SameBrokers ¶
func SameBrokers( a PartitionAssignment, b PartitionAssignment, ) bool
SameBrokers returns whether two PartitionAssignments have the same brokers.
func ThrottledBrokerIDs ¶
func ThrottledBrokerIDs(brokers []BrokerInfo) []int
ThrottledBrokerIDs returns a slice of the IDs of the subset of argument brokers that have throttles on them.
func ThrottledTopicNames ¶
ThrottledTopicNames returns the names of topics in the argument slice that have throttles on them.
Types ¶
type AssignmentDiff ¶
type AssignmentDiff struct { PartitionID int Old PartitionAssignment New PartitionAssignment }
AssignmentDiff represents the diff in a single partition reassignment.
func AssignmentDiffs ¶
func AssignmentDiffs( current []PartitionAssignment, desired []PartitionAssignment, ) []AssignmentDiff
AssignmentDiffs returns the diffs implied by the argument current and desired PartitionAssignments. Used for displaying diffs to user.
type BrokerInfo ¶
type BrokerInfo struct { ID int `json:"id"` Endpoints []string `json:"endpoints"` Host string `json:"host"` Port int32 `json:"port"` InstanceID string `json:"instanceID"` AvailabilityZone string `json:"availabilityZone"` Rack string `json:"rack"` InstanceType string `json:"instanceType"` Version int `json:"version"` Timestamp time.Time `json:"timestamp"` Config map[string]string `json:"config"` }
BrokerInfo represents the information stored about a broker in zookeeper.
func (BrokerInfo) Addr ¶
func (b BrokerInfo) Addr() string
Addr returns the address of the current BrokerInfo.
func (BrokerInfo) IsThrottled ¶
func (b BrokerInfo) IsThrottled() bool
IsThrottled determines whether the broker has any throttles in its config.
type BrokerThrottle ¶
BrokerThrottle represents a throttle being applied to a single broker.
func BrokerThrottles ¶
func BrokerThrottles( leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle, throttleBytes int64, ) []BrokerThrottle
BrokerThrottles returns a slice of BrokerThrottles that we should apply. It's currently just set from the union of the leader and follower brokers (matching the behavior of bin/kafka-reassign-partitions.sh).
func (BrokerThrottle) ConfigEntries ¶
func (b BrokerThrottle) ConfigEntries() []kafka.ConfigEntry
ConfigEntries returns the kafka config entries associated with this broker throttle.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a general client for interacting with a kafka cluster. Most interactions are done via zookeeper, but a few (e.g., creating topics or getting the controller address) are done via the broker API instead.
func NewClient ¶
func NewClient( ctx context.Context, config ClientConfig, ) (*Client, error)
NewClient creates and returns a new Client instance.
func (*Client) AcquireLock ¶
AcquireLock acquires and returns a lock from the underlying zookeeper client. The Unlock method should be called on the lock when it's safe to release.
func (*Client) AddPartitions ¶
func (c *Client) AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error
AddPartitions adds one or more partitions to an existing topic. Unlike AssignPartitions, this directly updates the topic's partition config in zookeeper.
func (*Client) AssignPartitions ¶
func (c *Client) AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error
AssignPartitions notifies the cluster to begin a partition reassignment. This should only be used for existing partitions; to create new partitions, use the AddPartitions method.
func (*Client) AssignmentInProgress ¶
AssignmentInProgress returns whether the zk assignment node exists.
func (*Client) CreateTopic ¶
CreateTopic creates a new topic with the argument config. It uses the topic creation API exposed on the controller broker.
func (*Client) ElectionInProgress ¶
ElectionInProgress returns whether the election zk node is set.
func (*Client) GetBootstrapAddrs ¶
GetBootstrapAddrs returns the stored value of the bootstrapAddrs parameter so it can be used by the messages package.
func (*Client) GetBrokerIDs ¶
GetBrokerIDs returns a slice of all broker IDs.
func (*Client) GetBrokerPartitions ¶
func (c *Client) GetBrokerPartitions( ctx context.Context, names []string, ) ([]PartitionInfo, error)
GetBrokerPartitions gets partition information directly from a broker for one or more topics. This is faster than using ZK to get this information (i.e., by calling GetTopics with detailed=true), but it doesn't include higher-level topic information.
func (*Client) GetBrokers ¶
GetBrokers gets information on one or more cluster brokers from zookeeper. If the argument ids is unset, then it fetches all brokers.
func (*Client) GetClusterID ¶
GetClusterID gets the cluster ID from zookeeper. This ID is generated when the cluster is created and should be stable over the life of the cluster.
func (*Client) GetControllerAddr ¶
GetControllerAddr gets the address of the cluster controller. This is needed for creating new topics and other operations.
func (*Client) GetTopic ¶
GetTopic is a wrapper around GetTopics(...) for getting information about a single topic.
func (*Client) GetTopicNames ¶
GetTopicNames gets all topic names from zookeeper.
func (*Client) GetTopics ¶
func (c *Client) GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error)
GetTopics gets information about one or more cluster topics from zookeeper. If the argument names is unset, then it fetches all topics. The detailed parameter determines whether the ISRs and leaders are fetched for each partition.
func (*Client) LockHeld ¶
LockHeld determines whether the lock with the provided path is held (i.e., has children).
func (*Client) RunLeaderElection ¶
RunLeaderElection triggers a leader election for the argument topic and partitions.
func (*Client) UpdateBrokerConfig ¶
func (c *Client) UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateBrokerConfig updates the config JSON for a cluster broker and sets a change notification so the cluster brokers are notified. If overwrite is true, then it will overwrite existing config entries.
The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries
func (*Client) UpdateTopicConfig ¶
func (c *Client) UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateTopicConfig updates the config JSON for a topic and sets a change notification so that the brokers are notified. If overwrite is true, then it will overwrite existing config entries.
The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries that were already set.
type ClientConfig ¶
type ClientConfig struct { ZKAddrs []string ZKPrefix string BootstrapAddrs []string ExpectedClusterID string Sess *session.Session ReadOnly bool }
ClientConfig contains all of the parameters necessary to create a kafka admin client.
type PartitionAssignment ¶
PartitionAssignment contains the actual or desired assignment of replicas in a topic partition.
func AssignmentsToUpdate ¶
func AssignmentsToUpdate( current []PartitionAssignment, desired []PartitionAssignment, ) []PartitionAssignment
AssignmentsToUpdate returns the subset of assignments that need to be updated given the current and desired states.
func CopyAssignments ¶
func CopyAssignments( curr []PartitionAssignment, ) []PartitionAssignment
CopyAssignments returns a deep copy of the argument PartitionAssignment slice.
func ReplicasToAssignments ¶
func ReplicasToAssignments( replicaSlices [][]int, ) []PartitionAssignment
ReplicasToAssignments converts a slice of slices to a slice of PartitionAssignments, assuming that the argument slices are in partition order. Used for unit tests.
func (PartitionAssignment) Copy ¶
func (a PartitionAssignment) Copy() PartitionAssignment
Copy returns a deep copy of this PartitionAssignment.
func (PartitionAssignment) DistinctRacks ¶
func (a PartitionAssignment) DistinctRacks( brokerRacks map[int]string, ) map[string]struct{}
DistinctRacks returns a map of the distinct racks in this PartitionAssignment.
func (PartitionAssignment) Index ¶
func (a PartitionAssignment) Index(replica int) int
Index returns the index of the argument replica, or -1 if it can't be found.
type PartitionInfo ¶
type PartitionInfo struct { Topic string `json:"topic"` ID int `json:"ID"` Leader int `json:"leader"` Version int `json:"version"` Replicas []int `json:"replicas"` ISR []int `json:"isr"` ControllerEpoch int `json:"controllerEpoch"` LeaderEpoch int `json:"leaderEpoch"` }
PartitionInfo represents the information stored about a topic partition in zookeeper.
type PartitionThrottle ¶
PartitionThrottle represents a throttle being applied to a single partition, broker combination.
func FollowerPartitionThrottles ¶
func FollowerPartitionThrottles( curr []PartitionAssignment, desired []PartitionAssignment, ) []PartitionThrottle
FollowerPartitionThrottles returns a slice of PartitionThrottles that we should apply on the follower side.
See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.
func LeaderPartitionThrottles ¶
func LeaderPartitionThrottles( curr []PartitionAssignment, desired []PartitionAssignment, ) []PartitionThrottle
LeaderPartitionThrottles returns a slice of PartitionThrottles that we should apply on the leader side.
See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.
func ParsePartitionThrottleStr ¶
func ParsePartitionThrottleStr(valuesStr string) ([]PartitionThrottle, error)
ParsePartitionThrottleStr converts a throttle config string from zk into a slice of PartitionThrottle structs.
func (PartitionThrottle) String ¶
func (p PartitionThrottle) String() string
type TopicInfo ¶
type TopicInfo struct { Name string `json:"name"` Config map[string]string `json:"config"` Partitions []PartitionInfo `json:"partitions"` Version int `json:"version"` }
TopicInfo represents the information stored about a topic in zookeeper.
func (TopicInfo) AllLeadersCorrect ¶
AllLeadersCorrect returns whether leader == replicas[0] for all partitions.
func (TopicInfo) AllReplicasInSync ¶
AllReplicasInSync returns whether all partitions have ISR == replicas (ignoring order).
func (TopicInfo) IsThrottled ¶
IsThrottled determines whether the topic has any throttles in its config.
func (TopicInfo) MaxISR ¶
MaxISR returns the maximum number of in-sync replicas across all partitions in a topic.
func (TopicInfo) MaxReplication ¶
MaxReplication returns the maximum number of replicas across all partitions in a topic.
func (TopicInfo) OutOfSyncPartitions ¶
func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo
OutOfSyncPartitions returns the partitions for which ISR != replicas (ignoring order).
func (TopicInfo) PartitionIDs ¶
PartitionIDs returns an ordered slice of partition IDs for a topic.
func (TopicInfo) RackCounts ¶
RackCounts returns the minimum and maximum distinct rack counts across all partitions in a topic.
func (TopicInfo) Retention ¶
Retention returns the retention duration implied by a topic config. If unset, it returns 0.
func (TopicInfo) ToAssignments ¶
func (t TopicInfo) ToAssignments() []PartitionAssignment
ToAssignments converts a topic to a slice of partition assignments.
func (TopicInfo) WrongLeaderPartitions ¶
func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo
WrongLeaderPartitions returns the partitions where leader != replicas[0].