Aggregate


This activity allows you to aggregate data and calculate an average or sliding average.


Flogo Web

This activity comes out of the box with the Flogo Web UI

Flogo CLI

flogo install


Inputs and Outputs:

      "name": "function",
      "type": "string",
      "required": true,
      "allowed" : ["block_avg", "moving_avg", "timeblockavg"]
      "name": "windowSize",
      "type": "integer",
      "required": true
      "name": "value",
      "type": "number"
  "output": [
      "name": "result",
      "type": "number"
      "name": "report",
      "type": "boolean"


Setting Required Description
function True The aggregate fuction, currently only average is supported
windowSize True The window size of the values to aggregate
value False The value to aggregate


The below example aggregates a 'temperature' attribute with a moving window of size 5:

"id": "aggregate_4",
"name": "Aggregate",
"description": "Simple Aggregator Activity",
"activity": {
  "ref": "",
  "input": {
    "function": "average",
    "windowSize": "5"
  "mappings": {
    "input": [
        "type": "assign",
        "value": "temperature",
        "mapTo": "value"




func New

func New(config *activity.Config) (activity.Activity, error)

func NewActivity

func NewActivity(md *activity.Metadata) activity.Activity

NewActivity creates a new AppActivity

func NewSlidingTimeWindow

func NewSlidingTimeWindow(function string, settings *window.Settings) (window.TimeWindow, error)

NewSlidingTimeWindow creates a new sliding time window, all time windows are managed externally and are progressed using the NextBlock() method

func NewSlidingWindow

func NewSlidingWindow(function string, settings *window.Settings) (window.Window, error)

func NewTumblingTimeWindow

func NewTumblingTimeWindow(function string, settings *window.Settings) (window.TimeWindow, error)

NewTumblingTimeWindow creates a new tumbling time window, all time windows are managed externally and are progressed using the NextBlock() method

func NewTumblingWindow

func NewTumblingWindow(function string, settings *window.Settings) (window.Window, error)


type AggregateActivity

type AggregateActivity struct {
	// contains filtered or unexported fields

AggregateActivity is an Activity that is used to Aggregate a message to the console

func (*AggregateActivity) Eval

func (a *AggregateActivity) Eval(ctx activity.Context) (done bool, err error)

Eval implements api.Activity.Eval - Aggregates the Message

func (*AggregateActivity) Metadata

func (a *AggregateActivity) Metadata() *activity.Metadata

Metadata returns the activity's metadata

func (*AggregateActivity) PostEval

func (a *AggregateActivity) PostEval(ctx activity.Context, userData interface{}) (done bool, err error)

type Settings

type Settings struct {
	Function           string `md:"function,required,allowed(avg,sum,min,max,count)"`
	WindowType         string `md:"windowType,required,allowed(tumbling,sliding,timeTumbling,timeSliding)"`
	WindowSize         int    `md:"windowSize,required"`
	ProceedOnlyOnEmit  bool
	Resolution         int
	AdditionalSettings map[string]string

we can generate json from this! - we could also create a "validate-able" object from this


