kafka

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: MIT Imports: 6 Imported by: 2

Documentation

Index

Constants

View Source
const (
	OffsetEarliest             = "earliest"
	OffsetLatest               = "latest"
	ExponentialBackOffStrategy = "exponential"
	LinearBackOffStrategy      = "linear"
	FixedBackOffStrategy       = "fixed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffStrategyInterface added in v1.4.5

type BackoffStrategyInterface interface {
	ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool
	String() string
}

func GetBackoffStrategy added in v1.4.5

func GetBackoffStrategy(strategyName string) BackoffStrategyInterface

type Config

type Config struct {
	Brokers  []string         `yaml:"brokers"`
	Consumer ConsumerConfig   `yaml:"consumer"`
	Producer ProducerConfig   `yaml:"producer"`
	SASL     SASLConfig       `yaml:"sasl"`
	LogLevel logger.Level     `yaml:"logLevel"`
	Logger   logger.Interface `yaml:"-"`
	ClientID string           `yaml:"clientId"`

	// MetricPrefix is used for prometheus fq name prefix.
	// If not provided, default metric prefix value is `kafka_cronsumer`.
	// Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`.
	// So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and
	// `kafka_cronsumer_discarded_messages_total_current`.
	MetricPrefix string `yaml:"metricPrefix"`
}

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate()

type ConsumeFn

type ConsumeFn func(message Message) error

ConsumeFn function describes how to consume messages from specified topic

type ConsumerConfig

type ConsumerConfig struct {
	ClientID              string                   `yaml:"clientId"`
	GroupID               string                   `yaml:"groupId"`
	Topic                 string                   `yaml:"topic"`
	DeadLetterTopic       string                   `yaml:"deadLetterTopic"`
	MinBytes              int                      `yaml:"minBytes"`
	MaxBytes              int                      `yaml:"maxBytes"`
	MaxRetry              int                      `yaml:"maxRetry"`
	MaxWait               time.Duration            `yaml:"maxWait"`
	CommitInterval        time.Duration            `yaml:"commitInterval"`
	HeartbeatInterval     time.Duration            `yaml:"heartbeatInterval"`
	SessionTimeout        time.Duration            `yaml:"sessionTimeout"`
	RebalanceTimeout      time.Duration            `yaml:"rebalanceTimeout"`
	StartOffset           Offset                   `yaml:"startOffset"`
	RetentionTime         time.Duration            `yaml:"retentionTime"`
	Concurrency           int                      `yaml:"concurrency"`
	Duration              time.Duration            `yaml:"duration"`
	Cron                  string                   `yaml:"cron"`
	BackOffStrategy       BackoffStrategyInterface `yaml:"backOffStrategy"`
	SkipMessageByHeaderFn SkipMessageByHeaderFn    `yaml:"skipMessageByHeaderFn"`
}

type Cronsumer

type Cronsumer interface {
	// Start starts the kafka consumer KafkaCronsumer with a new goroutine so its asynchronous operation (non-blocking)
	Start()

	// Run runs the kafka consumer KafkaCronsumer with the caller goroutine so its synchronous operation (blocking)
	Run()

	// Stop stops the cron and kafka KafkaCronsumer consumer
	Stop()

	// WithLogger for injecting custom log implementation
	WithLogger(logger logger.Interface)

	// Produce produces the message to kafka KafkaCronsumer producer. Offset and Time fields will be ignored in the message.
	Produce(message Message) error

	// ProduceBatch produces the list of messages to kafka KafkaCronsumer producer.
	ProduceBatch(messages []Message) error

	// GetMetricCollectors  for the purpose of making metric collectors available to other libraries
	GetMetricCollectors() []prometheus.Collector
}

type ExponentialBackoffStrategy added in v1.4.5

type ExponentialBackoffStrategy struct{}

func (*ExponentialBackoffStrategy) ShouldIncreaseRetryAttemptCount added in v1.4.5

func (s *ExponentialBackoffStrategy) ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool

func (*ExponentialBackoffStrategy) String added in v1.4.5

func (s *ExponentialBackoffStrategy) String() string

type FixedBackoffStrategy added in v1.4.5

type FixedBackoffStrategy struct{}

func (*FixedBackoffStrategy) ShouldIncreaseRetryAttemptCount added in v1.4.5

func (s *FixedBackoffStrategy) ShouldIncreaseRetryAttemptCount(_ int, _ int) bool

func (*FixedBackoffStrategy) String added in v1.4.5

func (s *FixedBackoffStrategy) String() string
type Header struct {
	Key   string
	Value []byte
}

type LinearBackoffStrategy added in v1.4.5

type LinearBackoffStrategy struct{}

func (*LinearBackoffStrategy) ShouldIncreaseRetryAttemptCount added in v1.4.5

func (s *LinearBackoffStrategy) ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool

func (*LinearBackoffStrategy) String added in v1.4.5

func (s *LinearBackoffStrategy) String() string

type Message

type Message struct {
	Topic         string
	Partition     int
	Offset        int64
	HighWaterMark int64
	Key           []byte
	Value         []byte
	Headers       []Header
	Time          time.Time
}

func (*Message) AddHeader added in v1.4.6

func (m *Message) AddHeader(header Header)

AddHeader works as a idempotent function

type MessageBuilder added in v0.6.2

type MessageBuilder struct {
	// contains filtered or unexported fields
}

func NewMessageBuilder added in v0.6.2

func NewMessageBuilder() *MessageBuilder

func (*MessageBuilder) Build added in v0.6.2

func (mb *MessageBuilder) Build() Message

func (*MessageBuilder) WithHeaders added in v0.6.2

func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder

func (*MessageBuilder) WithHighWatermark added in v0.6.2

func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder

func (*MessageBuilder) WithKey added in v0.6.2

func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder

func (*MessageBuilder) WithPartition added in v0.6.2

func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder

func (*MessageBuilder) WithTopic added in v0.6.2

func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder

func (*MessageBuilder) WithValue added in v0.6.2

func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder

type Offset

type Offset string

func ToStringOffset added in v0.6.4

func ToStringOffset(offset int64) Offset

func (Offset) Value

func (o Offset) Value() int64

type ProducerConfig

type ProducerConfig struct {
	BatchSize    int                `yaml:"batchSize"`
	BatchTimeout time.Duration      `yaml:"batchTimeout"`
	Balancer     segmentio.Balancer `yaml:"balancer"`
}

type SASLConfig

type SASLConfig struct {
	Enabled            bool   `yaml:"enabled"`
	AuthType           string `yaml:"authType"` // plain or scram
	Username           string `yaml:"username"`
	Password           string `yaml:"password"`
	RootCAPath         string `yaml:"rootCAPath"`
	IntermediateCAPath string `yaml:"intermediateCAPath"`
	Rack               string `yaml:"rack"`
}

type SkipMessageByHeaderFn added in v1.4.7

type SkipMessageByHeaderFn func(headers []Header) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL