admin

package
v1.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RetentionKey is the config key used for topic time retention.
	RetentionKey = "retention.ms"

	// LeaderThrottledKey is the config key for the leader throttle rate.
	LeaderThrottledKey = "leader.replication.throttled.rate"

	// FollowerThrottledKey is the config key for the follower throttle rate.
	FollowerThrottledKey = "follower.replication.throttled.rate"

	// LeaderReplicasThrottledKey is the config key for the list of leader replicas
	// that should be throttled.
	LeaderReplicasThrottledKey = "leader.replication.throttled.replicas"

	// FollowerReplicasThrottledKey is the config key for the list of follower replicas
	// that should be throttled.
	FollowerReplicasThrottledKey = "follower.replication.throttled.replicas"
)
View Source
const (
	ListenerNotFoundError kafka.Error = 72
)

Variables

View Source
var (
	// ErrTopicDoesNotExist is returned by admin functions when a topic that should exist
	// does not.
	ErrTopicDoesNotExist = errors.New("Topic does not exist")
)

Functions

func AssignmentsToReplicas

func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)

AssignmentsToReplicas is the inverse of ReplicasToAssignments. Used for unit tests.

func BrokerCountsPerRack

func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int

BrokerCountsPerRack returns a mapping of rack -> number of brokers.

func BrokerIDs

func BrokerIDs(brokers []BrokerInfo) []int

BrokerIDs returns a slice of the IDs of the argument brokers.

func BrokerRacks

func BrokerRacks(brokers []BrokerInfo) map[int]string

BrokerRacks returns a mapping of broker ID -> rack.

func BrokersPerRack

func BrokersPerRack(brokers []BrokerInfo) map[string][]int

BrokersPerRack returns a mapping of rack -> broker IDs.

func CheckAssignments

func CheckAssignments(assignments []PartitionAssignment) error

CheckAssignments does some basic sanity checks on the assignments that are passed into an Assigner or extender so that we can fail early if something is obviously wrong.

func DistinctRacks

func DistinctRacks(brokers []BrokerInfo) []string

DistinctRacks returns a sorted slice of all the distinct racks in the cluster.

func FormatACLInfo added in v1.12.0

func FormatACLInfo(a ACLInfo) string

FormatACLInfo formats an ACLInfo struct as a string, using the string version of all the fields.

func FormatACLs added in v1.10.3

func FormatACLs(acls []ACLInfo) string

FormatACLs creates a pretty table that lists the details of the argument acls.

func FormatAssignentDiffs

func FormatAssignentDiffs(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
	brokers []BrokerInfo,
) string

FormatAssignentDiffs generates a pretty table that shows the before and after states of a partition replica and/or leader update.

func FormatBrokerMaxPartitions

func FormatBrokerMaxPartitions(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
	brokers []BrokerInfo,
) string

FormatBrokerMaxPartitions generates a pretty table that shows the total number of partitions that each broker is involved in for a diff. It's used to evaluate the potential extra load that could occur on brokers during a migration.

func FormatBrokerRackReplicas

func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string

FormatBrokerRackReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by rack across all topics. Useful for showing total-topic balance.

func FormatBrokerReplicas

func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string

FormatBrokerReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by broker across all topics. Useful for showing total-topic balance.

func FormatBrokers

func FormatBrokers(brokers []BrokerInfo, full bool) string

FormatBrokers creates a pretty table from a list of brokers.

func FormatBrokersPerRack

func FormatBrokersPerRack(brokers []BrokerInfo) string

FormatBrokersPerRack creates a pretty table that shows the number of brokers per rack.

func FormatClusterID added in v1.14.0

func FormatClusterID(clusterID string) string

FormatClusterID creates a pretty table for cluster ID.

func FormatConfig

func FormatConfig(configMap map[string]string) string

FormatConfig creates a pretty table with all of the keys and values in a topic or broker config.

func FormatControllerID added in v1.14.0

func FormatControllerID(brokerID int) string

FormatControllerID creates a pretty table for controller broker.

func FormatTopicLeadersPerRack

func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string

FormatTopicLeadersPerRack creates a pretty table that shows the number of partitions with a leader in each rack.

func FormatTopicPartitions

func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string

FormatTopicPartitions creates a pretty table with information on all of the partitions for a topic.

func FormatTopics

func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string

FormatTopics creates a pretty table that lists the details of the argument topics.

func FormatTopicsPartitions added in v1.10.3

func FormatTopicsPartitions(
	topicsPartitionsStatusInfo map[string][]PartitionStatusInfo,
	brokers []BrokerInfo,
) string

FormatTopicsPartitions creates a pretty table with information on all of the partitions for topics.

func FormatTopicsPartitionsSummary added in v1.10.3

func FormatTopicsPartitionsSummary(
	topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int,
) string

FormatTopicsPartitionsSummary creates a pretty table with summary of the partitions for topics.

func FormatUsers added in v1.10.3

func FormatUsers(users []UserInfo) string

FormatUsers creates a pretty table that lists the details of the argument users.

func GetAllTopicNamesFromMetadata added in v1.10.3

func GetAllTopicNamesFromMetadata(
	metadata *kafka.MetadataResponse,
) map[string]bool

func GetKafkaCredentials added in v1.13.0

func GetKafkaCredentials(svc secretsmanageriface.SecretsManagerAPI, secretArn string) (credentials, error)

func GetTopicsPartitionsStatusInfo added in v1.10.3

func GetTopicsPartitionsStatusInfo(
	metadata *kafka.MetadataResponse,
	topics []string,
	status PartitionStatus,
) map[string][]PartitionStatusInfo

Get the partition status info for specified topics

func GetTopicsPartitionsStatusSummary added in v1.10.3

func GetTopicsPartitionsStatusSummary(
	metadata *kafka.MetadataResponse,
	topics []string,
	status PartitionStatus,
) (map[string]map[PartitionStatus][]int, int, int, int)

Get the partition status summary

func GetValidTopicNamesFromMetadata added in v1.10.3

func GetValidTopicNamesFromMetadata(
	topics []string,
	metadata *kafka.MetadataResponse,
) map[string]bool

given an input of topics, returns topics that exist in the cluster

func HasLeaders

func HasLeaders(topics []TopicInfo) bool

HasLeaders returns whether at least one partition in the argument topics has a non-zero leader set. Used for formatting purposes.

func LeadersPerRack

func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int

LeadersPerRack returns a mapping of rack -> number of partitions with a leader in that rack.

func MaxPartitionsPerBroker

func MaxPartitionsPerBroker(
	allAssignments ...[]PartitionAssignment,
) map[int]int

MaxPartitionsPerBroker calculates the number of partitions that each broker may need to handle during a migration.

func MaxReplication

func MaxReplication(topics []TopicInfo) int

MaxReplication returns the maximum amount of replication across all partitions in the argument topics.

func NewLeaderPartitions

func NewLeaderPartitions(
	current []PartitionAssignment,
	desired []PartitionAssignment) []int

NewLeaderPartitions returns the partition IDs which will have new leaders given the current and desired assignments.

func ParseBrokerThrottles

func ParseBrokerThrottles(brokers []BrokerInfo) (
	[]BrokerThrottle,
	[]BrokerThrottle,
	error,
)

ParseBrokerThrottles returns slices of the leader and follower throttles for the argument brokers.

func ParsePartitionThrottles

func ParsePartitionThrottles(topic TopicInfo) (
	[]PartitionThrottle,
	[]PartitionThrottle,
	error,
)

ParsePartitionThrottles returns slices of the leader and follower partition throttles for the argument topic.

func PartitionIDs

func PartitionIDs(partitions []PartitionInfo) []int

PartitionIDs returns the IDs from the argument partitions.

func PartitionThrottleConfigEntries

func PartitionThrottleConfigEntries(
	leaderThrottles []PartitionThrottle,
	followerThrottles []PartitionThrottle,
) []kafka.ConfigEntry

PartitionThrottleConfigEntries generates the topic config entries for the provided leader and follower throttles.

func SameBrokers

func SameBrokers(
	a PartitionAssignment,
	b PartitionAssignment,
) bool

SameBrokers returns whether two PartitionAssignments have the same brokers.

func ThrottledBrokerIDs

func ThrottledBrokerIDs(brokers []BrokerInfo) []int

ThrottledBrokerIDs returns a slice of the IDs of the subset of argument brokers that have throttles on them.

func ThrottledTopicNames

func ThrottledTopicNames(topics []TopicInfo) []string

ThrottledTopicNames returns the names of topics in the argument slice that have throttles on them.

Types

type ACLInfo added in v1.10.3

type ACLInfo struct {
	ResourceType   ResourceType      `json:"resourceType"`
	ResourceName   string            `json:"resourceName"`
	PatternType    PatternType       `json:"patternType"`
	Principal      string            `json:"principal"`
	Host           string            `json:"host"`
	Operation      ACLOperationType  `json:"operation"`
	PermissionType ACLPermissionType `json:"permissionType"`
}

PartitionInfo represents the information stored about an ACL in zookeeper.

type ACLOperationType added in v1.10.3

type ACLOperationType kafka.ACLOperationType

ACLOperationType presents the Kafka operation type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.

func (*ACLOperationType) Set added in v1.10.3

func (o *ACLOperationType) Set(v string) error

Set is used by Cobra to set the value of a variable from a Cobra flag.

func (*ACLOperationType) String added in v1.10.3

func (o *ACLOperationType) String() string

String is used both by fmt.Print and by Cobra in help text.

func (*ACLOperationType) Type added in v1.10.3

func (o *ACLOperationType) Type() string

Type is used by Cobra in help text.

type ACLPermissionType added in v1.10.3

type ACLPermissionType kafka.ACLPermissionType

ACLPermissionType presents the Kafka operation type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.

func (*ACLPermissionType) Set added in v1.10.3

func (p *ACLPermissionType) Set(v string) error

Set is used by Cobra to set the value of a variable from a Cobra flag.

func (*ACLPermissionType) String added in v1.10.3

func (p *ACLPermissionType) String() string

String is used both by fmt.Print and by Cobra in help text.

func (*ACLPermissionType) Type added in v1.10.3

func (p *ACLPermissionType) Type() string

Type is used by Cobra in help text.

type AssignmentDiff

type AssignmentDiff struct {
	PartitionID int
	Old         PartitionAssignment
	New         PartitionAssignment
}

AssignmentDiff represents the diff in a single partition reassignment.

func AssignmentDiffs

func AssignmentDiffs(
	current []PartitionAssignment,
	desired []PartitionAssignment,
) []AssignmentDiff

AssignmentDiffs returns the diffs implied by the argument current and desired PartitionAssignments. Used for displaying diffs to user.

type BrokerAdminClient added in v1.0.0

type BrokerAdminClient struct {
	// contains filtered or unexported fields
}

BrokerAdminClient is a Client implementation that only uses broker APIs, without any zookeeper access.

func NewBrokerAdminClient added in v1.0.0

func NewBrokerAdminClient(
	ctx context.Context,
	config BrokerAdminClientConfig,
) (*BrokerAdminClient, error)

NewBrokerAdminClient constructs a new BrokerAdminClient instance.

func (*BrokerAdminClient) AcquireLock added in v1.0.0

func (c *BrokerAdminClient) AcquireLock(ctx context.Context, path string) (
	zk.Lock,
	error,
)

AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic. NOTE: Not implemented for broker-based clients.

func (*BrokerAdminClient) AddPartitions added in v1.0.0

func (c *BrokerAdminClient) AddPartitions(
	ctx context.Context,
	topic string,
	newAssignments []PartitionAssignment,
) error

AddPartitions extends a topic by adding one or more new partitions to it.

func (*BrokerAdminClient) AssignPartitions added in v1.0.0

func (c *BrokerAdminClient) AssignPartitions(
	ctx context.Context,
	topic string,
	assignments []PartitionAssignment,
) error

AssignPartitions sets the replica broker IDs for one or more partitions in a topic.

func (*BrokerAdminClient) Close added in v1.0.0

func (c *BrokerAdminClient) Close() error

Close closes the client.

func (*BrokerAdminClient) CreateACLs added in v1.10.3

func (c *BrokerAdminClient) CreateACLs(
	ctx context.Context,
	acls []kafka.ACLEntry,
) error

CreateACLs creates ACLs in the cluster.

func (*BrokerAdminClient) CreateTopic added in v1.0.0

func (c *BrokerAdminClient) CreateTopic(
	ctx context.Context,
	config kafka.TopicConfig,
) error

CreateTopic creates a topic in the cluster.

func (*BrokerAdminClient) DeleteACLs added in v1.12.0

func (c *BrokerAdminClient) DeleteACLs(
	ctx context.Context,
	filters []kafka.DeleteACLsFilter,
) (*kafka.DeleteACLsResponse, error)

DeleteACLs deletes ACLs in the cluster.

func (*BrokerAdminClient) GetACLs added in v1.10.3

func (c *BrokerAdminClient) GetACLs(
	ctx context.Context,
	filter kafka.ACLFilter,
) ([]ACLInfo, error)

GetACLs gets full information about each ACL in the cluster.

func (*BrokerAdminClient) GetAllTopicsMetadata added in v1.10.3

func (c *BrokerAdminClient) GetAllTopicsMetadata(
	ctx context.Context,
) (*kafka.MetadataResponse, error)

func (*BrokerAdminClient) GetBrokerIDs added in v1.0.0

func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)

GetBrokerIDs get the IDs of all brokers in the cluster.

func (*BrokerAdminClient) GetBrokers added in v1.0.0

func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) (
	[]BrokerInfo,
	error,
)

GetBrokers gets information about all brokers in the cluster.

func (*BrokerAdminClient) GetClusterID added in v1.0.0

func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error)

GetClusterID gets the ID of the cluster.

func (*BrokerAdminClient) GetConnector added in v1.0.0

func (c *BrokerAdminClient) GetConnector() *Connector

GetConnector gets the Connector instance for this cluster.

func (*BrokerAdminClient) GetControllerID added in v1.14.0

func (c *BrokerAdminClient) GetControllerID(ctx context.Context) (
	int,
	error,
)

GetControllerID gets ID of the active controller broker

func (*BrokerAdminClient) GetSupportedFeatures added in v1.0.0

func (c *BrokerAdminClient) GetSupportedFeatures() SupportedFeatures

GetSupportedFeatures gets the features supported by the cluster for this client.

func (*BrokerAdminClient) GetTopic added in v1.0.0

func (c *BrokerAdminClient) GetTopic(
	ctx context.Context,
	name string,
	detailed bool,
) (TopicInfo, error)

GetTopic gets the details of a single topic in the cluster.

func (*BrokerAdminClient) GetTopicNames added in v1.0.0

func (c *BrokerAdminClient) GetTopicNames(ctx context.Context) ([]string, error)

GetTopicNames gets just the names of each topic in the cluster.

func (*BrokerAdminClient) GetTopics added in v1.0.0

func (c *BrokerAdminClient) GetTopics(
	ctx context.Context,
	names []string,
	detailed bool,
) ([]TopicInfo, error)

GetTopics gets full information about each topic in the cluster.

func (*BrokerAdminClient) GetUsers added in v1.10.3

func (c *BrokerAdminClient) GetUsers(
	ctx context.Context,
	names []string,
) ([]UserInfo, error)

func (*BrokerAdminClient) LockHeld added in v1.0.0

func (c *BrokerAdminClient) LockHeld(ctx context.Context, path string) (bool, error)

LockHeld returns whether a lock is currently held for the given path. NOTE: Not implemented for broker-based clients.

func (*BrokerAdminClient) RunLeaderElection added in v1.0.0

func (c *BrokerAdminClient) RunLeaderElection(
	ctx context.Context,
	topic string,
	partitions []int,
) error

RunLeaderElection triggers a leader election for one or more partitions in a topic.

func (*BrokerAdminClient) UpdateBrokerConfig added in v1.0.0

func (c *BrokerAdminClient) UpdateBrokerConfig(
	ctx context.Context,
	id int,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateBrokerConfig updates the configuration for the argument broker. It returns the config keys that were updated.

func (*BrokerAdminClient) UpdateTopicConfig added in v1.0.0

func (c *BrokerAdminClient) UpdateTopicConfig(
	ctx context.Context,
	name string,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateTopicConfig updates the configuration for the argument topic. It returns the config keys that were updated.

func (*BrokerAdminClient) UpsertUser added in v1.10.3

func (c *BrokerAdminClient) UpsertUser(
	ctx context.Context,
	user kafka.UserScramCredentialsUpsertion,
) error

type BrokerAdminClientConfig added in v1.0.0

type BrokerAdminClientConfig struct {
	ConnectorConfig
	ReadOnly          bool
	ExpectedClusterID string
}

BrokerAdminClientConfig contains the configuration settings to construct a BrokerAdminClient instance.

type BrokerInfo

type BrokerInfo struct {
	ID               int               `json:"id"`
	Endpoints        []string          `json:"endpoints"`
	Host             string            `json:"host"`
	Port             int32             `json:"port"`
	InstanceID       string            `json:"instanceID"`
	AvailabilityZone string            `json:"availabilityZone"`
	Rack             string            `json:"rack"`
	InstanceType     string            `json:"instanceType"`
	Version          int               `json:"version"`
	Timestamp        time.Time         `json:"timestamp"`
	Config           map[string]string `json:"config"`
}

BrokerInfo represents the information stored about a broker in zookeeper.

func (BrokerInfo) Addr

func (b BrokerInfo) Addr() string

Addr returns the address of the current BrokerInfo.

func (BrokerInfo) IsThrottled

func (b BrokerInfo) IsThrottled() bool

IsThrottled determines whether the broker has any throttles in its config.

type BrokerThrottle

type BrokerThrottle struct {
	Broker        int
	ThrottleBytes int64
}

BrokerThrottle represents a throttle being applied to a single broker.

func BrokerThrottles

func BrokerThrottles(
	leaderThrottles []PartitionThrottle,
	followerThrottles []PartitionThrottle,
	throttleBytes int64,
) []BrokerThrottle

BrokerThrottles returns a slice of BrokerThrottles that we should apply. It's currently just set from the union of the leader and follower brokers (matching the behavior of bin/kafka-reassign-partitions.sh).

func (BrokerThrottle) ConfigEntries

func (b BrokerThrottle) ConfigEntries() []kafka.ConfigEntry

ConfigEntries returns the kafka config entries associated with this broker throttle.

type Client

type Client interface {
	// GetClusterID gets the ID of the cluster.
	GetClusterID(ctx context.Context) (string, error)

	// GetBrokers gets information about all brokers in the cluster.
	GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)

	// GetControllerID get the active controller broker ID in the cluster.
	GetControllerID(ctx context.Context) (int, error)

	// GetBrokerIDs get the IDs of all brokers in the cluster.
	GetBrokerIDs(ctx context.Context) ([]int, error)

	// GetConnector gets the Connector instance for this cluster.
	GetConnector() *Connector

	// GetTopics gets full information about each topic in the cluster.
	GetTopics(
		ctx context.Context,
		names []string,
		detailed bool,
	) ([]TopicInfo, error)

	// GetTopicNames gets just the names of each topic in the cluster.
	GetTopicNames(ctx context.Context) ([]string, error)

	// GetTopic gets the details of a single topic in the cluster.
	GetTopic(
		ctx context.Context,
		name string,
		detailed bool,
	) (TopicInfo, error)

	// GetACLs gets full information about each ACL in the cluster.
	GetACLs(
		ctx context.Context,
		filter kafka.ACLFilter,
	) ([]ACLInfo, error)

	// GetAllTopicsMetadata performs kafka-go metadata call to get topic information
	GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)

	// GetUsers gets information about users in the cluster.
	GetUsers(
		ctx context.Context,
		names []string,
	) ([]UserInfo, error)

	// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
	// keys that were updated.
	UpdateTopicConfig(
		ctx context.Context,
		name string,
		configEntries []kafka.ConfigEntry,
		overwrite bool,
	) ([]string, error)

	// UpdateBrokerConfig updates the configuration for the argument broker. It returns the config
	// keys that were updated.
	UpdateBrokerConfig(
		ctx context.Context,
		id int,
		configEntries []kafka.ConfigEntry,
		overwrite bool,
	) ([]string, error)

	// CreateTopic creates a topic in the cluster.
	CreateTopic(
		ctx context.Context,
		config kafka.TopicConfig,
	) error

	// CreateACLs creates ACLs in the cluster.
	CreateACLs(
		ctx context.Context,
		acls []kafka.ACLEntry,
	) error

	// DeleteACLs deletes ACLs in the cluster.
	DeleteACLs(
		ctx context.Context,
		filters []kafka.DeleteACLsFilter,
	) (*kafka.DeleteACLsResponse, error)

	// UpsertUser creates or updates an user in zookeeper.
	UpsertUser(
		ctx context.Context,
		user kafka.UserScramCredentialsUpsertion,
	) error

	// AssignPartitions sets the replica broker IDs for one or more partitions in a topic.
	AssignPartitions(
		ctx context.Context,
		topic string,
		assignments []PartitionAssignment,
	) error

	// AddPartitions extends a topic by adding one or more new partitions to it.
	AddPartitions(
		ctx context.Context,
		topic string,
		newAssignments []PartitionAssignment,
	) error

	// RunLeaderElection triggers a leader election for one or more partitions in a topic.
	RunLeaderElection(
		ctx context.Context,
		topic string,
		partitions []int,
	) error

	// AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic.
	AcquireLock(ctx context.Context, path string) (zk.Lock, error)

	// LockHeld returns whether a lock is currently held for the given path.
	LockHeld(ctx context.Context, path string) (bool, error)

	// GetSupportedFeatures gets the features supported by the cluster for this client.
	GetSupportedFeatures() SupportedFeatures

	// Close closes the client.
	Close() error
}

Client is an interface for interacting with a cluster for administrative tasks.

type Connector added in v1.0.0

type Connector struct {
	Config      ConnectorConfig
	Dialer      *kafka.Dialer
	KafkaClient *kafka.Client
}

Connector is a wrapper around the low-level, kafka-go dialer and client.

func NewConnector added in v1.0.0

func NewConnector(config ConnectorConfig) (*Connector, error)

NewConnector contructs a new Connector instance given the argument config.

type ConnectorConfig added in v1.0.0

type ConnectorConfig struct {
	BrokerAddr string
	TLS        TLSConfig
	SASL       SASLConfig
}

ConnectorConfig contains the configuration used to contruct a connector.

type CredentialInfo added in v1.10.3

type CredentialInfo struct {
	ScramMechanism ScramMechanism `json:"scramMechanism"`
	Iterations     int            `json:"iterations"`
}

CredentialInfo represents read only information about a users credentials in zookeeper.

type PartitionAssignment

type PartitionAssignment struct {
	ID       int   `json:"id"`
	Replicas []int `json:"replicas"`
}

PartitionAssignment contains the actual or desired assignment of replicas in a topic partition.

func AssignmentsToUpdate

func AssignmentsToUpdate(
	current []PartitionAssignment,
	desired []PartitionAssignment,
) []PartitionAssignment

AssignmentsToUpdate returns the subset of assignments that need to be updated given the current and desired states.

func CopyAssignments

func CopyAssignments(
	curr []PartitionAssignment,
) []PartitionAssignment

CopyAssignments returns a deep copy of the argument PartitionAssignment slice.

func ReplicasToAssignments

func ReplicasToAssignments(
	replicaSlices [][]int,
) []PartitionAssignment

ReplicasToAssignments converts a slice of slices to a slice of PartitionAssignments, assuming that the argument slices are in partition order. Used for unit tests.

func (PartitionAssignment) Copy

Copy returns a deep copy of this PartitionAssignment.

func (PartitionAssignment) DistinctRacks

func (a PartitionAssignment) DistinctRacks(
	brokerRacks map[int]string,
) map[string]struct{}

DistinctRacks returns a map of the distinct racks in this PartitionAssignment.

func (PartitionAssignment) Index

func (a PartitionAssignment) Index(replica int) int

Index returns the index of the argument replica, or -1 if it can't be found.

type PartitionInfo

type PartitionInfo struct {
	Topic           string `json:"topic"`
	ID              int    `json:"ID"`
	Leader          int    `json:"leader"`
	Version         int    `json:"version"`
	Replicas        []int  `json:"replicas"`
	ISR             []int  `json:"isr"`
	ControllerEpoch int    `json:"controllerEpoch"`
	LeaderEpoch     int    `json:"leaderEpoch"`
}

PartitionInfo represents the information stored about a topic partition in zookeeper.

func (PartitionInfo) NumRacks

func (p PartitionInfo) NumRacks(brokerRacks map[int]string) (int, error)

NumRacks returns the number of distinct racks in the partition.

func (PartitionInfo) Racks

func (p PartitionInfo) Racks(brokerRacks map[int]string) ([]string, error)

Racks returns a slice of all racks for the partition replicas.

type PartitionLeaderState added in v1.10.3

type PartitionLeaderState string
const (
	CorrectLeader PartitionLeaderState = "OK"
	WrongLeader   PartitionLeaderState = "Wrong"
)

type PartitionStatus added in v1.10.3

type PartitionStatus string
const (
	Ok              PartitionStatus = "OK"
	Offline         PartitionStatus = "Offline"
	UnderReplicated PartitionStatus = "Under-replicated"
)

func GetPartitionStatus added in v1.10.3

func GetPartitionStatus(partition kafka.Partition) PartitionStatus

Get the Partition Status - ok - offline - under-replicated

NOTE: partition is 1. offline - if ListenerNotFound Error observed for leader partition 2. underreplicated - if number of isrs are lesser than the replicas

func StringToPartitionStatus added in v1.10.3

func StringToPartitionStatus(status string) (PartitionStatus, bool)

Check if a string is valid PartitionStatus type

func (*PartitionStatus) Set added in v1.10.3

func (p *PartitionStatus) Set(v string) error

Set is used by Cobra to set the value of a variable from a Cobra flag.

func (*PartitionStatus) String added in v1.10.3

func (p *PartitionStatus) String() string

String is used by Cobra in help text.

func (*PartitionStatus) Type added in v1.10.3

func (p *PartitionStatus) Type() string

Type is used by Cobra in help text.

type PartitionStatusInfo added in v1.10.3

type PartitionStatusInfo struct {
	Topic       string
	Partition   kafka.Partition
	Status      PartitionStatus
	LeaderState PartitionLeaderState
}

func (PartitionStatusInfo) Racks added in v1.10.3

func (p PartitionStatusInfo) Racks(brokerRacks map[int]string) []string

Racks returns a slice of all racks for the partition replicas.

type PartitionThrottle

type PartitionThrottle struct {
	Partition int
	Broker    int
}

PartitionThrottle represents a throttle being applied to a single partition, broker combination.

func FollowerPartitionThrottles

func FollowerPartitionThrottles(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
) []PartitionThrottle

FollowerPartitionThrottles returns a slice of PartitionThrottles that we should apply on the follower side.

See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.

func LeaderPartitionThrottles

func LeaderPartitionThrottles(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
) []PartitionThrottle

LeaderPartitionThrottles returns a slice of PartitionThrottles that we should apply on the leader side.

See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.

func ParsePartitionThrottleStr

func ParsePartitionThrottleStr(valuesStr string) ([]PartitionThrottle, error)

ParsePartitionThrottleStr converts a throttle config string from zk into a slice of PartitionThrottle structs.

func (PartitionThrottle) String

func (p PartitionThrottle) String() string

type PatternType added in v1.10.3

type PatternType kafka.PatternType

PatternType presents the Kafka pattern type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.

func (*PatternType) Set added in v1.10.3

func (p *PatternType) Set(v string) error

Set is used by Cobra to set the value of a variable from a Cobra flag.

func (*PatternType) String added in v1.10.3

func (p *PatternType) String() string

String is used both by fmt.Print and by Cobra in help text.

func (*PatternType) Type added in v1.10.3

func (r *PatternType) Type() string

Type is used by Cobra in help text.

type ResourceType added in v1.10.3

type ResourceType kafka.ResourceType

ResourceType presents the Kafka resource type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.

func (*ResourceType) Set added in v1.10.3

func (r *ResourceType) Set(v string) error

Set is used by Cobra to set the value of a variable from a Cobra flag.

func (*ResourceType) String added in v1.10.3

func (r *ResourceType) String() string

String is used both by fmt.Print and by Cobra in help text.

func (*ResourceType) Type added in v1.10.3

func (r *ResourceType) Type() string

Type is used by Cobra in help text.

type SASLConfig added in v1.0.0

type SASLConfig struct {
	Enabled           bool
	Mechanism         SASLMechanism
	Username          string
	Password          string
	SecretsManagerArn string
}

SASLConfig stores the SASL-related configuration for a connection.

type SASLMechanism added in v1.3.0

type SASLMechanism string

SASLMechanism is the name of a SASL mechanism that will be used for client authentication.

const (
	SASLMechanismAWSMSKIAM   SASLMechanism = "aws-msk-iam"
	SASLMechanismPlain       SASLMechanism = "plain"
	SASLMechanismScramSHA256 SASLMechanism = "scram-sha-256"
	SASLMechanismScramSHA512 SASLMechanism = "scram-sha-512"
)

func SASLNameToMechanism added in v1.3.0

func SASLNameToMechanism(name string) (SASLMechanism, error)

SASLNameToMechanism converts the argument SASL mechanism name string to a valid instance of the SASLMechanism enum.

type ScramMechanism added in v1.10.3

type ScramMechanism kafka.ScramMechanism

ScramMechanism represents the ScramMechanism used for a users credential in zookeeper.

func (*ScramMechanism) String added in v1.10.3

func (s *ScramMechanism) String() string

type SupportedFeatures added in v1.0.0

type SupportedFeatures struct {
	// Reads indicates whether the client supports reading basic cluster information
	// (metadata, configs, etc.).
	Reads bool

	// Applies indicates whether the client supports the functionality required for applying
	// (e.g., changing configs, electing leaders, etc.).
	Applies bool

	// Locks indicates whether the client supports locking.
	Locks bool

	// DynamicBrokerConfigs indicates whether the client can return dynamic broker configs
	// like leader.replication.throttled.rate.
	DynamicBrokerConfigs bool

	// ACLs indicates whether the client supports access control levels.
	ACLs bool

	// Users indicates whether the client supports SASL Users.
	Users bool
}

SupportedFeatures provides a summary of what an admin client supports.

type TLSConfig added in v1.0.0

type TLSConfig struct {
	Enabled    bool
	CertPath   string
	KeyPath    string
	CACertPath string
	ServerName string
	SkipVerify bool
}

TLSConfig stores the TLS-related configuration for a connection.

type TopicInfo

type TopicInfo struct {
	Name       string            `json:"name"`
	Config     map[string]string `json:"config"`
	Partitions []PartitionInfo   `json:"partitions"`
	Version    int               `json:"version"`
}

TopicInfo represents the information stored about a topic in zookeeper.

func (TopicInfo) AllLeadersCorrect

func (t TopicInfo) AllLeadersCorrect() bool

AllLeadersCorrect returns whether leader == replicas[0] for all partitions.

func (TopicInfo) AllReplicasInSync

func (t TopicInfo) AllReplicasInSync() bool

AllReplicasInSync returns whether all partitions have ISR == replicas (ignoring order).

func (TopicInfo) IsThrottled

func (t TopicInfo) IsThrottled() bool

IsThrottled determines whether the topic has any throttles in its config.

func (TopicInfo) MaxISR

func (t TopicInfo) MaxISR() int

MaxISR returns the maximum number of in-sync replicas across all partitions in a topic.

func (TopicInfo) MaxReplication

func (t TopicInfo) MaxReplication() int

MaxReplication returns the maximum number of replicas across all partitions in a topic.

func (TopicInfo) OutOfSyncPartitions

func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo

OutOfSyncPartitions returns the partitions for which ISR != replicas (ignoring order).

func (TopicInfo) PartitionIDs

func (t TopicInfo) PartitionIDs() []int

PartitionIDs returns an ordered slice of partition IDs for a topic.

func (TopicInfo) RackCounts

func (t TopicInfo) RackCounts(brokerRacks map[int]string) (int, int, error)

RackCounts returns the minimum and maximum distinct rack counts across all partitions in a topic.

func (TopicInfo) Retention

func (t TopicInfo) Retention() time.Duration

Retention returns the retention duration implied by a topic config. If unset, it returns 0.

func (TopicInfo) ToAssignments

func (t TopicInfo) ToAssignments() []PartitionAssignment

ToAssignments converts a topic to a slice of partition assignments.

func (TopicInfo) WrongLeaderPartitions

func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo

WrongLeaderPartitions returns the partitions where leader != replicas[0].

type UserInfo added in v1.10.3

type UserInfo struct {
	Name            string           `json:"name"`
	CredentialInfos []CredentialInfo `json:"credentialInfos"`
}

UserInfo represents the information stored about a user in zookeeper.

type ZKAdminClient added in v1.0.0

type ZKAdminClient struct {
	Connector *Connector
	// contains filtered or unexported fields
}

ZKAdminClient is a general client for interacting with a kafka cluster that assumes zookeeper access. Most interactions are done via the latter, but a few (e.g., creating topics or getting the controller address) are done via the broker API instead.

func NewZKAdminClient added in v1.0.0

func NewZKAdminClient(
	ctx context.Context,
	config ZKAdminClientConfig,
) (*ZKAdminClient, error)

NewZKAdminClient creates and returns a new Client instance.

func (*ZKAdminClient) AcquireLock added in v1.0.0

func (c *ZKAdminClient) AcquireLock(
	ctx context.Context,
	path string,
) (zk.Lock, error)

AcquireLock acquires and returns a lock from the underlying zookeeper client. The Unlock method should be called on the lock when it's safe to release.

func (*ZKAdminClient) AddPartitions added in v1.0.0

func (c *ZKAdminClient) AddPartitions(
	ctx context.Context,
	topic string,
	newAssignments []PartitionAssignment,
) error

AddPartitions adds one or more partitions to an existing topic. Unlike AssignPartitions, this directly updates the topic's partition config in zookeeper.

func (*ZKAdminClient) AssignPartitions added in v1.0.0

func (c *ZKAdminClient) AssignPartitions(
	ctx context.Context,
	topic string,
	assignments []PartitionAssignment,
) error

AssignPartitions notifies the cluster to begin a partition reassignment. This should only be used for existing partitions; to create new partitions, use the AddPartitions method.

func (*ZKAdminClient) Close added in v1.0.0

func (c *ZKAdminClient) Close() error

Close closes the connections in the underlying zookeeper client.

func (*ZKAdminClient) CreateACLs added in v1.10.3

func (c *ZKAdminClient) CreateACLs(
	ctx context.Context,
	acls []kafka.ACLEntry,
) error

func (*ZKAdminClient) CreateTopic added in v1.0.0

func (c *ZKAdminClient) CreateTopic(
	ctx context.Context,
	config kafka.TopicConfig,
) error

CreateTopic creates a new topic with the argument config. It uses the topic creation API exposed on the controller broker.

func (*ZKAdminClient) DeleteACLs added in v1.12.0

func (c *ZKAdminClient) DeleteACLs(
	ctx context.Context,
	filters []kafka.DeleteACLsFilter,
) (*kafka.DeleteACLsResponse, error)

func (*ZKAdminClient) GetACLs added in v1.10.3

func (c *ZKAdminClient) GetACLs(
	ctx context.Context,
	filter kafka.ACLFilter,
) ([]ACLInfo, error)

func (*ZKAdminClient) GetAllTopicsMetadata added in v1.10.3

func (c *ZKAdminClient) GetAllTopicsMetadata(
	ctx context.Context,
) (*kafka.MetadataResponse, error)

func (*ZKAdminClient) GetBrokerIDs added in v1.0.0

func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)

GetBrokerIDs returns a slice of all broker IDs.

func (*ZKAdminClient) GetBrokers added in v1.0.0

func (c *ZKAdminClient) GetBrokers(
	ctx context.Context,
	ids []int,
) ([]BrokerInfo, error)

GetBrokers gets information on one or more cluster brokers from zookeeper. If the argument ids is unset, then it fetches all brokers.

func (*ZKAdminClient) GetClusterID added in v1.0.0

func (c *ZKAdminClient) GetClusterID(
	ctx context.Context,
) (string, error)

GetClusterID gets the cluster ID from zookeeper. This ID is generated when the cluster is created and should be stable over the life of the cluster.

func (*ZKAdminClient) GetConnector added in v1.0.0

func (c *ZKAdminClient) GetConnector() *Connector

GetConnector returns the Connector instance associated with this client.

func (*ZKAdminClient) GetControllerID added in v1.14.0

func (c *ZKAdminClient) GetControllerID(
	ctx context.Context,
) (int, error)

GetControllerID gets ID of the active controller broker

func (*ZKAdminClient) GetSupportedFeatures added in v1.0.0

func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures

GetSupportedFeatures returns the features that are supported by this client.

func (*ZKAdminClient) GetTopic added in v1.0.0

func (c *ZKAdminClient) GetTopic(
	ctx context.Context,
	name string,
	detailed bool,
) (TopicInfo, error)

GetTopic is a wrapper around GetTopics(...) for getting information about a single topic.

func (*ZKAdminClient) GetTopicNames added in v1.0.0

func (c *ZKAdminClient) GetTopicNames(ctx context.Context) ([]string, error)

GetTopicNames gets all topic names from zookeeper.

func (*ZKAdminClient) GetTopics added in v1.0.0

func (c *ZKAdminClient) GetTopics(
	ctx context.Context,
	names []string,
	detailed bool,
) ([]TopicInfo, error)

GetTopics gets information about one or more cluster topics from zookeeper. If the argument names is unset, then it fetches all topics. The detailed parameter determines whether the ISRs and leaders are fetched for each partition.

func (*ZKAdminClient) GetUsers added in v1.10.3

func (c *ZKAdminClient) GetUsers(
	ctx context.Context,
	names []string,
) ([]UserInfo, error)

func (*ZKAdminClient) LockHeld added in v1.0.0

func (c *ZKAdminClient) LockHeld(
	ctx context.Context,
	path string,
) (bool, error)

LockHeld determines whether the lock with the provided path is held (i.e., has children).

func (*ZKAdminClient) RunLeaderElection added in v1.0.0

func (c *ZKAdminClient) RunLeaderElection(
	ctx context.Context,
	topic string,
	partitions []int,
) error

RunLeaderElection triggers a leader election for the argument topic and partitions.

func (*ZKAdminClient) UpdateBrokerConfig added in v1.0.0

func (c *ZKAdminClient) UpdateBrokerConfig(
	ctx context.Context,
	id int,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateBrokerConfig updates the config JSON for a cluster broker and sets a change notification so the cluster brokers are notified. If overwrite is true, then it will overwrite existing config entries.

The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries

func (*ZKAdminClient) UpdateTopicConfig added in v1.0.0

func (c *ZKAdminClient) UpdateTopicConfig(
	ctx context.Context,
	name string,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateTopicConfig updates the config JSON for a topic and sets a change notification so that the brokers are notified. If overwrite is true, then it will overwrite existing config entries.

The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries that were already set.

func (*ZKAdminClient) UpsertUser added in v1.10.3

func (c *ZKAdminClient) UpsertUser(
	ctx context.Context,
	user kafka.UserScramCredentialsUpsertion,
) error

type ZKAdminClientConfig added in v1.0.0

type ZKAdminClientConfig struct {
	ZKAddrs           []string
	ZKPrefix          string
	BootstrapAddrs    []string
	ExpectedClusterID string
	Sess              *session.Session
	ReadOnly          bool
}

ZKAdminClientConfig contains all of the parameters necessary to create a kafka admin client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL