kinesis_consumer

package
v1.21.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2022 License: MIT Imports: 18 Imported by: 2

README

Kinesis Consumer Input Plugin

The Kinesis consumer plugin reads from a Kinesis data stream and creates metrics using one of the supported input data formats.

Configuration

[[inputs.kinesis_consumer]]
  ## Amazon REGION of kinesis endpoint.
  region = "ap-southeast-2"

  ## Amazon Credentials
  ## Credentials are loaded in the following order
  ## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
  ## 2) Assumed credentials via STS if role_arn is specified
  ## 3) explicit credentials from 'access_key' and 'secret_key'
  ## 4) shared profile from 'profile'
  ## 5) environment variables
  ## 6) shared credentials file
  ## 7) EC2 Instance Profile
  # access_key = ""
  # secret_key = ""
  # token = ""
  # role_arn = ""
  # web_identity_token_file = ""
  # role_session_name = ""
  # profile = ""
  # shared_credential_file = ""

  ## Endpoint to make request against, the correct endpoint is automatically
  ## determined and this option should only be set if you wish to override the
  ## default.
  ##   ex: endpoint_url = "http://localhost:8000"
  # endpoint_url = ""

  ## Kinesis StreamName must exist prior to starting telegraf.
  streamname = "StreamName"

  ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
  # shard_iterator_type = "TRIM_HORIZON"

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ##
  ## The content encoding of the data from kinesis
  ## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
  ## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
  ## also base64 encodes the zip byte data before pushing to the stream.  The base64 decoding
  ## is done automatically by the golang sdk, as data is read from kinesis)
  ##
  # content_encoding = "identity"

  ## Optional
  ## Configuration for a dynamodb checkpoint
  [inputs.kinesis_consumer.checkpoint_dynamodb]
    ## unique name for this consumer
    app_name = "default"
    table_name = "default"
Required AWS IAM permissions

Kinesis:

  • DescribeStream
  • GetRecords
  • GetShardIterator

DynamoDB:

  • GetItem
  • PutItem
DynamoDB Checkpoint

The DynamoDB checkpoint stores the last processed record in a DynamoDB. To leverage this functionality, create a table with the following string type keys:

Partition key: namespace
Sort key: shard_id

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DynamoDB

type DynamoDB struct {
	AppName   string `toml:"app_name"`
	TableName string `toml:"table_name"`
}

type KinesisConsumer

type KinesisConsumer struct {
	StreamName             string    `toml:"streamname"`
	ShardIteratorType      string    `toml:"shard_iterator_type"`
	DynamoDB               *DynamoDB `toml:"checkpoint_dynamodb"`
	MaxUndeliveredMessages int       `toml:"max_undelivered_messages"`
	ContentEncoding        string    `toml:"content_encoding"`

	Log telegraf.Logger

	internalaws.CredentialConfig
	// contains filtered or unexported fields
}

func (*KinesisConsumer) Description

func (k *KinesisConsumer) Description() string

func (*KinesisConsumer) Gather

func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error

func (*KinesisConsumer) GetCheckpoint added in v1.20.3

func (k *KinesisConsumer) GetCheckpoint(streamName, shardID string) (string, error)

Get wraps the checkpoint's GetCheckpoint function (called by consumer library)

func (*KinesisConsumer) Init added in v1.18.3

func (k *KinesisConsumer) Init() error

func (*KinesisConsumer) SampleConfig

func (k *KinesisConsumer) SampleConfig() string

func (*KinesisConsumer) SetCheckpoint added in v1.20.3

func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber string) error

Set wraps the checkpoint's SetCheckpoint function (called by consumer library)

func (*KinesisConsumer) SetParser

func (k *KinesisConsumer) SetParser(parser parsers.Parser)

func (*KinesisConsumer) Start

func (*KinesisConsumer) Stop

func (k *KinesisConsumer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL