kotel

package module
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2023 License: BSD-3-Clause Imports: 16 Imported by: 11

README

kotel

Kotel is an OpenTelemetry instrumentation plug-in package for franz-go. It provides tracing and metrics options through a kgo.Hook. With kotel, you can trace records produced or consumed with franz-go. You can pass parent traces into records and extract parent traces from records. It also tracks metrics related to connections, errors, and bytes transferred.

To learn more about how to use kotel, see the usage sections in the README and refer to the OpenTelemetry documentation for additional information about OpenTelemetry and how it can be used in your franz-go projects.

Tracing

kotel provides tracing capabilities for Kafka using OpenTelemetry specifications. It allows for the creation of three different span operations: "publish", "receive", and "process". Additionally, it also provides a set of attributes to use with these spans.

How it works

The kotel tracer module uses hooks to automatically create and close "publish" and "receive" spans as a kgo.Record flows through the application. However, for the "process" span, it uses a convenience method that must be manually invoked and closed in the consumer code to capture processing.

The following table provides a visual representation of the lineage of the span operations:

Order Hook/Method Operation State
1 kgo.HookProduceRecordBuffered Publish Start
2 kgo.HookProduceRecordUnbuffered Publish End
3 kgo.HookFetchRecordBuffered Receive Start
4 kgo.HookFetchRecordUnbuffered Receive End
5 kotel.Tracer.WithProcessSpan Process Start
Getting started

To start using kotel for tracing, you will need to:

  1. Set up a tracer provider
  2. Configure any desired tracer options
  3. Create a new kotel tracer
  4. Create a new kotel service hook
  5. Create a new Kafka client and pass in the kotel hook

Here's an example of how you might do this:

// Initialize tracer provider.
tracerProvider, err := initTracerProvider()

// Create a new kotel tracer.
tracerOpts := []kotel.TracerOpt{
	kotel.TracerProvider(tracerProvider),
	kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})),
}
tracer := kotel.NewTracer(tracerOpts...)

// Create a new kotel service.
kotelOps := []kotel.Opt{
	kotel.WithTracer(tracer),
}
kotelService := kotel.NewKotel(kotelOps...)

// Create a new Kafka client.
cl, err := kgo.NewClient(
	// Pass in the kotel hook.
	kgo.WithHooks(kotelService.Hooks()...),
	// ...other opts.
)
Sending records

When producing a record with franz-go, it will traced by kotel. To include parent traces, pass in an instrumented context.

Here's an example of how to do this:

func httpHandler(w http.ResponseWriter, r *http.Request) {
	// Start a new span with options.
	opts := []trace.SpanStartOption{
		trace.WithSpanKind(trace.SpanKindServer),
		trace.WithAttributes([]attribute.KeyValue{attribute.String("some-key", "foo")}...),
	}
	ctx, span := tracer.Start(r.Context(), "request", opts...)
	// End the span when function exits.
	defer span.End()

	var wg sync.WaitGroup
	wg.Add(1)
	record := &kgo.Record{Topic: "topic", Value: []byte("foo")}
	// Pass in the context from the tracer.Start() call to ensure that the span
	// created is linked to the parent span.
	cl.Produce(ctx, record, func(_ *kgo.Record, err error) {
		defer wg.Done()
		if err != nil {
			fmt.Printf("record had a produce error: %v\n", err)
			span.SetStatus(codes.Error, err.Error())
			span.RecordError(err)
		}
	})
	wg.Wait()
}
Processing Records

Use the kotel.Tracer.WithProcessSpan method to start a "process" span. Make sure to end the span after you finish processing the record. The trace can be continued to the next processing step if desired.

Here is an example of how you might do this:

func processRecord(record *kgo.Record, tracer *kotel.Tracer) {
	ctx, span := tracer.WithProcessSpan(record)
	// Process the record here.
	// End the span when function exits.
	defer span.End()
	// optionally pass the context to the next processing step.
	fmt.Printf(
		"processed offset '%s' with key '%s' and value '%s'\n",
		strconv.FormatInt(record.Offset, 10),
		string(record.Key),
		string(record.Value),
	)
}

Metrics

The kotel meter module tracks various metrics related to the processing of records, such as the number of successful and unsuccessful connections, bytes written and read, and the number of buffered records. These metrics are all counters and are tracked under the following names:

messaging.kafka.connects.count{node_id = "#{node}"}
messaging.kafka.connect_errors.count{node_id = "#{node}"}
messaging.kafka.disconnects.count{node_id = "#{node}"}
messaging.kafka.write_errors.count{node_id = "#{node}"}
messaging.kafka.write_bytes{node_id = "#{node}"}
messaging.kafka.read_errors.count{node_id = "#{node}"}
messaging.kafka.read_bytes.count{node_id = "#{node}"}
messaging.kafka.produce_bytes.count{node_id = "#{node}", topic = "#{topic}"}
messaging.kafka.produce_records.count{node_id = "#{node}", topic = "#{topic}"}
messaging.kafka.fetch_bytes.count{node_id = "#{node}", topic = "#{topic}"}
messaging.kafka.fetch_records.count{node_id = "#{node}", topic = "#{topic}"}
Getting started

To start using kotel for metrics, you will need to:

  1. Set up a meter provider
  2. Configure any desired meter options
  3. Create a new kotel meter
  4. Create a new kotel service hook
  5. Create a new Kafka client and pass in the kotel hook

Here's an example of how you might do this:

// Initialize meter provider.
meterProvider, err := initMeterProvider()

// Create a new kotel meter.
meterOpts := []kotel.MeterOpt{kotel.MeterProvider(meterProvider)}
meter := kotel.NewMeter(meterOpts...)

// Pass the meter to NewKotel hook.
kotelOps := []kotel.Opt{
	kotel.WithMeter(meter),
}

// Create a new kotel service.
kotelService := kotel.NewKotel(kotelOps...)

// Create a new Kafka client.
cl, err := kgo.NewClient(
	// Pass in the kotel hook.
	kgo.WithHooks(kotelService.Hooks()...),
	// ...other opts.
)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Kotel

type Kotel struct {
	// contains filtered or unexported fields
}

Kotel represents the configuration options available for the kotel plugin.

func NewKotel

func NewKotel(opts ...Opt) *Kotel

NewKotel creates a new Kotel struct and applies opts to it.

func (*Kotel) Hooks

func (k *Kotel) Hooks() []kgo.Hook

Hooks return a list of kgo.hooks compatible with its interface.

type Meter

type Meter struct {
	// contains filtered or unexported fields
}

func NewMeter

func NewMeter(opts ...MeterOpt) *Meter

NewMeter returns a Meter, used as option for kotel to instrument franz-go with instruments.

func (*Meter) OnBrokerConnect

func (m *Meter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error)

func (*Meter) OnBrokerDisconnect

func (m *Meter) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn)

func (*Meter) OnBrokerRead

func (m *Meter) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, err error)

func (*Meter) OnBrokerWrite

func (m *Meter) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, _, _ time.Duration, err error)

func (*Meter) OnFetchBatchRead

func (m *Meter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, fbm kgo.FetchBatchMetrics)

func (*Meter) OnProduceBatchWritten

func (m *Meter) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, pbm kgo.ProduceBatchMetrics)

type MeterOpt

type MeterOpt interface {
	// contains filtered or unexported methods
}

MeterOpt interface used for setting optional config properties.

func MeterProvider

func MeterProvider(provider metric.MeterProvider) MeterOpt

MeterProvider takes a metric.MeterProvider and applies it to the Meter If none is specified, the global provider is used.

type Opt

type Opt interface {
	// contains filtered or unexported methods
}

Opt interface used for setting optional kotel properties.

func WithMeter

func WithMeter(m *Meter) Opt

WithMeter configures Kotel with a Meter.

func WithTracer

func WithTracer(t *Tracer) Opt

WithTracer configures Kotel with a Tracer.

type RecordCarrier

type RecordCarrier struct {
	// contains filtered or unexported fields
}

RecordCarrier injects and extracts traces from a kgo.Record.

This type exists to satisfy the otel/propagation.TextMapCarrier interface.

func NewRecordCarrier

func NewRecordCarrier(record *kgo.Record) RecordCarrier

NewRecordCarrier creates a new RecordCarrier.

func (RecordCarrier) Get

func (c RecordCarrier) Get(key string) string

Get retrieves a single value for a given key if it exists.

func (RecordCarrier) Keys

func (c RecordCarrier) Keys() []string

Keys returns a slice of all key identifiers in the carrier.

func (RecordCarrier) Set

func (c RecordCarrier) Set(key, val string)

Set sets a header.

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

func NewTracer

func NewTracer(opts ...TracerOpt) *Tracer

NewTracer returns a Tracer, used as option for kotel to instrument franz-go with tracing.

func (*Tracer) OnFetchRecordBuffered

func (t *Tracer) OnFetchRecordBuffered(r *kgo.Record)

OnFetchRecordBuffered starts a new span for the "receive" operation on a buffered record.

It sets the span options and extracts the span context from the record, updates the record's context to ensure it can be ended in the OnFetchRecordUnbuffered hook and can be used in downstream consumer processing.

func (*Tracer) OnFetchRecordUnbuffered

func (t *Tracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool)

OnFetchRecordUnbuffered continues and ends the "receive" span for an unbuffered record.

func (*Tracer) OnProduceRecordBuffered

func (t *Tracer) OnProduceRecordBuffered(r *kgo.Record)

OnProduceRecordBuffered starts a new span for the "publish" operation on a buffered record.

It sets span options and injects the span context into record and updates the record's context, so it can be ended in the OnProduceRecordUnbuffered hook.

func (*Tracer) OnProduceRecordUnbuffered

func (t *Tracer) OnProduceRecordUnbuffered(r *kgo.Record, err error)

OnProduceRecordUnbuffered continues and ends the "publish" span for an unbuffered record.

It sets attributes with values unset when producing and records any error that occurred during the publish operation.

func (*Tracer) WithProcessSpan

func (t *Tracer) WithProcessSpan(r *kgo.Record) (context.Context, trace.Span)

WithProcessSpan starts a new span for the "process" operation on a consumer record.

It sets up the span options. The user's application code is responsible for ending the span.

This should only ever be called within a polling loop of a consumed record and not a record which has been created for producing, so call this at the start of each iteration of your processing for the record.

type TracerOpt

type TracerOpt interface {
	// contains filtered or unexported methods
}

TracerOpt interface used for setting optional config properties.

func ClientID

func ClientID(id string) TracerOpt

ClientID sets the optional client_id attribute value.

func ConsumerGroup

func ConsumerGroup(group string) TracerOpt

ConsumerGroup sets the optional group attribute value.

func KeyFormatter added in v1.1.0

func KeyFormatter(fn func(*kgo.Record) (string, error)) TracerOpt

KeyFormatter formats a Record's key for use in a span's attributes, overriding the default of string(Record.Key).

This option can be used to parse binary data and return a canonical string representation. If the returned string is not valid UTF-8 or if the formatter returns an error, the key is not attached to the span.

func TracerPropagator

func TracerPropagator(propagator propagation.TextMapPropagator) TracerOpt

TracerPropagator takes a propagation.TextMapPropagator and applies it to the Tracer.

If none is specified, the global Propagator is used.

func TracerProvider

func TracerProvider(provider trace.TracerProvider) TracerOpt

TracerProvider takes a trace.TracerProvider and applies it to the Tracer. If none is specified, the global provider is used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL