Back to

Package kafka

Latest Go to latest

The latest major version is .

Published: Apr 5, 2020 | License: Apache-2.0 | Module:


func NewSaramaConfig

func NewSaramaConfig(cfg *Config) (*sarama.Config, error)

NewSaramaConfig creates a new sarama config which can be used for the admin client

type Config

type Config struct {
	// General
	Brokers        []string `yaml:"brokers"`
	ClientID       string   `yaml:"clientId"`
	ClusterVersion string   `yaml:"clusterVersion"`

	TLS  TLSConfig  `yaml:"tls"`
	SASL SASLConfig `yaml:"sasl"`

Config required for opening a connection to Kafka

func (*Config) RegisterFlags

func (c *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers all nested config flags.

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate() error

type DirectEmbedding

type DirectEmbedding struct {
	Value     []byte
	ValueType valueType

DirectEmbedding consists of a byte array that will be used as-is without any conversion

func (*DirectEmbedding) MarshalJSON

func (d *DirectEmbedding) MarshalJSON() ([]byte, error)

MarshalJSON implements the 'Marshaller' interface for DirectEmbedding

type ListMessageRequest

type ListMessageRequest struct {
	TopicName    string
	PartitionID  int32 // -1 for all partitions
	StartOffset  int64 // -1 for newest, -2 for oldest offset
	MessageCount uint16

ListMessageRequest carries all filter, sort and cancellation options for fetching messages from Kafka

type ListMessageResponse

type ListMessageResponse struct {
	ElapsedMs       float64         `json:"elapsedMs"`
	FetchedMessages int             `json:"fetchedMessages"`
	IsCancelled     bool            `json:"isCancelled"`
	Messages        []*TopicMessage `json:"messages"`

ListMessageResponse returns the requested kafka messages along with some metadata about the operation

type LogDirResponse

type LogDirResponse struct {
	Err error

LogDirResponse can have an error (if the broker failed to return data) or the actual response

type SASLConfig

type SASLConfig struct {
	Enabled      bool             `yaml:"enabled"`
	UseHandshake bool             `yaml:"useHandshake"`
	Username     string           `yaml:"username"`
	Password     string           `yaml:"password"`
	Mechanism    string           `yaml:"mechanism"`
	GSSAPIConfig SASLGSSAPIConfig `yaml:"gssapi"`

func (*SASLConfig) RegisterFlags

func (c *SASLConfig) RegisterFlags(f *flag.FlagSet)

func (*SASLConfig) SetDefaults

func (c *SASLConfig) SetDefaults()

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error


type SASLGSSAPIConfig struct {
	AuthType           string `yaml:"authType"`
	KeyTabPath         string `yaml:"keyTabPath"`
	KerberosConfigPath string `yaml:"kerberosConfigPath"`
	ServiceName        string `yaml:"serviceName"`
	Username           string `yaml:"username"`
	Password           string `yaml:"password"`
	Realm              string `yaml:"realm"`

func (*SASLGSSAPIConfig) RegisterFlags

func (c *SASLGSSAPIConfig) RegisterFlags(f *flag.FlagSet)

type Service

type Service struct {
	MetricsNamespace string
	Client           sarama.Client
	Logger           *zap.Logger

Service acts as interface to interact with the Kafka Cluster

func (*Service) DescribeCluster

func (s *Service) DescribeCluster() (*sarama.MetadataResponse, error)

DescribeCluster returns some generic information about the brokers in the given cluster

func (*Service) DescribeConsumerGroups

func (s *Service) DescribeConsumerGroups(ctx context.Context, groups []string) (map[int32]*sarama.DescribeGroupsResponse, error)

DescribeConsumerGroups fetches additional information from Kafka about one or more consumer groups. It returns a map where the coordinator BrokerID is the key.

func (*Service) DescribeLogDirs

func (s *Service) DescribeLogDirs() map[int32]*LogDirResponse

DescribeLogDirs concurrently fetches LogDirs from all Brokers and returns them in a map where the BrokerID is the key. map[BrokerID]LogDirResponse

func (*Service) DescribeTopicsConfigs

func (s *Service) DescribeTopicsConfigs(topicNames []string, configNames []string) (*sarama.DescribeConfigsResponse, error)

DescribeTopicsConfigs fetches all topic config options for the given set of topic names and config names. Use an empty array for configNames to fetch all configs.

func (*Service) HighWaterMarks

func (s *Service) HighWaterMarks(topicPartitions map[string][]int32) (map[string]map[int32]int64, error)

HighWaterMarks returns a nested map of: topic -> partitionID -> high water mark offset of all available partitions

func (*Service) IsHealthy

func (s *Service) IsHealthy() error

IsHealthy checks whether it can communicate with the Kafka cluster or not

func (*Service) ListConsumerGroupOffsets

func (s *Service) ListConsumerGroupOffsets(group string) (*sarama.OffsetFetchResponse, error)

ListConsumerGroupOffsets returns the commited group offsets for a single group

func (*Service) ListConsumerGroupOffsetsBulk

func (s *Service) ListConsumerGroupOffsetsBulk(ctx context.Context, groups []string) (map[string]*sarama.OffsetFetchResponse, error)

ListConsumerGroupOffsetsBulk returns a map which has the consumer group name as key

func (*Service) ListConsumerGroups

func (s *Service) ListConsumerGroups(ctx context.Context) ([]string, error)

ListConsumerGroups returns an array of consumer group ids

func (*Service) ListMessages

func (s *Service) ListMessages(ctx context.Context, req ListMessageRequest) (*ListMessageResponse, error)

ListMessages fetches one or more kafka messages and returns them by spinning one partition consumer (which runs in it's own goroutine) for each partition and funneling all the data to eventually return it. The second return parameter is a bool which indicates whether the requested topic exists. TODO: refactor to owl and add topic blacklisting

func (*Service) ListPartitions

func (s *Service) ListPartitions(topicName string) ([]int32, error)

ListPartitions returns the partitionIDs for a given topic

func (*Service) ListTopics

func (s *Service) ListTopics() ([]*sarama.TopicMetadata, error)

ListTopics returns a List of all topics in a kafka cluster. Each topic entry contains details like ReplicationFactor, Cleanup Policy

func (*Service) RegisterMetrics

func (s *Service) RegisterMetrics()

RegisterMetrics periodically updates all sarama/client Kafka metrics and exposes them on the default prometheus registry.

func (*Service) Start

func (s *Service) Start()

Start initializes the Kafka Service and takes care of stuff like KeepAlive

func (*Service) WaterMarks

func (s *Service) WaterMarks(topic string, partitionIDs []int32) (map[int32]*WaterMark, error)

WaterMarks returns a map of: partitionID -> *waterMark

type TLSConfig

type TLSConfig struct {
	Enabled               bool   `yaml:"enabled"`
	CaFilepath            string `yaml:"caFilepath"`
	CertFilepath          string `yaml:"certFilepath"`
	KeyFilepath           string `yaml:"keyFilepath"`
	Passphrase            string `yaml:"passphrase"`
	InsecureSkipTLSVerify bool   `yaml:"insecureSkipTlsVerify"`

func (*TLSConfig) RegisterFlags

func (c *TLSConfig) RegisterFlags(f *flag.FlagSet)

type TopicMessage

type TopicMessage struct {
	PartitionID int32  `json:"partitionID"`
	Offset      int64  `json:"offset"`
	Timestamp   int64  `json:"timestamp"`
	Key         []byte `json:"key"`

	Value     DirectEmbedding `json:"value"`
	ValueType string          `json:"valueType"`

	Size        int  `json:"size"`
	IsValueNull bool `json:"isValueNull"`

TopicMessage represents a single message from a given Kafka topic/partition

type WaterMark

type WaterMark struct {
	PartitionID int32
	Low         int64
	High        int64

WaterMark is a partitionID along with it's highest and lowest message index

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier