README

Kinsumer

Native Go consumer for AWS Kinesis streams.

Build Status Go Report Card

Rationale

There are several very good ways to consume Kinesis streams, primarily The Amazon Kinesis Client Library, and it is recommended that be investigated as an option.

Kinsumer is designed for a cluster of Go clients that want each client to consume from multiple shards. Kinsumer is designed to be at-least-once with a strong effort to be exactly-once. Kinsumer by design does not attempt to keep shards on a specific client and will shuffle them around as needed.

Behavior

Kinsumer is designed to suit a specific use case of kinesis consuming, specifically when you need to have multiple clients each handling multiple shards and you do not care which shard is being consumed by which client.

Kinsumer will rebalance shards to each client whenever it detects the list of shards or list of clients has changed, and does not attempt to keep shards on the same client.

If you are running multiple Kinsumer apps against a single stream, make sure to increase the throttleDelay to at least 50ms + (200ms * <the number of reader apps>). Note that Kinesis does not support more than two readers per writer on a fully utilized stream, so make sure you have enough stream capacity.

Example

See cmd/noopkinsumer for a fully working example of a kinsumer client.

Testing

Testing with local test servers

By default the tests look for a dynamodb server at localhost:4567 and kinesis server at localhost:4568

For example using kinesalite and dynalite

kinesalite --port 4568 --createStreamMs 1 --deleteStreamMs 1 --updateStreamMs 1 --shardLimit 1000 &
dynalite --port 4567 --createTableMs 1 --deleteTableMs 1 --updateTableMs 1 &

Then go test ./...

Testing with real aws resources

It's possible to run the test against real AWS resources, but the tests create and destroy resources, which can be finicky, and potentially expensive.

Make sure you have your credentials setup in a way that aws-sdk-go is happy with, or be running on an EC2 instance.

Then go test . -dynamo_endpoint= -kinesis_endpoint= -resource_change_timeout=30s

Expand ▾ Collapse ▴

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrRunTwice - Run() can only ever be run once
	ErrRunTwice = errors.New("run() can only ever be run once")
	// ErrNoKinesisInterface - Need a kinesis instance
	ErrNoKinesisInterface = errors.New("need a kinesis instance")
	// ErrNoDynamoInterface - Need a dynamodb instance
	ErrNoDynamoInterface = errors.New("need a dynamodb instance")
	// ErrNoStreamName - Need a kinesis stream name
	ErrNoStreamName = errors.New("need a kinesis stream name")
	// ErrNoApplicationName - Need an application name for the dynamo table names
	ErrNoApplicationName = errors.New("need an application name for the dynamo table names")
	// ErrThisClientNotInDynamo - Unable to find this client in the client list
	ErrThisClientNotInDynamo = errors.New("unable to find this client in the client list")
	// ErrNoShardsAssigned - We found shards, but got assigned none
	ErrNoShardsAssigned = errors.New("we found shards, but got assigned none")

	// ErrConfigInvalidThrottleDelay - ThrottleDelay config value must be at least 200ms
	ErrConfigInvalidThrottleDelay = errors.New("throttleDelay config value must be at least 200ms (preferably 250ms)")
	// ErrConfigInvalidCommitFrequency - CommitFrequency config value is mandatory
	ErrConfigInvalidCommitFrequency = errors.New("commitFrequency config value is mandatory")
	// ErrConfigInvalidShardCheckFrequency - ShardCheckFrequency config value is mandatory
	ErrConfigInvalidShardCheckFrequency = errors.New("shardCheckFrequency config value is mandatory")
	// ErrConfigInvalidLeaderActionFrequency - LeaderActionFrequency config value is mandatory
	ErrConfigInvalidLeaderActionFrequency = errors.New("leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency")
	// ErrConfigInvalidBufferSize - BufferSize config value is mandatory
	ErrConfigInvalidBufferSize = errors.New("bufferSize config value is mandatory")
	// ErrConfigInvalidDynamoCapacity - Dynamo read/write capacity cannot be 0
	ErrConfigInvalidDynamoCapacity = errors.New("dynamo read/write capacity cannot be 0")

	// ErrStreamBusy - Stream is busy
	ErrStreamBusy = errors.New("stream is busy")
	// ErrNoSuchStream - No such stream
	ErrNoSuchStream = errors.New("no such stream")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

    Config holds all configuration values for a single Kinsumer instance

    func NewConfig

    func NewConfig() Config

      NewConfig returns a default Config struct

      func (Config) WithBufferSize

      func (c Config) WithBufferSize(bufferSize int) Config

        WithBufferSize returns a Config with a modified buffer size

        func (Config) WithCommitFrequency

        func (c Config) WithCommitFrequency(commitFrequency time.Duration) Config

          WithCommitFrequency returns a Config with a modified commit frequency

          func (Config) WithDynamoReadCapacity

          func (c Config) WithDynamoReadCapacity(readCapacity int64) Config

            WithDynamoReadCapacity returns a Config with a modified dynamo read capacity

            func (Config) WithDynamoWaiterDelay

            func (c Config) WithDynamoWaiterDelay(delay time.Duration) Config

              WithDynamoWaiterDelay returns a Config with a modified dynamo waiter delay

              func (Config) WithDynamoWriteCapacity

              func (c Config) WithDynamoWriteCapacity(writeCapacity int64) Config

                WithDynamoWriteCapacity returns a Config with a modified dynamo write capacity

                func (Config) WithLeaderActionFrequency

                func (c Config) WithLeaderActionFrequency(leaderActionFrequency time.Duration) Config

                  WithLeaderActionFrequency returns a Config with a modified leader action frequency

                  func (Config) WithShardCheckFrequency

                  func (c Config) WithShardCheckFrequency(shardCheckFrequency time.Duration) Config

                    WithShardCheckFrequency returns a Config with a modified shard check frequency

                    func (Config) WithThrottleDelay

                    func (c Config) WithThrottleDelay(delay time.Duration) Config

                      WithThrottleDelay returns a Config with a modified throttle delay

                      type Kinsumer

                      type Kinsumer struct {
                      	// contains filtered or unexported fields
                      }

                        Kinsumer is a Kinesis Consumer that tries to reduce duplicate reads while allowing for multiple clients each processing multiple shards

                        func New

                        func New(streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)

                          New returns a Kinsumer Interface with default kinesis and dynamodb instances, to be used in ec2 instances to get default auth and config

                          func NewWithInterfaces

                          func NewWithInterfaces(kinesisAPI kinesisiface.KinesisAPI, dynamodb dynamodbiface.DynamoDBAPI, s3manager s3manageriface.UploaderAPI, streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)

                            NewWithInterfaces allows you to override the Kinesis and Dynamo instances for mocking or using a local set of servers

                            func NewWithSession

                            func NewWithSession(session *session.Session, streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)

                              NewWithSession should be used if you want to override the Kinesis and Dynamo instances with a non-default aws session

                              func (*Kinsumer) CreateRequiredTables

                              func (k *Kinsumer) CreateRequiredTables() error

                                CreateRequiredTables will create the required dynamodb tables based on the applicationName

                                func (*Kinsumer) DeleteTables

                                func (k *Kinsumer) DeleteTables() error

                                  DeleteTables will delete the dynamodb tables that were created based on the applicationName

                                  func (*Kinsumer) GetClientName

                                  func (k *Kinsumer) GetClientName() string

                                  func (*Kinsumer) Publish

                                  func (k *Kinsumer) Publish(streamName, partitionKey string, data *[]byte) error

                                    Publish to write some data into stream.

                                    func (*Kinsumer) Run

                                    func (k *Kinsumer) Run(fatalErr chan error) error

                                      Run runs the main kinesis consumer process. This is a non-blocking call, use Stop() to force it to return. This goroutine is responsible for startin/stopping consumers, aggregating all consumers' records, updating checkpointers as records are consumed, and refreshing our shard/client list and leadership TODO: Can we unit test this at all?

                                      func (*Kinsumer) Stop

                                      func (k *Kinsumer) Stop()

                                        Stop stops the consumption of kinesis events TODO: Can we unit test this at all?

                                        Directories

                                        Path Synopsis