package module
Version: v2.0.0+incompatible Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2018 License: MIT Imports: 60 Imported by: 0


Build Status GoDoc

Veneur (/vɛnˈʊr/, rhymes with “assure”) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the DogStatsD protocol or SSF for aggregating metrics and sending them to downstream storage to one or more supported sinks. It can also act as a global aggregator for histograms, sets and counters.

More generically, Veneur is a convenient sink for various observability primitives.

See also:

  • A unified, standard format for observability primitives, the SSF
  • A proxy for resilient distributed aggregation, veneur-proxy
  • A command line tool for emitting metrics, veneur-emit


Veneur is currently handling all metrics for Stripe and is considered production ready. It is under active development and maintenance! Starting with v1.6, Veneur operates on a six-week release cycle, and all releases are tagged in git.

Building Veneur requires Go 1.8 or later.


We wanted percentiles, histograms and sets to be global. Veneur helps us do that!

Veneur is a DogStatsD implementation that acts as a local collector and — optionally — as an aggregator for some metric types, such that the metrics are global rather than host-local. This is particularly useful for histograms, timers and sets, as in their normal, per-host configuration the percentiles for histograms can be less effective or even meaningless. Per-host unique sets are also often not what's desired.

Global *StatsD installations can be problematic, as they either require client-side or proxy sharding behavior to prevent an instance being a Single Point of Failure (SPoF) for all metrics. Veneur aims to solve this problem. Non-global metrics like counters and gauges are collected by a local Veneur instance and sent to storage at flush time. Global metrics (histograms and sets) are forwarded to a central Veneur instance for aggregation before being sent to storage.

How Veneur Is Different Than Official DogStatsD

Veneur is different for a few reasons. They are enumerated here.


Veneur adheres to the official DogStatsD datagram format with the exceptions below:

  • The tag veneurlocalonly is stripped and influences forwarding behavior, as discussed below.
  • The tag veneurglobalonly is stripped and influences forwarding behavior, as discussed below.

Global Aggregation

If configured to do so, Veneur can selectively aggregate global metrics to be cumulative across all instances that report to a central Veneur, allowing global percentile calculation and global set counts.

For example, say you emit a timer from 20 hosts that are configured to forward to a central veneur. In Datadog you'll see the following:

  • Metrics that have been "globalized"
    • the p50 across all hosts, by tag
    • the p90 across all hosts, by tag
    • the p95 across all hosts, by tag
    • the p99 across all hosts, by tag
  • Metrics that remain host-local
    • by-host tagged average
    • by-host tagged count which (when summed) shows the total count of times this metric was emitted
    • by-host tagged maximum value
    • by-host tagged median value
    • by-host tagged minimum value
    • by-host tagged sum value representing the total time

Clients can choose to override this behavior by including the tag veneurlocalonly.

Approximate Histograms

Because Veneur is built to handle lots and lots of data, it uses approximate histograms. We have our own implementation of Dunning's t-digest, which has bounded memory consumption and reduced error at extreme quantiles. Metrics are consistently routed to the same worker to distribute load and to be added to the same histogram.

Datadog's DogStatsD — and StatsD — uses an exact histogram which retains all samples and is reset every flush period. This means that there is a loss of precision when using Veneur, but the resulting percentile values are meant to be more representative of a global view.

Approximate Sets

Veneur uses HyperLogLogs for approximate unique sets. These are a very efficient unique counter with fixed memory consumption.

Global Counters

Via an optional magic tag Veneur will forward counters to a global host for accumulation. This feature was primarily developed to control tag cardinality. Some counters are valuable but do not require per-host tagging.

Lack of Host Tags for Aggregated Metrics

By definition the hostname is not applicable to global metrics that Veneur processes. Note that if you do include a hostname tag, Veneur will not strip it for you. Veneur will add its own hostname as configured to metrics sent to Datadog.


Veneur expires all metrics on each flush. If a metric is no longer being sent (or is sent sparsely) Veneur will not send it as zeros! This was chosen because the combination of the approximation's features and the additional hysteresis imposed by retaining these approximations over time was deemed more complex than desirable.


  • Global metrics are those that benefit from being aggregated for chunks — or all — of your infrastructure. These are histograms (including the percentiles generated by timers) and sets.
  • Metrics that are sent to another Veneur instance for aggregation are said to be "forwarded". This terminology helps to decipher configuration and metric options below.
  • Flushed, in Veneur, means metrics sent to Datadog.

By Metric Type Behavior

To clarify how each metric type behaves in Veneur, please use the following:

  • Counters: Locally accrued, flushed to Datadog (see magic tags for global version)
  • Gauges: Locally accrued, flushed to Datadog (see magic tags for global version)
  • Histograms: Locally accrued, count, max and min flushed to Datadog, percentiles forwarded to forward_address for global aggregation when set.
  • Timers: Locally accrued, count, max and min flushed to Datadog, percentiles forwarded to forward_address for global aggregation when set.
  • Sets: Locally accrued, forwarded to forward_address for global aggregation when set.

Other Notes

  • Veneur aligns its flush timing with the local clock. For the default interval of 10s Veneur will generally emit metrics at 00, 10, 20, 30, … seconds after the minute.
  • Veneur will delay it's first metric emission to align the clock as stated above. This may result in a brief quiet period on a restart at worst < interval seconds long.


veneur -f example.yaml

See example.yaml for a sample config. Be sure to set your Datadog API key!


Veneur includes optional plugins to extend its capabilities. These plugins are enabled via configuration options. Please consult each plugin's README for more information:

  • S3 Plugin - Emit flushed metrics as a TSV file to Amazon S3


Here we'll document some explanations of setup choices you may make when using Veneur.

With Datadog

Veneur can act as a replacement for the stock DogStatsD. In our environment we disable the Datadog DogStatsd port by setting use_dogstatsd to false. We run Veneur on an alternate port (8200) so as to not accidentally ingest metrics from legacy StatsD clients and we configure our Datadog agent to use this port by dogstatsd_port to 8200 to match.


Veneur is capable of ingesting:

  • DogStatsD including events and service checks
  • SSF (experimental)
  • StatsD as a subset of DogStatsD, but this may cause trouble depending on where you store your metrics.

To use clients with Veneur you need only configure your client of choice to the proper host and port combination. This port should match one of:

  • udp_address for UDP-based clients
  • tcp_address for TCP-based clients
  • ssf_address for SSF-based clients

Einhorn Usage

When you upgrade Veneur (deploy, stop, start with new binary) there will be a brief period where Veneur will not be able to handle HTTP requests. At Stripe we use Einhorn as a shared socket manager to bridge the gap until Veneur is ready to handle HTTP requests again.

You'll need to consult Einhorn's documentation for installation, setup and usage. But once you've done that you can tell Veneur to use Einhorn by setting http_address to einhorn@0. This informs goji/bind to use its Einhorn handling code to bind to the file descriptor for HTTP.


Veneur instances can be configured to forward their global metrics to another Veneur instance. You can use this feature to get the best of both worlds: metrics that benefit from global aggregation can be passed up to a single global Veneur, but other metrics can be published locally with host-scoped information. Note: Forwarding adds an additional delay to metric availability corresponding to the value of the interval configuration option, as the local veneur will flush it to its configured upstream, which will then flush any recieved metrics when its interval expires.

If a local instance receives a histogram or set, it will publish the local parts of that metric (the count, min and max) directly to DataDog, but instead of publishing percentiles, it will package the entire histogram and send it to the global instance. The global instance will aggregate all the histograms together and publish their percentiles to DataDog.

Note that the global instance can also receive metrics over UDP. It will publish a count, min and max for the samples that were sent directly to it, but not counting any samples from other Veneur instances (this ensures that things don't get double-counted). You can even chain multiple levels of forwarding together if you want. This might be useful if, for example, your global Veneur is under too much load. The root of the tree will be the Veneur instance that has an empty forward_address. (Do not tell a Veneur instance to forward metrics to itself. We don't support that and it doesn't really make sense in the first place.)

With respect to the tags configuration option, the tags that will be added are those of the Veneur that actually publishes to DataDog. If a local instance forwards its histograms and sets to a global instance, the local instance's tags will not be attached to the forwarded structures. It will still use its own tags for the other metrics it publishes, but the percentiles will get extra tags only from the global instance.


To improve availability, you can leverage veneur-proxy in conjunction with Consul service discovery.

The proxy can be configured to query the Consul API for instances of a service using consul_forward_service_name. Each healthy instance is then entered in to a hash ring. When choosing which host to forward to, Veneur will use a combination of metric name and tags to consistently choose the same host for forwarding.

See more documentation for Proxy Veneur.

Static Configuration

For static configuration you need one Veneur, which we'll call the global instance, and one or more other Veneurs, which we'll call local instances. The local instances should have their forward_address configured to the global instance's http_address. The global instance should have an empty forward_address (ie just don't set it). You can then report metrics to any Veneur's udp_address as usual.

Magic Tag

If you want a metric to be strictly host-local, you can tell Veneur not to forward it by including a veneurlocalonly tag in the metric packet, eg foo:1|h|#veneurlocalonly. This tag will not actually appear in DataDog; Veneur removes it.

Global Counters And Gauges

Relatedly, if you want to forward a counter or gauge to the global Veneur instance to reduce tag cardinality, you can tell Veneur to flush it to the global instance by including a veneurglobalonly tag in the metric's packet. This veneurglobalonly tag is stripped and will not be passed on to sinks.

Note: For global counters to report correctly, the local and global Veneur instances should be configured to have the same flush interval.

Note: Global gauges are "random write wins" since they are merged in a non-deterministic order at the global Veneur.

Hostname and Device

Veneur also honors the same "magic" tags that the dogstatsd daemon includes in the datadog agent. The tag host will override Hostname in the metric and device will override DeviceName.

Routing metrics

Veneur supports specifying that metrics should only be routed to a specific metric sink, with the veneursinkonly:<sink_name> tag. The <sink_name> value can be any configured metric sink. Currently, that's datadog, kafka, signalfx. It's possible to specify multiple sink destination tags on a metric, which will cause the metric to be routed to each sink specified.


Veneur expects to have a config file supplied via -f PATH. The included example.yaml outlines the options:

  • statsd_listen_addresses - The address(es) on which to listen for metrics, in URI form. Examples: udp://, tcp:// DogStatsD listens on 8125, so you might want to choose a different port.
  • forward_address - The address of an upstream Veneur to forward metrics to. See below.
  • interval - How often to flush. Something like 10s seems good. Note: If you change this, it breaks all kinds of things on Datadog's side. You'll have to change all your metric's metadata.
  • stats_address - The address to send internally generated metrics. Probably In practice this means you'll be sending metrics to yourself. This is expected!
  • http_address - The address to serve HTTP healthchecks and other endpoints. This can be a simple ip:port combination like If you're under einhorn, you probably want einhorn@0.
Metrics configuration
  • hostname - The hostname to be tagged on each metric sent. Defaults to os.Hostname()
  • omit_empty_hostname - If true and hostname is empty ("") Veneur will not add a host tag to its own metrics.
  • tags - Tags to add to every metric that is sent to Veneur. Expects an array of strings!
  • percentiles - The percentiles to generate from our timers and histograms. Specified as array of float64s
  • aggregates - The aggregates to generate from our timers and histograms. Specified as array of strings, choices: min, max, median, avg, count, sum. Default: min, max, count
  • num_workers - The number of worker goroutines to start.
  • num_readers - The number of reader goroutines to start. Veneur supports SO_REUSEPORT on Linux to scale to multiple readers. On other platforms, this should always be 1; other values will probably cause errors at startup. See below.
  • metric_max_length - How big a buffer to allocate for incoming metric lengths. Metrics longer than this will get truncated!
  • trace_max_length_bytes - How big a buffer to allocate for incoming traces
  • ssf_buffer_size - The number of SSF packets that can be processed per flush interval
  • read_buffer_size_bytes - The size of the buffer we'll use to buffer socket reads. Tune this if you think Veneur needs more room to keep up with all packets.
  • flush_max_per_body - how many metrics to include in each JSON body POSTed to Datadog. Veneur will POST multiple bodies in parallel if it goes over this limit. A value around 5k-10k is recommended; in practice we've seen Datadog reject bodies over about 195k.
  • debug - Should we output lots of debug info? :)
  • sentry_dsn A DSN for Sentry, where errors will be sent when they happen.
  • enable_profiling - Enables Go profiling
  • datadog_api_key - Your Datadog API key
  • datadog_api_hostname - The Datadog API URL to post to. Probably
  • datadog_trace_api_address - The hostname to send Datadog traces to
  • trace_lightstep_access_token - The access token for sending to LightStep
  • trace_lightstep_collector_host - The hostname to send trace data to
  • trace_lightstep_reconnect_period - How often to reconnect to LightStep collectors
  • aws_access_key_id - The AWS access key ID, used in conjunction with aws_secret_access_key to authenticate to AWS
  • aws_secret_access_key - The AWS secret access key, used in conjunction with aws_access_key_id to authenticate to AWS
  • aws_region - The region to write to
  • aws_s3_bucket - The bucket to write to
  • flush_file - The local file path to write metrics to

Configuration via Environment Variables

Veneur and veneur-proxy each allow configuration via environment variables using envconfig. Options provided via environment variables take precedent over those in config. This allows stuff like:

VENEUR_DEBUG=true veneur -f someconfig.yml

Note: The environment variables used for configuration map to the field names in config.go, capitalized, with the prefix VENEUR_. For example, the environment variable equivalent of api_hostname is VENEUR_APIHOSTNAME.

You may specify configurations that are arrays by separating them with a comma, for example VENEUR_AGGREGATES="min,max"


Here are the important things to monitor with Veneur:

At Local Node

When running as a local instance, you will be primarily concerned with the following metrics:

  • veneur.flush*.error_total as a count of errors when flushing metrics to Datadog. This should rarely happen. Occasional errors are fine, but sustained is bad.
  • veneur.flush.total_duration_ns and veneur.flush.total_duration_ns.count. These metrics track the per-host time spent performing a flush to Datadog. The time should be minimal!

If you are forwarding metrics to central Veneur, you'll want to monitor these:

  • veneur.forward.error_total and the cause tag. This should pretty much never happen and definitely not be sustained.
  • veneur.forward.duration_ns and veneur.forward.duration_ns.count. These metrics track the per-host time spent performing a forward. The time should be minimal!

At Global Node

When forwarding you'll want to also monitor the global nodes you're using for aggregation:

  • veneur.import.request_error_total and the cause tag. This should pretty much never happen and definitely not be sustained.
  • veneur.import.response_duration_ns and veneur.import.response_duration_ns.count to monitor duration and number of received forwards. This should not fail and not take very long. How long it takes will depend on how many metrics you're forwarding.
  • And the same veneur.flush.* metrics from the "At Local Node" section.


Veneur will emit metrics to the stats_address configured above in DogStatsD form. Those metrics are:

  • veneur.packet.error_total - Number of packets that Veneur could not parse due to some sort of formatting error by the client. Tagged by packet_type and reason.
  • veneur.flush.post_metrics_total - The total number of time-series points that will be submitted to Datadog via POST. Datadog's rate limiting is roughly proportional to this number.
  • veneur.forward.post_metrics_total - Indicates how many metrics are being forwarded in a given POST request. A "metric", in this context, refers to a unique combination of name, tags and metric type.
  • veneur.*.content_length_bytes.* - The number of bytes in a single POST body. Remember that Veneur POSTs large sets of metrics in multiple separate bodies in parallel. Uses a histogram, so there are multiple metrics generated depending on your local DogStatsD config.
  • veneur.flush.duration_ns - Time taken for a single POST transaction to the Datadog API. Tagged by part for each sub-part marshal (assembling the request body) and post (blocking on an HTTP response).
  • veneur.forward.duration_ns - Same as flush.duration_ns, but for forwarding requests.
  • veneur.flush.total_duration_ns - Total time spent POSTing to Datadog, across all parallel requests. Under most circumstances, this should be roughly equal to the total veneur.flush.duration_ns. If it's not, then some of the POSTs are happening in sequence, which suggests some kind of goroutine scheduling issue.
  • veneur.flush.error_total - Number of errors received POSTing to Datadog.
  • veneur.forward.error_total - Number of errors received POSTing to an upstream Veneur. See also import.request_error_total below.
  • veneur.flush.worker_duration_ns - Per-worker timing — tagged by worker - for flush. This is important as it is the time in which the worker holds a lock and is unavailable for other work.
  • veneur.gc.number - Number of completed GC cycles.
  • veneur.gc.pause_total_ns - Total seconds of STW GC since the program started.
  • veneur.mem.heap_alloc_bytes - Total number of reachable and unreachable but uncollected heap objects in bytes.
  • veneur.worker.metrics_processed_total - Total number of metric packets processed between flushes by workers, tagged by worker. This helps you find hot spots where a single worker is handling a lot of metrics. The sum across all workers should be approximately proportional to the number of packets received.
  • veneur.worker.metrics_flushed_total - Total number of metrics flushed at each flush time, tagged by metric_type. A "metric", in this context, refers to a unique combination of name, tags and metric type. You can use this metric to detect when your clients are introducing new instrumentation, or when you acquire new clients.
  • veneur.worker.metrics_imported_total - Total number of metrics received via the importing endpoint. A "metric", in this context, refers to a unique combination of name, tags, type and originating host. This metric indicates how much of a Veneur instance's load is coming from imports.
  • veneur.import.response_duration_ns - Time spent responding to import HTTP requests. This metric is broken into part tags for request (time spent blocking the client) and merge (time spent sending metrics to workers).
  • veneur.import.request_error_total - A counter for the number of import requests that have errored out. You can use this for monitoring and alerting when imports fail.

Error Handling

In addition to logging, Veneur will dutifully send any errors it generates to a Sentry instance. This will occur if you set the sentry_dsn configuration option. Not setting the option will disable Sentry reporting.


Processing packets quickly is the name of the game.


The common use case for Veneur is as an aggregator and host-local replacement for DogStatsD, therefore processing UDP fast is no longer the priority. That said, we were processing > 60k packets/second in production before shifting to the current local aggregation method. This outperformed both the Datadog-provided DogStatsD and StatsD in our infrastructure.

Compressed, Chunked POST

Datadog's API is tuned for small POST bodies from lots of hosts since they work on a per-host basis. Also there are limits on the size of the body that can be posted. As a result Veneur chunks metrics in to smaller bits — governed by flush_max_per_body — and sends them (compressed) concurrently to Datadog. This is essential for reasonable performance as Datadog's API seems to be somewhat O(n) with the size of the body (which is proportional to the number of metrics).

We've found that our hosts generate around 5k metrics and have reasonable performance, so in our case 5k is used as the flush_max_per_body.


The following sysctl settings are used in testing, and are the same one would use for StatsD:

sysctl -w net.ipv4.udp_rmem_min=67108864
sysctl -w net.ipv4.udp_wmem_min=67108864
sysctl -w net.core.netdev_max_backlog=200000
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.rmem_default=16777216
sysctl -w net.ipv4.udp_mem="4648512 6198016 9297024"


As other implementations have observed, there's a limit to how many UDP packets a single kernel thread can consume before it starts to fall over. Veneur supports the SO_REUSEPORT socket option on Linux, allowing multiple threads to share the UDP socket with kernel-space balancing between them. If you've tried throwing more cores at Veneur and it's just not going fast enough, this feature can probably help by allowing more of those cores to work on the socket (which is Veneur's hottest code path by far). Note that this is only supported on Linux (right now). We have not added support for other platforms, like darwin and BSDs.

TCP connections

Veneur supports reading the statds protocol from TCP connections. This is mostly to support TLS encryption and authentication, but might be useful on its own. Since TCP is a continuous stream of bytes, this requires each stat to be terminated by a new line character ('\n'). Most statsd clients only add new lines between stats within a single UDP packet, and omit the final trailing new line. This means you will likely need to modify your client to use this feature.

TLS encryption and authentication

If you specify the tls_key and tls_certificate options, Veneur will only accept TLS connections on its TCP port. This allows the metrics sent to Veneur to be encrypted.

If you specify the tls_authority_certificate option, Veneur will require clients to present a client certificate, signed by this authority. This ensures that only authenticated clients can connect.

You can generate your own set of keys using openssl:

# Generate the authority key and certificate (2048-bit RSA signed using SHA-256)
openssl genrsa -out cakey.pem 2048
openssl req -new -x509 -sha256 -key cakey.pem -out cacert.pem -days 1095 -subj "/O=Example Inc/CN=Example Certificate Authority"

# Generate the server key and certificate, signed by the authority
openssl genrsa -out serverkey.pem 2048
openssl req -new -sha256 -key serverkey.pem -out serverkey.csr -days 1095 -subj "/O=Example Inc/"
openssl x509 -sha256 -req -in serverkey.csr -CA cacert.pem -CAkey cakey.pem -CAcreateserial -out servercert.pem -days 1095

# Generate a client key and certificate, signed by the authority
openssl genrsa -out clientkey.pem 2048
openssl req -new -sha256 -key clientkey.pem -out clientkey.csr -days 1095 -subj "/O=Example Inc/CN=Veneur client key"
openssl x509 -req -in clientkey.csr -CA cacert.pem -CAkey cakey.pem -CAcreateserial -out clientcert.pem -days 1095

Set tcp_address, tls_key, tls_certificate, and tls_authority_certificate:

tcp_address: "localhost:8129"
tls_certificate: |
tls_key: |
tls_authority_certificate: |
Performance implications of TLS

Establishing a TLS connection is fairly expensive, so you should reuse connections as much as possible. RSA keys are also far more expensive than using ECDH keys. Using localhost on a machine with one CPU, Veneur was able to establish ~700 connections/second using ECDH prime256v1 keys, but only ~110 connections/second using RSA 2048-bit keys. According to the Go profiling for a Veneur instance using TLS with RSA keys, approximately 25% of the CPU time was in the TLS handshake, and 13% was decrypting data.


The veneur is a person acting as superintendent of the chase and especially of hounds in French medieval venery and being an important officer of the royal household. In other words, it is the master of dogs. :)




View Source

REDACTED is used to replace values that we don't want to leak into loglines (e.g., credentials)


View Source
var BUILD_DATE = defaultLinkValue
View Source
var ErrNoSpanWorker = fmt.Errorf("Can not submit traces to an unstarted server")
View Source
var VERSION = defaultLinkValue

VERSION stores the current veneur version. It must be a var so it can be set at link time.


func CalculateTickDelay added in v1.8.0

func CalculateTickDelay(interval time.Duration, t time.Time) time.Duration

CalculateTickDelay takes the provided time, `Truncate`s it a rounded-down multiple of `interval`, then adds `interval` back to find the "next" tick.

func ConsumePanic added in v1.3.1

func ConsumePanic(sentry *raven.Client, stats *statsd.Client, hostname string, err interface{})

ConsumePanic is intended to be called inside a deferred function when recovering from a panic. It accepts the value of recover() as its only argument, and reports the panic to Sentry, prints the stack, and then repanics (to ensure your program terminates)

func StartSSF added in v1.7.0

func StartSSF(s *Server, a net.Addr, tracePool *sync.Pool)

func StartStatsd added in v1.7.0

func StartStatsd(s *Server, a net.Addr, packetPool *sync.Pool)

StartStatsd spawns a goroutine that listens for metrics in statsd format on the address a. As this is a setup routine, if any error occurs, it panics.


type Config

type Config struct {
	Aggregates                    []string  `yaml:"aggregates"`
	APIHostname                   string    `yaml:"api_hostname"`
	AwsAccessKeyID                string    `yaml:"aws_access_key_id"`
	AwsRegion                     string    `yaml:"aws_region"`
	AwsS3Bucket                   string    `yaml:"aws_s3_bucket"`
	AwsSecretAccessKey            string    `yaml:"aws_secret_access_key"`
	DatadogAPIHostname            string    `yaml:"datadog_api_hostname"`
	DatadogAPIKey                 string    `yaml:"datadog_api_key"`
	DatadogTraceAPIAddress        string    `yaml:"datadog_trace_api_address"`
	Debug                         bool      `yaml:"debug"`
	EnableProfiling               bool      `yaml:"enable_profiling"`
	FlushFile                     string    `yaml:"flush_file"`
	FlushMaxPerBody               int       `yaml:"flush_max_per_body"`
	ForwardAddress                string    `yaml:"forward_address"`
	Hostname                      string    `yaml:"hostname"`
	HTTPAddress                   string    `yaml:"http_address"`
	IndicatorSpanTimerName        string    `yaml:"indicator_span_timer_name"`
	Interval                      string    `yaml:"interval"`
	KafkaBroker                   string    `yaml:"kafka_broker"`
	KafkaCheckTopic               string    `yaml:"kafka_check_topic"`
	KafkaEventTopic               string    `yaml:"kafka_event_topic"`
	KafkaMetricBufferBytes        int       `yaml:"kafka_metric_buffer_bytes"`
	KafkaMetricBufferFrequency    string    `yaml:"kafka_metric_buffer_frequency"`
	KafkaMetricBufferMessages     int       `yaml:"kafka_metric_buffer_messages"`
	KafkaMetricRequireAcks        string    `yaml:"kafka_metric_require_acks"`
	KafkaMetricTopic              string    `yaml:"kafka_metric_topic"`
	KafkaPartitioner              string    `yaml:"kafka_partitioner"`
	KafkaRetryMax                 int       `yaml:"kafka_retry_max"`
	KafkaSpanBufferBytes          int       `yaml:"kafka_span_buffer_bytes"`
	KafkaSpanBufferFrequency      string    `yaml:"kafka_span_buffer_frequency"`
	KafkaSpanBufferMesages        int       `yaml:"kafka_span_buffer_mesages"`
	KafkaSpanRequireAcks          string    `yaml:"kafka_span_require_acks"`
	KafkaSpanSerializationFormat  string    `yaml:"kafka_span_serialization_format"`
	KafkaSpanTopic                string    `yaml:"kafka_span_topic"`
	Key                           string    `yaml:"key"`
	MetricMaxLength               int       `yaml:"metric_max_length"`
	NumReaders                    int       `yaml:"num_readers"`
	NumWorkers                    int       `yaml:"num_workers"`
	OmitEmptyHostname             bool      `yaml:"omit_empty_hostname"`
	Percentiles                   []float64 `yaml:"percentiles"`
	ReadBufferSizeBytes           int       `yaml:"read_buffer_size_bytes"`
	SentryDsn                     string    `yaml:"sentry_dsn"`
	SignalfxAPIKey                string    `yaml:"signalfx_api_key"`
	SignalfxEndpointBase          string    `yaml:"signalfx_endpoint_base"`
	SignalfxHostnameTag           string    `yaml:"signalfx_hostname_tag"`
	SsfAddress                    string    `yaml:"ssf_address"`
	SsfBufferSize                 int       `yaml:"ssf_buffer_size"`
	SsfListenAddresses            []string  `yaml:"ssf_listen_addresses"`
	StatsAddress                  string    `yaml:"stats_address"`
	StatsdListenAddresses         []string  `yaml:"statsd_listen_addresses"`
	SynchronizeWithInterval       bool      `yaml:"synchronize_with_interval"`
	Tags                          []string  `yaml:"tags"`
	TcpAddress                    string    `yaml:"tcp_address"`
	TLSAuthorityCertificate       string    `yaml:"tls_authority_certificate"`
	TLSCertificate                string    `yaml:"tls_certificate"`
	TLSKey                        string    `yaml:"tls_key"`
	TraceAddress                  string    `yaml:"trace_address"`
	TraceAPIAddress               string    `yaml:"trace_api_address"`
	TraceLightstepAccessToken     string    `yaml:"trace_lightstep_access_token"`
	TraceLightstepCollectorHost   string    `yaml:"trace_lightstep_collector_host"`
	TraceLightstepMaximumSpans    int       `yaml:"trace_lightstep_maximum_spans"`
	TraceLightstepNumClients      int       `yaml:"trace_lightstep_num_clients"`
	TraceLightstepReconnectPeriod string    `yaml:"trace_lightstep_reconnect_period"`
	TraceMaxLengthBytes           int       `yaml:"trace_max_length_bytes"`
	UdpAddress                    string    `yaml:"udp_address"`

func ReadConfig

func ReadConfig(path string) (c Config, err error)

ReadConfig unmarshals the config file and slurps in it's data.

func (Config) ParseInterval

func (c Config) ParseInterval() (time.Duration, error)

ParseInterval handles parsing the flush interval as a time.Duration

type Consul added in v1.3.1

type Consul struct {
	ConsulHealth *api.Health

Consul is a Discoverer that uses Consul to find healthy instances of a given name.

func NewConsul added in v1.3.1

func NewConsul(config *api.Config) (*Consul, error)

NewConsul creates a new instance of a Consul Discoverer

func (*Consul) GetDestinationsForService added in v1.3.1

func (c *Consul) GetDestinationsForService(serviceName string) ([]string, error)

GetDestinationsForService updates the list of destinations based on healthy nodes found via Consul.

type DatadogTraceSpan

type DatadogTraceSpan struct {
	Duration int64              `json:"duration"`
	Error    int64              `json:"error"`
	Meta     map[string]string  `json:"meta"`
	Metrics  map[string]float64 `json:"metrics"`
	Name     string             `json:"name"`
	ParentID int64              `json:"parent_id,omitempty"`
	Resource string             `json:"resource,omitempty"`
	Service  string             `json:"service"`
	SpanID   int64              `json:"span_id"`
	Start    int64              `json:"start"`
	TraceID  int64              `json:"trace_id"`
	Type     string             `json:"type"`

DatadogTraceSpan represents a trace span as JSON for the Datadog tracing API.

type Discoverer added in v1.3.1

type Discoverer interface {
	GetDestinationsForService(string) ([]string, error)

Discoverer is an interface for various service discovery mechanisms. You could implement your own by implementing this method! See consul.go

type EventWorker

type EventWorker struct {
	EventChan        chan samplers.UDPEvent
	ServiceCheckChan chan samplers.UDPServiceCheck
	// contains filtered or unexported fields

EventWorker is similar to a Worker but it collects events and service checks instead of metrics.

func NewEventWorker

func NewEventWorker(stats *statsd.Client) *EventWorker

NewEventWorker creates an EventWorker ready to collect events and service checks.

func (*EventWorker) Flush

Flush returns the EventWorker's stored events and service checks and resets the stored contents.

func (*EventWorker) Work

func (ew *EventWorker) Work()

Work will start the EventWorker listening for events and service checks. This function will never return.

type Proxy added in v1.3.1

type Proxy struct {
	Sentry                 *raven.Client
	Hostname               string
	ForwardDestinations    *consistent.Consistent
	TraceDestinations      *consistent.Consistent
	Discoverer             Discoverer
	ConsulForwardService   string
	ConsulTraceService     string
	ConsulInterval         time.Duration
	ForwardDestinationsMtx sync.Mutex
	TraceDestinationsMtx   sync.Mutex
	HTTPAddr               string
	HTTPClient             *http.Client
	Statsd                 *statsd.Client
	AcceptingForwards      bool
	AcceptingTraces        bool
	// contains filtered or unexported fields

func NewProxyFromConfig added in v1.3.1

func NewProxyFromConfig(conf ProxyConfig) (p Proxy, err error)

func (*Proxy) HTTPServe added in v1.3.1

func (p *Proxy) HTTPServe()

HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.

func (*Proxy) Handler added in v1.3.1

func (p *Proxy) Handler() http.Handler

Handler returns the Handler responsible for routing request processing.

func (*Proxy) ProxyMetrics added in v1.3.1

func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)

ProxyMetrics takes a sliceof JSONMetrics and breaks them up into multiple HTTP requests by MetricKey using the hash ring.

func (*Proxy) ProxyTraces added in v1.3.1

func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)

func (*Proxy) RefreshDestinations added in v1.3.1

func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)

RefreshDestinations updates the server's list of valid destinations for flushing. This should be called periodically to ensure we have the latest data.

func (*Proxy) Shutdown added in v1.3.1

func (p *Proxy) Shutdown()

Shutdown signals the server to shut down after closing all current connections.

func (*Proxy) Start added in v1.3.1

func (p *Proxy) Start()

Start fires up the various goroutines that run on behalf of the server. This is separated from the constructor for testing convenience.

type ProxyConfig added in v1.3.1

type ProxyConfig struct {
	ConsulForwardServiceName string `yaml:"consul_forward_service_name"`
	ConsulRefreshInterval    string `yaml:"consul_refresh_interval"`
	ConsulTraceServiceName   string `yaml:"consul_trace_service_name"`
	Debug                    bool   `yaml:"debug"`
	EnableProfiling          bool   `yaml:"enable_profiling"`
	ForwardAddress           string `yaml:"forward_address"`
	HTTPAddress              string `yaml:"http_address"`
	SentryDsn                string `yaml:"sentry_dsn"`
	StatsAddress             string `yaml:"stats_address"`
	TraceAddress             string `yaml:"trace_address"`
	TraceAPIAddress          string `yaml:"trace_api_address"`

func ReadProxyConfig added in v1.3.1

func ReadProxyConfig(path string) (c ProxyConfig, err error)

ReadProxyConfig unmarshals the proxy config file and slurps in its data.

type Server

type Server struct {
	Workers     []*Worker
	EventWorker *EventWorker
	SpanWorker  *SpanWorker

	Statsd *statsd.Client
	Sentry *raven.Client

	Hostname  string
	Tags      []string
	TagsAsMap map[string]string

	HTTPClient *http.Client

	HTTPAddr string

	ForwardAddr string

	StatsdListenAddrs []net.Addr
	SSFListenAddrs    []net.Addr
	RcvbufBytes       int

	HistogramPercentiles []float64

	HistogramAggregates samplers.HistogramAggregates

	TraceClient *trace.Client
	// contains filtered or unexported fields

A Server is the actual veneur instance that will be run.

func NewFromConfig

func NewFromConfig(conf Config) (*Server, error)

NewFromConfig creates a new veneur server from a configuration specification.

func (*Server) Flush

func (s *Server) Flush(ctx context.Context)

Flush collects sampler's metrics and passes them to sinks.

func (*Server) HTTPServe

func (s *Server) HTTPServe()

HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.

func (*Server) HandleMetricPacket

func (s *Server) HandleMetricPacket(packet []byte) error

HandleMetricPacket processes each packet that is sent to the server, and sends to an appropriate worker (EventWorker or Worker).

func (*Server) HandleTracePacket

func (s *Server) HandleTracePacket(packet []byte)

HandleTracePacket accepts an incoming packet as bytes and sends it to the appropriate worker.

func (*Server) Handler

func (s *Server) Handler() http.Handler

Handler returns the Handler responsible for routing request processing.

func (*Server) ImportMetrics

func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)

ImportMetrics feeds a slice of json metrics to the server's workers

func (*Server) IsLocal

func (s *Server) IsLocal() bool

IsLocal indicates whether veneur is running as a local instance (forwarding non-local data to a global veneur instance) or is running as a global instance (sending all data directly to the final destination).

func (*Server) ReadMetricSocket

func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)

ReadMetricSocket listens for available packets to handle.

func (*Server) ReadSSFPacketSocket added in v1.7.0

func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)

ReadSSFPacketSocket reads SSF packets off a packet connection.

func (*Server) ReadSSFStreamSocket added in v1.7.0

func (s *Server) ReadSSFStreamSocket(serverConn net.Conn)

ReadSSFStreamSocket reads a streaming connection in framed wire format off a streaming socket. See package for details.

func (*Server) ReadTCPSocket added in v1.1.0

func (s *Server) ReadTCPSocket(listener net.Listener)

ReadTCPSocket listens on Server.TCPAddr for new connections, starting a goroutine for each.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown signals the server to shut down after closing all current connections.

func (*Server) Start added in v1.1.0

func (s *Server) Start()

Start spins up the Server to do actual work, firing off goroutines for various workers and utilities.

type SpanWorker added in v1.6.0

type SpanWorker struct {
	SpanChan chan *ssf.SSFSpan
	// contains filtered or unexported fields

SpanWorker is similar to a Worker but it collects events and service checks instead of metrics.

func NewSpanWorker added in v1.6.0

func NewSpanWorker(sinks []sinks.SpanSink, stats *statsd.Client) *SpanWorker

NewSpanWorker creates an SpanWorker ready to collect events and service checks.

func (*SpanWorker) Flush added in v1.6.0

func (tw *SpanWorker) Flush()

Flush invokes flush on each sink.

func (*SpanWorker) Work added in v1.6.0

func (tw *SpanWorker) Work()

Work will start the SpanWorker listening for spans. This function will never return.

type Worker

type Worker struct {
	PacketChan chan samplers.UDPMetric
	ImportChan chan []samplers.JSONMetric
	QuitChan   chan struct{}
	// contains filtered or unexported fields

Worker is the doodad that does work.

func NewWorker

func NewWorker(id int, stats *statsd.Client, logger *logrus.Logger) *Worker

NewWorker creates, and returns a new Worker object.

func (*Worker) Flush

func (w *Worker) Flush() WorkerMetrics

Flush resets the worker's internal metrics and returns their contents.

func (*Worker) ImportMetric

func (w *Worker) ImportMetric(other samplers.JSONMetric)

ImportMetric receives a metric from another veneur instance

func (*Worker) IngestUDP

func (w *Worker) IngestUDP(metric samplers.UDPMetric)

IngestUDP on a Worker feeds the metric into the worker's PacketChan.

func (*Worker) MetricsProcessedCount added in v1.2.0

func (w *Worker) MetricsProcessedCount() int64

MetricsProcessedCount is a convenince method for testing that allows us to fetch the Worker's processed count in a non-racey way.

func (*Worker) ProcessMetric

func (w *Worker) ProcessMetric(m *samplers.UDPMetric)

ProcessMetric takes a Metric and samples it

This is standalone to facilitate testing

func (*Worker) Stop

func (w *Worker) Stop()

Stop tells the worker to stop listening for work requests.

Note that the worker will only stop *after* it has finished its work.

func (*Worker) Work

func (w *Worker) Work()

Work will start the worker listening for metrics to process or import. It will not return until the worker is sent a message to terminate using Stop()

type WorkerMetrics

type WorkerMetrics struct {
	// contains filtered or unexported fields

WorkerMetrics is just a plain struct bundling together the flushed contents of a worker

func NewWorkerMetrics

func NewWorkerMetrics() WorkerMetrics

NewWorkerMetrics initializes a WorkerMetrics struct

func (WorkerMetrics) Upsert

func (wm WorkerMetrics) Upsert(mk samplers.MetricKey, Scope samplers.MetricScope, tags []string) bool

Upsert creates an entry on the WorkerMetrics struct for the given metrickey (if one does not already exist) and updates the existing entry (if one already exists). Returns true if the metric entry was created and false otherwise.


Path Synopsis
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
Package metrics provides sinks that are used by veneur internally.
Package metrics provides sinks that are used by veneur internally.
Package ssf is a generated protocol buffer package.
Package ssf is a generated protocol buffer package.
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
Package trace provies an experimental API for initiating traces.
Package trace provies an experimental API for initiating traces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL