fkpulsarexporter

package module
v0.81.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

README

FK Pulsar Exporter

Status
Stability alpha: traces, metrics, logs
Distributions contrib
Issues Open issues Closed issues

Pulsar exporter exports logs, metrics, and traces to Pulsar. This exporter uses a synchronous producer that blocks and able to batch messages.

Get Started

The following settings can be optionally configured:

  • endpoint (default = pulsar://localhost:6650): The url of pulsar cluster.
  • topic (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the pulsar topic to export to.
  • encoding (default = otlp_proto): The encoding of the traces sent to pulsar. All available encodings:
    • otlp_proto: payload is Protobuf serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics or ExportLogsServiceRequest for logs.
    • otlp_json: ** EXPERIMENTAL ** payload is JSON serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics or ExportLogsServiceRequest for logs.
    • The following encodings are valid only for traces.
      • jaeger_proto: the payload is serialized to a single Jaeger proto Span, and keyed by TraceID.
      • jaeger_json: the payload is serialized to a single Jaeger JSON Span using jsonpb, and keyed by TraceID.
  • auth
    • tls
      • cert_file:
      • key_file:
    • token
      • token
    • oauth2
      • issuer_url:
      • client_id:
      • audience:
    • athenz
      • provider_domain:
      • tenant_domain:
      • tenant_service:
      • private_key:
      • key_id:
      • principal_header:
      • zts_url:
  • producer
    • max_reconnect_broker: specifies the maximum retry number of reconnectToBroker. (default: ultimate)
    • hashing_scheme: used to define the partition on where to publish a particular message. Can be set to java_string_hash (default) or murmur3_32hash.
    • compression_level: one of 'default' (default), 'faster', or 'better'.
    • compression_type: one of 'none' (default), 'lz4', 'zlib', or 'zstd'.
    • max_pending_messages": specifies the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
    • batch_builder_type": one of 'default' (default) or 'key_based'.
    • partitions_auto_discovery_interval": the time interval for the background process to discover new partitions
    • batching_max_publish_delay": specifies the time period within which the messages sent will be batched (default: 10ms)
    • batching_max_messages": specifies the maximum number of messages permitted in a batch. (default: 1000)
    • batching_max_size": specifies the maximum number of bytes permitted in a batch. (default 128 KB)
    • disable_block_if_queue_full": controls whether Send and SendAsync block if producer's message queue is full. Defaults to false.
    • disable_batching": controls whether automatic batching of messages is enabled for the producer. Defaults to false.
  • tls_trust_certs_file_path: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to true.
  • tls_allow_insecure_connection: configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
  • timeout: send pulsar message timeout (default: 5s)
  • operation_timeout: sets producer-create, subscribe and unsubscribe operations timeout (default: 30 seconds)
  • connection_timeout: timeout for the establishment of a TCP connection (default: 5 seconds)
  • map_connections_per_broker: max number of connections to a single broker that will kept in the pool. (default: 1 connection)
  • retry_on_failure
    • enabled (default = true)
    • initial_interval (default = 5s): Time to wait after the first failure before retrying; ignored if enabled is false
    • max_interval (default = 30s): Is the upper bound on backoff; ignored if enabled is false
    • max_elapsed_time (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if enabled is false
  • sending_queue
    • enabled (default = true)
    • num_consumers (default = 10): Number of consumers that dequeue batches; ignored if enabled is false
    • queue_size (default = 1000): Maximum number of batches kept in memory before dropping data; ignored if enabled is false; User should calculate this as num_seconds * requests_per_second where:
      • num_seconds is the number of seconds to buffer in case of a backend outage
      • requests_per_second is the average number of requests per seconds.

Example configuration:

exporters:
  pulsar:
    service_url: pulsar://localhost:6650
    topic: otlp-spans
    encoding: otlp_proto
    auth:
      tls:
        cert_file: cert.pem
        key_file: key.pem
    timeout: 10s
    tls_allow_insecure_connection: false
    tls_trust_certs_file_path: ca.pem

Documentation

Index

Constants

View Source
const (
	Type             = "fkpulsar"
	TracesStability  = component.StabilityLevelAlpha
	MetricsStability = component.StabilityLevelAlpha
	LogsStability    = component.StabilityLevelAlpha
)

Variables

This section is empty.

Functions

func NewFactory

func NewFactory(options ...FactoryOption) exporter.Factory

NewFactory creates Pulsar exporter factory.

Types

type Athenz

type Athenz struct {
	ProviderDomain  string              `mapstructure:"provider_domain"`
	TenantDomain    string              `mapstructure:"tenant_domain"`
	TenantService   string              `mapstructure:"tenant_service"`
	PrivateKey      configopaque.String `mapstructure:"private_key"`
	KeyID           string              `mapstructure:"key_id"`
	PrincipalHeader string              `mapstructure:"principal_header"`
	ZtsURL          string              `mapstructure:"zts_url"`
}

type Authentication

type Authentication struct {
	TLS    *TLS    `mapstructure:"tls"`
	Token  *Token  `mapstructure:"token"`
	Athenz *Athenz `mapstructure:"athenz"`
	OAuth2 *OAuth2 `mapstructure:"oauth2"`
}

type BatchBuilderType

type BatchBuilderType string
const (
	DefaultBatchBuilder  BatchBuilderType = "default"
	KeyBasedBatchBuilder BatchBuilderType = "key_based"
)

func (*BatchBuilderType) ToPulsar

func (*BatchBuilderType) UnmarshalText

func (c *BatchBuilderType) UnmarshalText(text []byte) error

type CompressionLevel

type CompressionLevel string
const (
	Default CompressionLevel = "default"
	Faster  CompressionLevel = "faster"
	Better  CompressionLevel = "better"
)

func (*CompressionLevel) ToPulsar

func (*CompressionLevel) UnmarshalText

func (c *CompressionLevel) UnmarshalText(text []byte) error

type CompressionType

type CompressionType string
const (
	None CompressionType = "none"
	LZ4  CompressionType = "lz4"
	ZLib CompressionType = "zlib"
	ZStd CompressionType = "zstd"
)

func (*CompressionType) ToPulsar

func (c *CompressionType) ToPulsar() pulsar.CompressionType

func (*CompressionType) UnmarshalText

func (c *CompressionType) UnmarshalText(text []byte) error

type Config

type Config struct {
	exporterhelper.TimeoutSettings `mapstructure:",squash"`
	exporterhelper.QueueSettings   `mapstructure:"sending_queue"`
	exporterhelper.RetrySettings   `mapstructure:"retry_on_failure"`

	// Endpoint of pulsar broker (default "pulsar://localhost:6650")
	Endpoint string `mapstructure:"endpoint"`
	// The name of the pulsar topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
	Topic string `mapstructure:"topic"`
	// Encoding of messages (default "otlp_proto")
	Encoding string `mapstructure:"encoding"`
	// Producer configuration of the Pulsar producer
	Producer Producer `mapstructure:"producer"`
	// Set the path to the trusted TLS certificate file
	TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"`
	// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
	TLSAllowInsecureConnection bool           `mapstructure:"tls_allow_insecure_connection"`
	Authentication             Authentication `mapstructure:"auth"`
	OperationTimeout           time.Duration  `mapstructure:"operation_timeout"`
	ConnectionTimeout          time.Duration  `mapstructure:"connection_timeout"`
	MaxConnectionsPerBroker    int            `mapstructure:"map_connections_per_broker"`
}

Config defines configuration for Pulsar exporter.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks if the exporter configuration is valid

type FactoryOption

type FactoryOption func(factory *pulsarExporterFactory)

FactoryOption applies changes to pulsarExporterFactory.

func WithTracesMarshalers

func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption

WithTracesMarshalers adds tracesMarshalers.

type HashingScheme

type HashingScheme string
const (
	JavaStringHash HashingScheme = "java_string_hash"
	Murmur3_32Hash HashingScheme = "murmur3_32hash"
)

func (*HashingScheme) ToPulsar

func (c *HashingScheme) ToPulsar() pulsar.HashingScheme

func (*HashingScheme) UnmarshalText

func (c *HashingScheme) UnmarshalText(text []byte) error

type LogsMarshaler

type LogsMarshaler interface {
	// Marshal serializes logs into sarama's ProducerMessages
	Marshal(logs plog.Logs, topic string) ([]*pulsar.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

LogsMarshaler marshals logs into Message array

type MetricsMarshaler

type MetricsMarshaler interface {
	// Marshal serializes metrics into sarama's ProducerMessages
	Marshal(metrics pmetric.Metrics, topic string) ([]*pulsar.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

MetricsMarshaler marshals metrics into Message array

type OAuth2

type OAuth2 struct {
	IssuerURL    string `mapstructure:"issuer_url"`
	ClientID     string `mapstructure:"client_id"`
	Audience     string `mapstructure:"audience"`
	ClientSecret string `mapstructure:"client_secret"`
	Scope        string `mapstructure:"scope"`
}

type Producer

type Producer struct {
	MaxReconnectToBroker            *uint            `mapstructure:"max_reconnect_broker"`
	HashingScheme                   HashingScheme    `mapstructure:"hashing_scheme"`
	CompressionLevel                CompressionLevel `mapstructure:"compression_level"`
	CompressionType                 CompressionType  `mapstructure:"compression_type"`
	MaxPendingMessages              int              `mapstructure:"max_pending_messages"`
	BatcherBuilderType              BatchBuilderType `mapstructure:"batch_builder_type"`
	PartitionsAutoDiscoveryInterval time.Duration    `mapstructure:"partitions_auto_discovery_interval"`
	BatchingMaxPublishDelay         time.Duration    `mapstructure:"batching_max_publish_delay"`
	BatchingMaxMessages             uint             `mapstructure:"batching_max_messages"`
	BatchingMaxSize                 uint             `mapstructure:"batching_max_size"`
	DisableBlockIfQueueFull         bool             `mapstructure:"disable_block_if_queue_full"`
	DisableBatching                 bool             `mapstructure:"disable_batching"`
}

Producer defines configuration for producer

type PulsarLogsProducer

type PulsarLogsProducer struct {
	// contains filtered or unexported fields
}

func (*PulsarLogsProducer) Close

type PulsarMetricsProducer

type PulsarMetricsProducer struct {
	// contains filtered or unexported fields
}

func (*PulsarMetricsProducer) Close

type PulsarTracesProducer

type PulsarTracesProducer struct {
	// contains filtered or unexported fields
}

func (*PulsarTracesProducer) Close

type TLS

type TLS struct {
	CertFile string `mapstructure:"cert_file"`
	KeyFile  string `mapstructure:"key_file"`
}

type Token

type Token struct {
	Token configopaque.String `mapstructure:"token"`
}

type TracesMarshaler

type TracesMarshaler interface {
	// Marshal serializes spans into sarama's ProducerMessages
	Marshal(traces ptrace.Traces, topic string) ([]*pulsar.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

TracesMarshaler marshals traces into Message array.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL