Documentation
¶
Overview ¶
Package kafka provides Kafka consumer implementation for Things-Kit applications. It implements the messaging.Consumer interface.
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ConsumerModule = fx.Module("kafka-consumer", fx.Provide( NewConfig, NewKafkaConsumer, fx.Annotate( func(c *KafkaConsumer) messaging.Consumer { return c }, fx.As(new(messaging.Consumer)), ), ), fx.Invoke(RunConsumer), )
ConsumerModule provides the Kafka consumer module to the application.
Functions ¶
func RunConsumer ¶
func RunConsumer(p ConsumerParams, consumer *KafkaConsumer)
RunConsumer starts the Kafka consumer with lifecycle management.
Types ¶
type Config ¶
type Config struct {
Brokers []string `mapstructure:"brokers"`
Topic string `mapstructure:"topic"`
GroupID string `mapstructure:"group_id"`
MaxWait time.Duration `mapstructure:"max_wait"`
MinBytes int `mapstructure:"min_bytes"`
MaxBytes int `mapstructure:"max_bytes"`
}
Config holds the Kafka consumer configuration.
type ConsumerParams ¶
type ConsumerParams struct {
fx.In
Lifecycle fx.Lifecycle
Logger log.Logger
Config *Config
Handler messaging.Handler
}
ConsumerParams contains all dependencies needed to run the Kafka consumer.
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
KafkaConsumer implements the messaging.Consumer interface using Kafka.
func NewKafkaConsumer ¶
NewKafkaConsumer creates a new Kafka consumer.
Click to show internal directories.
Click to hide internal directories.