aggregation

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package aggregation provides aggregation functions to group events as if we were using SQL statement such as GROUP BY.

Two main functions are provided:

  • GetLatestBy to get the latest event for each group. A group is defined by one or more columns.
  • GroupBy to group events by one or more columns and perform an aggregation for each group, like count(), sum() or max().

It is possible to use a single aggregation when calling GroupBy or to use multiple ones thanks to aggregation.Set. Please see the examples to see how to use the aggregators.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetLatestBy

func GetLatestBy(events []*data.Event, columns ...string) map[string]*data.Event

GetLatestBy groups lines by the given columns keeping the most recent one, thus without performing any aggregation.

Example
ctx := context.Background()
client := getBigTableClient(ctx)
jsonMapping, err := mapping.LoadMapping([]byte(jMapping))
if err != nil {
	log.Fatalln(err)
}
mapper := mapping.NewMapper(jsonMapping)
tbl := client.Open(table)

repo := repository.NewRepository(tbl, mapper)
eventSet, err := repo.Read(ctx, "contact-3")
if err != nil {
	log.Fatalln(err)
}
latest := GetLatestBy(eventSet.Events["front"], "device_type")

// we prefer to access each event individually as it's impossible to predict the order in which the events will be returned
fmt.Println(latest["Computer"].Date.UTC())
fmt.Println(latest["Computer"].Cells["device_type"])
fmt.Println(latest["Smartphone"].Date.UTC())
fmt.Println(latest["Smartphone"].Cells["device_type"])
Output:

1970-01-01 01:39:00 +0000 UTC
Computer
1970-01-01 01:38:00 +0000 UTC
Smartphone

func GroupBy

func GroupBy(events []*data.Event, agg func(line *data.Event, lines []*data.Event) *data.Event, columns ...string) map[string]*data.Event

GroupBy groups lines by the given columns, performing the given aggregation function.

Example
ctx := context.Background()
client := getBigTableClient(ctx)
jsonMapping, err := mapping.LoadMapping([]byte(jMapping))
if err != nil {
	log.Fatalln(err)
}
mapper := mapping.NewMapper(jsonMapping)
tbl := client.Open(table)

repo := repository.NewRepository(tbl, mapper)
eventSet, err := repo.Read(ctx, "contact-3")
if err != nil {
	log.Fatalln(err)
}
cnt := NewCount("count")
total := NewSum("amount", "total_amount")

set := NewAggregationSet()
set.Add(cnt.Compute)
set.Add(total.Compute)
grouped := GroupBy(eventSet.Events["front"], set.Compute, "device_type", "event_type")

fmt.Println(grouped["Computerpurchase"].Cells["device_type"])
fmt.Println(grouped["Computerpurchase"].Cells["event_type"])
fmt.Println(grouped["Computerpurchase"].Cells["count"])
fmt.Println(grouped["Computerpurchase"].Cells["total_amount"])
Output:

Computer
purchase
5
150

Types

type Average

type Average struct {
	// contains filtered or unexported fields
}

Average returns the average value of the given column in the given group.

func NewAverage

func NewAverage(column string, projection string) *Average

func (*Average) Compute

func (m *Average) Compute(e *data.Event, events []*data.Event) *data.Event

type Count

type Count struct {
	// contains filtered or unexported fields
}

Count returns the number of lines in the given group.

func NewCount

func NewCount(column string) *Count

func (*Count) Compute

func (c *Count) Compute(e *data.Event, events []*data.Event) *data.Event

type Max

type Max struct {
	// contains filtered or unexported fields
}

Max returns the maximum value of the given column in the given group.

func NewMax

func NewMax(column string, projection string) *Max

func (*Max) Compute

func (m *Max) Compute(e *data.Event, events []*data.Event) *data.Event

type Min

type Min struct {
	// contains filtered or unexported fields
}

Min returns the minimum value of the given column in the given group.

func NewMin

func NewMin(column string, projection string) *Min

func (*Min) Compute

func (m *Min) Compute(e *data.Event, events []*data.Event) *data.Event

type Set

type Set struct {
	// contains filtered or unexported fields
}

Set is a set of aggregations. It is designed to apply several aggregations to the same line.

func NewAggregationSet

func NewAggregationSet() *Set

func (*Set) Add

func (s *Set) Add(agg func(e *data.Event, events []*data.Event) *data.Event)

func (*Set) Compute

func (s *Set) Compute(e *data.Event, events []*data.Event) *data.Event

type Sum

type Sum struct {
	// contains filtered or unexported fields
}

Sum returns the sum of the given column in the given group.

func NewSum

func NewSum(column string, projection string) *Sum

func (*Sum) Compute

func (m *Sum) Compute(e *data.Event, events []*data.Event) *data.Event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL