Kafka Receiver
Kafka receiver receives telemetry data from Kafka, with configurable topics and encodings.
If used in conjunction with the kafkaexporter
configured with include_metadata_keys
. The Kafka receiver will also propagate the Kafka headers to the downstream pipeline, giving access to the rest of the pipeline to arbitrary metadata keys and values.
Getting Started
There are no required settings.
The following settings can be optionally configured:
brokers
(default = localhost:9092): The list of kafka brokers.
protocol_version
(default = 2.1.0): Kafka protocol version.
resolve_canonical_bootstrap_servers_only
(default = false): Whether to resolve then reverse-lookup broker IPs during startup
logs
topic
(default = otlp_logs): The name of the Kafka topic from which to consume logs.
encoding
(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
metrics
topic
(default = otlp_metrics): The name of the Kafka topic from which to consume metrics.
encoding
(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
traces
topic
(default = otlp_spans): The name of the Kafka topic from which to consume traces.
encoding
(default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
topic
(Deprecated [v0.124.0]: use logs::topic
, traces::topic
, or metrics::topic
).
If this is set, it will take precedence over the default value for those fields.
encoding
(Deprecated [v0.124.0]: use logs::encoding
, traces::encoding
, or metrics::encoding
).
If this is set, it will take precedence over the default value for those fields.
group_id
(default = otel-collector): The consumer group that receiver will be consuming messages from
client_id
(default = otel-collector): The consumer client ID that receiver will use
initial_offset
(default = latest): The initial offset to use if no offset was previously committed. Must be latest
or earliest
.
session_timeout
(default = 10s
): The request timeout for detecting client failures when using Kafka’s group management facilities.
heartbeat_interval
(default = 3s
): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
group_rebalance_strategy
(default = range
): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are:
range
: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see RangeAssignor.
roundrobin
: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see RoundRobinAssignor.
sticky
: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see StickyAssignor.
group_instance_id
: A unique identifier for the consumer instance within a consumer group.
- If set to a non-empty string, the consumer is treated as a static member of the group. This means that the consumer will maintain its partition assignments across restarts and rebalances, as long as it rejoins the group with the same
group_instance_id
.
- If set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer's partition assignments may change during rebalances.
- Using a
group_instance_id
is useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions.
min_fetch_size
(default = 1
): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
default_fetch_size
(default = 1048576
): The default number of message bytes to fetch in a request, defaults to 1MB.
max_fetch_size
(default = 0
): The maximum number of message bytes to fetch in a request, defaults to unlimited.
max_fetch_wait
(default = 250ms
): The maximum amount of time the broker should wait for min_fetch_size
bytes to be available before returning anyway.
tls
: see TLS Configuration Settings for the full set of available options.
auth
plain_text
(Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)
username
: The username to use.
password
: The password to use
sasl
username
: The username to use.
password
: The password to use.
mechanism
: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN)
aws_msk.region
: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism
aws_msk.broker_addr
: MSK Broker address in case of AWS_MSK_IAM mechanism
tls
(Deprecated in v0.124.0: configure tls at the top level): this is an alias for tls at the top level.
kerberos
service_name
: Kerberos service name
realm
: Kerberos realm
use_keytab
: Use of keytab instead of password, if this is true, keytab file will be used instead of password
username
: The Kerberos username used for authenticate with KDC
password
: The Kerberos password used for authenticate with KDC
config_file
: Path to Kerberos configuration. i.e /etc/krb5.conf
keytab_file
: Path to keytab file. i.e /etc/security/kafka.keytab
disable_fast_negotiation
: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to false
by default.
metadata
full
(default = true): Whether to maintain a full set of metadata. When
disabled, the client does not make the initial request to broker at the
startup.
retry
max
(default = 3): The number of retries to get metadata
backoff
(default = 250ms): How long to wait between metadata retries
autocommit
enable
: (default = true) Whether or not to auto-commit updated offsets back to the broker
interval
: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
message_marking
:
after
: (default = false) If true, the messages are marked after the pipeline execution
on_error
: (default = false) If false, only the successfully processed messages are marked
Note: this can block the entire partition in case a message processing returns a permanent error
header_extraction
:
extract_headers
(default = false): Allows user to attach header fields to resource attributes in otel pipeline
headers
(default = []): List of headers they'd like to extract from kafka record.
Note: Matching pattern will be exact
. Regexes are not supported as of now.
error_backoff
: BackOff configuration in case of errors
enabled
: (default = false) Whether to enable backoff when next consumers return errors
initial_interval
: The time to wait after the first error before retrying
max_interval
: The upper bound on backoff interval between consecutive retries
multiplier
: The value multiplied by the backoff interval bounds
randomization_factor
: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
max_elapsed_time
: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
Supported encodings
The Kafka receiver supports encoding extensions, as well as the following built-in encodings.
Available for all signals:
otlp_proto
: the payload is decoded as OTLP Protobuf
otlp_json
: the payload is decoded as OTLP JSON
Available only for traces:
jaeger_proto
: the payload is deserialized to a single Jaeger proto Span
.
jaeger_json
: the payload is deserialized to a single Jaeger JSON Span using jsonpb
.
zipkin_proto
: the payload is deserialized into a list of Zipkin proto spans.
zipkin_json
: the payload is deserialized into a list of Zipkin V2 JSON spans.
zipkin_thrift
: the payload is deserialized into a list of Zipkin Thrift spans.
Available only for logs:
raw
: the payload's bytes are inserted as the body of a log record.
text
: the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use text_<ENCODING>
, like text_utf-8
, text_shift_jis
, etc., to customize this behavior.
json
: the payload is decoded as JSON and inserted as the body of a log record.
azure_resource_logs
: the payload is converted from Azure Resource Logs format to OTel format.
The Kafka receiver will extract Kafka message headers and include them as request metadata (context).
This metadata can then be used throughout the pipeline, for example to set attributes using the
attributes processor.
Example configurations
Minimal configuration
By default, the receiver does not require any configuration. With the following configuration,
the receiver will consume messages from the default topics from localhost:9092 using the
otlp_proto
encoding:
receivers:
kafka:
TLS and authentication
In this example the receiver is configured to connect to Kafka using TLS for encryption,
and SASL/SCRAM for authentication:
receivers:
kafka:
tls:
auth:
sasl:
username: "user"
password: "secret"
mechanism: "SCRAM-SHA-512"
In addition to propagating Kafka message headers as metadata as described above in
Message header propagation, the Kafka receiver can also
be configured to extract and attach specific headers as resource attributes. e.g.
receivers:
kafka:
header_extraction:
extract_headers: true
headers: ["header1", "header2"]
If we produce a Kafka message with headers "header1: value1" and "header2: value2"
with the above configuration, the receiver will attach these headers as resource
attributes with the prefix "kafka.header.", i.e.
"resource": {
"attributes": {
"kafka.header.header1": "value1",
"kafka.header.header2": "value2",
}
}
...