kafka

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2023 License: AGPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const ControllerBrokerLabel = "*"

Variables

Functions

func ConsumeMessage

func ConsumeMessage(conn *KafkaConnection, topic string, group string, verbose bool) error

func ConsumerLag

func ConsumerLag(conn *KafkaConnection, groups []string) map[string]*ConsumerGroupOffset

func CreateAcl added in v0.1.4

func CreateAcl(conn *KafkaConnection, resource_name string, resource_type sarama.AclResourceType, principal string, host string, operation sarama.AclOperation, permission_type sarama.AclPermissionType) error

func CreateTopic

func CreateTopic(conn *KafkaConnection, topic string, numPartitions int32, replicationFactor int16, configs map[string]*string) error

func DeleteAcl added in v0.1.4

func DeleteAcl(conn *KafkaConnection, resource_name string, resource_type sarama.AclResourceType, principal string, host string, operation sarama.AclOperation, permission_type sarama.AclPermissionType) error

func DeleteConsumerGroup

func DeleteConsumerGroup(conn *KafkaConnection, group string) error

func DeleteTopic

func DeleteTopic(conn *KafkaConnection, topic string) error

func DescribeBrokerConfig

func DescribeBrokerConfig(conn *KafkaConnection, brokerId string) (configs []sarama.ConfigEntry)

func DescribeConsumerGroups

func DescribeConsumerGroups(conn *KafkaConnection, groups []string) []*sarama.GroupDescription

func DescribeTopicConfig

func DescribeTopicConfig(conn *KafkaConnection, topic string) (configs []sarama.ConfigEntry)

func DescribeTopics

func DescribeTopics(conn *KafkaConnection, topics []string) []*sarama.TopicMetadata

func GetBrokerProp

func GetBrokerProp(conn *KafkaConnection, brokerId, key string) (configs []sarama.ConfigEntry)

func GetConfigBroker

func GetConfigBroker(conn *KafkaConnection, broker string)

func GetTopicOffsets

func GetTopicOffsets(conn *KafkaConnection, topic string) map[int32]int64

func GetTopicProp

func GetTopicProp(conn *KafkaConnection, topic, key string) (configs []sarama.ConfigEntry)

func ListAllAcls added in v0.1.4

func ListAllAcls(conn *KafkaConnection) []sarama.ResourceAcls

func ListAllTopics

func ListAllTopics(conn *KafkaConnection) map[string]sarama.TopicDetail

func ListConsumerGroupDescriptions

func ListConsumerGroupDescriptions(conn *KafkaConnection) map[string]*sarama.GroupDescription

func ListConsumerGroups

func ListConsumerGroups(conn *KafkaConnection) map[string]string

func ProduceMessage

func ProduceMessage(conn *KafkaConnection, topic string) error

func ResetProp

func ResetProp(conn *KafkaConnection, topic string, props []string) error

func SetProp

func SetProp(conn *KafkaConnection, topic string, props map[string]string) error

Types

type Broker

type Broker struct {
	Address      string
	Id           int32
	Host         string
	IsController bool
	*sarama.Broker
}

func NewBroker

func NewBroker(broker *sarama.Broker, controllerId int32) *Broker

func (*Broker) MarkedHostName

func (b *Broker) MarkedHostName() string

func (*Broker) String

func (b *Broker) String() string

type BrokerMetadata

type BrokerMetadata struct {
	Details        *Broker
	ConsumerGroups []string
	Logs           []*LogFile
}

func DescribeBroker

func DescribeBroker(conn *KafkaConnection, idOrAddr string) *BrokerMetadata

type BrokersById

type BrokersById []*Broker

func GetBrokers

func GetBrokers(conn *KafkaConnection) BrokersById

func (BrokersById) Len

func (b BrokersById) Len() int

func (BrokersById) Less

func (b BrokersById) Less(i, j int) bool

func (BrokersById) Swap

func (b BrokersById) Swap(i, j int)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Cleanup

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error

func (*Consumer) ConsumeClaim

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*Consumer) Setup

func (consumer *Consumer) Setup(sess sarama.ConsumerGroupSession) error

type ConsumerGroupOffset

type ConsumerGroupOffset struct {
	Id     string
	Topics map[string]*TopicAssignment
}

func MakeConsumerGroupOffset

func MakeConsumerGroupOffset(groupId string) *ConsumerGroupOffset

func (*ConsumerGroupOffset) AddTopic

func (c *ConsumerGroupOffset) AddTopic(name string) *TopicAssignment

type KafkaConnection

type KafkaConnection struct {
	Client               sarama.Client
	Admin                sarama.ClusterAdmin
	Config               *configuration.Configuration
	Context              *configuration.Context
	SchemaRegistryClient *srclient.SchemaRegistryClient
}

func MakeConnection

func MakeConnection(config *configuration.Configuration) *KafkaConnection

func MakeConnectionContext

func MakeConnectionContext(config *configuration.Configuration, context *configuration.Context) (*KafkaConnection, error)

func (*KafkaConnection) Close

func (k *KafkaConnection) Close()

func (*KafkaConnection) Connect

func (k *KafkaConnection) Connect() error

type LogEntry

type LogEntry struct {
	Topic     string
	Permanent uint64
	Temporary uint64
}

type LogFile

type LogFile struct {
	Path    string
	Entries aggregatedTopicSize
}

func (*LogFile) SortByPermanentSize

func (l *LogFile) SortByPermanentSize() []*LogEntry

type PartitionOffsets

type PartitionOffsets struct {
	Current int64
	Max     int64
}

type TopicAssignment

type TopicAssignment struct {
	Partitions map[int32]PartitionOffsets
}

func (*TopicAssignment) AddOffset

func (m *TopicAssignment) AddOffset(partition int32, consumerOffset int64, partitionOffset int64) *PartitionOffsets

func (*TopicAssignment) GetLagPartition

func (m *TopicAssignment) GetLagPartition(partitionNo int32) int64

func (*TopicAssignment) GetLagTopicLag

func (m *TopicAssignment) GetLagTopicLag() int64

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL