Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint interface { Get(streamName, shardID string) (string, error) Set(streamName, shardID, sequenceNumber string) error }
Checkpoint interface used track consumer progress in the stream
type Client ¶ added in v0.2.0
type Client interface { GetShardIDs(string) ([]string, error) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error) }
Client interface is used for interacting with kinesis stream
type ClientOption ¶ added in v0.2.0
type ClientOption func(*KinesisClient)
ClientOption is used to override defaults when creating a KinesisClient
func WithKinesis ¶ added in v0.2.0
func WithKinesis(svc kinesisiface.KinesisAPI) ClientOption
WithKinesis overrides the default Kinesis client
func WithStartFromLatest ¶ added in v0.2.0
func WithStartFromLatest() ClientOption
WithStartFromLatest will make sure the client start consuming events starting from the most recent event in kinesis. This option discards the checkpoints.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps the interaction with the Kinesis stream
func New ¶ added in v0.2.0
New creates a kinesis consumer with default settings. Use Option to override any of the optional attributes.
func (*Consumer) Scan ¶ added in v0.2.0
Scan scans each of the shards of the stream, calls the callback func with each of the kinesis records.
func (*Consumer) ScanShard ¶ added in v0.2.0
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Record) ScanError) (err error)
ScanShard loops over records on a specific shard, calls the callback func for each record and checkpoints the progress of scan. Note: Returning `false` from the callback func will end the scan.
type Counter ¶ added in v0.2.0
Counter interface is used for exposing basic metrics from the scanner
type KinesisClient ¶ added in v0.2.0
type KinesisClient struct {
// contains filtered or unexported fields
}
KinesisClient acts as wrapper around Kinesis client
func NewKinesisClient ¶ added in v0.2.0
func NewKinesisClient(opts ...ClientOption) (*KinesisClient, error)
NewKinesisClient returns client to interface with Kinesis stream
func (*KinesisClient) GetRecords ¶ added in v0.2.0
func (c *KinesisClient) GetRecords(ctx context.Context, streamName, shardID, lastSeqNum string) (<-chan *Record, <-chan error, error)
GetRecords returns a chan Record from a Shard of the Stream
func (*KinesisClient) GetShardIDs ¶ added in v0.2.0
func (c *KinesisClient) GetShardIDs(streamName string) ([]string, error)
GetShardIDs returns shard ids in a given stream
type Logger ¶ added in v0.2.0
type Logger interface {
Log(...interface{})
}
A Logger is a minimal interface to as a adaptor for external logging library to consumer
func NewDefaultLogger ¶ added in v0.2.0
func NewDefaultLogger() Logger
NewDefaultLogger returns a Logger which discards messages.
type LoggerFunc ¶ added in v0.2.0
type LoggerFunc func(...interface{})
type Option ¶ added in v0.2.0
Option is used to override defaults when creating a new Consumer
func WithCheckpoint ¶ added in v0.2.0
func WithCheckpoint(checkpoint Checkpoint) Option
WithCheckpoint overrides the default checkpoint
func WithClient ¶ added in v0.2.0
WithClient overrides the default client
func WithCounter ¶ added in v0.2.0
WithCounter overrides the default counter
func WithLogger ¶ added in v0.2.0
WithLogger overrides the default logger