koro

package module
v0.0.0-...-14d6508 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: BSD-3-Clause Imports: 8 Imported by: 0

README

koro

Go

koro is a simple DynamoDB Streams reader for Go.

Motivation

In real environment, most of people prefers to Lambda function to read DynamoDB Streams because the lambda handles shards management. The lambda function itself is kept in stateless. I'd want to have a similar setup in local environment (mainly for testing) but there is no Lambda runtime environment that works with DynamoDB Steams.

koro is built to allow to read DynamoDB Streams on DynamoDB Local with less hustle.

Example

See stream_test.go for the detail.

Documentation

Overview

Package koro is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEndOfShard = errors.New("koro: End of Shard")
)
View Source
var ErrNoAvailShards = errors.New("koro: no available shards to read")

ErrNoAvailShards will be returned when there are no available shards for reading. When the reader reads a last record in a last shard in a disabled stream, it will be returned.

Functions

func IsShardNotFoundError

func IsShardNotFoundError(origErr error) bool

func SortShards

func SortShards(shards []*dynamodbstreams.Shard) []*dynamodbstreams.Shard

SortShards sorts shards from a parent-to-child.

Types

type DynamoDBStreamer

DynamoDBStreamer is a subset of DynamoDB Streams API interface.

type MockDynamoDBStreamer

type MockDynamoDBStreamer struct {
	// contains filtered or unexported fields
}

MockDynamoDBStreamer is a mock of DynamoDBStreamer interface.

func NewMockDynamoDBStreamer

func NewMockDynamoDBStreamer(ctrl *gomock.Controller) *MockDynamoDBStreamer

NewMockDynamoDBStreamer creates a new mock instance.

func (*MockDynamoDBStreamer) DescribeStream

DescribeStream mocks base method.

func (*MockDynamoDBStreamer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockDynamoDBStreamer) GetRecords

GetRecords mocks base method.

func (*MockDynamoDBStreamer) GetShardIterator

GetShardIterator mocks base method.

func (*MockDynamoDBStreamer) ListStreams

ListStreams mocks base method.

type MockDynamoDBStreamerMockRecorder

type MockDynamoDBStreamerMockRecorder struct {
	// contains filtered or unexported fields
}

MockDynamoDBStreamerMockRecorder is the mock recorder for MockDynamoDBStreamer.

func (*MockDynamoDBStreamerMockRecorder) DescribeStream

func (mr *MockDynamoDBStreamerMockRecorder) DescribeStream(arg0 interface{}) *gomock.Call

DescribeStream indicates an expected call of DescribeStream.

func (*MockDynamoDBStreamerMockRecorder) GetRecords

func (mr *MockDynamoDBStreamerMockRecorder) GetRecords(arg0 interface{}) *gomock.Call

GetRecords indicates an expected call of GetRecords.

func (*MockDynamoDBStreamerMockRecorder) GetShardIterator

func (mr *MockDynamoDBStreamerMockRecorder) GetShardIterator(arg0 interface{}) *gomock.Call

GetShardIterator indicates an expected call of GetShardIterator.

func (*MockDynamoDBStreamerMockRecorder) ListStreams

func (mr *MockDynamoDBStreamerMockRecorder) ListStreams(arg0 interface{}) *gomock.Call

ListStreams indicates an expected call of ListStreams.

type ShardReader

type ShardReader struct {
	// contains filtered or unexported fields
}

ShardReader provides a reader interface for *dynamodbstreams.Shard.

func (*ShardReader) Next

func (r *ShardReader) Next() bool

Next returns true if the reader doesn't reach the end of shard.

func (*ShardReader) ReadRecords

func (r *ShardReader) ReadRecords() ([]*dynamodbstreams.Record, error)

ReadRecords reads records from the shard. It will automatically update the shard iterator for you.

func (*ShardReader) Reset

func (r *ShardReader) Reset()

Reset resets the internal state in order to read the shard from the beginning.

func (*ShardReader) Seek

func (r *ShardReader) Seek(rc *dynamodbstreams.Record)

Seek advances the iterator to a given record. The next iterator will read record at rc. When a caller is unable to a record, you should seek the iterator to the record in order to restart the processing at the record.

func (*ShardReader) ShardID

func (r *ShardReader) ShardID() string

type ShardReaderService

type ShardReaderService struct {
	// contains filtered or unexported fields
}

ShardReaderService is a factory service that creates a new *ShardReader from *dynamodbstreams.Shard.

func NewShardReaderService

func NewShardReaderService(arn *string, client DynamoDBStreamer) *ShardReaderService

NewShardReaderService creates a *ShardReaderService.

func (*ShardReaderService) NewReader

func (srs *ShardReaderService) NewReader(shard *dynamodbstreams.Shard) *ShardReader

NewReader creates a *ShardReader by *dynamodbstreams.Shard.

type StreamReader

type StreamReader struct {
	// contains filtered or unexported fields
}

StreamReader reads shards in serial order. It does maintain a checkpoint in-memory.

func NewStreamByName

func NewStreamByName(dsr DynamoDBStreamer, tn string) (*StreamReader, error)

NewStreamByName creates a *StreamReader by a given table name.

func NewStreamReader

func NewStreamReader(dsr DynamoDBStreamer, arn *string) (*StreamReader, error)

NewStreamByName creates a *StreamReader by a stream ARN.

func (*StreamReader) ReadRecords

func (sr *StreamReader) ReadRecords() ([]*dynamodbstreams.Record, error)

ReadRecords reads the current shard. It will only move to the next shard if the current shard reader reaches the end of shard.

func (*StreamReader) Reader

func (sr *StreamReader) Reader() *ShardReader

Reader returns a shard reader that is currently read.

func (*StreamReader) Seek

func (sr *StreamReader) Seek(rc *dynamodbstreams.Record)

Seek advances the iterator in the current shard. See ShardReader.Seek. This should be called right after ReadRecords() if you want to restart from the same shard.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL