kinesisdatacounter

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2022 License: MIT Imports: 33 Imported by: 0

README

kinesis-data-counter

Latest GitHub release Github Actions test Go Report Card License

kinesis data counting tool.
want to count the Kinesis Data Stream where JSON data flows.

Install

binary packages

Releases.

Homebrew tap
$ brew install mashiike/tap/kinesis-data-counter

Usage

as CLI command
Usage of kinesis-data-counter:
  -config string
        kinesis-data-counter config
  -counter-id string
        set instant counter id [Only at CLI] (default "__instant__")
  -counter-query string
        set instant counter output query, jq expr [Only at CLI]
  -counter-target-column string
        set instant counter target column [Only at CLI] (default "*")
  -counter-type value
        set instant counter type [Only at CLI] (default count)
  -log-level string
        log level (default "info")
  -put record
        put record configured stream [Only at CLI]
  -stream string
        kinesis data stream name [Only at CLI]
  -window string
        tumbling window size, max 15m [Only at CLI]

quick start:

$ kinesis-data-counter -window 1m -stream test-stream
2021/11/22 01:24:36 [info] start get record from arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream
{"counter_id":"__instant__","counter_type":"count","event_source_arn":"arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream","shard_id":"shardId-000000000000","value":203,"window_end":1637511900000,"window_start":1637511840000}
{"counter_id":"__instant__","counter_type":"count","event_source_arn":"arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream","shard_id":"shardId-000000000000","value":502,"window_end":1637511960000,"window_start":1637511900000}
{"counter_id":"__instant__","counter_type":"count","event_source_arn":"arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream","shard_id":"shardId-000000000000","value":230,"window_end":1637512020000,"window_start":1637511960000}

If you do not specify a config, an instant counter is used.
Instant counters simply build counters from CLI options.

For example, the following data is flowing to steam.

{"request_id":1904,"time":"2021-12-01T11:19:54.24Z","user_id":1016}
{"request_id":1905,"time":"2021-12-01T11:19:54.3Z","user_id":1014}
{"request_id":1906,"time":"2021-12-01T11:19:54.36Z","user_id":1017}
{"request_id":1907,"time":"2021-12-01T11:19:54.42Z","user_id":1013}
{"request_id":1908,"time":"2021-12-01T11:19:54.48Z","user_id":1007}
...

If you want to get the estimated number of unique users per minute in the tumbling window:

$ kinesis-data-counter -window 1m  -counter-type approx_count_distinct -counter-target-column user_id -stream test-stream
2021/11/22 01:24:36 [info] start get record from arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream
{"counter_id":"__instant__","counter_type":"approx_count_distinct","event_source_arn":"arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream","shard_id":"shardId-000000000000","value":30,"window_end":1637511900000,"window_start":1637511840000}
{"counter_id":"__instant__","counter_type":"approx_count_distinct","event_source_arn":"arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream","shard_id":"shardId-000000000000","value":24,"window_end":1637511960000,"window_start":1637511900000}
{"counter_id":"__instant__","counter_type":"approx_count_distinct","event_source_arn":"arn:aws:kinesis:ap-northeast-1:111122223333:stream/test-stream","shard_id":"shardId-000000000000","value":22,"window_end":1637512020000,"window_start":1637511960000}
as AWS Lambda function

kinesis-data-counter binary also runs as AWS Lambda function. kinesis-data-counter can be run as a bootstrap using a Lambda function. When run as a Lambda function, expect to receive a KinesisTimeWindowEvent.

For details, refer to the TimeWindow item here.

Sample Configure file as following:

required_version: ">=0.0.0"

counters:
  - id: unique_user_count
    input_stream_arn: arn:aws:kinesis:*:*:stream/input-stream
    output_stream_arn: arn:aws:kinesis:ap-northeast-1:111122223333:stream/output-stream
    target_column: user_id
    counter_type: approx_count_distinct
    jq_expr: |
      {"time":.window_start, "name": "access_log.user_count", "value": .value}

Example Lambda functions configuration.

{
  "FunctionName": "kinesis-data-counter",
  "Environment": {
    "Variables": {
      "KINESIS_DATA_COUNTER_CONFIG": "config.yaml",
    }
  },
  "Handler": "bootstrap",
  "MemorySize": 128,
  "Role": "arn:aws:iam::111122223333:role/lambda-function",
  "Runtime": "provided.al2",
  "Timeout": 300
}

It is possible to set multiple counters and count each input_source_arn. It is also possible to set multiple counters for one input_source_arn.

notice:

  • Always set the --tumbling-window-in-seconds parameter when doing create-event-source-mapping.
  • It is recommended to set aggregate_source_arn when the number of shards of the kinesis stream that becomes input_source is 2 or more.
aggregate_source_arn

For example.

required_version: ">=0.0.0"

counters:
  - id: unique_user_count
    input_stream_arn: arn:aws:kinesis:*:*:stream/input-stream
    aggregate_stream_arn: arn:aws:kinesis:ap-northeast-1:111122223333:stream/aggregate-stream
    output_stream_arn: arn:aws:firehose:ap-northeast-1:111122223333:deliverystream/output-to-s3
    target_column: user_id
    counter_type: approx_count_distinct
    jq_expr: |
      {"time":.window_start, "name": "access_log.user_count", "value": .value}

Lambda is Invoked per tumbling window and shard.
If the input stream has more than one shard, it counts for each shard.
For example, if the counter type is approx_count_distinct, it will be a unique count for each shard and will not be the number you want.
If aggregate_stream_arn is set, after receiving the KinesisTimeWindowEvent from the input stream specified in input_stream_arn, the record of the intermediate calculation result is immediately put to PutRecord in the Kinesis Data Stream specified in aggregate_stream_arn.
If you have set a Lambda function that receives the same KinesisTimeWindowEvent for Aggregate Stream, that Lambda function calculates the value of the counter that straddles the final Shard and puts it to Output Stream.

Counters
counter_type: count

A counter of type count simply counts.
If target_column is*, count the amount of data flowing in the kinesis data stream.
If any key is set in target_column, the JSON key is set and the number that does not appear null is counted.
If an expression is set in target_expr, it evaluates the JSON record and counts it if it is not nil or false.

counter_type: approx_count_distinct

Estimated unique count using HyperLogLog++.
I think you can get a generally correct unique count.

target_column is*cannot be set.
If any key is set in target_column, the JSON key is set and the number that does not appear null is counted.
If an expression is set in target_expr, it evaluates the JSON record and uniquely counts those that do not result in nil.
SipHash is used as the column hashing algorithm. If you want to use something other than the default HashKey, specify siphash_key_hex

jq_expr

The final output can be processed using the jq expr.

LICENSE

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CounterTypeValuesString

func CounterTypeValuesString() string

Types

type ARN

type ARN struct {
	awsarn.ARN
}

func (*ARN) IsAmbiguous

func (arn *ARN) IsAmbiguous() bool

func (*ARN) IsKinesisDataStream

func (arn *ARN) IsKinesisDataStream() bool

func (*ARN) MarshalText

func (arn *ARN) MarshalText() ([]byte, error)

func (*ARN) Match

func (arn *ARN) Match(other string) bool

func (*ARN) Set

func (arn *ARN) Set(text string) error

func (*ARN) StreamName

func (arn *ARN) StreamName() string

func (*ARN) UnmarshalText

func (arn *ARN) UnmarshalText(text []byte) error

type App

type App struct {
	// contains filtered or unexported fields
}

func New

func New(cfg *Config) (*App, error)

func NewWithClient

func NewWithClient(cfg *Config, kinesisClient KinesisClient, firehoseClient FirehoseClient) *App

func (*App) Handler

func (*App) Run

func (app *App) Run(ctx context.Context, streamName string, tumblingWindow time.Duration) error

func (*App) SetIgnorePutRecord

func (app *App) SetIgnorePutRecord(f bool)

func (*App) SetOutput

func (app *App) SetOutput(w io.Writer)

func (*App) SetVersion

func (app *App) SetVersion(version string)

type BatchItemFailure

type BatchItemFailure struct {
	ItemIdentifier string `json:"itemIdentifier"`
}

type Config

type Config struct {
	RequiredVersion string `yaml:"required_version"`

	Counters []*CounterConfig `yaml:"counters"`
	// contains filtered or unexported fields
}

func NewDefaultConfig

func NewDefaultConfig() *Config

func (*Config) Load

func (cfg *Config) Load(path string) error

func (*Config) Restrict

func (cfg *Config) Restrict() error

func (*Config) ValidateVersion

func (c *Config) ValidateVersion(version string) error

type CounterConfig

type CounterConfig struct {
	ID                 string      `yaml:"id,omitempty"`
	InputStreamARN     *ARN        `yaml:"input_stream_arn,omitempty"`
	OutputStreamARN    *ARN        `yaml:"output_stream_arn,omitempty"`
	AggregateStreamArn *ARN        `yaml:"aggregate_stream_arn,omitempty"`
	TargetColumn       string      `yaml:"target_column,omitempty"`
	TargetExpr         string      `yaml:"target_expr,omitempty"`
	CounterType        CounterType `yaml:"counter_type,omitempty"`
	SipHashKeyHex      string      `yaml:"siphash_key_hex"`
	JQExpr             string      `yaml:"jq_expr"`
	// contains filtered or unexported fields
}

func NewDefaultCounterConfig

func NewDefaultCounterConfig() *CounterConfig

For CLI

func (*CounterConfig) Restrict

func (cfg *CounterConfig) Restrict() error

func (*CounterConfig) SetFlags

func (cfg *CounterConfig) SetFlags(f *flag.FlagSet) error

type CounterState

type CounterState struct {
	CounterType CounterType `json:"counter_type"`
	RowCount    int64       `json:"row_count,omitempty"`
	Base64HLLPP string      `json:"base64_hllpp,omitempty"`
}

type CounterStates

type CounterStates map[string]map[string]*CounterState

func (CounterStates) MergeInto

func (s CounterStates) MergeInto(other CounterStates)

type CounterType

type CounterType int
const (
	Count CounterType = iota + 1
	ApproxCountDistinct
)

func CounterTypeString

func CounterTypeString(s string) (CounterType, error)

CounterTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func CounterTypeValues

func CounterTypeValues() []CounterType

CounterTypeValues returns all values of the enum

func (CounterType) IsACounterType

func (i CounterType) IsACounterType() bool

IsACounterType returns "true" if the value is listed in the enum definition. "false" otherwise

func (CounterType) MarshalJSON

func (i CounterType) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for CounterType

func (CounterType) MarshalYAML

func (i CounterType) MarshalYAML() (interface{}, error)

MarshalYAML implements a YAML Marshaler for CounterType

func (*CounterType) Set

func (t *CounterType) Set(str string) error

func (CounterType) String

func (i CounterType) String() string

func (*CounterType) UnmarshalJSON

func (i *CounterType) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the json.Unmarshaler interface for CounterType

func (*CounterType) UnmarshalYAML

func (i *CounterType) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements a YAML Unmarshaler for CounterType

type FirehoseClient

type FirehoseClient interface {
	PutRecord(ctx context.Context, params *firehose.PutRecordInput, optFns ...func(*firehose.Options)) (*firehose.PutRecordOutput, error)
}

type IntermediateRecord

type IntermediateRecord struct {
	EventSourceARN string             `json:"event_source_arn,omitempty"`
	ShardID        string             `json:"shard_id,omitempty"`
	CounterID      string             `json:"counter_id,omitempty"`
	CounterType    CounterType        `json:"counter_type,omitempty"`
	CounterVersion string             `json:"counter_version,omitempty"`
	Window         *KinesisTimeWindow `json:"window,omitempty"`
	State          *CounterState      `json:"counter_state,omitempty"`
}

type KinesisClient

type KinesisClient interface {
	GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
	GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
	DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error)
	PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error)
}

type KinesisTimeWindow

type KinesisTimeWindow struct {
	Start time.Time `json:"start"`
	End   time.Time `json:"end"`
}

func (*KinesisTimeWindow) String added in v0.1.2

func (w *KinesisTimeWindow) String() string

type KinesisTimeWindowEvent

type KinesisTimeWindowEvent struct {
	Records                 []events.KinesisEventRecord `json:"Records"`
	Window                  *KinesisTimeWindow          `json:"window"`
	State                   CounterStates               `json:"state"`
	ShardID                 string                      `json:"shardId"`
	EventSourceArn          string                      `json:"eventSourceARN"`
	IsFinalInvokeForWindow  bool                        `json:"isFinalInvokeForWindow"`
	IsWindowTerminatedEarly bool                        `json:"isWindowTerminatedEarly"`
}

type TimeWindowEventResponse

type TimeWindowEventResponse struct {
	State             CounterStates      `json:"state"`
	BatchItemFailures []BatchItemFailure `json:"batchItemFailures"`
	// contains filtered or unexported fields
}

func (*TimeWindowEventResponse) AddBatchItemFailures

func (resp *TimeWindowEventResponse) AddBatchItemFailures(items ...BatchItemFailure)

func (*TimeWindowEventResponse) MergeInto

func (resp *TimeWindowEventResponse) MergeInto(other *TimeWindowEventResponse)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL