Documentation
¶
Index ¶
Constants ¶
const ( Version_2_1_1 Version = "2.1.1" Sticky BalanceStrategy = "sticky" RoundRobin BalanceStrategy = "roundrobin" Range BalanceStrategy = "range" Newest Offset = "newest" Oldest Offset = "oldest" )
Variables ¶
This section is empty.
Functions ¶
func NewSaramaLogger ¶
func NewSaramaLogger(logger Logger) *customSaramaLogger
Types ¶
type BalanceStrategy ¶
type BalanceStrategy string
BalanceStrategy consumer group balancing strategy.
type Config ¶
type Config struct {
Brokers []string
Group string
Version Version
Topics []string
BalanceStrategy BalanceStrategy
Offset Offset
DebugLogger Logger
EncoderBuilder EncoderBuilder
Decoder Decoder
ConsumerCallback ConsumerCallback
ConsumerErrorHandler ConsumerErrorHandler
}
Config General configurations.
type ConsumerCallback ¶
type ConsumerCallback func(key, value interface{}) (err error)
ConsumerCallback hanler function for the consumer.
type ConsumerErrorHandler ¶
ConsumerErrorHandler hanlde the error returning from the ConsumerCallback.
type ConsumerGroup ¶
type ConsumerGroup interface {
// Init initialize the consumer group.
Init() (err error)
// Run start the consumer group.
Run() (err error)
}
ConsumerGroup the interface for manage consumer group
func NewConsumerGroup ¶
func NewConsumerGroup(config Config) ConsumerGroup
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
ConsumerGroupHandler implementation for ConsumerGroupHandler.
func (*ConsumerGroupHandler) Cleanup ¶
func (c *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup cleanup the consumer group session.
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (c *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim decode messages and call the consumer callback function configured.
func (*ConsumerGroupHandler) Setup ¶
func (c *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup setup the consumer group session.
type Decoder ¶
type Decoder func(consumerMessage *sarama.ConsumerMessage) (key, value interface{}, err error)
Decoder decode the consumer message to according to the given implementation
func GetAvroDecoder ¶
func GetAvroDecoder(ss avro.SchemaStore) Decoder
GetAvroDecoder creates avro decoder with avro.SchemaStore.
func GetDefaultDecoder ¶
func GetDefaultDecoder() Decoder
GetDefaultDecoder creates a default decoder (string decoder).
type EncoderBuilder ¶
type EncoderBuilder interface {
// Build build the sarama.Encoders with the given subject and data.
Build(subject string, data interface{}) sarama.Encoder
}
EncoderBuilder resposible for creating encoders.
func DefaultEncoderBuilder ¶
func DefaultEncoderBuilder() EncoderBuilder
DefaultEncoderBuilder creates instance of EncoderBuilder.
func NewAvroEncoderBuilder ¶
func NewAvroEncoderBuilder(schemaStore avro.SchemaStore) EncoderBuilder
NewAvroEncoderBuilder create instance of EncoderBuilder.
type Logger ¶
type Logger interface {
Print(v ...interface{})
Printf(format string, v ...interface{})
Println(v ...interface{})
}
type Producer ¶
type Producer interface {
// Produce produce the kafka message to the given topic.
Produce(topic string, schema string, key interface{}, value interface{}) (partition int32, offset int64, err error)
}
Producer kafka message producer interface.
func NewProducer ¶
NewProducer creates a producer instance.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package avro
|
Package avro |
|
example
|
|
|
consumergroup
command
Package main sample code for group consumer.
|
Package main sample code for group consumer. |
|
producer
command
Package main sample code for producer
|
Package main sample code for producer |