Documentation ¶
Index ¶
- func NewSaramaConfig(cfg *Config) (*sarama.Config, error)
- type Config
- type DirectEmbedding
- type ListMessageRequest
- type ListMessageResponse
- type LogDirResponse
- type SASLConfig
- type SASLGSSAPIConfig
- type Service
- func (s *Service) DescribeCluster() (*sarama.MetadataResponse, error)
- func (s *Service) DescribeConsumerGroups(ctx context.Context, groups []string) (map[int32]*sarama.DescribeGroupsResponse, error)
- func (s *Service) DescribeLogDirs() map[int32]*LogDirResponse
- func (s *Service) DescribeTopicsConfigs(topicNames []string, configNames []string) (*sarama.DescribeConfigsResponse, error)
- func (s *Service) HighWaterMarks(topicPartitions map[string][]int32) (map[string]map[int32]int64, error)
- func (s *Service) IsHealthy() error
- func (s *Service) ListConsumerGroupOffsets(group string) (*sarama.OffsetFetchResponse, error)
- func (s *Service) ListConsumerGroupOffsetsBulk(ctx context.Context, groups []string) (map[string]*sarama.OffsetFetchResponse, error)
- func (s *Service) ListConsumerGroups(ctx context.Context) ([]string, error)
- func (s *Service) ListMessages(ctx context.Context, req ListMessageRequest) (*ListMessageResponse, error)
- func (s *Service) ListPartitions(topicName string) ([]int32, error)
- func (s *Service) ListTopics() ([]*sarama.TopicMetadata, error)
- func (s *Service) RegisterMetrics()
- func (s *Service) Start()
- func (s *Service) WaterMarks(topic string, partitionIDs []int32) (map[int32]*WaterMark, error)
- type TLSConfig
- type TopicMessage
- type WaterMark
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { // General Brokers []string `yaml:"brokers"` ClientID string `yaml:"clientId"` ClusterVersion string `yaml:"clusterVersion"` TLS TLSConfig `yaml:"tls"` SASL SASLConfig `yaml:"sasl"` }
Config required for opening a connection to Kafka
func (*Config) RegisterFlags ¶
RegisterFlags registers all nested config flags.
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
type DirectEmbedding ¶
type DirectEmbedding struct { Value []byte ValueType valueType }
DirectEmbedding consists of a byte array that will be used as-is without any conversion
func (*DirectEmbedding) MarshalJSON ¶
func (d *DirectEmbedding) MarshalJSON() ([]byte, error)
MarshalJSON implements the 'Marshaller' interface for DirectEmbedding
type ListMessageRequest ¶
type ListMessageRequest struct { TopicName string PartitionID int32 // -1 for all partitions StartOffset int64 // -1 for newest, -2 for oldest offset MessageCount uint16 }
ListMessageRequest carries all filter, sort and cancellation options for fetching messages from Kafka
type ListMessageResponse ¶
type ListMessageResponse struct { ElapsedMs float64 `json:"elapsedMs"` FetchedMessages int `json:"fetchedMessages"` IsCancelled bool `json:"isCancelled"` Messages []*TopicMessage `json:"messages"` }
ListMessageResponse returns the requested kafka messages along with some metadata about the operation
type LogDirResponse ¶
type LogDirResponse struct { *sarama.DescribeLogDirsResponse Err error }
LogDirResponse can have an error (if the broker failed to return data) or the actual response
type SASLConfig ¶
type SASLConfig struct { Enabled bool `yaml:"enabled"` UseHandshake bool `yaml:"useHandshake"` Username string `yaml:"username"` Password string `yaml:"password"` Mechanism string `yaml:"mechanism"` GSSAPIConfig SASLGSSAPIConfig `yaml:"gssapi"` }
func (*SASLConfig) RegisterFlags ¶
func (c *SASLConfig) RegisterFlags(f *flag.FlagSet)
func (*SASLConfig) SetDefaults ¶
func (c *SASLConfig) SetDefaults()
func (*SASLConfig) Validate ¶
func (c *SASLConfig) Validate() error
type SASLGSSAPIConfig ¶
type SASLGSSAPIConfig struct { AuthType string `yaml:"authType"` KeyTabPath string `yaml:"keyTabPath"` KerberosConfigPath string `yaml:"kerberosConfigPath"` ServiceName string `yaml:"serviceName"` Username string `yaml:"username"` Password string `yaml:"password"` Realm string `yaml:"realm"` }
func (*SASLGSSAPIConfig) RegisterFlags ¶
func (c *SASLGSSAPIConfig) RegisterFlags(f *flag.FlagSet)
type Service ¶
Service acts as interface to interact with the Kafka Cluster
func (*Service) DescribeCluster ¶
func (s *Service) DescribeCluster() (*sarama.MetadataResponse, error)
DescribeCluster returns some generic information about the brokers in the given cluster
func (*Service) DescribeConsumerGroups ¶
func (s *Service) DescribeConsumerGroups(ctx context.Context, groups []string) (map[int32]*sarama.DescribeGroupsResponse, error)
DescribeConsumerGroups fetches additional information from Kafka about one or more consumer groups. It returns a map where the coordinator BrokerID is the key.
func (*Service) DescribeLogDirs ¶
func (s *Service) DescribeLogDirs() map[int32]*LogDirResponse
DescribeLogDirs concurrently fetches LogDirs from all Brokers and returns them in a map where the BrokerID is the key. map[BrokerID]LogDirResponse
func (*Service) DescribeTopicsConfigs ¶
func (s *Service) DescribeTopicsConfigs(topicNames []string, configNames []string) (*sarama.DescribeConfigsResponse, error)
DescribeTopicsConfigs fetches all topic config options for the given set of topic names and config names. Use an empty array for configNames to fetch all configs.
func (*Service) HighWaterMarks ¶
func (s *Service) HighWaterMarks(topicPartitions map[string][]int32) (map[string]map[int32]int64, error)
HighWaterMarks returns a nested map of: topic -> partitionID -> high water mark offset of all available partitions
func (*Service) IsHealthy ¶
IsHealthy checks whether it can communicate with the Kafka cluster or not
func (*Service) ListConsumerGroupOffsets ¶
func (s *Service) ListConsumerGroupOffsets(group string) (*sarama.OffsetFetchResponse, error)
ListConsumerGroupOffsets returns the commited group offsets for a single group
func (*Service) ListConsumerGroupOffsetsBulk ¶
func (s *Service) ListConsumerGroupOffsetsBulk(ctx context.Context, groups []string) (map[string]*sarama.OffsetFetchResponse, error)
ListConsumerGroupOffsetsBulk returns a map which has the consumer group name as key
func (*Service) ListConsumerGroups ¶
ListConsumerGroups returns an array of consumer group ids
func (*Service) ListMessages ¶
func (s *Service) ListMessages(ctx context.Context, req ListMessageRequest) (*ListMessageResponse, error)
ListMessages fetches one or more kafka messages and returns them by spinning one partition consumer (which runs in it's own goroutine) for each partition and funneling all the data to eventually return it. The second return parameter is a bool which indicates whether the requested topic exists. TODO: refactor to owl and add topic blacklisting
func (*Service) ListPartitions ¶
ListPartitions returns the partitionIDs for a given topic
func (*Service) ListTopics ¶
func (s *Service) ListTopics() ([]*sarama.TopicMetadata, error)
ListTopics returns a List of all topics in a kafka cluster. Each topic entry contains details like ReplicationFactor, Cleanup Policy
func (*Service) RegisterMetrics ¶
func (s *Service) RegisterMetrics()
RegisterMetrics periodically updates all sarama/client Kafka metrics and exposes them on the default prometheus registry.
type TLSConfig ¶
type TLSConfig struct { Enabled bool `yaml:"enabled"` CaFilepath string `yaml:"caFilepath"` CertFilepath string `yaml:"certFilepath"` KeyFilepath string `yaml:"keyFilepath"` Passphrase string `yaml:"passphrase"` InsecureSkipTLSVerify bool `yaml:"insecureSkipTlsVerify"` }
func (*TLSConfig) RegisterFlags ¶
type TopicMessage ¶
type TopicMessage struct { PartitionID int32 `json:"partitionID"` Offset int64 `json:"offset"` Timestamp int64 `json:"timestamp"` Key []byte `json:"key"` Value DirectEmbedding `json:"value"` ValueType string `json:"valueType"` Size int `json:"size"` IsValueNull bool `json:"isValueNull"` }
TopicMessage represents a single message from a given Kafka topic/partition
Source Files ¶
- config.go
- config_sasl.go
- config_sasl_gssapi.go
- config_tls.go
- connection_helper.go
- describe_cluster.go
- describe_consumer_groups.go
- describe_topic_configs.go
- health_check.go
- list_consumer_group_offsets.go
- list_consumer_groups.go
- list_messages.go
- list_partitions.go
- list_topics.go
- log_dir.go
- metrics.go
- partition_consumer.go
- scram_client.go
- service.go
- topic_config.go
- utils.go
- water_mark.go