Documentation
¶
Overview ¶
Package xkafka provides consumers & producers to work efficiently with Kafka. `Message` is the core data structure for xkafka. `xkafka` comes with implementations that support:
- HTTP-like handler for message processing
- Concurrent message consumer
- Batch message consumer with retries
- Middleware support for consumer & producer
## Consumer The processing mode is determined by the xkafka.Concurrency option. By default, the consumer is initialized with `enable.auto.offset.store=false`. The offset is "stored" after the message is processed. The offset is "committed" based on the `auto.commit.interval.ms` options.
It is important to understand the difference between "store" and "commit". The offset is "stored" in the consumer's memory and is "committed" to Kafka. The offset is "stored" after the message is processed and the `message.Status` is Success or Skip. The stored offsets will be automatically committed, unless the `ManualCommit` option is enabled.
### Error Handling By default, xkafka.Consumer will stop processing, commit last stored offset, and exit if there is a Kafka error or if the handler returns an error.
Errors can be handled by using one or more of the following options:
Within the handler implementation Using error handling & retry middlewares through the catch all xkafka.ErrorHandler option xkafka.ErrorHandler is called for every error that is not handled by the handler or the middlewares. It is also called for errors returned by underlying Kafka client.
### Sequential Processing Sequential processing is the default mode. It is same as xkafka.Concurrency(1).
### Async Processing Async processing is enabled by setting xkafka.Concurrency to a value greater than 1. The consumer will use a pool of Go routines to process messages concurrently. Offsets are stored and committed in the order that the messages are received.
### Manual Commit By default, the consumer will automatically commit the offset based on the `auto.commit.interval.ms` option, asynchronously in the background.
The consumer can be configured to commit the offset manually by setting `xkafka.EnableManualCommit` option to true. When ManualCommit is enabled, the consumer will synchronously commit the offset after each message is processed.
NOTE: Enabling ManualCommit will add an overhead to each message. It is recommended to use ManualCommit only when necessary.
Index ¶
- Variables
- func NoopErrorHandler(err error) error
- type AckFunc
- type Batch
- type BatchConsumer
- type BatchHandler
- type BatchHandlerFunc
- type BatchMiddlewareFunc
- type BatchMiddlewarer
- type BatchSize
- type BatchTimeout
- type Brokers
- type Concurrency
- type ConfigMap
- type Consumer
- type ConsumerOption
- type DeliveryCallback
- type ErrorHandler
- type Handler
- type HandlerFunc
- type ManualCommit
- type Message
- func (m *Message) AckFail(err error) bool
- func (m *Message) AckSkip() bool
- func (m *Message) AckSuccess() bool
- func (m *Message) AddCallback(fn AckFunc)
- func (m *Message) Err() error
- func (m *Message) Header(key string) []byte
- func (m *Message) Headers() map[string][]byte
- func (m *Message) SetHeader(key string, value []byte)
- type Metadata
- type MetadataTimeout
- type MiddlewareFunc
- type Middlewarer
- type PollTimeout
- type Producer
- func (p *Producer) AsyncPublish(ctx context.Context, msg *Message) error
- func (p *Producer) Close()
- func (p *Producer) Publish(ctx context.Context, msg *Message) error
- func (p *Producer) Run(ctx context.Context) error
- func (p *Producer) Start(ctx context.Context) error
- func (p *Producer) Use(mwf ...MiddlewareFunc)
- type ProducerOption
- type ShutdownTimeout
- type Status
- type Topics
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrRetryable is the error message for retryable errors. ErrRetryable = errors.New("xkafka: retryable error") // ErrRequiredOption is returned when a required option is // not provided. ErrRequiredOption = errors.New("xkafka: required option not provided") )
Functions ¶
func NoopErrorHandler ¶
NoopErrorHandler is an ErrorHandler that passes the error through.
Types ¶
type Batch ¶ added in v0.10.0
type Batch struct {
ID string
Messages []*Message
Status Status
// contains filtered or unexported fields
}
Batch is a group of messages that are processed together.
func (*Batch) AckSkip ¶ added in v0.10.0
func (b *Batch) AckSkip()
AckSkip marks the batch as skipped.
func (*Batch) AckSuccess ¶ added in v0.10.0
func (b *Batch) AckSuccess()
AckSuccess marks the batch as successfully processed.
func (*Batch) GroupMaxOffset ¶ added in v0.10.0
func (b *Batch) GroupMaxOffset() []kafka.TopicPartition
GroupMaxOffset returns the maximum offset for each topic-partition in the batch.
type BatchConsumer ¶ added in v0.10.0
type BatchConsumer struct {
// contains filtered or unexported fields
}
BatchConsumer manages the consumption of messages from kafka topics and processes them in batches.
func NewBatchConsumer ¶ added in v0.10.0
func NewBatchConsumer(name string, handler BatchHandler, opts ...ConsumerOption) (*BatchConsumer, error)
NewBatchConsumer creates a new BatchConsumer instance.
func (*BatchConsumer) Run ¶ added in v0.10.0
func (c *BatchConsumer) Run(ctx context.Context) (err error)
Run starts running the BatchConsumer. The component will stop running when the context is closed. Run blocks until the context is closed or an error occurs.
func (*BatchConsumer) Use ¶ added in v0.10.0
func (c *BatchConsumer) Use(mwf ...BatchMiddlewarer)
Use appends a BatchMiddlewareFunc to the chain.
type BatchHandler ¶ added in v0.10.0
BatchHandler defines a handler for a batch of messages.
type BatchHandlerFunc ¶ added in v0.10.0
BatchHandlerFunc defines a function for handling a batch.
func (BatchHandlerFunc) HandleBatch ¶ added in v0.10.0
func (h BatchHandlerFunc) HandleBatch(ctx context.Context, b *Batch) error
HandleBatch implements BatchHandler interface.
type BatchMiddlewareFunc ¶ added in v0.10.0
type BatchMiddlewareFunc func(BatchHandler) BatchHandler
BatchMiddlewareFunc defines a function for batch middleware.
func (BatchMiddlewareFunc) BatchMiddleware ¶ added in v0.10.0
func (mw BatchMiddlewareFunc) BatchMiddleware(handler BatchHandler) BatchHandler
BatchMiddleware implements BatchMiddlewarer interface.
type BatchMiddlewarer ¶ added in v0.10.0
type BatchMiddlewarer interface {
BatchMiddleware(handler BatchHandler) BatchHandler
}
BatchMiddlewarer is an interface for batch message handler middleware.
type BatchSize ¶ added in v0.10.0
type BatchSize int
BatchSize defines the maximum number of messages in a batch.
type BatchTimeout ¶ added in v0.10.0
BatchTimeout defines the maximum time to wait for a batch to be filled.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer manages the consumption of messages from kafka topics and the processing of those messages.
Example ¶
handler := HandlerFunc(func(ctx context.Context, msg *Message) error {
// do something with the message
// acknowledge the message with success, skip or error
msg.AckSuccess()
return nil
})
ignoreError := func(err error) error {
// ignore error
return nil
}
consumer, err := NewConsumer("consumer-id", handler,
Concurrency(10), // default is 1. values > 1 enable async processing
Topics{"test"},
Brokers{"localhost:9092"},
// default behavior is to stop the consumer. this option allows customizing the error handling
ErrorHandler(ignoreError),
// custom configuration for the underlying kafka consumer
ConfigMap{
"auto.commit.interval.ms": 1000,
},
// default behavior is to commit messages automatically.
// this option triggers manual commit after each message is processed.
ManualCommit(true),
)
if err != nil {
panic(err)
}
consumer.Use(
// middleware to log messages
MiddlewareFunc(func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, msg *Message) error {
// log the message
return next.Handle(ctx, msg)
})
}),
)
if err := consumer.Run(context.Background()); err != nil {
panic(err)
}
consumer.Close()
func NewConsumer ¶
func NewConsumer(name string, handler Handler, opts ...ConsumerOption) (*Consumer, error)
NewConsumer creates a new Consumer instance.
func (*Consumer) GetMetadata ¶
GetMetadata returns the metadata for the consumer.
func (*Consumer) Run ¶
Run starts running the Consumer. The component will stop running when the context is closed. Run blocks until the context is closed or an error occurs.
func (*Consumer) Start ¶
Start subscribes to the configured topics and starts consuming messages. It runs the handler for each message in a separate goroutine.
This method is non-blocking and returns immediately post subscribe. Instead, use Run if you want to block until the context is closed or an error occurs.
Errors are handled by the ErrorHandler if set, otherwise they stop the consumer and are returned.
func (*Consumer) Use ¶
func (c *Consumer) Use(mwf ...MiddlewareFunc)
Use appends a MiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Consumer.
type ConsumerOption ¶ added in v0.8.0
type ConsumerOption interface {
// contains filtered or unexported methods
}
ConsumerOption is an interface for consumer options.
type DeliveryCallback ¶
type DeliveryCallback AckFunc
DeliveryCallback is a callback function triggered for every published message. Works only for xkafka.Producer.
type ErrorHandler ¶
ErrorHandler is a callback function that is called when an error occurs.
type HandlerFunc ¶
HandlerFunc defines a function for handling messages.
type ManualCommit ¶ added in v0.5.0
type ManualCommit bool
ManualCommit disables the auto commit and calls the `Commit` after every message is marked as `Success` or `Skip` by the handler.
Works only for xkafka.Consumer.
WARNING: Using this option will increase the message processing time, because of the synchronous `Commit` for every message.
type Message ¶
type Message struct {
ID string `json:"id,omitempty"`
Topic string `json:"topic,omitempty"`
Partition int32 `json:"partition,omitempty"`
Group string `json:"group,omitempty"`
Key []byte `json:"key,omitempty"`
Value []byte `json:"value,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Status Status `json:"status,omitempty"`
ErrMsg string `json:"err_msg,omitempty"`
Offset int64 `json:"offset,omitempty"`
// contains filtered or unexported fields
}
Message holds the Kafka message data and manages the lifecycle of the message.
func (*Message) AckFail ¶
AckFail marks the message as failed out and stores the error. Error overrides any existing ack status.
func (*Message) AckSuccess ¶
AckSuccess marks the message as successfully processed. Overrides any existing ack status.
func (*Message) AddCallback ¶
AddCallback adds the callback func to the call stack.
func (*Message) Err ¶
Err returns the underlying error that cause message fail. DESIGN: Intentionally called Err to avoid confusion with Error().
func (*Message) Header ¶
Header returns the value for the given key of the header field of the message.
type MetadataTimeout ¶
MetadataTimeout defines the timeout for the consumer metadata request.
type MiddlewareFunc ¶
MiddlewareFunc defines a function for middleware.
func (MiddlewareFunc) Middleware ¶
func (mw MiddlewareFunc) Middleware(handler Handler) Handler
Middleware implements Middlewarer interface.
type Middlewarer ¶ added in v0.10.0
Middlewarer is an interface for message handler middleware.
type PollTimeout ¶
PollTimeout defines the timeout for the consumer read timeout.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer manages the production of messages to kafka topics. It provides both synchronous and asynchronous publish methods and a channel to stream delivery events.
Example ¶
ctx, cancel := context.WithCancel(context.Background())
producer, err := NewProducer(
"producer-id",
Brokers{"localhost:9092"},
ConfigMap{
"socket.keepalive.enable": true,
},
)
if err != nil {
panic(err)
}
producer.Use(
// middleware to log messages
MiddlewareFunc(func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, msg *Message) error {
// log the message
return next.Handle(ctx, msg)
})
}),
)
go func() {
err := producer.Run(ctx)
if err != nil {
panic(err)
}
}()
msg := &Message{
Topic: "test",
Key: []byte("key"),
Value: []byte("value"),
}
if err := producer.Publish(ctx, msg); err != nil {
panic(err)
}
// cancel the context to stop the producer
cancel()
func NewProducer ¶
func NewProducer(name string, opts ...ProducerOption) (*Producer, error)
NewProducer creates a new Producer.
func (*Producer) AsyncPublish ¶
AsyncPublish sends messages to the kafka topic asyncronously.
Example ¶
ctx, cancel := context.WithCancel(context.Background())
// default callback function called after each message
// handled by the producer
callback := func(msg *Message) {
// do something with the message
}
producer, err := NewProducer(
"producer-id",
Brokers{"localhost:9092"},
ConfigMap{
"socket.keepalive.enable": true,
},
DeliveryCallback(callback),
)
if err != nil {
panic(err)
}
go func() {
err := producer.Run(ctx)
if err != nil {
panic(err)
}
}()
msg := &Message{
Topic: "test",
Key: []byte("key"),
Value: []byte("value"),
}
// each message can have its own callback function
// in addition to the default callback function
msg.AddCallback(func(m *Message) {
// do something with the message
})
err = producer.AsyncPublish(ctx, msg)
if err != nil {
panic(err)
}
// cancel the context to stop the producer
cancel()
func (*Producer) Close ¶
func (p *Producer) Close()
Close waits for all messages to be delivered and closes the producer.
func (*Producer) Publish ¶
Publish sends messages to the kafka topic synchronously. Returns error if the message cannot be enqueued or if there's a Kafka error.
func (*Producer) Start ¶
Start starts the kafka event handling. It blocks until the context is cancelled.
func (*Producer) Use ¶
func (p *Producer) Use(mwf ...MiddlewareFunc)
Use appends a MiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Producer.
type ProducerOption ¶ added in v0.8.0
type ProducerOption interface {
// contains filtered or unexported methods
}
ProducerOption is an interface for producer options.
type ShutdownTimeout ¶
ShutdownTimeout defines the timeout for the consumer/producer to shutdown.