consumer

package module
v0.0.0-...-97adbfc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2018 License: MIT Imports: 11 Imported by: 0

README

kinesis-ddb-consumer

GoDoc

KinesisStream Client for golang which manage sequence number in DynamoDB.

Overview

The package will abstract the shards of the kinesis stream so you can read data via single channel.

This will guarantee the order of records in the same shard but won't guarantee the order across shards.

Usage

import (
	consumer "github.com/atsushi-ishibashi/kinesis-ddb-consumer"
)

func main() {
	c, err := consumer.New("<AppName>", "<KinesisStreamName>", "<DynamoDB TableName>")
	if err != nil {
		log.Fatalln(err)
	}

	queue := c.GetChannel()
	for v := range queue {
		// do something
	}
}
requirements
IAM Policy
{
  "Version": "2012-10-17",
  "Statement": [{
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:UpdateItem"
      ],
      "Resource": [
        "arn:aws:dynamodb:<region>:<account>:table/<table>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetShardIterator",
        "kinesis:GetRecords"
      ],
      "Resource": [
        "arn:aws:kinesis:<region>:<account>:stream/<stream>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "kinesis:ListShards",
      "Resource": "*"
    }
  ]
}
TODO
  • Graceful shatdown
  • Context

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidMaxWait    = errors.New("invalid max wait")
	ErrInvalidChannelCap = errors.New("invalid channel cap")
)

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	SetMaxWait(i int) error
	SetChannelCap(i int) error
	GetChannel() <-chan Record
}

func New

func New(appName, streamName, tableName string) (Consumer, error)

type Record

type Record struct {
	ArrivalTimestamp time.Time
	Data             []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL