otelkafka

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2022 License: MIT Imports: 14 Imported by: 0

README

Instrumentation for `github.com/confluentinc/confluent-kafka-go/kafka

This instrumentation is for the github.com/confluentinc/confluent-kafka-go/kafka package.

Compatibility

The Producer will end spans when a delivery report is returned. Setting "go.delivery.reports" to false will disable the delivery reports and can result in an build up of un-ended spans. If delivery reports are disabled, an un-instrumented Producer should be used instead.

This instrumentation was built to support v1.7.0 of github.com/confluentinc/confluent-kafka-go/kafka. Similar to the instrumented package, librdkafka 1.6.0+ is required. This means you will need to use an environment that supports the pre-built binaries, or install the library manually. Important to note, similar to the instrumented package, cgo is required and this instrumentation does not support the Windows operating system.

Getting started

The NewConsumer and NewProducer functions are provided as drop-in replacements of the equivalent from the kafka package. See these examples for how to use these functions.

Documentation

Overview

Package otelkafka provides functions to trace the github.com/confluentinc/confluent-kafka-go/kafka package.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMessageCarrier

func NewMessageCarrier(message *kafka.Message) propagation.TextMapCarrier

NewMessageCarrier returns a TextMapCarrier that will encode and decode tracing information to and from the passed message.

Types

type Config

type Config struct {
	Tracer           trace.Tracer
	Propagator       propagation.TextMapPropagator
	DefaultStartOpts []trace.SpanStartOption
	// contains filtered or unexported fields
}

Config contains configuration options.

func NewConfig

func NewConfig(instrumentationName string, options ...Option) *Config

NewConfig returns a Config for instrumentation with all options applied.

If no TracerProvider or Propagator are specified with options, the default OpenTelemetry globals will be used.

func (*Config) Copy

func (c *Config) Copy() *Config

Copy returns a deep copy of c.

func (*Config) MergedSpanStartOptions

func (c *Config) MergedSpanStartOptions(opts ...trace.SpanStartOption) []trace.SpanStartOption

MergedSpanStartOptions returns a copy of opts with any DefaultStartOpts that c is configured with prepended.

func (*Config) ResolveTracer

func (c *Config) ResolveTracer(ctx context.Context) trace.Tracer

ResolveTracer returns an OpenTelemetry tracer from the appropriate TracerProvider.

If the passed context contains a span, the TracerProvider that created the tracer that created that span will be used. Otherwise, the TracerProvider from c is used.

func (*Config) WithSpan

func (c *Config) WithSpan(ctx context.Context, name string, f func(context.Context) error, opts ...trace.SpanStartOption) error

WithSpan wraps the function f with a span named name.

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

Consumer wraps a kafka.Consumer and traces its operations.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer with tracing instrumentation.

Example
c, err := NewConsumer(&kafka.ConfigMap{
	"bootstrap.servers": "localhost:9092",
	"group.id":          "myGroup",
})
if err != nil {
	panic(err)
}
defer c.Close()
Output:

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, ends any remaining span.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel. Message events are traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for events. Message events are traced.

Will block for at most timeoutMs milliseconds.

The following callbacks may be triggered:

Subscribe()'s rebalanceCb

Returns nil on timeout, else an Event

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message and traces the read.

This is a convenience API that wraps Poll() and only returns messages or errors. All other event types are discarded.

The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.

Timeout is returned as (nil, err) where err is `err.(kafka.Error).Code() == kafka.ErrTimedOut`.

Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).

All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.

type Option

type Option interface {
	Apply(*Config)
}

Option applies options to a configuration.

func WithAttributes

func WithAttributes(attr []attribute.KeyValue) Option

WithAttributes returns an Option that appends attr to the attributes set for every span created.

func WithPropagator

func WithPropagator(p propagation.TextMapPropagator) Option

WithPropagator returns an Option that sets p as the TextMapPropagator used when propagating a span context.

func WithTracerProvider

func WithTracerProvider(tp trace.TracerProvider) Option

WithTracerProvider returns an Option that sets the TracerProvider used for a configuration.

type OptionFunc

type OptionFunc func(*Config)

OptionFunc is a generic way to set an option using a func.

func (OptionFunc) Apply

func (o OptionFunc) Apply(c *Config)

Apply applies the configuration option.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

A Producer wraps a kafka.Producer and traces its operations.

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer calls kafka.NewProducer and wraps the resulting Producer with tracing instrumentation.

Example
p, err := NewProducer(&kafka.ConfigMap{
	"bootstrap.servers": "localhost:9092",
})
if err != nil {
	panic(err)
}
defer p.Close()
Output:

func WrapProducer

func WrapProducer(p *kafka.Producer, opts ...Option) *Producer

WrapProducer wraps a kafka.Producer so that any produced events are traced.

func (*Producer) Close

func (p *Producer) Close()

Close calls the wrapped Producer.Close and closes the producer channel.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the wrapped Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns the traced producer channel.

Directories

Path Synopsis
test module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL