datadog

package module
Version: v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	// DefaultStatsAddrUDP specifies the default protocol (UDP) and address
	// for the DogStatsD service.
	DefaultStatsAddrUDP = "localhost:8125"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Exporter

type Exporter struct {
	// contains filtered or unexported fields
}

Exporter forwards metrics to a DataDog agent

Example
package main

import (
	"context"
	"fmt"
	"net"
	"os"
	"strings"
	"time"

	"github.com/DataDog/datadog-go/statsd"

	"go.opentelemetry.io/contrib/exporters/metric/datadog"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/metric"
	"go.opentelemetry.io/otel/metric/global"
	controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
	"go.opentelemetry.io/otel/sdk/metric/processor/basic"
	"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

type TestUDPServer struct {
	*net.UDPConn
}

func main() {
	const testHostPort = ":8159"
	selector := simple.NewWithInexpensiveDistribution()
	exp, err := datadog.NewExporter(datadog.Options{
		StatsAddr:     testHostPort,
		Tags:          []string{"env:dev"},
		StatsDOptions: []statsd.Option{statsd.WithoutTelemetry()},
	})
	if err != nil {
		panic(err)
	}
	s, err := getTestServer(testHostPort)
	if err != nil {
		panic(err)
	}
	defer s.Close()

	go func() {
		defer exp.Close()
		processor := basic.New(selector, exp)
		pusher := controller.New(processor, controller.WithExporter(exp), controller.WithCollectPeriod(time.Second*10))
		ctx := context.Background()
		err := pusher.Start(ctx)
		if err != nil {
			panic(err)
		}
		defer func() { handleErr(pusher.Stop(ctx)) }()
		global.SetMeterProvider(pusher.MeterProvider())
		meter := global.Meter("marwandist")
		m := metric.Must(meter).NewInt64ValueRecorder("myrecorder")
		meter.RecordBatch(context.Background(), []attribute.KeyValue{attribute.Int("l", 1)},
			m.Measurement(1), m.Measurement(50), m.Measurement(100))
	}()

	statsChan := make(chan []byte, 1)
	timedOutChan, stopChan := make(chan struct{}), make(chan struct{})
	defer close(stopChan)

	go s.ReadPackets(statsChan, 500*time.Millisecond, timedOutChan, stopChan)

	for {
		select {
		case d := <-statsChan:
			// only look for "max" value, since we don't want to rely on
			// specifics of OpenTelemetry aggregator calculations
			// "max" is something that will always exist and always be the same
			statLine := string(d)
			if strings.HasPrefix(statLine, "myrecorder.max") {
				fmt.Println(statLine)
				return
			}
		case <-timedOutChan:
			_, _ = fmt.Fprintln(os.Stderr, "Server timed out waiting for packets")
			return
		case <-time.After(1 * time.Second):
			fmt.Println("no data received after 1 second")
			return
		}
	}

}

func getTestServer(addr string) (*TestUDPServer, error) {
	udpAddr, err := net.ResolveUDPAddr("udp", addr)
	if err != nil {
		return nil, err
	}

	conn, err := net.ListenUDP("udp", udpAddr)
	if err != nil {
		return nil, err
	}
	return &TestUDPServer{conn}, nil
}

// ReadPackets reads one "transmission" at a time from the UDP connection
//
// In the case of StatsD, there is one transmission per stat.
// If there is nothing on the connection for longer than maxIdleTime, the
// routine will return, assuming that everything has been sent
// doneChan is an output channel that is closed when ReadPackets returns
// stopChan is an input channel that tells ReadPackets to exit
func (s TestUDPServer) ReadPackets(
	xferChan chan []byte,
	maxIdleTime time.Duration,
	doneChan chan<- struct{},
	stopChan <-chan struct{}) {

	const readTimeout = 50 * time.Millisecond
	var timeouts int

	buffer := make([]byte, 1500)
	defer close(doneChan)
	n := 0
	for {
		select {
		case <-stopChan:
			return
		default:
			_ = s.SetReadDeadline(time.Now().Add(readTimeout))
			nn, _, err := s.ReadFrom(buffer[n:])
			if err == nil {
				timeouts = 0
				data := make([]byte, nn)
				_ = copy(data, buffer[n:n+nn])
				xferChan <- data
				n += nn
				continue
			} else {
				if nerr, ok := err.(*net.OpError); ok && nerr.Timeout() {
					timeouts++
					if time.Duration(timeouts)*readTimeout > maxIdleTime {
						return
					}
					continue
				}
				// give connection some time to connect
				time.Sleep(2 * time.Millisecond)
			}

		}
	}
}

func handleErr(err error) {
	if err != nil {
		fmt.Println("Encountered error: ", err.Error())
	}
}
Output:

myrecorder.max:100|g|#env:dev,l:1,service.name:unknown_service:datadog.test,telemetry.sdk.language:go,telemetry.sdk.name:opentelemetry,telemetry.sdk.version:1.0.0-RC1

func NewExporter

func NewExporter(opts Options) (*Exporter, error)

NewExporter exports to a datadog client

func (*Exporter) Close

func (e *Exporter) Close() error

Close cloess the underlying datadog client which flushes any pending buffers

func (*Exporter) Export

func (e *Exporter) Export(ctx context.Context, cs export.CheckpointSet) error

func (*Exporter) ExportKindFor

ExportKindFor returns export.DeltaExporter for statsd-derived exporters

type Options

type Options struct {
	// StatsAddr specifies the host[:port] address for DogStatsD. It defaults
	// to DefaultStatsAddrUDP.
	StatsAddr string

	// Tags specifies a set of global tags to attach to each metric.
	Tags []string

	// UseDistribution uses a DataDog Distribution type instead of Histogram
	UseDistribution bool

	// MetricNameFormatter lets you customize the metric name that gets sent to
	// datadog before exporting
	MetricNameFormatter func(namespace, name string) string

	// StatsD specific Options
	StatsDOptions []statsd.Option
}

Options contains options for configuring the exporter.

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to