kafkaadmin

package
v3.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

README

GoDoc

Documentation

Overview

Package kafkaadmin provides Kafka administrative functionality.

Index

Constants

This section is empty.

Variables

View Source
var (

	// SecurityProtocolSet is the set of protocols supported to communicate with brokers
	SecurityProtocolSet = map[string]struct{}{"PLAINTEXT": empty, "SSL": empty, "SASL_PLAINTEXT": empty, "SASL_SSL": empty}
	// SASLMechanismSet is the set of mechanisms supported for client to broker authentication
	SASLMechanismSet = map[string]struct{}{"PLAIN": empty, "SCRAM-SHA-256": empty, "SCRAM-SHA-512": empty}
)
View Source
var (
	// ErrNoData is a generic error for no data available to be returned.
	ErrNoData = fmt.Errorf("no data returned")
)

Functions

func NewConsumer added in v3.5.0

func NewConsumer(cfg Config) (*kafka.Consumer, error)

Types

type BrokerThrottleConfig added in v3.16.0

type BrokerThrottleConfig struct {
	InboundLimitBytes  int
	OutboundLimitBytes int
}

BrokerThrottleConfig defines an inbound and outbound throttle rate in bytes to be applied to a broker.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements a KafkaAdmin.

func NewClientWithFactory

func NewClientWithFactory(cfg Config, factory FactoryFunc) (*Client, error)

NewClientWithFactory returns a new admin Client using a factory func for the kafkaAdminClient

func (Client) Close

func (c Client) Close()

Close closes the Client.

func (Client) CreateTopic

func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error

CreateTopic creates a topic.

func (Client) DeleteTopic added in v3.6.0

func (c Client) DeleteTopic(ctx context.Context, name string) error

DeleteTopic deletes a topic.

func (Client) DescribeTopics added in v3.17.0

func (c Client) DescribeTopics(ctx context.Context, topics []string) (TopicStates, error)

DescribeTopics takes a []string of topic names. Topic names can be name literals or optional regex. A TopicStates is returned for all matching topics.

func (Client) GetDynamicConfigs added in v3.16.0

func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)

GetDynamicConfigs takes a kafka resource type (ie topic, broker) and list of names and returns a ResourceConfigs for all dynamic configurations discovered for each resource by name.

func (Client) RemoveThrottle added in v3.16.0

func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) error

RemoveThrottle takes a RemoveThrottleConfig that includes an optionally specified list of brokers and topics to remove all throttle configurations from.

func (Client) SetThrottle added in v3.16.0

func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error

SetThrottle takes a SetThrottleConfig and sets the underlying throttle configs accordingly. A throttle is a combination of topic throttled replicas configs and broker inbound/outbound throttle configs.

type Config

type Config struct {
	// Required.
	BootstrapServers string
	// Misc.
	GroupId          string
	SSLCALocation    string
	SecurityProtocol string
	SASLMechanism    string
	SASLUsername     string
	SASLPassword     string
}

Config holds Client configuration parameters.

type CreateTopicConfig added in v3.7.0

type CreateTopicConfig struct {
	Name              string
	Partitions        int
	ReplicationFactor int
	Config            map[string]string
	ReplicaAssignment ReplicaAssignment
}

CreateTopicConfig holds CreateTopic parameters.

type ErrRemoveThrottle added in v3.16.0

type ErrRemoveThrottle struct{ Message string }

ErrRemoveThrottle is a generic error for RemoveThrottle.

func (ErrRemoveThrottle) Error added in v3.16.0

func (e ErrRemoveThrottle) Error() string

type ErrSetThrottle added in v3.16.0

type ErrSetThrottle struct{ Message string }

ErrSetThrottle is a generic error for SetThrottle.

func (ErrSetThrottle) Error added in v3.16.0

func (e ErrSetThrottle) Error() string

type ErrorFetchingMetadata added in v3.17.0

type ErrorFetchingMetadata struct{ Message string }

ErrorFetchingMetadata is an error encountered fetching Kafka cluster metadata.

func (ErrorFetchingMetadata) Error added in v3.17.0

func (e ErrorFetchingMetadata) Error() string

type FactoryFunc

type FactoryFunc func(conf *kafka.ConfigMap) (*kafka.AdminClient, error)

type KafkaAdmin added in v3.7.0

type KafkaAdmin interface {
	Close()
	// Topics.
	CreateTopic(context.Context, CreateTopicConfig) error
	DeleteTopic(context.Context, string) error
	DescribeTopics(context.Context, []string) (TopicStates, error)
	// Cluster.
	SetThrottle(context.Context, SetThrottleConfig) error
	RemoveThrottle(context.Context, RemoveThrottleConfig) error
	GetDynamicConfigs(context.Context, string, []string) (ResourceConfigs, error)
}

KafkaAdmin interface.

func NewClient

func NewClient(cfg Config) (KafkaAdmin, error)

NewClient returns a KafkaAdmin.

type PartitionState added in v3.17.0

type PartitionState struct {
	ID       int32
	Leader   int32
	Replicas []int32
	ISR      []int32
}

PartitionState describes the state of a partition.

type RemoveThrottleConfig added in v3.16.0

type RemoveThrottleConfig struct {
	Topics  []string
	Brokers []int
}

RemoveThrottleConfig holds lists of all topics and brokers to remove throttles from.

type ReplicaAssignment added in v3.7.0

type ReplicaAssignment [][]int32

ReplicaAssignment is a [][]int32 of partition assignments. The outer slice index maps to the partition ID (ie index position 3 describes partition 3 for the reference topic), the inner slice is an []int32 of broker assignments.

type ResourceConfigs added in v3.16.0

type ResourceConfigs map[string]map[string]string

ResourceConfigs is a map of resource name to a map of configuration name and configuration value Example: map["my_topic"]map["retention.ms"] = "4000000"

func (ResourceConfigs) AddConfig added in v3.16.0

func (rc ResourceConfigs) AddConfig(name, key, value string) error

AddConfig takes a resource name and populates the config key to the specified value.

func (ResourceConfigs) AddConfigEntry added in v3.16.0

func (rc ResourceConfigs) AddConfigEntry(name string, config kafka.ConfigEntryResult) error

AddConfigEntry takes a resource name (ie a broker ID or topic name) and a kafka.ConfigEntryResult. It populates the kafka.ConfigEntryResult in the ResourceConfigs keyed by the provided resource name.

type SetThrottleConfig added in v3.16.0

type SetThrottleConfig struct {
	// Topics is a list of all topics that require throttled replica configs.
	Topics []string
	// Brokers is a mapping of broker ID to BrokerThrottleConfig.
	Brokers map[int]BrokerThrottleConfig
}

SetThrottleConfig holds SetThrottle configs.

type TopicState added in v3.17.0

type TopicState struct {
	Name              string
	Partitions        int32
	ReplicationFactor int32
	PartitionStates   map[int]PartitionState
}

TopicState describes the current state of a topic.

func NewTopicState added in v3.17.0

func NewTopicState(name string) TopicState

NewTopicState initializes a TopicState.

type TopicStates added in v3.17.0

type TopicStates map[string]TopicState

TopicStates is a map of topic names to TopicState.

func NewTopicStates added in v3.17.0

func NewTopicStates() TopicStates

NewTopicStates initializes a TopicStates.

func TopicStatesFromMetadata added in v3.18.0

func TopicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL