otelkafkakonsumer

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2023 License: MIT Imports: 14 Imported by: 0

README

Otel Kafka Konsumer

This library enables distributed tracing on the segmentio/kafka-go library and is used on kafka-konsumer.

Please refer to examples to learn how to use it.

You can follow the release status by subscribing this issue

It will be released soon.

Demo

In the examples, you can run

docker-compose up --build

Producing

Producing Example

Consuming

Consuming Example

Bring it all together

You can run producer and consumer, respectively, to see that they work together.

Producing - Consuming Together

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMessageCarrier

func NewMessageCarrier(message *kafka.Message) propagation.TextMapCarrier

NewMessageCarrier returns a TextMapCarrier that will encode and decode tracing information to and from the passed message.

Types

type Config

type Config struct {
	Tracer         trace.Tracer
	TracerProvider trace.TracerProvider
	Propagator     propagation.TextMapPropagator

	DefaultStartOpts []trace.SpanStartOption
	// contains filtered or unexported fields
}

Config contains configuration options.

func NewConfig

func NewConfig(instrumentationName string, options ...Option) *Config

NewConfig returns a Config for instrumentation with all options applied.

If no TracerProvider or Propagator are specified with options, the default OpenTelemetry globals will be used.

func (*Config) MergedSpanStartOptions

func (c *Config) MergedSpanStartOptions(opts ...trace.SpanStartOption) []trace.SpanStartOption

MergedSpanStartOptions returns a copy of opts with any DefaultStartOpts that c is configured with prepended.

func (*Config) ResolveTracer

func (c *Config) ResolveTracer(ctx context.Context) trace.Tracer

ResolveTracer returns an OpenTelemetry tracer from the appropriate TracerProvider.

If the passed context contains a span, the TracerProvider that created the tracer that created that span will be used. Otherwise, the TracerProvider from c is used.

func (*Config) WithSpan

func (c *Config) WithSpan(ctx context.Context, name string, f func(context.Context) error, opts ...trace.SpanStartOption) error

WithSpan wraps the function f with a span named name.

type Option

type Option interface {
	Apply(*Config)
}

Option applies options to a configuration.

func WithAttributes

func WithAttributes(attr []attribute.KeyValue) Option

WithAttributes returns an Option that appends attr to the attributes set for every span created.

func WithPropagator

func WithPropagator(p propagation.TextMapPropagator) Option

WithPropagator returns an Option that sets p as the TextMapPropagator used when propagating a span context.

func WithTracerProvider

func WithTracerProvider(tp trace.TracerProvider) Option

WithTracerProvider returns an Option that sets the TracerProvider used for a configuration.

type OptionFunc

type OptionFunc func(*Config)

OptionFunc is a generic way to set an option using a func.

func (OptionFunc) Apply

func (o OptionFunc) Apply(c *Config)

Apply applies the configuration option.

type Reader

type Reader struct {
	R           *kafka.Reader
	TraceConfig *Config
	// contains filtered or unexported fields
}

func NewReader

func NewReader(r *kafka.Reader, opts ...Option) (*Reader, error)

NewReader calls kafka.NewReader and wraps the resulting Consumer with tracing instrumentation.

func (*Reader) Close

func (r *Reader) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, ends any remaining span.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (*kafka.Message, error)

type Writer

type Writer struct {
	W           *kafka.Writer
	TraceConfig *Config
}

func NewWriter

func NewWriter(w *kafka.Writer, opts ...Option) (*Writer, error)

NewWriter wraps the resulting Writer with OpenTelemetry instrumentation

func (*Writer) Close

func (w *Writer) Close() error

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msg kafka.Message) error

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL