kafka

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2021 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismPlain       = "PLAIN"
	SASLMechanismScramSHA256 = "SCRAM-SHA-256"
	SASLMechanismScramSHA512 = "SCRAM-SHA-512"
	SASLMechanismGSSAPI      = "GSSAPI"
	SASLMechanismOAuthBearer = "OAUTHBEARER"
)

Variables

This section is empty.

Functions

func NewKgoConfig

func NewKgoConfig(cfg Config, logger *zap.Logger, hooks kgo.Hook) ([]kgo.Opt, error)

NewKgoConfig creates a new Config for the Kafka Client as exposed by the franz-go library. If TLS certificates can't be read an error will be returned.

Types

type Config

type Config struct {
	// General
	Brokers  []string `koanf:"brokers"`
	ClientID string   `koanf:"clientId"`
	RackID   string   `koanf:"rackId"`

	TLS  TLSConfig  `koanf:"tls"`
	SASL SASLConfig `koanf:"sasl"`
}

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate() error

type KgoZapLogger

type KgoZapLogger struct {
	// contains filtered or unexported fields
}

func (KgoZapLogger) Level

func (k KgoZapLogger) Level() kgo.LogLevel

Level Implements kgo.Logger interface. It returns the log level to log at. We pin this to debug as the zap logger decides what to actually send to the output stream.

func (KgoZapLogger) Log

func (k KgoZapLogger) Log(level kgo.LogLevel, msg string, keyvals ...interface{})

Log implements kgo.Logger interface

type SASLConfig

type SASLConfig struct {
	Enabled   bool   `koanf:"enabled"`
	Username  string `koanf:"username"`
	Password  string `koanf:"password"`
	Mechanism string `koanf:"mechanism"`

	// SASL Mechanisms that require more configuration than username & password
	GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
}

SASLConfig for Kafka Client

func (*SASLConfig) SetDefaults

func (c *SASLConfig) SetDefaults()

SetDefaults for SASL Config

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error

Validate SASL config input

type SASLGSSAPIConfig

type SASLGSSAPIConfig struct {
	AuthType           string `koanf:"authType"`
	KeyTabPath         string `koanf:"keyTabPath"`
	KerberosConfigPath string `koanf:"kerberosConfigPath"`
	ServiceName        string `koanf:"serviceName"`
	Username           string `koanf:"username"`
	Password           string `koanf:"password"`
	Realm              string `koanf:"realm"`
}

SASLGSSAPIConfig represents the Kafka Kerberos config

type Service

type Service struct {
	Client *kgo.Client
	// contains filtered or unexported fields
}

func NewService

func NewService(cfg Config, logger *zap.Logger) (*Service, error)

func (*Service) TestConnection

func (s *Service) TestConnection(ctx context.Context) error

TestConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be returned if connecting fails.

type TLSConfig

type TLSConfig struct {
	Enabled               bool   `koanf:"enabled"`
	CaFilepath            string `koanf:"caFilepath"`
	CertFilepath          string `koanf:"certFilepath"`
	KeyFilepath           string `koanf:"keyFilepath"`
	Passphrase            string `koanf:"passphrase"`
	InsecureSkipTLSVerify bool   `koanf:"insecureSkipTlsVerify"`
}

TLSConfig to connect to Kafka via TLS

func (*TLSConfig) SetDefaults

func (c *TLSConfig) SetDefaults()

func (*TLSConfig) Validate

func (c *TLSConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL