kafkav2

package
v1.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PartitionerTypeRandom     = "random"
	PartitionerTypeRoundRobin = "roundrobin"
	PartitionerTypeRoundHash  = "hash"
)

Variables

Functions

This section is empty.

Types

type Authentication

type Authentication struct {
	// PlainText authentication
	PlainText *PlainTextConfig
	// SASL authentication
	SASL *SaslConfig
	// TLS authentication
	TLS *tlscommon.TLSConfig
	// Kerberos authentication
	Kerberos *KerberosConfig
}

func (*Authentication) ConfigureAuthentication

func (config *Authentication) ConfigureAuthentication(saramaConfig *sarama.Config) error

type FlusherFunc

type FlusherFunc func(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error

type FlusherKafka

type FlusherKafka struct {

	// The list of kafka brokers
	Brokers []string
	// The name of the kafka topic
	Topic string
	// Kafka protocol version
	Version Version
	// The number of seconds to wait for responses from the Kafka brokers before timing out.
	// The default is 30 (seconds).
	Timeout time.Duration

	// Authentication using SASL/PLAIN
	Authentication Authentication
	// Kafka output broker event partitioning strategy.
	// Must be one of random, roundrobin, or hash. By default, the random partitioner is used
	PartitionerType string
	// Kafka metadata update settings.
	Metadata metaConfig
	// The keep-alive period for an active network connection.
	// If 0s, keep-alives are disabled. The default is 0 seconds.
	KeepAlive time.Duration
	// The maximum number of messages the producer will send in a single
	MaxMessageBytes *int
	// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
	//   0 -> NoResponse.  doesn't send any response
	//   1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
	//   -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
	RequiredACKs *int
	// The maximum duration a broker will wait for number of required ACKs.
	// The default is 10s.
	BrokerTimeout time.Duration
	// Compression Codec used to produce messages
	// The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd'
	Compression string
	// Sets the compression level used by gzip. Setting this value to 0 disables compression.
	// The compression level must be in the range of 1 (best speed) to 9 (best compression)
	// The default value is 4.
	CompressionLevel int

	// The maximum number of events to bulk in a single Kafka request. The default is 2048.
	BulkMaxSize int

	// Duration to wait before sending bulk Kafka request. 0 is no delay. The default is 0.
	BulkFlushFrequency time.Duration

	MaxRetries int
	// A header is a key-value pair, and multiple headers can be included with the same key.
	// Only string values are supported
	Headers []header

	Backoff backoffConfig
	// Per Kafka broker number of messages buffered in output pipeline. The default is 256
	ChanBufferSize int
	// ilogtail data convert config
	Convert convertConfig

	HashKeys []string
	HashOnce bool
	ClientID string
	// contains filtered or unexported fields
}

func NewFlusherKafka

func NewFlusherKafka() *FlusherKafka

NewFlusherKafka Kafka flusher default config

func (*FlusherKafka) Description

func (k *FlusherKafka) Description() string

func (*FlusherKafka) Flush

func (k *FlusherKafka) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error

func (*FlusherKafka) HashFlush

func (k *FlusherKafka) HashFlush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error

func (*FlusherKafka) Init

func (k *FlusherKafka) Init(context pipeline.Context) error

func (*FlusherKafka) IsReady

func (k *FlusherKafka) IsReady(projectName string, logstoreName string, logstoreKey int64) bool

IsReady is ready to flush

func (*FlusherKafka) NormalFlush

func (k *FlusherKafka) NormalFlush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error

func (*FlusherKafka) SetUrgent

func (*FlusherKafka) SetUrgent(flag bool)

func (*FlusherKafka) Stop

func (k *FlusherKafka) Stop() error

Stop ...

func (*FlusherKafka) Validate

func (k *FlusherKafka) Validate() error

type KerberosConfig added in v1.4.0

type KerberosConfig struct {
	ServiceName string
	Realm       string
	UseKeyTab   bool
	Username    string
	Password    string
	ConfigPath  string
	KeyTabPath  string
}

KerberosConfig defines kereros configuration.

type PlainTextConfig

type PlainTextConfig struct {
	// The username for connecting to Kafka.
	Username string
	// The password for connecting to Kafka.
	Password string
}

func (*PlainTextConfig) ConfigurePlaintext

func (plainTextConfig *PlainTextConfig) ConfigurePlaintext(saramaConfig *sarama.Config) error

type SaslConfig

type SaslConfig struct {
	// SASL Mechanism to be used, possible values are: (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512)
	SaslMechanism string
	// The username for connecting to Kafka.
	Username string
	// The password for connecting to Kafka.
	Password string
}

func (*SaslConfig) ConfigureSasl

func (saslConfig *SaslConfig) ConfigureSasl(saramaConfig *sarama.Config) error

func (*SaslConfig) Validate

func (saslConfig *SaslConfig) Validate() error

type Version

type Version string

Version is a kafka version

func (Version) Get

func (v Version) Get() (sarama.KafkaVersion, bool)

Get a sarama kafka version

func (*Version) Validate

func (v *Version) Validate() error

Validate that a kafka version is among the possible options

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL