triton

package
v2.0.0-...-d734875 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2016 License: ISC Imports: 25 Imported by: 0

Documentation

Overview

Package triton provides an opinionated interface with Kinesis

Index

Constants

View Source
const BUFFER_SIZE int = 1024 * 1024
View Source
const CREATE_TABLE_STMT = `` /* 237-byte string literal not displayed */
View Source
const MaxBatchSize = 500

MaxBatchSize is the limit Kinesis has on a PutRecords call

Variables

This section is empty.

Functions

func GetCheckpointStats

func GetCheckpointStats(clientName string, db *sql.DB) (stat map[string]int64, err error)

func MarshalRecord

func MarshalRecord(r Record) ([]byte, error)

Types

type BatchWriter

type BatchWriter struct {
	// contains filtered or unexported fields
}

BatchWriter implements an asyncronous writer that writes records in batches. A batch is written when either the buffer size is exceeded or the time interval since the last write has been exceeded.

Write errors are written to the Errors() channel. It is highly recommended that you actively monitor this channel because the writer will not stop after an error.

func NewBatchWriter

func NewBatchWriter(w Writer) *BatchWriter

NewBatchWriter creates a batch version of an existing Writer using default values for size and interval.

func NewBatchWriterSize

func NewBatchWriterSize(w Writer, size int, intr time.Duration) *BatchWriter

NewBatchWriterSize creates a batch writer using the given size and interval.

func (*BatchWriter) Close

func (bw *BatchWriter) Close()

Close prevents future writes and flushes all currently buffered records. If there is an error it will have been written to the Errors chan.

func (*BatchWriter) Errors

func (bw *BatchWriter) Errors() <-chan error

Errors returns the channel that errors will be returned on. It is highly recommended that you monitor this channel.

func (*BatchWriter) Flush

func (bw *BatchWriter) Flush()

Flush forces all buffered records to be sent. If there is an error it will have been written to the Errors chan.

func (*BatchWriter) WriteRecords

func (bw *BatchWriter) WriteRecords(rs ...Record) error

WriteRecords performs an asyncronous write to Kinesis.

The returned error will always be nil in the current implementation. It is recommended you read errors from Errors().

type Checkpointer

type Checkpointer interface {
	Checkpoint(string, string) error
	LastSequenceNumber(string) (string, error)
}

func NewCheckpointer

func NewCheckpointer(clientName string, streamName string, db *sql.DB) (Checkpointer, error)

Create a new Checkpointer. May return an error if the database is not usable.

type Config

type Config struct {
	Streams map[string]StreamConfig
}

func NewConfigFromFile

func NewConfigFromFile(r io.Reader) (c *Config, err error)

func (*Config) ConfigForName

func (c *Config) ConfigForName(n string) (sc *StreamConfig, err error)

type Reader

type Reader interface {
	// Read reads up to len(r) bytes into r. It returns the number of records
	// read (0 <= n <= len(r)), the offset after the read, and any error
	// encountered.
	Read(r []Record) (n int, off string, err error)
}

Reader is an interface for a basic read

func NewStoreReader

func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, startDate, endDate time.Time) (Reader, error)

type Record

type Record map[string]interface{}

func UnmarshalRecord

func UnmarshalRecord(data []byte) (Record, error)

type S3Service

type S3Service interface {
	ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error)
	Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (n int64, err error)
	Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

A store manages buffering records together into files, and uploading them somewhere.

func (*Store) Close

func (s *Store) Close() (err error)

func (*Store) Put

func (s *Store) Put(b []byte) (err error)

func (*Store) PutRecord

func (s *Store) PutRecord(rec Record) (err error)

func (*Store) Store

func (s *Store) Store(stream *Stream, chunkSize int) error

type Store2

type Store2 struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(name, bucket string) *Store2

func NewStoreService

func NewStoreService(s3 S3Service, name, bucket string) *Store2

func (*Store2) Reader

func (s *Store2) Reader(start, end string) (Reader, error)

start and end are in form time.RFC3339

func (*Store2) Write

func (s *Store2) Write(data []Record) (int, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(region, name string) (*Stream, error)

NewStream returns a stream configured for all shards using env variables to determine the Kinesis Service.

func NewStreamShardsService

func NewStreamShardsService(svc KinesisService, region, name string, shards []string) (*Stream, error)

NewStreamShardsService returns a Stream configured to the specified shards using the specificed Kinesis service.

If no shards are given, the stream will be configured to return from all shards for a given stream.

func (*Stream) Reader

func (s *Stream) Reader(off string) (Reader, error)

type StreamConfig

type StreamConfig struct {
	StreamName       string `yaml:"name"`
	RegionName       string `yaml:"region"`
	PartitionKeyName string `yaml:"partition_key"`
}

type Writer

type Writer interface {
	WriteRecords(r ...Record) error
}

Writer represents a Kinesis writer configured to use a stream and partition key.

func NewWriter

func NewWriter(config *StreamConfig) Writer

NewWriter returns a Writer using the given configuration. The returned writer is a syncronous writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL