triton

package
v0.0.0-...-6cc688f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2020 License: ISC Imports: 24 Imported by: 1

Documentation

Overview

Mock Kinesis Service

Index

Constants

View Source
const BUFFER_SIZE int = 1024 * 1024
View Source
const CREATE_TABLE_STMT = `` /* 237-byte string literal not displayed */

Variables

View Source
var MinPollInterval = 1.0 * time.Second

Recommended minimum polling interval to keep from overloading a Kinesis shard.

View Source
var RequestLimit int64 = 1000

Functions

func GetCheckpointStats

func GetCheckpointStats(clientName string, db *sql.DB) (stat map[string]int64, err error)

Types

type ArchiveReader

type ArchiveReader struct {
	// contains filtered or unexported fields
}

An ArchiveReader understands how to translate our archive data store format into indivdual records.

func (*ArchiveReader) ReadRecord

func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error)

type CheckpointService

type CheckpointService interface {
	Checkpoint(string) error
}

type Checkpointer

type Checkpointer interface {
	Checkpoint(ShardID, SequenceNumber) error
	LastSequenceNumber(ShardID) (SequenceNumber, error)
}

func NewCheckpointer

func NewCheckpointer(clientName string, streamName string, db *sql.DB) (Checkpointer, error)

Create a new Checkpointer. May return an error if the database is not usable.

type Config

type Config struct {
	Streams map[string]StreamConfig
}

func NewConfigFromFile

func NewConfigFromFile(r io.Reader) (c *Config, err error)

func (*Config) ConfigForName

func (c *Config) ConfigForName(n string) (sc *StreamConfig, err error)

type DynamoDBService

type DynamoDBService interface {
	UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error)
}

type Reader

type Reader interface {
	ReadRecord() (rec map[string]interface{}, err error)
}

func NewArchiveReader

func NewArchiveReader(ir io.Reader) (or Reader)

func NewSerialReader

func NewSerialReader(readers []Reader) Reader

func NewStoreReader

func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, startDate, endDate time.Time) (Reader, error)

type S3Service

type S3Service interface {
	GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
	ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error)
}

type S3Uploader

type S3Uploader struct {
	// contains filtered or unexported fields
}

An Uploader is just a simple wrapper around an S3manager. It just assumes default options, and that we will want to upload from some local file name to a remote file name.

func NewUploader

func NewUploader(c client.ConfigProvider, bucketName string) *S3Uploader

func (*S3Uploader) Upload

func (s *S3Uploader) Upload(fileName, keyName string) (err error)

type S3UploaderService

type S3UploaderService interface {
	Upload(input *s3manager.UploadInput) (*s3manager.UploadOutput, error)
}

type SequenceNumber

type SequenceNumber string

type SerialReader

type SerialReader struct {
	// contains filtered or unexported fields
}

A SerialReader let's us read from multiple readers, in sequence

func (*SerialReader) ReadRecord

func (sr *SerialReader) ReadRecord() (rec map[string]interface{}, err error)

type ShardID

type ShardID string

Some types to make sure our lists of func args don't get confused

func ListShards

func ListShards(svc KinesisService, streamName string) (shards []ShardID, err error)

func PickShardID

func PickShardID(svc KinesisService, streamName string, shardNum int) (sid ShardID, err error)

Utility function to pick a shard id given an integer shard number. Use this if you want the 2nd shard, but don't know what the id would be.

type ShardStreamReader

type ShardStreamReader struct {
	StreamName         string
	ShardID            ShardID
	ShardIteratorType  string
	NextIteratorValue  *string
	LastSequenceNumber *SequenceNumber
	// contains filtered or unexported fields
}

A ShardStreamReader provides records from a Kinesis stream. It's specific to a single shard. A Stream is blocking, and will avoid overloading a shard by limiting how often it attempts to consume records.

func NewShardStreamReader

func NewShardStreamReader(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader)

Create a new stream starting at the latest position

This uses the Kinesis LATEST iterator type and assumes the caller only wants new data.

func NewShardStreamReaderFromSequence

func NewShardStreamReaderFromSequence(svc KinesisService, streamName string, sid ShardID, sn SequenceNumber) (s *ShardStreamReader)

Create a new stream given a specific sequence number

This uses the Kinesis AFTER_SEQUENCE_NUMBER interator type, so this assumes the provided sequenceNumber has already been processed, and the caller wants records produced since.

func NewShardStreamReaderTrimHorizon

func NewShardStreamReaderTrimHorizon(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader)

Create a new stream starting at the oldest position

This uses the Kinesis TRIM_HORIZON iterator type and assumes the caller only wants all availible data.

func (*ShardStreamReader) Get

func (s *ShardStreamReader) Get() (r *kinesis.Record, err error)

Get the next record from the Shard Stream

If records are already loaded, this returns the next record quickly.

If not, it may block fetching them from the underlying API. In the event the API doesn't have any records prepared either, this method will return a nil record. This allows the caller to do other things rather than just blocking in this call forever or needing to pass in other flow control signals.

type Store

type Store struct {
	// contains filtered or unexported fields
}

A store manages buffering records together into files, and uploading them somewhere.

func NewStore

func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store)

func (*Store) Close

func (s *Store) Close() (err error)

func (*Store) Put

func (s *Store) Put(b []byte) (err error)

func (*Store) PutRecord

func (s *Store) PutRecord(rec map[string]interface{}) (err error)

func (*Store) Store

func (s *Store) Store() (err error)

type StoreArchive

type StoreArchive struct {
	StreamName string
	Bucket     string
	Key        string
	ClientName string

	T         time.Time
	SortValue int
	// contains filtered or unexported fields
}

A StoreArchive represents an instance of a data file stored, usually, in S3.

func NewStoreArchive

func NewStoreArchive(bucketName, keyName string, svc S3Service) (sa StoreArchive, err error)

func (*StoreArchive) ReadRecord

func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error)

type StoreArchiveList

type StoreArchiveList []StoreArchive

Sortable list of store archives.

Though archives, when they come out of S3 are lexigraphically sorted, we want to just be sure that we're really handling our dates and times correctly.

func (StoreArchiveList) Len

func (l StoreArchiveList) Len() int

func (StoreArchiveList) Less

func (l StoreArchiveList) Less(i, j int) bool

func (StoreArchiveList) Swap

func (l StoreArchiveList) Swap(i, j int)

type StreamConfig

type StreamConfig struct {
	StreamName       string `yaml:"name"`
	RegionName       string `yaml:"region"`
	PartitionKeyName string `yaml:"partition_key"`
}

type StreamReader

type StreamReader interface {
	Reader
	Checkpoint() error
	Stop()
}

A StreamReader is a higher-level interface for reading data from a live Triton stream.

By implementing a Reader interface, we can delivery processed triton data to the client. In addition, we provide checkpointing service.

func NewStreamReader

func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)

func NewStreamReaderDefaultLatest

func NewStreamReaderDefaultLatest(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)

func NewStreamReaderDefaultTrimHorizon

func NewStreamReaderDefaultTrimHorizon(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL