otel

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2021 License: MIT Imports: 12 Imported by: 0

README

OpenTelemetry Instrumentation

This package provides an instrumentation of OpenTelemetry.

Here are the available methods instrumented:

Producer

Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
ProduceChannel() chan *kafka.Message

Consumer

ReadMessage(timeout time.Duration) (*kafka.Message, error)
Poll(timeoutMs int) kafka.Event

Concerning customers, I've also added these 2 following methods in order to trace the consumer handler duration (which is not possible with the original library available methods):

ReadMessageWithHandler(timeout time.Duration, handler ConsumeFunc) (*kafka.Message, error)
PollWithHandler(timeoutMs int, handler ConsumeFunc) kafka.Event

Handler function takes the following arguments:

type ConsumeFunc func(consumer *kafka.Consumer, msg *kafka.Message) error

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeFunc

type ConsumeFunc func(consumer *kafka.Consumer, msg *kafka.Message) error

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

func NewConsumerWithTracing

func NewConsumerWithTracing(consumer *kafka.Consumer, opts ...Option) *Consumer

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) kafka.Event

Poll retrieves an event from current consumer and creates a new span if it is a kafka.Message event type.

func (*Consumer) PollWithHandler

func (c *Consumer) PollWithHandler(timeoutMs int, handler ConsumeFunc) kafka.Event

PollWithHandler retrieves an event from current consumer, creates a new span if it is a kafka.Message event type and also runs the given handler.

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage creates a new span and reads a Kafka message from current consumer.

func (*Consumer) ReadMessageWithHandler

func (c *Consumer) ReadMessageWithHandler(timeout time.Duration, handler ConsumeFunc) (*kafka.Message, error)

ReadMessageWithHandler reads a message and runs the given handler by tracing it.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

MessageCarrier injects and extracts traces from a sarama.Message.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

Get retrieves a single value for a given key from Kafka message headers.

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

Keys returns all keys identifiers from the Kafka message headers.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, value string)

Set sets a header on Kafka message.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is used to configure the client.

func WithConsumerGroupID

func WithConsumerGroupID(consumerGroupID string) Option

WithConsumerGroupID specifies the consumer group ID that is used for creating a consumer.

func WithPropagator

func WithPropagator(propagator propagation.TextMapPropagator) Option

WithPropagator specifies a custom TextMapPropagator.

func WithTracerName

func WithTracerName(name string) Option

WithTracerName specifies a specific name for the current tracer.

func WithTracerProvider

func WithTracerProvider(provider oteltrace.TracerProvider) Option

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

func NewProducerWithTracing

func NewProducerWithTracing(producer *kafka.Producer, opts ...Option) *Producer

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Events

func (p *Producer) Events() chan kafka.Event

Events returns the channel events

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce creates a new span and produces the given Kafka message synchronously using the original producer.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel creates a new span for every messages sent into the channel and forwards to the original producer channel.

Directories

Path Synopsis
otelconfluent module
example Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL