Version: v1.42.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2022 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 10 Imported by: 0



Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).


This example shows how a span context can be passed from a producer to a consumer.

// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package main

import (


	kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"

var (
	testGroupID = "gotest"
	testTopic   = "gotest"

// This example shows how a span context can be passed from a producer to a consumer.
func main() {

	defer tracer.Stop()

	c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{
		"go.events.channel.enable": true, // required for the events channel to be turned on
		"group.id":                 testGroupID,
		"socket.timeout.ms":        10,
		"session.timeout.ms":       10,
		"enable.auto.offset.store": false,

	err = c.Subscribe(testTopic, nil)
	if err != nil {

	// Create the span to be passed
	parentSpan := tracer.StartSpan("test_parent_span")

	// Produce a message with a span
	go func() {
		msg := &kafka.Message{
			TopicPartition: kafka.TopicPartition{
				Topic:     &testTopic,
				Partition: 1,
				Offset:    1,
			Key:   []byte("key1"),
			Value: []byte("value1"),

		// Inject the span context in the message to be produced
		carrier := kafkatrace.NewMessageCarrier(msg)
		tracer.Inject(parentSpan.Context(), carrier)

		c.Consumer.Events() <- msg


	msg := (<-c.Events()).(*kafka.Message)

	// Extract the context from the message
	carrier := kafkatrace.NewMessageCarrier(msg)
	spanContext, err := tracer.Extract(carrier)
	if err != nil {

	parentContext := parentSpan.Context()

	// Validate that the context passed is the context sent via the message
	if spanContext.TraceID() == parentContext.TraceID() {
		fmt.Println("Span context passed sucessfully from producer to consumer")
	} else {
		fmt.Println("Span context not passed")

	// wait for the events channel to be closed

Span context passed sucessfully from producer to consumer




This section is empty.


This section is empty.


This section is empty.


type Consumer

type Consumer struct {
	// contains filtered or unexported fields

A Consumer wraps a kafka.Consumer.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). Message events will be traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for messages or events. Message will be traced.

func (*Consumer) ReadMessage added in v1.30.0

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields

A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option func(cfg *config)

An Option customizes the config.

func WithAnalytics added in v1.11.0

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate added in v1.11.0

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithContext

func WithContext(ctx context.Context) Option

WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers

func WithCustomTag added in v1.42.0

func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option

WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.

func WithServiceName

func WithServiceName(serviceName string) Option

WithServiceName sets the config service name to serviceName.

type Producer

type Producer struct {
	// contains filtered or unexported fields

A Producer wraps a kafka.Producer.

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer calls kafka.NewProducer and wraps the resulting Producer.

func WrapProducer

func WrapProducer(p *kafka.Producer, opts ...Option) *Producer

WrapProducer wraps a kafka.Producer so requests are traced.

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL