processor

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

README

General Information

Processors are used at various stages of a pipeline. Generally, a processor pre-processes data before it is exported (e.g. modify attributes or sample) or helps ensure that data makes it through a pipeline successfully (e.g. batch/retry).

IMPORTANT: All processors only work with trace receivers/exporters today.

Some important aspects of pipelines and processors to be aware of:

Supported processors (sorted alphabetically):

The contributors repository has more processors that can be added to custom builds of the Collector.

Data Ownership

The ownership of the TraceData and MetricsData in a pipeline is passed as the data travels through the pipeline. The data is created by the receiver and then the ownership is passed to the first processor when ConsumeTraceData/ConsumeMetricsData function is called.

Note: the receiver may be attached to multiple pipelines, in which case the same data will be passed to all attached pipelines via a data fan-out connector.

From data ownership perspective pipelines can work in 2 modes:

  • Exclusive data ownership
  • Shared data ownership

The mode is defined during startup based on data modification intent reported by the processors. The intent is reported by each processor via MutatesConsumedData field of the struct returned by GetCapabilities function. If any processor in the pipeline declares an intent to modify the data then that pipeline will work in exclusive ownership mode. In addition, any other pipeline that receives data from a receiver that is attached to a pipeline with exclusive ownership mode will be also operating in exclusive ownership mode.

Exclusive Ownership

In exclusive ownership mode the data is owned exclusively by a particular processor at a given moment of time and the processor is free to modify the data it owns.

Exclusive ownership mode is only applicable for pipelines that receive data from the same receiver. If a pipeline is marked to be in exclusive ownership mode then any data received from a shared receiver will be cloned at the fan-out connector before passing further to each pipeline. This ensures that each pipeline has its own exclusive copy of data and the data can be safely modified in the pipeline.

The exclusive ownership of data allows processors to freely modify the data while they own it (e.g. see attributesprocessor). The duration of ownership of the data by processor is from the beginning of ConsumeTraceData/ConsumeMetricsData call until the processor calls the next processor's ConsumeTraceData/ConsumeMetricsData function, which passes the ownership to the next processor. After that the processor must no longer read or write the data since it may be concurrently modified by the new owner.

Exclusive Ownership mode allows to easily implement processors that need to modify the data by simply declaring such intent.

Shared Ownership

In shared ownership mode no particular processor owns the data and no processor is allowed the modify the shared data.

In this mode no cloning is performed at the fan-out connector of receivers that are attached to multiple pipelines. In this case all such pipelines will see the same single shared copy of the data. Processors in pipelines operating in shared ownership mode are prohibited from modifying the original data that they receive via ConsumeTraceData/ConsumeMetricsData call. Processors may only read the data but must not modify the data.

If the processor needs to modify the data while performing the processing but does not want to incur the cost of data cloning that Exclusive mode brings then the processor can declare that it does not modify the data and use any different technique that ensures original data is not modified. For example, the processor can implement copy-on-write approach for individual sub-parts of TraceData/MetricsData argument. Any approach that does not mutate the original TraceData/MetricsData argument (including referenced data, such as Node, Resource, Spans, etc) is allowed.

If the processor uses such technique it should declare that it does not intend to modify the original data by setting MutatesConsumedData=false in its capabilities to avoid marking the pipeline for Exclusive ownership and to avoid the cost of data cloning described in Exclusive Ownership section.

Ordering Processors

The order processors are specified in a pipeline is important as this is the order in which each processor is applied to traces and metrics.

Include/Exclude Spans

The attribute processor and the span processor expose the option to provide a set of properties of a span to match against to determine if the span should be included or excluded from the processor. By default, all spans are processed by the processor.

To configure this option, under include and/or exclude:

  • at least one of services, span_names or attributes is required.

Note: If both include and exclude are specified, the include properties are checked before the exclude properties.

{span, attributes}:
    # include and/or exclude can be specified. However, the include properties
    # are always checked before the exclude properties.
    {include, exclude}:
      # At least one of services, span_names or attributes must be specified.
      # It is supported to have more than one specified, but all of the specified
      # conditions must evaluate to true for a match to occur.

      # match_type controls how items in "services" and "span_names" arrays are
      # interpreted. Possible values are "regexp" or "strict".
      # This is a required field.
      match_type: {strict, regexp}

      # services specify an array of items to match the service name against.
      # A match occurs if the span service name matches at least of the items.
      # This is an optional field.
      services: [<item1>, ..., <itemN>]

      # The span name must match at least one of the items.
      # This is an optional field.
      span_names: [<item1>, ..., <itemN>]

      # Attributes specifies the list of attributes to match against.
      # All of these attributes must match exactly for a match to occur.
      # Only match_type=strict is allowed if "attributes" are specified.
      # This is an optional field.
      attributes:
          # Key specifies the attribute to match against.
        - key: <key>
          # Value specifies the exact value to match against.
          # If not specified, a match occurs if the key is present in the attributes.
          value: {value}

No processors are enabled by default, however multiple processors are recommended to be enabled. These are:

  1. memory_limiter
  2. any sampling processors
  3. batch
  4. any other processors
  5. queued_retry

In addition, it is important to note that the order of processors matters. The order above is the best practice. Refer to the individual processor documentation below for more information.

Processors

Attributes Processor

The attributes processor modifies attributes of a span. Please refer to config.go for the config spec.

It optionally supports the ability to include/exclude spans.

It takes a list of actions which are performed in order specified in the config. The supported actions are:

  • insert: Inserts a new attribute in spans where the key does not already exist.
  • update: Updates an attribute in spans where the key does exist.
  • upsert: Performs insert or update. Inserts a new attribute in spans where the key does not already exist and updates an attribute in spans where the key does exist.
  • delete: Deletes an attribute from a span.
  • hash: Hashes (SHA1) an existing attribute value.

For the actions insert, update and upsert,

  • key is required
  • one of value or from_attribute is required
  • action is required.
  # Key specifies the attribute to act upon.
- key: <key>
  action: {insert, update, upsert}
  # Value specifies the value to populate for the key.
  # The type is inferred from the configuration.
  value: <value>

  # Key specifies the attribute to act upon.
- key: <key>
  action: {insert, update, upsert}
  # FromAttribute specifies the attribute from the span to use to populate
  # the value. If the attribute doesn't exist, no action is performed.
  from_attribute: <other key>

For the delete action,

  • key is required
  • action: delete is required.
# Key specifies the attribute to act upon.
- key: <key>
  action: delete

For the hash action,

  • key is required
  • action: hash is required.
# Key specifies the attribute to act upon.
- key: <key>
  action: hash 

The list of actions can be composed to create rich scenarios, such as back filling attribute, copying values to a new key, redacting sensitive information. The following is a sample configuration.

processors:
  attributes/example:
    actions:
      - key: db.table
        action: delete
      - key: redacted_span
        value: true
        action: upsert
      - key: copy_key
        from_attribute: key_original
        action: update
      - key: account_id
        value: 2245
      - key: account_password
        action: delete
      - key: account_email 
        action: hash

Refer to config.yaml for detailed examples on using the processor.

Batch Processor

The batch processor accepts spans and places them into batches grouped by node and resource. Batching helps better compress the data and reduce the number of outgoing connections required to transmit the data. This processor supports both size and time based batching.

It is highly recommended to configure the batch processor on every collector. The batch processor should be defined in the pipeline after the memory_limiter as well as any sampling processors. This is because batching should happen after any data drops such as sampling.

Please refer to config.go for the config spec.

The following configuration options can be modified:

  • num_tickers (default = 4): Number of tickers that loop over batch buckets
  • remove_after_ticks (default = 10): Number of ticks passed without a span arriving for a node at which time batcher is deleted
  • send_batch_size (default = 8192): Number of spans after which a batch will be sent regardless of time
  • tick_time (default = 1s): Interval in which the tickers tick
  • timeout (default = 1s): Time duration after which a batch will be sent regardless of size

Examples:

processors:
  batch:
  batch/2:
    num_tickers: 10
    remove_after_ticks: 20
    send_batch_size: 1000
    tick_time: 5s
    timeout: 10s

Refer to config.yaml for detailed examples on using the processor.

Memory Limiter Processor

The memory_limiter processor is used to prevent out of memory situations on the collector. Given that the amount and type of data a collector processes is environment specific and resource utilization of the collector is also dependent on the configured processors, it is important to put checks in place regarding memory usage. The memory_limiter processor offers the follow safeguards:

  • Ability to define an interval when memory usage will be checked and if memory usage exceeds a defined limit will trigger GC to reduce memory consumption.
  • Ability to define an interval when memory usage will be compared against the previous interval's value and if the delta exceeds a defined limit will trigger GC to reduce memory consumption.

In addition, there is a command line option (mem-ballast-size-mib) which can be used to define a ballast, which allocates memory and provides stability to the heap. If defined, the ballast increases the base size of the heap so that GC triggers are delayed and the number of GC cycles over time is reduced. While the ballast is configured via the command line, today the same value configured on the command line must also be defined in the memory_limiter processor.

Note that while these configuration options can help mitigate out of memory situations, they are not a replacement for properly sizing and configuring the collector. For example, if the limit or spike thresholds are crossed, the collector will return errors to all receive operations until enough memory is freed. This may result in dropped data.

It is highly recommended to configure the ballast command line option as well as the memory_limiter processor on every collector. The ballast should be configured to be 1/3 to 1/2 of the memory allocated to the collector. The memory_limiter processor should be the first processor defined in the pipeline (immediately after the receivers). This is to ensure that backpressure can be sent to applicable receivers and minimize the likelihood of dropped data when the memory_limiter gets triggered.

Please refer to config.go for the config spec.

The following configuration options must be changed:

  • check_interval (default = 0s): Time between measurements of memory usage. Values below 1 second are not recommended since it can result in unnecessary CPU consumption.
  • limit_mib (default = 0): Maximum amount of memory, in MiB, targeted to be allocated by the process heap. Note that typically the total memory usage of process will be about 50MiB higher than this value.
  • spike_limit_mib (default = 0): Maximum spike expected between the measurements of memory usage. The value must be less than limit_mib.

The following configuration options can also be modified:

  • ballast_size_mib (default = 0): Must match the mem-ballast-size-mib command line option.

Examples:

processors:
  memory_limiter:
    ballast_size_mib: 2000
    check_interval: 5s
    limit_mib: 4000
    spike_limit_mib: 500

Refer to config.yaml for detailed examples on using the processor.

Queued Retry Processor

The queued_retry processor uses a bounded queue to relay batches from the receiver or previous processor to the next processor. Received data is enqueued immediately if the queue is not full. At the same time, the processor has one or more workers which consume the data in the queue by sending them to the next processor or exporter. If relaying the data to the next processor or exporter in the pipeline fails, the processor retries after some backoff delay depending on the configuration.

Some important things to keep in mind with the queued_retry processor:

  • Given that if the queue is full the data will be dropped, it is important to size the queue appropriately.
  • Since the queue is based on batches and batch sizes are environment specific, it may not be easy to understand how much memory a queue will consume.
  • Finally, the queue size is dependent on the deployment model of the collector. The agent deployment model typically has a smaller queue than the collector deployment model.

It is highly recommended to configure the queued_retry processor on every collector as it minimizes the likelihood of data being dropped due to delays in processing or issues exporting the data. This processor should be the last processor defined in the pipeline because issues that require retry are typically due to exporting.

Please refer to config.go for the config spec.

The following configuration options can be modified:

  • backoff_delay (default = 5s): Time interval to wait before retrying
  • num_workers (default = 10): Number of workers that dequeue batches
  • queue_size (default = 5000): Maximum number of batches kept in memory before data is dropped
  • retry_on_failure (default = true): Whether to retry on failure or give up and drop

Examples:

processors:
  queued_retry/example:
    backoff_delay: 5s
    num_workers: 2
    queue_size: 10
    retry_on_failure: true

Refer to config.yaml for detailed examples on using the processor.

Resource Processor

The resource processor can be used to add attributes to a given resource. Please refer to config.go for the config spec.

The following configuration options are required:

  • type: Type of resource to which labels should be applied. Only applicable to type of host today.
  • labels: Map of key/value pairs that should be added to the defined type.

Examples:

processors:
  resource:
    type: "host"
    labels: {
      "cloud.zone": "zone-1",
      "k8s.cluster.name": "k8s-cluster",
      "host.name": "k8s-node",
    }

Refer to config.yaml for detailed examples on using the processor.

Sampling Processor

The sampling processor supports sampling spans. A couple of sampling processors are provided today and it is straight forward to add others.

Tail Sampling Processor

The tail sampling processor samples traces based on a set of defined policies. Today, this processor only works with a single instance of the collector. Technically, trace ID aware load balancing could be used to support multiple collector instances, but this configuration has not been tested. Please refer to config.go for the config spec.

The following configuration options are required:

  • policies (no default): Policies used to make a sampling decision

Multiple policies exist today and it is straight forward to add more. These include:

  • always_sample: Sample all traces
  • numeric_attribute: Sample based on number attributes
  • string_attribute: Sample based on string attributes
  • rate_limiting: Sample based on rate

The following configuration options can also be modified:

  • decision_wait (default = 30s): Wait time since the first span of a trace before making a sampling decision
  • num_traces (default = 50000): Number of traces kept in memory
  • expected_new_traces_per_sec (default = 0): Expected number of new traces (helps in allocating data structures)

Examples:

processors:
  tail_sampling:
    decision_wait: 10s
    num_traces: 100
    expected_new_traces_per_sec: 10
    policies:
      [
          {
            name: test-policy-1,
            type: always_sample
          },
          {
            name: test-policy-2,
            type: numeric_attribute,
            numeric_attribute: {key: key1, min_value: 50, max_value: 100}
          },
          {
            name: test-policy-3,
            type: string_attribute,
            string_attribute: {key: key2, values: [value1, value2]}
          },
          {
            name: test-policy-4,
            type: rate_limiting,
            rate_limiting: {spans_per_second: 35}
         }
      ]

Refer to config.yaml for detailed examples on using the processor.

Probabilistic Sampling Processor

The probabilistic sampler supports two types of sampling:

  1. sampling.priority semantic convention as defined by OpenTracing
  2. Trace ID hashing

The sampling.priority semantic convention takes priority over trace ID hashing. As the name implies, trace ID hashing samples based on hash values determined by trace IDs. In order for trace ID hashing to work, all collectors for a given tier (e.g. behind the same load balancer) must have the same hash_seed. It is also possible to leverage a different hash_seed at different collector tiers to support additional sampling requirements. Please refer to config.go for the config spec.

The following configuration options can be modified:

  • hash_seed (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.
  • sampling_percentage (default = 0): Percentage at which traces are sampled; >= 100 samples all traces

Examples:

processors:
  probabilistic_sampler:
    hash_seed: 22
    sampling_percentage: 15.3

Refer to config.yaml for detailed examples on using the processor.

Span Processor

The span processor modifies either the span name or attributes of a span based on the span name. Please refer to config.go for the config spec.

It optionally supports the ability to include/exclude spans.

The following actions are supported:

  • name: Modify the name of attributes within a span
Name a span

The following settings are required:

  • from_attributes: The attribute value for the keys are used to create a new name in the order specified in the configuration.

The following settings can be optionally configured:

  • separator: A string, which is specified will be used to split values

Note: If renaming is dependent on attributes being modified by the attributes processor, ensure the span processor is specified after the attributes processor in the pipeline specification.

span:
  name:
    # from_attributes represents the attribute keys to pull the values from to generate the
    # new span name.
    from_attributes: [<key1>, <key2>, ...]
    # Separator is the string used to concatenate various parts of the span name.
    separator: <value>

Example:

span:
  name:
    from_attributes: ["db.svc", "operation"]
    separator: "::"

Refer to config.yaml for detailed examples on using the processor.

Extract attributes from span name

Takes a list of regular expressions to match span name against and extract attributes from it based on subexpressions. Must be specified under the to_attributes section.

The following settings are required:

  • rules: A list of rules to extract attribute values from span name. The values in the span name are replaced by extracted attribute names. Each rule in the list is regex pattern string. Span name is checked against the regex and if the regex matches then all named subexpressions of the regex are extracted as attributes and are added to the span. Each subexpression name becomes an attribute name and subexpression matched portion becomes the attribute value. The matched portion in the span name is replaced by extracted attribute name. If the attributes already exist in the span then they will be overwritten. The process is repeated for all rules in the order they are specified. Each subsequent rule works on the span name that is the output after processing the previous rule.
  • break_after_match (default = false): specifies if processing of rules should stop after the first match. If it is false rule processing will continue to be performed over the modified span name.
span/to_attributes:
  name:
    to_attributes:
      rules:
        - regexp-rule1
        - regexp-rule2
        - regexp-rule3
        ...
      break_after_match: <true|false>

Example:

# Let's assume input span name is /api/v1/document/12345678/update
# Applying the following results in output span name /api/v1/document/{documentId}/update
# and will add a new attribute "documentId"="12345678" to the span.
span/to_attributes:
  name:
    to_attributes:
      rules:
        - ^\/api\/v1\/document\/(?P<documentId>.*)\/update$

Refer to config.yaml for detailed examples on using the processor.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TagSourceFormatKey, _  = tag.NewKey("source_format")
	TagServiceNameKey, _   = tag.NewKey("service")
	TagProcessorNameKey, _ = tag.NewKey("processor")

	StatReceivedSpanCount = stats.Int64(
		"spans_received",
		"counts the number of spans received",
		stats.UnitDimensionless)
	StatDroppedSpanCount = stats.Int64(
		"spans_dropped",
		"counts the number of spans dropped",
		stats.UnitDimensionless)
	StatBadBatchDroppedSpanCount = stats.Int64(
		"bad_batch_spans_dropped",
		"counts the number of spans dropped due to being in bad batches",
		stats.UnitDimensionless)

	StatTraceBatchesDroppedCount = stats.Int64(
		"trace_batches_dropped",
		"counts the number of trace batches dropped",
		stats.UnitDimensionless)

	StatDroppedMetricCount = stats.Int64(
		"metrics_dropped",
		"counts the number of metrics dropped",
		stats.UnitDimensionless)

	StatMetricBatchesDroppedCount = stats.Int64(
		"metric_batches_dropped",
		"counts the number of metric batches dropped",
		stats.UnitDimensionless)
)

Keys and stats for telemetry.

Functions

func CreateMetricsCloningFanOutConnector added in v0.2.9

func CreateMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumerBase) consumer.MetricsConsumerBase

CreateMetricsCloningFanOutConnector is a placeholder function for now. It supposed to create an old type connector or a new type connector based on type of provided metrics consumer.

func CreateMetricsFanOutConnector added in v0.2.9

func CreateMetricsFanOutConnector(mcs []consumer.MetricsConsumerBase) consumer.MetricsConsumerBase

CreateMetricsFanOutConnector creates a connector based on provided type of trace consumer. If any of the wrapped metrics consumers are of the new type, use metricsFanOutConnector, otherwise use the old type connector.

func CreateTraceCloningFanOutConnector added in v0.2.9

func CreateTraceCloningFanOutConnector(tcs []consumer.TraceConsumerBase) consumer.TraceConsumerBase

CreateTraceCloningFanOutConnector is a placeholder function for now. It supposed to create an old type connector or a new type connector based on type of provided trace consumer.

func CreateTraceFanOutConnector added in v0.2.9

func CreateTraceFanOutConnector(tcs []consumer.TraceConsumerBase) consumer.TraceConsumerBase

CreateTraceFanOutConnector wraps multiple trace consumers in a single one. If any of the wrapped trace consumers are of the new type, use traceFanOutConnector, otherwise use the old type connector

func MetricTagKeys

func MetricTagKeys(level telemetry.Level) []tag.Key

MetricTagKeys returns the metric tag keys according to the given telemetry level.

func MetricViews

func MetricViews(level telemetry.Level) []*view.View

MetricViews return the metrics views according to given telemetry level.

func NewMetricsCloningFanOutConnector

func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer

NewMetricsCloningFanOutConnector wraps multiple metrics consumers in a single one.

func NewMetricsCloningFanOutConnectorOld added in v0.2.9

func NewMetricsCloningFanOutConnectorOld(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld

NewMetricsCloningFanOutConnectorOld wraps multiple metrics consumers in a single one.

func NewMetricsFanOutConnector

func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer

NewMetricsFanOutConnector wraps multiple new type metrics consumers in a single one.

func NewMetricsFanOutConnectorOld added in v0.2.9

func NewMetricsFanOutConnectorOld(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld

NewMetricsFanOutConnectorOld wraps multiple metrics consumers in a single one.

func NewTraceCloningFanOutConnector

func NewTraceCloningFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer

NewTraceCloningFanOutConnector wraps multiple trace consumers in a single one.

func NewTraceCloningFanOutConnectorOld added in v0.2.9

func NewTraceCloningFanOutConnectorOld(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld

NewTraceCloningFanOutConnectorOld wraps multiple trace consumers in a single one.

func NewTraceFanOutConnector

func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer

NewTraceFanOutConnector wraps multiple new type trace consumers in a single one.

func NewTraceFanOutConnectorOld added in v0.2.9

func NewTraceFanOutConnectorOld(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld

NewTraceFanOutConnectorOld wraps multiple trace consumers in a single one.

func ServiceNameForNode

func ServiceNameForNode(node *commonpb.Node) string

ServiceNameForNode gets the service name for a specified node. Used for metrics.

func StatsTagsForBatch

func StatsTagsForBatch(processorName, serviceName, spanFormat string) []tag.Mutator

StatsTagsForBatch gets the stat tags based on the specified processorName, serviceName, and spanFormat.

Types

This section is empty.

Directories

Path Synopsis
Package attributesprocessor contains the logic to modify attributes of a span.
Package attributesprocessor contains the logic to modify attributes of a span.
Package memorylimiter provides a processor for OpenTelemetry Service pipeline that drops data on the pipeline according to the current state of memory usage.
Package memorylimiter provides a processor for OpenTelemetry Service pipeline that drops data on the pipeline according to the current state of memory usage.
Package resourceprocessor implements a processor for specifying resource labels to be added to OpenCensus trace data and metrics data.
Package resourceprocessor implements a processor for specifying resource labels to be added to OpenCensus trace data and metrics data.
samplingprocessor
tailsamplingprocessor/idbatcher
Package idbatcher defines a pipeline of fixed size in which the elements are batches of ids.
Package idbatcher defines a pipeline of fixed size in which the elements are batches of ids.
tailsamplingprocessor/sampling
Package sampling contains the interfaces and data types used to implement the various sampling policies.
Package sampling contains the interfaces and data types used to implement the various sampling policies.
Package spanprocessor contains logic to modify top level settings of a span, such as its name.
Package spanprocessor contains logic to modify top level settings of a span, such as its name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL