Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExtractSpanContext ¶
func ExtractSpanContext(msg kafka.Message) (*tracer.SpanContext, error)
ExtractSpanContext retrieves the SpanContext from a kafka.Message
Types ¶
type KafkaWriter ¶
type KafkaWriter struct { *kafka.Writer // contains filtered or unexported fields }
Writer wraps a kafka.Writer with tracing config data
func NewWriter ¶
func NewWriter(conf kafka.WriterConfig, opts ...Option) *KafkaWriter
NewWriter calls kafka.NewWriter and wraps the resulting Producer.
func WrapWriter ¶
func WrapWriter(w *kafka.Writer, opts ...Option) *KafkaWriter
WrapWriter wraps a kafka.Writer so requests are traced.
func (*KafkaWriter) WriteMessages ¶
func (w *KafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error
WriteMessages calls kafka-go.Writer.WriteMessages and traces the requests.
type Option ¶
Option describes options for the Kafka integration.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithDataStreams ¶
func WithDataStreams() Option
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithService ¶
WithService sets the config service name to serviceName.
type OptionFn ¶
OptionFn represents options applicable to NewReader, NewWriter, WrapReader and WrapWriter.
type Reader ¶
type Reader struct { *kafka.Reader // contains filtered or unexported fields }
A Reader wraps a kafka.Reader.
Example ¶
r := kafkatrace.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", GroupID: "group-id", SessionTimeout: 30 * time.Second, }) msg, err := r.ReadMessage(context.Background()) if err != nil { log.Fatal("Failed to read message", err) } // create a child span using span id and trace id in message header spanContext, err := kafkatrace.ExtractSpanContext(msg) if err != nil { log.Fatal("Failed to extract span context from carrier", err) } operationName := "child-span" s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext)) defer s.Finish()
func WrapReader ¶
WrapReader wraps a kafka.Reader so that any consumed events are traced.
func (*Reader) Close ¶
Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.
func (*Reader) FetchMessage ¶
FetchMessage reads and returns the next message from the reader. Message will be traced.