consumer

package
v0.0.58 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package consumer provides a high level API for consuming messages from a Kafka topic.

Consumer

A single consumer is the most basic use case when consuming messages from a topic.

c := consumer.NewConsumer(dialer, config) err := c.Run(ctx, handler)

Consumer Group

Multiple consumers can be created as Group to consume messages at a higher rate.

g := consumer.Group(dialer, config) errCh := g.Run(ctx, handler)

for err := range errCh {
    if err != nil {
	       panic(err)
    }
}

The consumer count and for the group is specified in the config parameter, which determines the number of goroutines to spawn. Each goroutine has their own Consumer and is set up so that individual consumer errors are reported back via the error channel returned from Group.Run.

It is important to note that Kafka is ultimately responsible for managing group members. As a result, a consumer group can easily be spread across multiple instances by simply using the same group ID for each Group. Kafka will then take care of re-balancing the group if members are added/removed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DialerSCRAM512

func DialerSCRAM512(username string, password string) (*kafka.Dialer, error)

DialerSCRAM512 returns a Kafka dialer configured with SASL authentication to securely transmit the provided credentials to Kafka using SCRAM-SHA-512.

func NonStopExponentialBackOff

func NonStopExponentialBackOff() backoff.BackOff

NonStopExponentialBackOff is the suggested backoff retry strategy for consumers groups handling messages where ordering matters e.g. a data-capture stream from a database. This results in endless retries to prevent Kafka from re-balancing the group so that each consumer does not eventually experience the same error.

Retry intervals: 500ms, 4s, 32s, 4m, 34m, 4.5h, 5h (max).

The max interval of 5 hours is intended to leave enough time for manual intervention if necessary.

Types

type Config

type Config struct {
	ID      string // Default: UUID
	Brokers []string
	Topic   string

	MinBytes      int           // Default: 1MB
	MaxBytes      int           // Default: 10MB
	MaxWait       time.Duration // Default: 250ms
	QueueCapacity int           // Default: 100

	DebugLogger DebugLogger
	// contains filtered or unexported fields
}

Config is a configuration object used to create a new Consumer.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides a high level API for consuming and handling messages from a Kafka topic.

It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.

func NewConsumer

func NewConsumer(dialer *kafka.Dialer, config Config, opts ...Option) *Consumer

NewConsumer returns a new Consumer configured with the provided dialer and config.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, handler Handler) error

Run consumes and handles messages from the topic. The method call blocks until the context is canceled, the consumer is closed, or an error occurs.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop stops the consumer. It waits for the current message (if any) to finish being handled before closing the reader stream, preventing the consumer from reading any more messages.

type DebugLogger

type DebugLogger interface {
	Print(msg string, keyvals ...any)
}

DebugLogger interface for consumer debug logging.

type Group

type Group struct {
	ID string
	// contains filtered or unexported fields
}

Group groups consumers together to concurrently consume and handle messages from a Kafka topic. Many groups with the same group ID are safe to use, which is particularly useful for groups across separate instances.

It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.

func NewGroup

func NewGroup(dialer *kafka.Dialer, config GroupConfig, opts ...Option) *Group

NewGroup returns a new Group configured with the provided dialer and config.

func (*Group) Run

func (g *Group) Run(ctx context.Context, handler Handler) <-chan error

Run concurrently consumes and handles messages from the topic across all consumers in the group. The method call returns an error channel that is used to receive any consumer errors. The run process is only stopped if the context is canceled, the consumer has been closed, or all consumers in the group have errored.

func (*Group) Stop

func (g *Group) Stop()

Stop stops the group. It waits for the current message (if any) in each consumer to finish being handled before closing the reader streams, preventing each consumer from reading any more messages.

type GroupConfig

type GroupConfig struct {
	Count   int
	Brokers []string
	Topic   string
	GroupID string

	MinBytes      int           // Default: 1MB
	MaxBytes      int           // Default: 10MB
	MaxWait       time.Duration // Default: 250ms
	QueueCapacity int           // Default: 100

	DebugLogger DebugLogger
}

GroupConfig is a configuration object used to create a new Group. The default consumer count in a group is 1 unless specified otherwise.

type Handler

type Handler func(ctx context.Context, msg Message) error

Handler specifies how a consumer should handle a received Kafka message.

type HandlerRetryBackOffConstructor

type HandlerRetryBackOffConstructor func() backoff.BackOff

type LoggerFunc

type LoggerFunc func(msg string, keyvals ...any)

LoggerFunc is a bridge between DebugLogger and any third party logger. Usage:

  l := NewLogger() // some logger
  c consumer.GroupConfig{
      DebugLogger: consumer.LoggerFunc(func(msg string, keyvals ...any) {
			logger.Debug().Fields(keyvals).Msg(msg)
		}),
  }

func (LoggerFunc) Print

func (f LoggerFunc) Print(msg string, keyvals ...any)

type Message

type Message struct {
	kafka.Message
	Metadata
}

type Metadata

type Metadata struct {
	GroupID    string
	ConsumerID string
	Attempt    int
}

Metadata contains relevant handler metadata for received Kafka messages.

type NotifyError

type NotifyError func(ctx context.Context, err error, msg Message)

NotifyError is a notify-on-error function used to report consumer handler errors.

type Option

type Option func(consumer *Consumer)

func WithDataDogTracing

func WithDataDogTracing() Option

WithDataDogTracing adds Data Dog tracing to the consumer.

A span is started each time a Kafka message is read and finished when the offset is committed. The consumer span can also be retrieved from within your handler using tracer.SpanFromContext.

func WithExplicitCommit

func WithExplicitCommit() Option

WithExplicitCommit enables offset commit only after a message is successfully handled.

Do not use this option if the default behaviour of auto committing offsets on initial read (before handling the message) is required.

func WithGroupBalancers

func WithGroupBalancers(groupBalancers ...kafka.GroupBalancer) Option

WithGroupBalancers adds a priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader.

Default: [Range, RoundRobin]

Only used by consumer group.

func WithHandlerBackOffRetry

func WithHandlerBackOffRetry(backOffConstructor HandlerRetryBackOffConstructor) Option

WithHandlerBackOffRetry adds a back off retry policy on the consumer handler.

func WithKafkaReader

func WithKafkaReader(readerFn func() Reader) Option

WithKafkaReader allows a custom reader to be injected into the Consumer/Group. Using this will ignore any other reader specific options passed in.

It is highly recommended to not use this option unless injecting a mock reader implementation for testing.

func WithNotifyError

func WithNotifyError(notify NotifyError) Option

WithNotifyError adds the NotifyError function to the consumer for it to be invoked on each consumer handler error.

func WithReaderErrorLogger

func WithReaderErrorLogger(logger kafka.LoggerFunc) Option

WithReaderErrorLogger specifies a logger used to report internal consumer reader errors.

func WithReaderLogger

func WithReaderLogger(logger kafka.LoggerFunc) Option

WithReaderLogger specifies a logger used to report internal consumer reader changes.

type Reader

type Reader interface {
	ReadMessage(ctx context.Context) (kafka.Message, error)
	FetchMessage(ctx context.Context) (kafka.Message, error)
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error
	Close() error
}

Reader fetches and commits messages from a Kafka topic.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL