Version: v1.40.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 10 Imported by: 0



Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).



This section is empty.


This section is empty.


This section is empty.


type Consumer

type Consumer struct {
	// contains filtered or unexported fields

A Consumer wraps a kafka.Consumer.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). Message events will be traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for messages or events. Message will be traced.

func (*Consumer) ReadMessage added in v1.30.0

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields

A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option func(cfg *config)

An Option customizes the config.

func WithAnalytics added in v1.11.0

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate added in v1.11.0

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithContext

func WithContext(ctx context.Context) Option

WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers

func WithServiceName

func WithServiceName(serviceName string) Option

WithServiceName sets the config service name to serviceName.

type Producer

type Producer struct {
	// contains filtered or unexported fields

A Producer wraps a kafka.Producer.

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer calls kafka.NewProducer and wraps the resulting Producer.

func WrapProducer

func WrapProducer(p *kafka.Producer, opts ...Option) *Producer

WrapProducer wraps a kafka.Producer so requests are traced.

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL