kafka

package
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package kafka abstracts the production and consumption of records to and from Kafka.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCommitFailed may be returned by `consumer.Run` when DeliveryType is
	// apmqueue.AtMostOnceDelivery.
	ErrCommitFailed = errors.New("kafka: failed to commit offsets")
)

Functions

This section is empty.

Types

type BatchWriteListener added in v2.1.3

type BatchWriteListener func(topic string, bytesWritten int)

BatchWriteListener specifies a callback function that is invoked after a batch is successfully produced to a Kafka broker. It is invoked with the corresponding topic and the amount of bytes written to that topic (taking compression into account, when applicable).

func (BatchWriteListener) OnProduceBatchWritten added in v2.1.3

func (l BatchWriteListener) OnProduceBatchWritten(_ kgo.BrokerMetadata,
	topic string, _ int32, m kgo.ProduceBatchMetrics)

OnProduceBatchWritten implements the kgo.HookProduceBatchWritten interface.

type CommonConfig

type CommonConfig struct {
	// ConfigFile holds the path to an optional YAML configuration file,
	// which configures Brokers and SASL.
	//
	// If ConfigFile is unspecified, but $KAFKA_CONFIG_FILE is specified,
	// it will be used to populate ConfigFile. Either way if a file is
	// specified, it must exist when a client is initially created.
	//
	// The following properties from
	// https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
	// are honoured:
	//
	//   - bootstrap.servers ($KAFKA_BROKERS)
	//   - sasl.mechanism ($KAFKA_SASL_MECHANISM)
	//   - sasl.username ($KAFKA_USERNAME)
	//   - sasl.password ($KAFKA_PASSWORD)
	//
	// If bootstrap.servers is defined, then it takes precedence over
	// CommonCnfig.Brokers. When a connection to a broker fails, the
	// config file will be reloaded, and the seed brokers will be
	// updated if bootstrap.servers has changed.
	//
	// If sasl.mechanism is set to PLAIN, or if sasl.username is defined,
	// then SASL/PLAIN will be configured. Whenever a new connection is
	// created, the config will be reloaded in case the username or
	// password has been updated. If sasl.mechanism is set to AWS_MSK_IAM,
	// then SASL/AWS_MSK_IAM is configured using the AWS SDK. Dynamic
	// changes to the sasl.mechanism value are not supported.
	ConfigFile string

	// Namespace holds a namespace for Kafka topics.
	//
	// This is added as a prefix for topics names, and acts as a filter
	// on topics monitored or described by the manager.
	//
	// Namespace is always removed from topic names before they are
	// returned to callers. The only way Namespace will surface is in
	// telemetry (e.g. metrics), as an independent dimension. This
	// enables users to filter metrics by namespace, while maintaining
	// stable topic names.
	Namespace string

	// Brokers is the list of kafka brokers used to seed the Kafka client.
	//
	// If Brokers is unspecified, but $KAFKA_BROKERS is specified, it will
	// be parsed as a comma-separated list of broker addresses and used.
	Brokers []string

	// ClientID to use when connecting to Kafka. This is used for logging
	// and client identification purposes.
	ClientID string

	// Version is the software version to use in the Kafka client. This is
	// useful since it shows up in Kafka metrics and logs.
	Version string

	// SASL configures the kgo.Client to use SASL authorization.
	//
	// If SASL is unspecified, then it may be derived from environment
	// variables as follows:
	//
	//  - if $KAFKA_SASL_MECHANISM is set to PLAIN, or if $KAFKA_USERNAME
	//    and $KAFKA_PASSWORD are both specified, then SASL/PLAIN will be
	//    configured
	//  - if $KAFKA_SASL_MECHANISM is set to AWS_MSK_IAM, then
	//    SASL/AWS_MSK_IAM will be configured using the AWS SDK
	SASL SASLMechanism

	// TLS configures the kgo.Client to use TLS for authentication.
	// This option conflicts with Dialer. Only one can be used.
	//
	// If neither TLS nor Dialer are specified, then TLS will be configured
	// by default unless the environment variable $KAFKA_PLAINTEXT is set to
	// "true". In case TLS is auto-configured, $KAFKA_TLS_INSECURE may be
	// set to "true" to disable server certificate and hostname verification.
	TLS *tls.Config

	// Dialer uses fn to dial addresses, overriding the default dialer that uses a
	// 10s dial timeout and no TLS (unless TLS option is set).
	//
	// The context passed to the dial function is the context used in the request
	// that caused the dial. If the request is a client-internal request, the
	// context is the context on the client itself (which is canceled when the
	// client is closed).
	// This option conflicts with TLS. Only one can be used.
	Dialer func(ctx context.Context, network, address string) (net.Conn, error)

	// Logger to use for any errors.
	Logger *zap.Logger

	// DisableTelemetry disables the OpenTelemetry hook.
	DisableTelemetry bool

	// TracerProvider allows specifying a custom otel tracer provider.
	// Defaults to the global one.
	TracerProvider trace.TracerProvider

	// MeterProvider allows specifying a custom otel meter provider.
	// Defaults to the global one.
	MeterProvider metric.MeterProvider

	// TopicAttributeFunc can be used to create custom dimensions from a Kafka
	// topic for these metrics:
	// - producer.messages.count
	// - consumer.messages.fetched
	TopicAttributeFunc TopicAttributeFunc
	// contains filtered or unexported fields
}

CommonConfig defines common configuration for Kafka consumers, producers, and managers.

type CompressionCodec

type CompressionCodec = kgo.CompressionCodec

CompressionCodec configures how records are compressed before being sent. Type alias to kgo.CompressionCodec.

func GzipCompression

func GzipCompression() CompressionCodec

GzipCompression enables gzip compression with the default compression level.

func Lz4Compression

func Lz4Compression() CompressionCodec

Lz4Compression enables lz4 compression with the fastest compression level.

func NoCompression

func NoCompression() CompressionCodec

NoCompression is a compression option that avoids compression. This can always be used as a fallback compression.

func SnappyCompression

func SnappyCompression() CompressionCodec

SnappyCompression enables snappy compression.

func ZstdCompression

func ZstdCompression() CompressionCodec

ZstdCompression enables zstd compression with the default compression level.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps a Kafka consumer and the consumption implementation details. Consumes each partition in a dedicated goroutine.

func NewConsumer

func NewConsumer(cfg ConsumerConfig) (*Consumer, error)

NewConsumer creates a new instance of a Consumer. The consumer will read from each partition concurrently by using a dedicated goroutine per partition.

func (*Consumer) Close

func (c *Consumer) Close() error

Close the consumer, blocking until all partition consumers are stopped.

func (*Consumer) Healthy

func (c *Consumer) Healthy(ctx context.Context) error

Healthy returns an error if the Kafka client fails to reach a discovered broker.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run the consumer until a non recoverable error is found:

  • ErrCommitFailed.

To shut down the consumer, call consumer.Close() or cancel the context. Calling `consumer.Close` is advisable to ensure graceful shutdown and avoid any records from being lost (AMOD), or processed twice (ALOD). To ensure that all polled records are processed. Close() must be called, even when the context is canceled.

If called more than once, returns `apmqueue.ErrConsumerAlreadyRunning`.

type ConsumerConfig

type ConsumerConfig struct {
	CommonConfig
	// Topics that the consumer will consume messages from
	Topics []apmqueue.Topic
	// ConsumeRegex sets the client to parse all topics passed to ConsumeTopics
	// as regular expressions.
	ConsumeRegex bool
	// GroupID to join as part of the consumer group.
	GroupID string
	// MaxPollRecords defines an upper bound to the number of records that can
	// be polled on a single fetch. If MaxPollRecords <= 0, defaults to 500.
	// Note that this setting doesn't change how `franz-go` fetches and buffers
	// events from Kafka brokers, it merely affects the number of records that
	// are returned on `client.PollRecords`.
	// The higher this setting, the higher the general processing throughput
	// be. However, when Delivery is set to AtMostOnce, the higher this number,
	// the more events lost if the process crashes or terminates abruptly.
	//
	// It is best to keep the number of polled records small or the consumer
	// risks being forced out of the group if it exceeds rebalance.timeout.ms.
	// Default: 500
	// Kafka consumer setting: max.poll.records
	// Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_max.poll.records
	MaxPollRecords int
	// MaxPollWait defines the maximum amount of time a broker will wait for a
	// fetch response to hit the minimum number of required bytes before
	// returning
	// Default: 5s
	// Kafka consumer setting: fetch.max.wait.ms
	// Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_fetch.max.wait.ms
	MaxPollWait time.Duration
	// // MaxConcurrentFetches sets the maximum number of fetch requests to allow in
	// flight or buffered at once, overriding the unbounded (i.e. number of
	// brokers) default.
	// This setting, paired with FetchMaxBytes, can upper bound the maximum amount
	// of memory that the client can use for consuming.
	// Default: Unbounded, total number of brokers.
	// Docs: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#MaxConcurrentFetches
	MaxConcurrentFetches int
	// MaxPollBytes sets the maximum amount of bytes a broker will try to send
	// during a fetch
	// Default: 52428800 bytes (~52MB, 50MiB)
	// Kafka consumer setting: fetch.max.bytes
	// Docs: https://kafka.apache.org/28/documentation.html#brokerconfigs_fetch.max.bytes
	MaxPollBytes int32
	// MaxPollPartitionBytes sets the maximum amount of bytes that will be consumed for
	// a single partition in a fetch request
	// Default: 1048576 bytes (~1MB, 1MiB)
	// Kafka consumer setting: max.partition.fetch.bytes
	// Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_max.partition.fetch.bytes
	MaxPollPartitionBytes int32
	// ShutdownGracePeriod defines the maximum amount of time to wait for the
	// partition consumers to process events before the underlying kgo.Client
	// is closed, overriding the default 5s.
	ShutdownGracePeriod time.Duration
	// Delivery mechanism to use to acknowledge the messages.
	// AtMostOnceDeliveryType and AtLeastOnceDeliveryType are supported.
	// If not set, it defaults to apmqueue.AtMostOnceDeliveryType.
	Delivery apmqueue.DeliveryType
	// Processor that will be used to process each event individually.
	// It is recommended to keep the synchronous processing fast and below the
	// rebalance.timeout.ms setting in Kafka.
	//
	// The processing time of each processing cycle can be calculated as:
	// record.process.time * MaxPollRecords.
	Processor apmqueue.Processor
	// FetchMinBytes sets the minimum amount of bytes a broker will try to send
	// during a fetch, overriding the default 1 byte.
	// Default: 1
	// Kafka consumer setting: fetch.min.bytes
	// Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_fetch.min.bytes
	FetchMinBytes int32
}

ConsumerConfig defines the configuration for the Kafka consumer.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages Kafka topics.

func NewManager

func NewManager(cfg ManagerConfig) (*Manager, error)

NewManager returns a new Manager with the given config.

func (*Manager) Close

func (m *Manager) Close() error

Close closes the manager's resources, including its connections to the Kafka brokers and any associated goroutines.

func (*Manager) DeleteTopics

func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) error

DeleteTopics deletes one or more topics.

No error is returned for topics that do not exist.

func (*Manager) Healthy

func (m *Manager) Healthy(ctx context.Context) error

Healthy returns an error if the Kafka client fails to reach a discovered broker.

func (*Manager) MonitorConsumerLag

func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error)

MonitorConsumerLag registers a callback with OpenTelemetry to measure consumer group lag for the given topics.

func (*Manager) NewTopicCreator

func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)

NewTopicCreator returns a new TopicCreator with the given config.

type ManagerConfig

type ManagerConfig struct {
	CommonConfig
}

ManagerConfig holds configuration for managing Kafka topics.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer publishes events to Kafka. Implements the Producer interface.

func NewProducer

func NewProducer(cfg ProducerConfig) (*Producer, error)

NewProducer returns a new Producer with the given config.

func (*Producer) Close

func (p *Producer) Close() error

Close stops the producer

This call is blocking and will cause all the underlying clients to stop producing. If producing is asynchronous, it'll block until all messages have been produced. After Close() is called, Producer cannot be reused.

func (*Producer) Healthy

func (p *Producer) Healthy(ctx context.Context) error

Healthy returns an error if the Kafka client fails to reach a discovered broker.

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, rs ...apmqueue.Record) error

Produce produces N records. If the Producer is synchronous, waits until all records are produced, otherwise, returns as soon as the records are stored in the producer buffer, or when the records are produced to the queue if sync producing is configured. If the context has been enriched with metadata, each entry will be added as a record's header. Produce takes ownership of Record and any modifications after Produce is called may cause an unhandled exception.

type ProducerConfig

type ProducerConfig struct {
	CommonConfig

	// MaxBufferedRecords sets the max amount of records the client will buffer
	MaxBufferedRecords int

	// ProducerBatchMaxBytes upper bounds the size of a record batch
	ProducerBatchMaxBytes int32

	// ManualFlushing disables auto-flushing when producing.
	ManualFlushing bool

	// Sync can be used to indicate whether production should be synchronous.
	Sync bool

	// CompressionCodec specifies a list of compression codecs.
	// See kgo.ProducerBatchCompression for more details.
	//
	// If CompressionCodec is empty, then the default will be set
	// based on $KAFKA_PRODUCER_COMPRESSION_CODEC, which should be
	// a comma-separated list of codec preferences from the list:
	//
	//   [none, gzip, snappy, lz4, zstd]
	//
	// If $KAFKA_PRODUCER_COMPRESSION_CODEC is not specified, then
	// the default behaviour of franz-go is to use [snappy, none].
	CompressionCodec []CompressionCodec

	// BatchListener is called per topic/partition after a batch is
	// successfully produced to a Kafka broker.
	BatchListener BatchWriteListener
}

ProducerConfig holds configuration for publishing events to Kafka.

type SASLMechanism

type SASLMechanism = sasl.Mechanism

SASLMechanism type alias to sasl.Mechanism

type TopicAttributeFunc added in v2.1.2

type TopicAttributeFunc func(topic string) attribute.KeyValue

TopicAttributeFunc run on `kgo.HookProduceBatchWritten` and `kgo.HookFetchBatchRead` for each topic/partition. It can be used include additionaly dimensions for `consumer.messages.fetched` and `producer.messages.count` metrics.

type TopicCreator

type TopicCreator struct {
	// contains filtered or unexported fields
}

TopicCreator creates Kafka topics.

func (*TopicCreator) CreateTopics

func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error

CreateTopics creates one or more topics.

Topics that already exist will be updated.

type TopicCreatorConfig

type TopicCreatorConfig struct {
	// PartitionCount is the number of partitions to assign to
	// newly created topics.
	//
	// Must be non-zero. If PartitonCount is -1, the broker's
	// default value (requires Kafka 2.4+).
	PartitionCount int

	// TopicConfigs holds any topic configs to assign to newly
	// created topics, such as `retention.ms`.
	//
	// See https://kafka.apache.org/documentation/#topicconfigs
	TopicConfigs map[string]string

	// MeterProvider used to create meters and record metrics (Optional).
	MeterProvider metric.MeterProvider
}

TopicCreatorConfig holds configuration for creating Kafka topics.

func (TopicCreatorConfig) Validate

func (cfg TopicCreatorConfig) Validate() error

Validate ensures the configuration is valid, returning an error otherwise.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL