Documentation ¶
Overview ¶
Package processor contains logic to process gzipped Mixpanel event data into SQL table schemas.
Index ¶
- Variables
- func Percentile(nums []int, percentile int) float64
- func ScoopTransformer(eventName string, properties []PropertySummary, nRows int) ([]byte, error)
- type AugmentedColumnDefinition
- type AugmentedEventConfig
- type EventAggregator
- type EventProcessor
- type EventRouter
- type FileDumper
- type LengthEstimator
- type MPEvent
- type NonTrackedEventProcessor
- type Outputter
- type PropertySummary
- type TypeAggregator
- type TypeCounter
Constants ¶
This section is empty.
Variables ¶
var ( // CriticalPercentage is the percentage of events that a property must be seen in in order to be considered part of the schema for an event. CriticalPercentage = 0.0 // CriticalThreshold is the number of events of a specific event name that must occur for the event to be summarized. CriticalThreshold = 2 )
Functions ¶
func Percentile ¶
Percentile computes the percentile of a set of integers using http://en.wikipedia.org/wiki/Percentile#Alternative_methods
func ScoopTransformer ¶
func ScoopTransformer(eventName string, properties []PropertySummary, nRows int) ([]byte, error)
ScoopTransformer returns JSON corresponding to the configuration for converting events of a given event name to a SQL table.
Types ¶
type AugmentedColumnDefinition ¶
type AugmentedColumnDefinition struct { // InboundName is the name of the property as sent to edge servers. InboundName string // OutboundName is the name of the property that will be stored in redshift. OutboundName string // Transformer is the SQL type for the column corresponding to this property. Transformer string // ColumnCreationOptions are additional options/parameters for the SQL type, e.g. for varchar, "(255)" ColumnCreationOptions string // OccurrenceProbability is how often this property appears in events. OccurrenceProbability float64 }
AugmentedColumnDefinition adds some metadata to a property of an event.
type AugmentedEventConfig ¶
type AugmentedEventConfig struct { // EventName is the name of the event. EventName string // Columns is the metadata required to create columns for each property of the event. Columns []AugmentedColumnDefinition // Occurred is the number of times the event occurred. Occurred int }
AugmentedEventConfig gives the configuration for creating a table for a given event name.
type EventAggregator ¶
type EventAggregator struct { // CriticalPercent is the threshold percent of events that contain a given property, under which a property will be ommitted from the event summary. CriticalPercent float64 // TotalRows is a count of events seen. TotalRows int // Columns stores information about each property contained within the event.. Columns map[string]*TypeAggregator }
EventAggregator summarizes a set of events.
func NewEventAggregator ¶
func NewEventAggregator(criticalPercentage float64) *EventAggregator
NewEventAggregator allocates a new EventAggregator.
func (*EventAggregator) Aggregate ¶
func (e *EventAggregator) Aggregate(properties map[string]interface{})
Aggregate JSON objects.
func (*EventAggregator) ColumnShouldBePruned ¶
func (e *EventAggregator) ColumnShouldBePruned(colAggregate *TypeAggregator) bool
ColumnShouldBePruned returns whether a property seen in set of events should be ignored.
func (*EventAggregator) Summarize ¶
func (e *EventAggregator) Summarize() (int, []PropertySummary)
Summarize returns a summary of the properties seen for this set of events, as well as a count of number of events seen. It prunes any property that didn't occur in over CriticalPercent of events.
type EventProcessor ¶
EventProcessor processes events of a certain type and flushes metadata about the schema.
func NewNonTrackedEventProcessor ¶
func NewNonTrackedEventProcessor(outputDir string) EventProcessor
NewNonTrackedEventProcessor allocates a new NonTrackedEventProcessor.
type EventRouter ¶
type EventRouter struct { // CurrentTables maintains the current event names with schemas in bpdb. It is updated periodically. CurrentTables []string // Processors aggregate data about different event types. Processors map[string]EventProcessor // ProcessorFactory creates a new Processor for a previously unseen event type. ProcessorFactory func(string) EventProcessor // FlushTimer will peridically flush data about events to the output directory. FlushTimer <-chan time.Time // GzipReader is for reading files, and is re-used. GzipReader *gzip.Reader // OutputDir to place files. OutputDir string // contains filtered or unexported fields }
EventRouter receives Mixpanel events, and for events that do not have a table yet, outputs files describing the table for that event.
func NewRouter ¶
func NewRouter( outputDir string, flushInterval time.Duration, bpSchemaBackend bpdb.BpSchemaBackend, ) *EventRouter
NewRouter allocates a new EventRouter that outputs transformations to a given output directory.
func (*EventRouter) EventCreated ¶
func (e *EventRouter) EventCreated(eventName string) bool
EventCreated returns true if the event has a table in bpdb.
func (*EventRouter) FlushRouters ¶
func (e *EventRouter) FlushRouters()
FlushRouters flushes event schema descriptions to the output directory, and also deletes ones for which a table has been created (can happen under race condition).
func (*EventRouter) ReadFile ¶
func (e *EventRouter) ReadFile(filename string) error
ReadFile reads a file of Mixpanel events and routes them to event aggregators. If the flush interval has expired, it will flush all even aggregators after reading the file.
func (*EventRouter) Route ¶
func (e *EventRouter) Route(eventName string, properties map[string]interface{})
Route sends an event to its event aggregator, but only if the event does not have a table yet.
func (*EventRouter) UpdateCurrentTables ¶
func (e *EventRouter) UpdateCurrentTables()
UpdateCurrentTables talks to bpdb and updates the list of tables that have been created.
type FileDumper ¶
type FileDumper struct { // TargetDir is the directory to write files to. TargetDir string }
FileDumper writes event data to a file within a directory.
type LengthEstimator ¶
type LengthEstimator struct {
Lengths []int
}
LengthEstimator computes the 99th percentile of a set of integers.
func (*LengthEstimator) Estimate ¶
func (l *LengthEstimator) Estimate() int
Estimate returns the 99the percentile of the set of integers.
func (*LengthEstimator) Increment ¶
func (l *LengthEstimator) Increment(size int)
Increment adds ann integer to the set of integers.
type NonTrackedEventProcessor ¶
type NonTrackedEventProcessor struct { // Out outputs events to a directory. Out Outputter // Aggregator summarizes the properties for this event for the purposes of creating a SQL table. Aggregator *EventAggregator // In is the channel of event properties. In chan map[string]interface{} // F is a channel that receives the event name when we're done aggregating and want to compute the transformation. F chan string }
NonTrackedEventProcessor takes in events
func (*NonTrackedEventProcessor) Accept ¶
func (e *NonTrackedEventProcessor) Accept(propertyBag map[string]interface{})
Accept an event's properties.
func (*NonTrackedEventProcessor) Flush ¶
func (e *NonTrackedEventProcessor) Flush(eventName string)
Flush events received. Label the flush with a given name.
func (*NonTrackedEventProcessor) Listen ¶
func (e *NonTrackedEventProcessor) Listen()
Listen for events.
type Outputter ¶
type Outputter interface {
Output(string, []PropertySummary, int) error
}
Outputter outputs a given event's property summary and number of rows.
func NewOutputter ¶
NewOutputter create an Outputter that writes event transformation configs to a directory.
type PropertySummary ¶
type PropertySummary struct { // Name of the property. Name string // OccurrenceProbability is an estimate of how often the field appears when the event is sent. OccurrenceProbability float64 // T is the Go type of the property. T reflect.Type // Len gives an approximate length of the values for this property if it is a string. Len int }
PropertySummary gives information about a field contained in an event.
type TypeAggregator ¶
type TypeAggregator struct { Total int Counts map[string]*TypeCounter }
TypeAggregator counts values of potentially many different types.
func NewTypeAggregator ¶
func NewTypeAggregator() *TypeAggregator
NewTypeAggregator allocates a new TypeAggregator.
func (*TypeAggregator) Aggregate ¶
func (t *TypeAggregator) Aggregate(val interface{})
Aggregate decoded JSON values. Converts json.Number to int or float.
func (*TypeAggregator) Summarize ¶
func (t *TypeAggregator) Summarize() PropertySummary
Summarize returns the summary for the type that occurred the most.
type TypeCounter ¶
type TypeCounter struct { Type reflect.Type Count int LenEstimator LengthEstimator }
TypeCounter counts occurrences of a specific type.
func (*TypeCounter) Aggregate ¶
func (c *TypeCounter) Aggregate(val interface{})
Aggregate Go values of a single type. For strings, will store lengths of all strings for estimating 99th percentile.
func (*TypeCounter) Summarize ¶
func (c *TypeCounter) Summarize() PropertySummary
Summarize values that have been aggregated.