kinsumer

package
v0.0.0-...-c9206f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2019 License: MIT Imports: 33 Imported by: 0

README

Kinsumer

Native Go consumer for AWS Kinesis streams.

Build Status Go Report Card

Rationale

There are several very good ways to consume Kinesis streams, primarily The Amazon Kinesis Client Library, and it is recommended that be investigated as an option.

Kinsumer is designed for a cluster of Go clients that want each client to consume from multiple shards. Kinsumer is designed to be at-least-once with a strong effort to be exactly-once. Kinsumer by design does not attempt to keep shards on a specific client and will shuffle them around as needed.

Behavior

Kinsumer is designed to suit a specific use case of kinesis consuming, specifically when you need to have multiple clients each handling multiple shards and you do not care which shard is being consumed by which client.

Kinsumer will rebalance shards to each client whenever it detects the list of shards or list of clients has changed, and does not attempt to keep shards on the same client.

If you are running multiple Kinsumer apps against a single stream, make sure to increase the throttleDelay to at least 50ms + (200ms * <the number of reader apps>). Note that Kinesis does not support more than two readers per writer on a fully utilized stream, so make sure you have enough stream capacity.

Example

See cmd/noopkinsumer for a fully working example of a kinsumer client.

Testing

Testing with local test servers

By default the tests look for a dynamodb server at localhost:4567 and kinesis server at localhost:4568

For example using kinesalite and dynalite

kinesalite --port 4568 --createStreamMs 1 --deleteStreamMs 1 --updateStreamMs 1 --shardLimit 1000 &
dynalite --port 4567 --createTableMs 1 --deleteTableMs 1 --updateTableMs 1 &

Then go test ./...

Testing with real aws resources

It's possible to run the test against real AWS resources, but the tests create and destroy resources, which can be finicky, and potentially expensive.

Make sure you have your credentials setup in a way that aws-sdk-go is happy with, or be running on an EC2 instance.

Then go test . -dynamo_endpoint= -kinesis_endpoint= -resource_change_timeout=30s

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrRunTwice - Run() can only ever be run once
	ErrRunTwice = errors.New("run() can only ever be run once")
	// ErrNoKinesisInterface - Need a kinesis instance
	ErrNoKinesisInterface = errors.New("need a kinesis instance")
	// ErrNoDynamoInterface - Need a dynamodb instance
	ErrNoDynamoInterface = errors.New("need a dynamodb instance")
	// ErrNoStreamName - Need a kinesis stream name
	ErrNoStreamName = errors.New("need a kinesis stream name")
	// ErrNoApplicationName - Need an application name for the dynamo table names
	ErrNoApplicationName = errors.New("need an application name for the dynamo table names")
	// ErrThisClientNotInDynamo - Unable to find this client in the client list
	ErrThisClientNotInDynamo = errors.New("unable to find this client in the client list")
	// ErrNoShardsAssigned - We found shards, but got assigned none
	ErrNoShardsAssigned = errors.New("we found shards, but got assigned none")

	// ErrConfigInvalidThrottleDelay - ThrottleDelay config value must be at least 200ms
	ErrConfigInvalidThrottleDelay = errors.New("throttleDelay config value must be at least 200ms (preferably 250ms)")
	// ErrConfigInvalidCommitFrequency - CommitFrequency config value is mandatory
	ErrConfigInvalidCommitFrequency = errors.New("commitFrequency config value is mandatory")
	// ErrConfigInvalidShardCheckFrequency - ShardCheckFrequency config value is mandatory
	ErrConfigInvalidShardCheckFrequency = errors.New("shardCheckFrequency config value is mandatory")
	// ErrConfigInvalidLeaderActionFrequency - LeaderActionFrequency config value is mandatory
	ErrConfigInvalidLeaderActionFrequency = errors.New("leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency")
	// ErrConfigInvalidBufferSize - BufferSize config value is mandatory
	ErrConfigInvalidBufferSize = errors.New("bufferSize config value is mandatory")
	// ErrConfigInvalidDynamoCapacity - Dynamo read/write capacity cannot be 0
	ErrConfigInvalidDynamoCapacity = errors.New("dynamo read/write capacity cannot be 0")

	// ErrStreamBusy - Stream is busy
	ErrStreamBusy = errors.New("stream is busy")
	// ErrNoSuchStream - No such stream
	ErrNoSuchStream = errors.New("no such stream")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config holds all configuration values for a single Kinsumer instance

func NewConfig

func NewConfig() Config

NewConfig returns a default Config struct

func (Config) WithBufferSize

func (c Config) WithBufferSize(bufferSize int) Config

WithBufferSize returns a Config with a modified buffer size

func (Config) WithCommitFrequency

func (c Config) WithCommitFrequency(commitFrequency time.Duration) Config

WithCommitFrequency returns a Config with a modified commit frequency

func (Config) WithDynamoReadCapacity

func (c Config) WithDynamoReadCapacity(readCapacity int64) Config

WithDynamoReadCapacity returns a Config with a modified dynamo read capacity

func (Config) WithDynamoWaiterDelay

func (c Config) WithDynamoWaiterDelay(delay time.Duration) Config

WithDynamoWaiterDelay returns a Config with a modified dynamo waiter delay

func (Config) WithDynamoWriteCapacity

func (c Config) WithDynamoWriteCapacity(writeCapacity int64) Config

WithDynamoWriteCapacity returns a Config with a modified dynamo write capacity

func (Config) WithLeaderActionFrequency

func (c Config) WithLeaderActionFrequency(leaderActionFrequency time.Duration) Config

WithLeaderActionFrequency returns a Config with a modified leader action frequency

func (Config) WithShardCheckFrequency

func (c Config) WithShardCheckFrequency(shardCheckFrequency time.Duration) Config

WithShardCheckFrequency returns a Config with a modified shard check frequency

func (Config) WithThrottleDelay

func (c Config) WithThrottleDelay(delay time.Duration) Config

WithThrottleDelay returns a Config with a modified throttle delay

type Kinsumer

type Kinsumer struct {
	// contains filtered or unexported fields
}

Kinsumer is a Kinesis Consumer that tries to reduce duplicate reads while allowing for multiple clients each processing multiple shards

func New

func New(streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)

New returns a Kinsumer Interface with default kinesis and dynamodb instances, to be used in ec2 instances to get default auth and config

func NewWithInterfaces

func NewWithInterfaces(kinesisAPI kinesisiface.KinesisAPI, dynamodb dynamodbiface.DynamoDBAPI, s3manager s3manageriface.UploaderAPI, streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)

NewWithInterfaces allows you to override the Kinesis and Dynamo instances for mocking or using a local set of servers

func NewWithSession

func NewWithSession(session *session.Session, streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)

NewWithSession should be used if you want to override the Kinesis and Dynamo instances with a non-default aws session

func (*Kinsumer) CreateRequiredTables

func (k *Kinsumer) CreateRequiredTables() error

CreateRequiredTables will create the required dynamodb tables based on the applicationName

func (*Kinsumer) DeleteTables

func (k *Kinsumer) DeleteTables() error

DeleteTables will delete the dynamodb tables that were created based on the applicationName

func (*Kinsumer) GetClientName

func (k *Kinsumer) GetClientName() string

func (*Kinsumer) Publish

func (k *Kinsumer) Publish(streamName, partitionKey string, data *[]byte) error

Publish to write some data into stream.

func (*Kinsumer) Run

func (k *Kinsumer) Run(fatalErr chan error) error

Run runs the main kinesis consumer process. This is a non-blocking call, use Stop() to force it to return. This goroutine is responsible for startin/stopping consumers, aggregating all consumers' records, updating checkpointers as records are consumed, and refreshing our shard/client list and leadership TODO: Can we unit test this at all?

func (*Kinsumer) Stop

func (k *Kinsumer) Stop()

Stop stops the consumption of kinesis events TODO: Can we unit test this at all?

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL