otelconfluent

package module
v0.0.0-...-5f8b02f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeFunc

type ConsumeFunc func(consumer *kafka.Consumer, msg *kafka.Message) error

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

func NewConsumerWithTracing

func NewConsumerWithTracing(consumer *kafka.Consumer, opts ...Option) *Consumer

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) kafka.Event

Poll retrieves an event from current consumer and creates a new span if it is a kafka.Message event type.

func (*Consumer) PollWithHandler

func (c *Consumer) PollWithHandler(timeoutMs int, handler ConsumeFunc) kafka.Event

PollWithHandler retrieves an event from current consumer, creates a new span if it is a kafka.Message event type and also runs the given handler.

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage creates a new span and reads a Kafka message from current consumer.

func (*Consumer) ReadMessageWithHandler

func (c *Consumer) ReadMessageWithHandler(timeout time.Duration, handler ConsumeFunc) (*kafka.Message, error)

ReadMessageWithHandler reads a message and runs the given handler by tracing it.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is used to configure the client.

func WithConsumerGroupID

func WithConsumerGroupID(consumerGroupID string) Option

WithConsumerGroupID specifies the consumer group ID that is used for creating a consumer.

func WithTracerProvider

func WithTracerProvider(provider oteltrace.TracerProvider) Option

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

func NewProducerWithTracing

func NewProducerWithTracing(producer *kafka.Producer, opts ...Option) *Producer

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Events

func (p *Producer) Events() chan kafka.Event

Events returns the channel events

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce creates a new span and produces the given Kafka message synchronously using the original producer.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel creates a new span for every messages sent into the channel and forwards to the original producer channel.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL