Versions in this module Expand all Collapse all v0 v0.3.5 Jul 6, 2020 Changes in this version + var ErrSkipCheckpoint = errors.New("skip checkpoint") + type AllGroup struct + func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger Logger) *AllGroup + func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard) + type Consumer struct + func New(streamName string, opts ...Option) (*Consumer, error) + func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error + func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error + type Counter interface + Add func(string, int64) + type Group interface + GetCheckpoint func(streamName, shardID string) (string, error) + SetCheckpoint func(streamName, shardID, sequenceNumber string) error + Start func(ctx context.Context, shardc chan *kinesis.Shard) + type Logger interface + Log func(...interface{}) + type Option func(*Consumer) + func WithClient(client kinesisiface.KinesisAPI) Option + func WithCounter(counter Counter) Option + func WithGroup(group Group) Option + func WithLogger(logger Logger) Option + func WithMaxRecords(n int64) Option + func WithScanInterval(d time.Duration) Option + func WithShardIteratorType(t string) Option + func WithStore(store Store) Option + func WithTimestamp(t time.Time) Option + type Record = kinesis.Record + type ScanFunc func(*Record) error + type Store interface + GetCheckpoint func(streamName, shardID string) (string, error) + SetCheckpoint func(streamName, shardID, sequenceNumber string) error