Documentation ¶
Index ¶
- Constants
- Variables
- type Authentication
- type FlusherFunc
- type FlusherKafka
- func (k *FlusherKafka) Description() string
- func (k *FlusherKafka) Flush(projectName string, logstoreName string, configName string, ...) error
- func (k *FlusherKafka) HashFlush(projectName string, logstoreName string, configName string, ...) error
- func (k *FlusherKafka) Init(context pipeline.Context) error
- func (k *FlusherKafka) IsReady(projectName string, logstoreName string, logstoreKey int64) bool
- func (k *FlusherKafka) NormalFlush(projectName string, logstoreName string, configName string, ...) error
- func (*FlusherKafka) SetUrgent(flag bool)
- func (k *FlusherKafka) Stop() error
- func (k *FlusherKafka) Validate() error
- type KerberosConfig
- type PlainTextConfig
- type SaslConfig
- type Version
- type XDGSCRAMClient
Constants ¶
View Source
const ( PartitionerTypeRandom = "random" PartitionerTypeRoundRobin = "roundrobin" PartitionerTypeRoundHash = "hash" )
Variables ¶
View Source
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
This section is empty.
Types ¶
type Authentication ¶
type Authentication struct { // PlainText authentication PlainText *PlainTextConfig // SASL authentication SASL *SaslConfig // TLS authentication TLS *tlscommon.TLSConfig // Kerberos authentication Kerberos *KerberosConfig }
func (*Authentication) ConfigureAuthentication ¶
func (config *Authentication) ConfigureAuthentication(saramaConfig *sarama.Config) error
type FlusherFunc ¶
type FlusherKafka ¶
type FlusherKafka struct { // The list of kafka brokers Brokers []string // The name of the kafka topic Topic string // Kafka protocol version Version Version // The number of seconds to wait for responses from the Kafka brokers before timing out. // The default is 30 (seconds). Timeout time.Duration // Authentication using SASL/PLAIN Authentication Authentication // Kafka output broker event partitioning strategy. // Must be one of random, roundrobin, or hash. By default the random partitioner is used PartitionerType string // Kafka metadata update settings. Metadata metaConfig // The keep-alive period for an active network connection. // If 0s, keep-alives are disabled. The default is 0 seconds. KeepAlive time.Duration // The maximum number of messages the producer will send in a single MaxMessageBytes *int // RequiredAcks Number of acknowledgements required to assume that a message has been sent. // 0 -> NoResponse. doesn't send any response // 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default ) // -1 -> WaitForAll. waits for all in-sync replicas to commit before responding. RequiredACKs *int // The maximum duration a broker will wait for number of required ACKs. // The default is 10s. BrokerTimeout time.Duration // Compression Codec used to produce messages // The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd' Compression string // Sets the compression level used by gzip. Setting this value to 0 disables compression. // The compression level must be in the range of 1 (best speed) to 9 (best compression) // The default value is 4. CompressionLevel int // The maximum number of events to bulk in a single Kafka request. The default is 2048. BulkMaxSize int // Duration to wait before sending bulk Kafka request. 0 is no delay. The default is 0. BulkFlushFrequency time.Duration MaxRetries int // A header is a key-value pair, and multiple headers can be included with the same key. // Only string values are supported Headers []header Backoff backoffConfig // Per Kafka broker number of messages buffered in output pipeline. The default is 256 ChanBufferSize int // ilogtail data convert config Convert convertConfig HashKeys []string HashOnce bool ClientID string // contains filtered or unexported fields }
func NewFlusherKafka ¶
func NewFlusherKafka() *FlusherKafka
NewFlusherKafka Kafka flusher default config
func (*FlusherKafka) Description ¶
func (k *FlusherKafka) Description() string
func (*FlusherKafka) IsReady ¶
func (k *FlusherKafka) IsReady(projectName string, logstoreName string, logstoreKey int64) bool
IsReady is ready to flush
func (*FlusherKafka) NormalFlush ¶
func (*FlusherKafka) SetUrgent ¶
func (*FlusherKafka) SetUrgent(flag bool)
func (*FlusherKafka) Validate ¶
func (k *FlusherKafka) Validate() error
type KerberosConfig ¶ added in v1.4.0
type KerberosConfig struct { ServiceName string Realm string UseKeyTab bool Username string Password string ConfigPath string KeyTabPath string }
KerberosConfig defines kereros configuration.
type PlainTextConfig ¶
type PlainTextConfig struct { // The username for connecting to Kafka. Username string // The password for connecting to Kafka. Password string }
func (*PlainTextConfig) ConfigurePlaintext ¶
func (plainTextConfig *PlainTextConfig) ConfigurePlaintext(saramaConfig *sarama.Config) error
type SaslConfig ¶
type SaslConfig struct { // SASL Mechanism to be used, possible values are: (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512) SaslMechanism string // The username for connecting to Kafka. Username string // The password for connecting to Kafka. Password string }
func (*SaslConfig) ConfigureSasl ¶
func (saslConfig *SaslConfig) ConfigureSasl(saramaConfig *sarama.Config) error
func (*SaslConfig) Validate ¶
func (saslConfig *SaslConfig) Validate() error
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.