Documentation
¶
Index ¶
- type Config
- type Kamux
- func (kamux *Kamux) Cleanup(sarama.ConsumerGroupSession) (err error)
- func (kamux *Kamux) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
- func (kamux *Kamux) Launch() (err error)
- func (kamux *Kamux) Setup(sarama.ConsumerGroupSession) (err error)
- func (kamux *Kamux) Stop() error
- func (kamux *Kamux) StopWithError(err error) error
- type Logger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Brokers defines the list of kafka brokers to connect to.
Brokers []string
// User is the Kafka's user.
User string
// Password is the Kafka's password.
Password string
// Topics are all the topics on which consumer groups listen.
Topics []string
// ConsumerGroup is the name of the consumer group to use.
ConsumerGroup string
// The InitialOffset to use if no offset was previously committed.
// Should be sarama.OffsetNewest or sarama.OffsetOldest. Defaults to sarama.OffsetNewest.
InitialOffset int64
// Handler is the function executed on each kafka message.
Handler func(*sarama.ConsumerMessage) error
// ErrHandler is the function executed on Handler's error used to trying to rescue the error.
ErrHandler func(error, *sarama.ConsumerMessage) error
// PreRun is the function executed before the launch on processing.
PreRun func(*Kamux) error
// PostRun is the function executed on kamux close.
PostRun func(*Kamux) error
// StopOnError, whether or not to stop processing on handler error.
StopOnError bool
// MarkOffsets, whether or not to mark offsets on each message processing.
MarkOffsets bool
// Debug enables debug mode, more verbose output
Debug bool
// MessagesBufferSize is the buffer size of the messages that a worker can queue.
MessagesBufferSize int
// ForceKafkaVersion overrides kafka cluster version on sarama library.
ForceKafkaVersion *sarama.KafkaVersion
// Logger is used to print some Kamux's information. Golang's log package is used as default.
Logger Logger
}
A Config holds all the configuration of the Kamux class.
type Kamux ¶
type Kamux struct {
Config *Config
ConsumerConfig *sarama.Config
// contains filtered or unexported fields
}
Kamux is the main object for the Kamux
func NewKamux ¶
NewKamux is the constructor of the ConsumerProducer It will make some config checks, and prepare the kafka connections for the upcoming launch of the process
func (*Kamux) Cleanup ¶
func (kamux *Kamux) Cleanup(sarama.ConsumerGroupSession) (err error)
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites but before the offsets are committed for the very last time.
func (*Kamux) ConsumeClaim ¶
func (kamux *Kamux) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
func (*Kamux) Launch ¶
Launch will begin the processing of the kafka messages It can be launched only once. It will :
- Connect to kafka using credentials provided in configuration
- Listen to consumer group notifications (rebalance,...)
- Listen to consumer errors, and stop properly in case of one
- Listen to system SIGINT to stop properly
func (*Kamux) Setup ¶
func (kamux *Kamux) Setup(sarama.ConsumerGroupSession) (err error)
Setup is run at the beginning of a new session, before ConsumeClaim.
func (*Kamux) StopWithError ¶
StopWithError will stop processing with error passed as argument
type Logger ¶
type Logger interface {
Printf(format string, args ...interface{})
Println(args ...interface{})
Fatalf(format string, args ...interface{})
Fatal(args ...interface{})
Panicf(format string, args ...interface{})
Panic(args ...interface{})
}
A Logger is the interface used in this package for logging, so that any backend can be plugged in.