aggregate

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2019 License: BSD-3-Clause Imports: 9 Imported by: 3

README

Aggregate

This activity allows you to aggregate data and calculate an average or sliding average.

Installation

Flogo Web

This activity comes out of the box with the Flogo Web UI

Flogo CLI
flogo install github.com/project-flogo/stream/activity/aggregate

Schema

{
  "settings": [
    {
      "name": "function",
      "type": "string",
      "required": true,
      "allowed" : ["avg", "sum", "min", "max", "count", "accumulate"]
    },
    {
      "name": "windowType",
      "type": "string",
      "required": true,
      "allowed" : ["tumbling", "sliding", "timeTumbling", "timeSliding"]
    },
    {
      "name": "windowSize",
      "type": "integer",
      "required": true
    },
    {
      "name": "resolution",
      "type": "integer"
    },
    {
      "name": "proceedOnlyOnEmit",
      "type": "boolean"
    },
    {
      "name": "additionalSettings",
      "type": "string"
    }
  ],
  "input":[
    {
      "name": "value",
      "type": "any"
    }
  ],
  "output": [
    {
      "name": "result",
      "type": "any"
    },
    {
      "name": "report",
      "type": "boolean"
    }
  ]
}
Details
Settings:
Setting Required Description
function true The aggregate function (ex. avg,sum,min,max,count)
windowType true The type of window (ex. tumbling,sliding,timeTumbling,timeSliding)
windowSize true The window size of the values to aggregate
resolution false The window resolution
proceedOnlyOnEmit false Proceed to the next activity only on emit of a result
additionalSettings false Additional settings for particular functions
note : if using this activity in a flow, proceedOnlyOnEmit should be set to false
Input:
Name Description
value The input value
Output:
Name Description
report Indicates if the value should be reported
result The result of the aggregation

Example

The below example sums of all the values in a tumbling time window of 5 seconds:

{
  "ref": "github.com/project-flogo/stream/activity/aggregate",
  "settings": {
    "function": "sum",
    "windowType": "timeTumbling",
    "windowSize": "5000"
  },
  "input": {
    "value": "=$.input"
  }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func NewSlidingTimeWindow

func NewSlidingTimeWindow(function string, settings *window.Settings) (window.TimeWindow, error)

NewSlidingTimeWindow creates a new sliding time window, all time windows are managed externally and are progressed using the NextBlock() method

func NewSlidingWindow

func NewSlidingWindow(function string, settings *window.Settings) (window.Window, error)

func NewTumblingTimeWindow

func NewTumblingTimeWindow(function string, settings *window.Settings) (window.TimeWindow, error)

NewTumblingTimeWindow creates a new tumbling time window, all time windows are managed externally and are progressed using the NextBlock() method

func NewTumblingWindow

func NewTumblingWindow(function string, settings *window.Settings) (window.Window, error)

Types

type Activity

type Activity struct {
	// contains filtered or unexported fields
}

Activity is an Activity that is used to Aggregate a message to the console

func (*Activity) Eval

func (a *Activity) Eval(ctx activity.Context) (done bool, err error)

Eval implements api.Activity.Eval - Aggregates the Message

func (*Activity) Metadata

func (a *Activity) Metadata() *activity.Metadata

Metadata returns the activity's metadata

func (*Activity) PostEval

func (a *Activity) PostEval(ctx activity.Context, userData interface{}) (done bool, err error)

type Input

type Input struct {
	Value interface{} `md:"value"`
}

type Output

type Output struct {
	Report bool        `md:"report"`
	Result interface{} `md:"result"`
}

type Settings

type Settings struct {
	Function           string `md:"function,required,allowed(avg,sum,min,max,count,accumulate)"`
	WindowType         string `md:"windowType,required,allowed(tumbling,sliding,timeTumbling,timeSliding)"`
	WindowSize         int    `md:"windowSize,required"`
	Resolution         int    `md:"resolution"`
	ProceedOnlyOnEmit  bool   `md:"proceedOnlyOnEmit"`
	AdditionalSettings string `md:"additionalSettings"`
}

we can generate json from this! - we could also create a "validate-able" object from this

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL