writer

package
v0.0.0-...-7e90556 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2018 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RedshiftDatetimeIngestString is the format of timestamps that Redshift understands.
	RedshiftDatetimeIngestString = "2006-01-02 15:04:05.999"
)

Variables

View Source
var (
	// EventsDir is the local subdirectory where successfully-transformed events are written.
	EventsDir = "events"
	// NonTrackedDir is the local subdirectory where non-tracked events are written.
	NonTrackedDir = "nontracked"
)

Functions

This section is empty.

Types

type BatchWriter

type BatchWriter interface {
	SendBatch([][]byte)
}

BatchWriter is an interface to write batches to an external sink.

type DefaultFirehoseFactory

type DefaultFirehoseFactory struct {
	Session *session.Session
}

DefaultFirehoseFactory returns a normal Firehose client.

func (*DefaultFirehoseFactory) New

New returns a firehose client configured to use the given region/role.

type DefaultKinesisFactory

type DefaultKinesisFactory struct {
	Session *session.Session
}

DefaultKinesisFactory returns a normal Kinesis client.

func (*DefaultKinesisFactory) New

func (f *DefaultKinesisFactory) New(region, role string) kinesisiface.KinesisAPI

New returns a kinesis client configured to use the given region/role.

type Event

type Event struct {
	Name   string
	Fields map[string]string
}

Event is a collection of name and fields for compressed streams.

type EventForwarder

type EventForwarder interface {
	Submit([]byte)
	Close()
}

EventForwarder receives events and forwards them to Kinesis or another EventForwarder.

type FirehoseBatchWriter

type FirehoseBatchWriter struct {
	// contains filtered or unexported fields
}

FirehoseBatchWriter writes batches to Kinesis Firehose

func (*FirehoseBatchWriter) SendBatch

func (w *FirehoseBatchWriter) SendBatch(batch [][]byte)

SendBatch writes the given batch to a firehose, configured by the FirehoseBatchWriter

type FirehoseFactory

type FirehoseFactory interface {
	New(region, role string) firehoseiface.FirehoseAPI
}

FirehoseFactory returns a firehose interface from a given session.

type JSONRecord

type JSONRecord struct {
	UUID      string
	Version   int
	Data      map[string]string
	CreatedAt string
}

JSONRecord is a raw JSON record to be sent to Kinesis.

type KinesisConfig

type KinesisConfig struct {
	StreamConfig  scoop_protocol.KinesisWriterConfig
	CommonFilters map[string]scoop_protocol.EventFilterFunc
	DefaultFilter scoop_protocol.EventFilterFunc
}

KinesisConfig represents a stream's config and some base kinesis config.

type KinesisFactory

type KinesisFactory interface {
	New(region, role string) kinesisiface.KinesisAPI
}

KinesisFactory returns a kinesis interface from a given session.

type KinesisWriter

type KinesisWriter struct {
	sync.WaitGroup
	// contains filtered or unexported fields
}

KinesisWriter is a writer that writes events to kinesis

func (*KinesisWriter) Close

func (w *KinesisWriter) Close() error

Close closes a KinesisWriter

func (*KinesisWriter) Rotate

func (w *KinesisWriter) Rotate() (bool, error)

Rotate doesn't do anything as KinesisWriters don't need to rotate.

func (*KinesisWriter) Write

func (w *KinesisWriter) Write(req *WriteRequest)

type Multee

type Multee struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Multee implements the `SpadeWriter` and 'SpadeWriterManager' interface and forwards all calls to a map of targets.

func NewMultee

func NewMultee() *Multee

NewMultee makes a empty multee and returns it

func (*Multee) Add

func (t *Multee) Add(key string, w SpadeWriter)

Add adds a new writer to the target map

func (*Multee) Close

func (t *Multee) Close() error

Close closes all the target writers, it does this asynchronously

func (*Multee) Drop

func (t *Multee) Drop(key string)

Drop drops an existing writer from the target map

func (*Multee) Replace

func (t *Multee) Replace(key string, newWriter SpadeWriter)

Replace adds a new writer to the target map

func (*Multee) Rotate

func (t *Multee) Rotate() (bool, error)

Rotate forwards a rotation request to multiple targets

func (*Multee) Write

func (t *Multee) Write(r *WriteRequest)

Write forwards a writerequest to multiple targets

type Record

type Record struct {
	UUID      string
	Version   int
	Data      []byte
	CreatedAt string
}

Record is a compressed record to be sent to Kinesis.

type RotateConditions

type RotateConditions struct {
	MaxLogSize     int64
	MaxTimeAllowed time.Duration
}

RotateConditions is the parameters for maximum time/size until we force a rotation.

type SpadeWriter

type SpadeWriter interface {
	Write(*WriteRequest)
	Close() error

	// Rotate requests a rotation from the SpadeWriter, which *may* write to S3 or Kinesis
	// depending on timing and amount of information already buffered.  This should be
	// called periodically, as this is the only time a SpadeWriter will write to its
	// sink (except on Close).  It returns a bool indicating whether all sinks were
	// written to and one of the errors which arose in writing (if any).
	Rotate() (bool, error)
}

SpadeWriter is an interface for writing to external sinks, like S3 or Kinesis.

func NewKinesisWriter

func NewKinesisWriter(
	kinesisFactory KinesisFactory,
	firehoseFactory FirehoseFactory,
	statter statsd.Statter,
	config *KinesisConfig,
	errorsBeforeThrottling int,
	secondsPerError int64) (SpadeWriter, error)

NewKinesisWriter returns an instance of SpadeWriter that writes events to kinesis

func NewWriterController

func NewWriterController(
	folder string,
	reporter reporter.Reporter,
	spadeUploaderPool *uploader.UploaderPool,
	blueprintUploaderPool *uploader.UploaderPool,
	maxLogBytes int64,
	maxLogAgeSecs int64,
	nontrackedMaxLogAgeSecs int64,
) SpadeWriter

NewWriterController returns a writerController that handles logic to distribute writes across a number of workers. Each worker owns and operates one file. There are several sets of workers. Each set corresponds to a event type. Thus if we are processing a log file with 2 types of events we should produce (nWriters * 2) files

type SpadeWriterManager

type SpadeWriterManager interface {
	Add(key string, w SpadeWriter)
	Drop(key string)
	Replace(key string, newWriter SpadeWriter)
}

SpadeWriterManager allows operations on a set of SpadeWriters

type Statter

type Statter struct {
	// contains filtered or unexported fields
}

Statter sends stats for a BatchWriter.

func NewStatter

func NewStatter(statter statsd.Statter, streamName string) *Statter

NewStatter returns a Statter for the given stream.

func (*Statter) IncStat

func (w *Statter) IncStat(stat int, amount int64)

IncStat increments a stat by an amount on the Statter.

type StreamBatchWriter

type StreamBatchWriter struct {
	// contains filtered or unexported fields
}

StreamBatchWriter writes batches to Kinesis Streams

func (*StreamBatchWriter) SendBatch

func (w *StreamBatchWriter) SendBatch(batch [][]byte)

SendBatch writes the given batch to a stream, configured by the KinesisWriter

type WriteRequest

type WriteRequest struct {
	Category string
	Version  int
	// Line is the transformed data in tsv format
	Line string
	// Record is the transformed data in a key/value map
	Record map[string]string
	UUID   string
	// Keep the source around for logging
	Source  json.RawMessage
	Failure reporter.FailMode
	Pstart  time.Time
}

WriteRequest is a processed event with metadata, ready for writing to an output.

func MakeErrorRequest

func MakeErrorRequest(e *parser.MixpanelEvent, err interface{}) *WriteRequest

MakeErrorRequest returns a WriteRequest indicating panic happened during processing.

func (*WriteRequest) GetCategory

func (r *WriteRequest) GetCategory() string

GetCategory returns the event type.

func (*WriteRequest) GetMessage

func (r *WriteRequest) GetMessage() string

GetMessage returns the raw JSON of the event.

func (*WriteRequest) GetResult

func (r *WriteRequest) GetResult() *reporter.Result

GetResult returns timing and metadata of the event.

func (*WriteRequest) GetStartTime

func (r *WriteRequest) GetStartTime() time.Time

GetStartTime returns when procesing of the event started.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL