Documentation
¶
Index ¶
- func NewAsyncProducer(log logger.Logger, brokers []string) *producer
- func NewAsyncProducerWithCallback(log logger.Logger, brokers []string, cb AsyncWriterCallback) *producer
- func NewAsyncWriter(brokers []string, errLogger kafka.Logger, log logger.Logger) *kafka.Writer
- func NewAsyncWriterWithCallback(brokers []string, errLogger kafka.Logger, log logger.Logger, ...) *kafka.Writer
- func NewConsumerGroup(brokers []string, groupID string, log logger.Logger) *consumerGroup
- func NewKafkaConn(ctx context.Context, kafkaCfg *Config) (*kafka.Conn, error)
- func NewKafkaReader(kafkaURL []string, topic, groupID string, errLogger kafka.Logger) *kafka.Reader
- func NewProducer(log logger.Logger, brokers []string) *producer
- func NewRequireNoneProducer(log logger.Logger, brokers []string) *producer
- func NewRequireNoneWriter(brokers []string, errLogger kafka.Logger, log logger.Logger) *kafka.Writer
- func NewWriter(brokers []string, errLogger kafka.Logger) *kafka.Writer
- type AsyncWriterCallback
- type Config
- type ConsumerGroup
- type MessageProcessor
- type Producer
- type Worker
- type WorkerErrGroup
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewAsyncProducer ¶
NewAsyncProducer create new kafka producer
func NewAsyncProducerWithCallback ¶
func NewAsyncProducerWithCallback(log logger.Logger, brokers []string, cb AsyncWriterCallback) *producer
NewAsyncProducerWithCallback create new kafka producer with callback for delete invalid projection
func NewAsyncWriter ¶
NewAsyncWriter create new configured kafka async writer
func NewAsyncWriterWithCallback ¶
func NewAsyncWriterWithCallback(brokers []string, errLogger kafka.Logger, log logger.Logger, cb AsyncWriterCallback) *kafka.Writer
NewAsyncWriterWithCallback create new configured kafka async writer
func NewConsumerGroup ¶
NewConsumerGroup kafka consumer group constructor
func NewKafkaConn ¶
NewKafkaConn create new kafka connection
func NewKafkaReader ¶
NewKafkaReader create new configured kafka reader
func NewProducer ¶
NewProducer create new kafka producer
func NewRequireNoneProducer ¶
NewRequireNoneProducer create new fire and forget kafka producer
Types ¶
type AsyncWriterCallback ¶
type Config ¶
type Config struct {
Brokers []string `mapstructure:"brokers" validate:"required"`
GroupID string `mapstructure:"groupID" validate:"required,gte=0"`
InitTopics bool `mapstructure:"initTopics"`
}
Config kafka config
type ConsumerGroup ¶
type ConsumerGroup interface {
ConsumeTopic(ctx context.Context, groupTopics []string, poolSize int, worker Worker)
ConsumeTopicWithErrGroup(ctx context.Context, groupTopics []string, poolSize int, worker WorkerErrGroup) error
GetNewKafkaReader(kafkaURL []string, groupTopics []string, groupID string) *kafka.Reader
GetNewKafkaWriter() *kafka.Writer
}
type MessageProcessor ¶
type MessageProcessor interface {
ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int)
ProcessMessagesWithErrGroup(ctx context.Context, r *kafka.Reader, workerID int)
}
MessageProcessor processor methods must implement kafka.Worker func method interface
Click to show internal directories.
Click to hide internal directories.