kafkaclient

package
v0.25.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AclPatternTypeMapping

func AclPatternTypeMapping(patternType v1alpha1.KafkaPatternType) sarama.AclResourcePatternType

AclPatternTypeMapping maps patternType from v1alpha1.KafkaPatternType to sarama.AclResourcePatternType

Types

type CreateTopicOptions

type CreateTopicOptions struct {
	Name              string
	Partitions        int32
	ReplicationFactor int16
	Config            map[string]*string
}

CreateTopicOptions holds info about topic configuration

type KafkaClient

type KafkaClient interface {
	NumBrokers() int
	ListTopics() (map[string]sarama.TopicDetail, error)
	CreateTopic(*CreateTopicOptions) error
	EnsurePartitionCount(string, int32) (bool, error)
	EnsureTopicConfig(string, map[string]*string) error
	DeleteTopic(string, bool) error
	GetTopic(string) (*sarama.TopicDetail, error)
	DescribeTopic(string) (*sarama.TopicMetadata, error)
	CreateUserACLs(v1alpha1.KafkaAccessType, v1alpha1.KafkaPatternType, string, string) error
	ListUserACLs() ([]sarama.ResourceAcls, error)
	DeleteUserACLs(string, v1alpha1.KafkaPatternType) error

	Brokers() map[int32]string
	DescribeCluster() ([]*sarama.Broker, int32, error)

	// AllOfflineReplicas returns the list of unique offline replica (broker) ids
	AllOfflineReplicas() ([]int32, error)

	// OutOfSyncReplicas returns the list of unique out of sync replica (broker) ids
	OutOfSyncReplicas() ([]int32, error)

	AlterPerBrokerConfig(int32, map[string]*string, bool) error
	DescribePerBrokerConfig(int32, []string) ([]*sarama.ConfigEntry, error)

	AlterClusterWideConfig(map[string]*string, bool) error
	DescribeClusterWideConfig() ([]sarama.ConfigEntry, error)

	TopicMetaToStatus(meta *sarama.TopicMetadata) *v1alpha1.KafkaTopicStatus

	Open() error
	Close() error
}

KafkaClient is the exported interface for kafka operations

func New

func New(opts *KafkaConfig) KafkaClient

func NewFromCluster

func NewFromCluster(k8sclient client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)

NewFromCluster is a convenience wrapper around New() and ClusterConfig()

func NewMockFromCluster

func NewMockFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)

type KafkaConfig

type KafkaConfig struct {
	BrokerURI string
	UseSSL    bool
	TLSConfig *tls.Config

	OperationTimeout int64
}

KafkaConfig are the options to creating a new ClusterAdmin client

func ClusterConfig

func ClusterConfig(client client.Client, cluster *v1beta1.KafkaCluster) (*KafkaConfig, error)

ClusterConfig creates connection options from a KafkaCluster CR

type Provider

type Provider interface {
	NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
}

func NewDefaultProvider

func NewDefaultProvider() Provider

func NewMockProvider

func NewMockProvider() Provider

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL