Documentation ¶
Index ¶
- Constants
- Variables
- func ConsumeMessage(conn *KafkaConnection, topic string, group string, verbose bool) error
- func ConsumerLag(conn *KafkaConnection, groups []string) map[string]*ConsumerGroupOffset
- func CreateAcl(conn *KafkaConnection, resource_name string, ...) error
- func CreateTopic(conn *KafkaConnection, topic string, numPartitions int32, ...) error
- func DeleteAcl(conn *KafkaConnection, resource_name string, ...) error
- func DeleteConsumerGroup(conn *KafkaConnection, group string) error
- func DeleteTopic(conn *KafkaConnection, topic string) error
- func DescribeBrokerConfig(conn *KafkaConnection, brokerId string) (configs []sarama.ConfigEntry)
- func DescribeConsumerGroups(conn *KafkaConnection, groups []string) []*sarama.GroupDescription
- func DescribeTopicConfig(conn *KafkaConnection, topic string) (configs []sarama.ConfigEntry)
- func DescribeTopics(conn *KafkaConnection, topics []string) []*sarama.TopicMetadata
- func GetBrokerProp(conn *KafkaConnection, brokerId, key string) (configs []sarama.ConfigEntry)
- func GetConfigBroker(conn *KafkaConnection, broker string)
- func GetTopicOffsets(conn *KafkaConnection, topic string) map[int32]int64
- func GetTopicProp(conn *KafkaConnection, topic, key string) (configs []sarama.ConfigEntry)
- func ListAllAcls(conn *KafkaConnection) []sarama.ResourceAcls
- func ListAllTopics(conn *KafkaConnection) map[string]sarama.TopicDetail
- func ListConsumerGroupDescriptions(conn *KafkaConnection) map[string]*sarama.GroupDescription
- func ListConsumerGroups(conn *KafkaConnection) map[string]string
- func ProduceMessage(conn *KafkaConnection, topic string) error
- func ResetProp(conn *KafkaConnection, topic string, props []string) error
- func SetProp(conn *KafkaConnection, topic string, props map[string]string) error
- type Broker
- type BrokerMetadata
- type BrokersById
- type Consumer
- type ConsumerGroupOffset
- type KafkaConnection
- type LogEntry
- type LogFile
- type PartitionOffsets
- type TopicAssignment
- type XDGSCRAMClient
Constants ¶
View Source
const ControllerBrokerLabel = "*"
Variables ¶
View Source
var SHA256 scram.HashGeneratorFcn = sha256.New
View Source
var SHA512 scram.HashGeneratorFcn = sha512.New
Functions ¶
func ConsumeMessage ¶
func ConsumeMessage(conn *KafkaConnection, topic string, group string, verbose bool) error
func ConsumerLag ¶
func ConsumerLag(conn *KafkaConnection, groups []string) map[string]*ConsumerGroupOffset
func CreateAcl ¶ added in v0.1.4
func CreateAcl(conn *KafkaConnection, resource_name string, resource_type sarama.AclResourceType, principal string, host string, operation sarama.AclOperation, permission_type sarama.AclPermissionType) error
func CreateTopic ¶
func DeleteAcl ¶ added in v0.1.4
func DeleteAcl(conn *KafkaConnection, resource_name string, resource_type sarama.AclResourceType, principal string, host string, operation sarama.AclOperation, permission_type sarama.AclPermissionType) error
func DeleteConsumerGroup ¶
func DeleteConsumerGroup(conn *KafkaConnection, group string) error
func DeleteTopic ¶
func DeleteTopic(conn *KafkaConnection, topic string) error
func DescribeBrokerConfig ¶
func DescribeBrokerConfig(conn *KafkaConnection, brokerId string) (configs []sarama.ConfigEntry)
func DescribeConsumerGroups ¶
func DescribeConsumerGroups(conn *KafkaConnection, groups []string) []*sarama.GroupDescription
func DescribeTopicConfig ¶
func DescribeTopicConfig(conn *KafkaConnection, topic string) (configs []sarama.ConfigEntry)
func DescribeTopics ¶
func DescribeTopics(conn *KafkaConnection, topics []string) []*sarama.TopicMetadata
func GetBrokerProp ¶
func GetBrokerProp(conn *KafkaConnection, brokerId, key string) (configs []sarama.ConfigEntry)
func GetConfigBroker ¶
func GetConfigBroker(conn *KafkaConnection, broker string)
func GetTopicOffsets ¶
func GetTopicOffsets(conn *KafkaConnection, topic string) map[int32]int64
func GetTopicProp ¶
func GetTopicProp(conn *KafkaConnection, topic, key string) (configs []sarama.ConfigEntry)
func ListAllAcls ¶ added in v0.1.4
func ListAllAcls(conn *KafkaConnection) []sarama.ResourceAcls
func ListAllTopics ¶
func ListAllTopics(conn *KafkaConnection) map[string]sarama.TopicDetail
func ListConsumerGroupDescriptions ¶
func ListConsumerGroupDescriptions(conn *KafkaConnection) map[string]*sarama.GroupDescription
func ListConsumerGroups ¶
func ListConsumerGroups(conn *KafkaConnection) map[string]string
func ProduceMessage ¶
func ProduceMessage(conn *KafkaConnection, topic string) error
Types ¶
type Broker ¶
func (*Broker) MarkedHostName ¶
type BrokerMetadata ¶
func DescribeBroker ¶
func DescribeBroker(conn *KafkaConnection, idOrAddr string) *BrokerMetadata
type BrokersById ¶
type BrokersById []*Broker
func GetBrokers ¶
func GetBrokers(conn *KafkaConnection) BrokersById
func (BrokersById) Len ¶
func (b BrokersById) Len() int
func (BrokersById) Less ¶
func (b BrokersById) Less(i, j int) bool
func (BrokersById) Swap ¶
func (b BrokersById) Swap(i, j int)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) ConsumeClaim ¶
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
type ConsumerGroupOffset ¶
type ConsumerGroupOffset struct { Id string Topics map[string]*TopicAssignment }
func MakeConsumerGroupOffset ¶
func MakeConsumerGroupOffset(groupId string) *ConsumerGroupOffset
func (*ConsumerGroupOffset) AddTopic ¶
func (c *ConsumerGroupOffset) AddTopic(name string) *TopicAssignment
type KafkaConnection ¶
type KafkaConnection struct { Client sarama.Client Admin sarama.ClusterAdmin Config *configuration.Configuration Context *configuration.Context SchemaRegistryClient *srclient.SchemaRegistryClient }
func MakeConnection ¶
func MakeConnection(config *configuration.Configuration) *KafkaConnection
func MakeConnectionContext ¶
func MakeConnectionContext(config *configuration.Configuration, context *configuration.Context) (*KafkaConnection, error)
func (*KafkaConnection) Close ¶
func (k *KafkaConnection) Close()
func (*KafkaConnection) Connect ¶
func (k *KafkaConnection) Connect() error
type LogFile ¶
type LogFile struct { Path string Entries aggregatedTopicSize }
func (*LogFile) SortByPermanentSize ¶
type PartitionOffsets ¶
type TopicAssignment ¶
type TopicAssignment struct {
Partitions map[int32]PartitionOffsets
}
func (*TopicAssignment) AddOffset ¶
func (m *TopicAssignment) AddOffset(partition int32, consumerOffset int64, partitionOffset int64) *PartitionOffsets
func (*TopicAssignment) GetLagPartition ¶
func (m *TopicAssignment) GetLagPartition(partitionNo int32) int64
func (*TopicAssignment) GetLagTopicLag ¶
func (m *TopicAssignment) GetLagTopicLag() int64
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.