processor

package
v0.0.0-...-5178349 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2017 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package processor contains logic to process gzipped Mixpanel event data into SQL table schemas.

Index

Constants

This section is empty.

Variables

View Source
var (
	// CriticalPercentage is the percentage of events that a property must be seen in in order to be considered part of the schema for an event.
	CriticalPercentage = 0.0

	// CriticalThreshold is the number of events of a specific event name that must occur for the event to be summarized.
	CriticalThreshold = 2
)

Functions

func Percentile

func Percentile(nums []int, percentile int) float64

Percentile computes the percentile of a set of integers using http://en.wikipedia.org/wiki/Percentile#Alternative_methods

func ScoopTransformer

func ScoopTransformer(eventName string, properties []PropertySummary, nRows int) ([]byte, error)

ScoopTransformer returns JSON corresponding to the configuration for converting events of a given event name to a SQL table.

Types

type AugmentedColumnDefinition

type AugmentedColumnDefinition struct {
	// InboundName is the name of the property as sent to edge servers.
	InboundName string

	// OutboundName is the name of the property that will be stored in redshift.
	OutboundName string

	// Transformer is the SQL type for the column corresponding to this property.
	Transformer string

	// ColumnCreationOptions are additional options/parameters for the SQL type, e.g. for varchar, "(255)"
	ColumnCreationOptions string

	// OccurrenceProbability is how often this property appears in events.
	OccurrenceProbability float64
}

AugmentedColumnDefinition adds some metadata to a property of an event.

type AugmentedEventConfig

type AugmentedEventConfig struct {

	// EventName is the name of the event.
	EventName string

	// Columns is the metadata required to create columns for each property of the event.
	Columns []AugmentedColumnDefinition

	// Occurred is the number of times the event occurred.
	Occurred int
}

AugmentedEventConfig gives the configuration for creating a table for a given event name.

type EventAggregator

type EventAggregator struct {
	// CriticalPercent is the threshold percent of events that contain a given property, under which a property will be ommitted from the event summary.
	CriticalPercent float64

	// TotalRows is a count of events seen.
	TotalRows int

	// Columns stores information about each property contained within the event..
	Columns map[string]*TypeAggregator
}

EventAggregator summarizes a set of events.

func NewEventAggregator

func NewEventAggregator(criticalPercentage float64) *EventAggregator

NewEventAggregator allocates a new EventAggregator.

func (*EventAggregator) Aggregate

func (e *EventAggregator) Aggregate(properties map[string]interface{})

Aggregate JSON objects.

func (*EventAggregator) ColumnShouldBePruned

func (e *EventAggregator) ColumnShouldBePruned(colAggregate *TypeAggregator) bool

ColumnShouldBePruned returns whether a property seen in set of events should be ignored.

func (*EventAggregator) Summarize

func (e *EventAggregator) Summarize() (int, []PropertySummary)

Summarize returns a summary of the properties seen for this set of events, as well as a count of number of events seen. It prunes any property that didn't occur in over CriticalPercent of events.

type EventProcessor

type EventProcessor interface {
	Accept(map[string]interface{})
	Flush(string)
}

EventProcessor processes events of a certain type and flushes metadata about the schema.

func NewNonTrackedEventProcessor

func NewNonTrackedEventProcessor(outputDir string) EventProcessor

NewNonTrackedEventProcessor allocates a new NonTrackedEventProcessor.

type EventRouter

type EventRouter struct {
	// CurrentTables maintains the current event names with schemas in bpdb. It is updated periodically.
	CurrentTables []string

	// Processors aggregate data about different event types.
	Processors map[string]EventProcessor

	// ProcessorFactory creates a new Processor for a previously unseen event type.
	ProcessorFactory func(string) EventProcessor

	// FlushTimer will peridically flush data about events to the output directory.
	FlushTimer <-chan time.Time

	// GzipReader is for reading files, and is re-used.
	GzipReader *gzip.Reader

	// OutputDir to place files.
	OutputDir string
	// contains filtered or unexported fields
}

EventRouter receives Mixpanel events, and for events that do not have a table yet, outputs files describing the table for that event.

func NewRouter

func NewRouter(
	outputDir string,
	flushInterval time.Duration,
	bpSchemaBackend bpdb.BpSchemaBackend,
) *EventRouter

NewRouter allocates a new EventRouter that outputs transformations to a given output directory.

func (*EventRouter) EventCreated

func (e *EventRouter) EventCreated(eventName string) bool

EventCreated returns true if the event has a table in bpdb.

func (*EventRouter) FlushRouters

func (e *EventRouter) FlushRouters()

FlushRouters flushes event schema descriptions to the output directory, and also deletes ones for which a table has been created (can happen under race condition).

func (*EventRouter) ReadFile

func (e *EventRouter) ReadFile(filename string) error

ReadFile reads a file of Mixpanel events and routes them to event aggregators. If the flush interval has expired, it will flush all even aggregators after reading the file.

func (*EventRouter) Route

func (e *EventRouter) Route(eventName string, properties map[string]interface{})

Route sends an event to its event aggregator, but only if the event does not have a table yet.

func (*EventRouter) UpdateCurrentTables

func (e *EventRouter) UpdateCurrentTables()

UpdateCurrentTables talks to bpdb and updates the list of tables that have been created.

type FileDumper

type FileDumper struct {
	// TargetDir is the directory to write files to.
	TargetDir string
}

FileDumper writes event data to a file within a directory.

func (*FileDumper) Dumper

func (f *FileDumper) Dumper(event string, output []byte) error

Dumper writes a event data as a JSON file in the given target directory.

type LengthEstimator

type LengthEstimator struct {
	Lengths []int
}

LengthEstimator computes the 99th percentile of a set of integers.

func (*LengthEstimator) Estimate

func (l *LengthEstimator) Estimate() int

Estimate returns the 99the percentile of the set of integers.

func (*LengthEstimator) Increment

func (l *LengthEstimator) Increment(size int)

Increment adds ann integer to the set of integers.

type MPEvent

type MPEvent struct {
	Event      string
	Properties map[string]interface{}
}

MPEvent is a Mixpanel event.

type NonTrackedEventProcessor

type NonTrackedEventProcessor struct {
	// Out outputs events to a directory.
	Out Outputter

	// Aggregator summarizes the properties for this event for the purposes of creating a SQL table.
	Aggregator *EventAggregator

	// In is the channel of event properties.
	In chan map[string]interface{}

	// F is a channel that receives the event name when we're done aggregating and want to compute the transformation.
	F chan string
}

NonTrackedEventProcessor takes in events

func (*NonTrackedEventProcessor) Accept

func (e *NonTrackedEventProcessor) Accept(propertyBag map[string]interface{})

Accept an event's properties.

func (*NonTrackedEventProcessor) Flush

func (e *NonTrackedEventProcessor) Flush(eventName string)

Flush events received. Label the flush with a given name.

func (*NonTrackedEventProcessor) Listen

func (e *NonTrackedEventProcessor) Listen()

Listen for events.

type Outputter

type Outputter interface {
	Output(string, []PropertySummary, int) error
}

Outputter outputs a given event's property summary and number of rows.

func NewOutputter

func NewOutputter(targetDir string) Outputter

NewOutputter create an Outputter that writes event transformation configs to a directory.

type PropertySummary

type PropertySummary struct {
	// Name of the property.
	Name string

	// OccurrenceProbability is an estimate of how often the field appears when the event is sent.
	OccurrenceProbability float64

	// T is the Go type of the property.
	T reflect.Type

	// Len gives an approximate length of the values for this property if it is a string.
	Len int
}

PropertySummary gives information about a field contained in an event.

type TypeAggregator

type TypeAggregator struct {
	Total  int
	Counts map[string]*TypeCounter
}

TypeAggregator counts values of potentially many different types.

func NewTypeAggregator

func NewTypeAggregator() *TypeAggregator

NewTypeAggregator allocates a new TypeAggregator.

func (*TypeAggregator) Aggregate

func (t *TypeAggregator) Aggregate(val interface{})

Aggregate decoded JSON values. Converts json.Number to int or float.

func (*TypeAggregator) Summarize

func (t *TypeAggregator) Summarize() PropertySummary

Summarize returns the summary for the type that occurred the most.

type TypeCounter

type TypeCounter struct {
	Type         reflect.Type
	Count        int
	LenEstimator LengthEstimator
}

TypeCounter counts occurrences of a specific type.

func (*TypeCounter) Aggregate

func (c *TypeCounter) Aggregate(val interface{})

Aggregate Go values of a single type. For strings, will store lengths of all strings for estimating 99th percentile.

func (*TypeCounter) Summarize

func (c *TypeCounter) Summarize() PropertySummary

Summarize values that have been aggregated.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL