Documentation ¶
Overview ¶
Package kafkaadmin provides Kafka administrative functionality.
Index ¶
- Variables
- func NewConsumer(cfg Config) (*kafka.Consumer, error)
- type BrokerThrottleConfig
- type Client
- func (c Client) Close()
- func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error
- func (c Client) DeleteTopic(ctx context.Context, name string) error
- func (c Client) DescribeTopics(ctx context.Context, topics []string) (TopicStates, error)
- func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)
- func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) error
- func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error
- type Config
- type CreateTopicConfig
- type ErrRemoveThrottle
- type ErrSetThrottle
- type ErrorFetchingMetadata
- type FactoryFunc
- type KafkaAdmin
- type PartitionState
- type RemoveThrottleConfig
- type ReplicaAssignment
- type ResourceConfigs
- type SetThrottleConfig
- type TopicState
- type TopicStates
Constants ¶
This section is empty.
Variables ¶
var ( // SecurityProtocolSet is the set of protocols supported to communicate with brokers SecurityProtocolSet = map[string]struct{}{"PLAINTEXT": empty, "SSL": empty, "SASL_PLAINTEXT": empty, "SASL_SSL": empty} // SASLMechanismSet is the set of mechanisms supported for client to broker authentication SASLMechanismSet = map[string]struct{}{"PLAIN": empty, "SCRAM-SHA-256": empty, "SCRAM-SHA-512": empty} )
var ( // ErrNoData is a generic error for no data available to be returned. ErrNoData = fmt.Errorf("no data returned") )
Functions ¶
Types ¶
type BrokerThrottleConfig ¶ added in v3.16.0
BrokerThrottleConfig defines an inbound and outbound throttle rate in bytes to be applied to a broker.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements a KafkaAdmin.
func NewClientWithFactory ¶
func NewClientWithFactory(cfg Config, factory FactoryFunc) (*Client, error)
NewClientWithFactory returns a new admin Client using a factory func for the kafkaAdminClient
func (Client) CreateTopic ¶
func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error
CreateTopic creates a topic.
func (Client) DeleteTopic ¶ added in v3.6.0
DeleteTopic deletes a topic.
func (Client) DescribeTopics ¶ added in v3.17.0
DescribeTopics takes a []string of topic names. Topic names can be name literals or optional regex. A TopicStates is returned for all matching topics.
func (Client) GetDynamicConfigs ¶ added in v3.16.0
func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)
GetDynamicConfigs takes a kafka resource type (ie topic, broker) and list of names and returns a ResourceConfigs for all dynamic configurations discovered for each resource by name.
func (Client) RemoveThrottle ¶ added in v3.16.0
func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) error
RemoveThrottle takes a RemoveThrottleConfig that includes an optionally specified list of brokers and topics to remove all throttle configurations from.
func (Client) SetThrottle ¶ added in v3.16.0
func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error
SetThrottle takes a SetThrottleConfig and sets the underlying throttle configs accordingly. A throttle is a combination of topic throttled replicas configs and broker inbound/outbound throttle configs.
type Config ¶
type Config struct { // Required. BootstrapServers string // Misc. GroupId string SSLCALocation string SecurityProtocol string SASLMechanism string SASLUsername string SASLPassword string }
Config holds Client configuration parameters.
type CreateTopicConfig ¶ added in v3.7.0
type CreateTopicConfig struct { Name string Partitions int ReplicationFactor int Config map[string]string ReplicaAssignment ReplicaAssignment }
CreateTopicConfig holds CreateTopic parameters.
type ErrRemoveThrottle ¶ added in v3.16.0
type ErrRemoveThrottle struct{ Message string }
ErrRemoveThrottle is a generic error for RemoveThrottle.
func (ErrRemoveThrottle) Error ¶ added in v3.16.0
func (e ErrRemoveThrottle) Error() string
type ErrSetThrottle ¶ added in v3.16.0
type ErrSetThrottle struct{ Message string }
ErrSetThrottle is a generic error for SetThrottle.
func (ErrSetThrottle) Error ¶ added in v3.16.0
func (e ErrSetThrottle) Error() string
type ErrorFetchingMetadata ¶ added in v3.17.0
type ErrorFetchingMetadata struct{ Message string }
ErrorFetchingMetadata is an error encountered fetching Kafka cluster metadata.
func (ErrorFetchingMetadata) Error ¶ added in v3.17.0
func (e ErrorFetchingMetadata) Error() string
type FactoryFunc ¶
type FactoryFunc func(conf *kafka.ConfigMap) (*kafka.AdminClient, error)
type KafkaAdmin ¶ added in v3.7.0
type KafkaAdmin interface { Close() // Topics. CreateTopic(context.Context, CreateTopicConfig) error DeleteTopic(context.Context, string) error DescribeTopics(context.Context, []string) (TopicStates, error) // Cluster. SetThrottle(context.Context, SetThrottleConfig) error RemoveThrottle(context.Context, RemoveThrottleConfig) error GetDynamicConfigs(context.Context, string, []string) (ResourceConfigs, error) }
KafkaAdmin interface.
type PartitionState ¶ added in v3.17.0
PartitionState describes the state of a partition.
type RemoveThrottleConfig ¶ added in v3.16.0
RemoveThrottleConfig holds lists of all topics and brokers to remove throttles from.
type ReplicaAssignment ¶ added in v3.7.0
type ReplicaAssignment [][]int32
ReplicaAssignment is a [][]int32 of partition assignments. The outer slice index maps to the partition ID (ie index position 3 describes partition 3 for the reference topic), the inner slice is an []int32 of broker assignments.
type ResourceConfigs ¶ added in v3.16.0
ResourceConfigs is a map of resource name to a map of configuration name and configuration value Example: map["my_topic"]map["retention.ms"] = "4000000"
func (ResourceConfigs) AddConfig ¶ added in v3.16.0
func (rc ResourceConfigs) AddConfig(name, key, value string) error
AddConfig takes a resource name and populates the config key to the specified value.
func (ResourceConfigs) AddConfigEntry ¶ added in v3.16.0
func (rc ResourceConfigs) AddConfigEntry(name string, config kafka.ConfigEntryResult) error
AddConfigEntry takes a resource name (ie a broker ID or topic name) and a kafka.ConfigEntryResult. It populates the kafka.ConfigEntryResult in the ResourceConfigs keyed by the provided resource name.
type SetThrottleConfig ¶ added in v3.16.0
type SetThrottleConfig struct { // Topics is a list of all topics that require throttled replica configs. Topics []string // Brokers is a mapping of broker ID to BrokerThrottleConfig. Brokers map[int]BrokerThrottleConfig }
SetThrottleConfig holds SetThrottle configs.
type TopicState ¶ added in v3.17.0
type TopicState struct { Name string Partitions int32 ReplicationFactor int32 PartitionStates map[int]PartitionState }
TopicState describes the current state of a topic.
func NewTopicState ¶ added in v3.17.0
func NewTopicState(name string) TopicState
NewTopicState initializes a TopicState.
type TopicStates ¶ added in v3.17.0
type TopicStates map[string]TopicState
TopicStates is a map of topic names to TopicState.
func NewTopicStates ¶ added in v3.17.0
func NewTopicStates() TopicStates
NewTopicStates initializes a TopicStates.
func TopicStatesFromMetadata ¶ added in v3.18.0
func TopicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error)