kinesisconsumer

package
v0.0.40 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2022 License: MIT Imports: 14 Imported by: 0

README

Kinesis Consumer Input Plugin

The Kinesis consumer plugin reads from a Kinesis data stream and creates metrics using one of the supported input data formats.

Configuration
[[inputs.kinesis_consumer]]
  ## Amazon REGION of kinesis endpoint.
  region = "ap-southeast-2"

  ## Amazon Credentials
  ## Credentials are loaded in the following order
  ## 1) Assumed credentials via STS if role_arn is specified
  ## 2) explicit credentials from 'access_key' and 'secret_key'
  ## 3) shared profile from 'profile'
  ## 4) environment variables
  ## 5) shared credentials file
  ## 6) EC2 Instance Profile
  # access_key = ""
  # secret_key = ""
  # token = ""
  # role_arn = ""
  # profile = ""
  # shared_credential_file = ""

  ## Endpoint to make request against, the correct endpoint is automatically
  ## determined and this option should only be set if you wish to override the
  ## default.
  ##   ex: endpoint_url = "http://localhost:8000"
  # endpoint_url = ""

  ## Kinesis StreamName must exist prior to starting agent.
  streamname = "StreamName"

  ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
  # shard_iterator_type = "TRIM_HORIZON"

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/circonus-labs/circonus-unified-agent/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ## Optional
  ## Configuration for a dynamodb checkpoint
  [inputs.kinesis_consumer.checkpoint_dynamodb]
    ## unique name for this consumer
    app_name = "default"
    table_name = "default"
Required AWS IAM permissions

Kinesis:

  • DescribeStream
  • GetRecords
  • GetShardIterator

DynamoDB:

  • GetItem
  • PutItem
DynamoDB Checkpoint

The DynamoDB checkpoint stores the last processed record in a DynamoDB. To leverage this functionality, create a table with the following string type keys:

Partition key: namespace
Sort key: shard_id

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DynamoDB

type DynamoDB struct {
	AppName   string `toml:"app_name"`
	TableName string `toml:"table_name"`
}

type KinesisConsumer

type KinesisConsumer struct {
	Region                 string    `toml:"region"`
	AccessKey              string    `toml:"access_key"`
	SecretKey              string    `toml:"secret_key"`
	RoleARN                string    `toml:"role_arn"`
	Profile                string    `toml:"profile"`
	Filename               string    `toml:"shared_credential_file"`
	Token                  string    `toml:"token"`
	EndpointURL            string    `toml:"endpoint_url"`
	StreamName             string    `toml:"streamname"`
	ShardIteratorType      string    `toml:"shard_iterator_type"`
	DynamoDB               *DynamoDB `toml:"checkpoint_dynamodb"`
	MaxUndeliveredMessages int       `toml:"max_undelivered_messages"`

	Log cua.Logger
	// contains filtered or unexported fields
}

func (*KinesisConsumer) Description

func (k *KinesisConsumer) Description() string

func (*KinesisConsumer) Gather

func (k *KinesisConsumer) Gather(ctx context.Context, acc cua.Accumulator) error

func (*KinesisConsumer) Get

func (k *KinesisConsumer) Get(streamName, shardID string) (string, error)

Get wraps the checkpoint's Get function (called by consumer library)

func (*KinesisConsumer) SampleConfig

func (k *KinesisConsumer) SampleConfig() string

func (*KinesisConsumer) Set

func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error

Set wraps the checkpoint's Set function (called by consumer library)

func (*KinesisConsumer) SetParser

func (k *KinesisConsumer) SetParser(parser parsers.Parser)

func (*KinesisConsumer) Start

func (k *KinesisConsumer) Start(ctx context.Context, acc cua.Accumulator) error

func (*KinesisConsumer) Stop

func (k *KinesisConsumer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL