Documentation ¶
Overview ¶
Package kafka producer, async producer
Package kafka consumer, normal consumer ¶
Package kafka consumer, consumer group ¶
Package kafka producer, sync producer
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
// DefaultAsyncProducerBufferSize default async buffer size
DefaultAsyncProducerBufferSize = 1
)
View Source
var (
// DefaultSyncProducerBufferSize default sync buffer size
DefaultSyncProducerBufferSize = 1
)
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
// contains filtered or unexported fields
}
AsyncProducer async producer
func NewAsyncProducer ¶
func NewAsyncProducer(brokers []string, bufferSize int, cfg *sarama.Config) (ap *AsyncProducer, err error)
NewAsyncProducer create async producer instance
func (*AsyncProducer) Send ¶
func (ap *AsyncProducer) Send(msg *sarama.ProducerMessage)
Send use async producer
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer simple consumer
func NewConsumer ¶
func NewConsumer(brokers, topics []string, groupID string, config *cluster.Config) (*Consumer, error)
NewConsumer create consumer instance
func (*Consumer) StartConsumer ¶
func (c *Consumer) StartConsumer(fn func(*sarama.ConsumerMessage) error)
StartConsumer shall run with keywords go
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup consumer group
func NewConsumerGroup ¶
func NewConsumerGroup(brokers, topics []string, groupID string, config *sarama.Config) (*ConsumerGroup, error)
NewConsumerGroup create consumer group instance
func (*ConsumerGroup) StartConsumer ¶
func (c *ConsumerGroup) StartConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) error
StartConsumer shall run with keywords go
type SyncProducer ¶
type SyncProducer struct {
// contains filtered or unexported fields
}
SyncProducer sync producer
func NewSyncProducer ¶
func NewSyncProducer(brokers []string, bufferSize int, cfg *sarama.Config) (sp *SyncProducer, err error)
NewSyncProducer create sync producer instance
func (*SyncProducer) Send ¶
func (sp *SyncProducer) Send(msg *sarama.ProducerMessage)
Send use sync producer
Click to show internal directories.
Click to hide internal directories.