View Source
var (
	MaxRetries    = 2
	MinExpiryTime = 100 * time.Millisecond


type AggregationElements added in v0.4.0

type AggregationElements struct {
	NonStatsElements                   []string
	StatsElements                      []string
	AggregatedSourceStatsElements      []string
	AggregatedDestinationStatsElements []string
	AntreaFlowEndSecondsElements       []string
	ThroughputElements                 []string
	SourceThroughputElements           []string
	DestinationThroughputElements      []string

type AggregationFlowRecord added in v0.4.0

type AggregationFlowRecord struct {
	Record entities.Record
	// Flow record contains mapping to its reference in priority queue.
	PriorityQueueItem *ItemToExpire
	// ReadyToSend is an indicator that we received all required records for the
	// given flow, i.e., records from source and destination nodes for the case
	// inter-node flow and record from the node for the case of intra-node flow.
	ReadyToSend bool
	// contains filtered or unexported fields

type AggregationInput added in v0.4.0

type AggregationInput struct {
	MessageChan           chan *entities.Message
	WorkerNum             int
	CorrelateFields       []string
	AggregateElements     *AggregationElements
	ActiveExpiryTimeout   time.Duration
	InactiveExpiryTimeout time.Duration

type AggregationProcess added in v0.3.1

type AggregationProcess struct {
	// contains filtered or unexported fields

func InitAggregationProcess

func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)

InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message), and correlateFields(fields to be correlated and filled).

func (*AggregationProcess) AggregateMsgByFlowKey added in v0.3.1

func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error

AggregateMsgByFlowKey gets flow key from records in message and stores in cache

func (*AggregationProcess) AreCorrelatedFieldsFilled added in v0.5.3

func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool

func (*AggregationProcess) AreExternalFieldsFilled added in v0.5.3

func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool

func (*AggregationProcess) ForAllExpiredFlowRecordsDo added in v0.5.1

func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error

func (*AggregationProcess) ForAllRecordsDo added in v0.3.1

func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error

ForAllRecordsDo takes in callback function to process the operations to flowkey->records pairs in the map

func (*AggregationProcess) GetExpiryFromExpirePriorityQueue added in v0.5.1

func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration

GetExpiryFromExpirePriorityQueue returns the earliest timestamp (active expiry or inactive expiry) from expire priority queue.

func (*AggregationProcess) GetNumFlows added in v0.5.9

func (a *AggregationProcess) GetNumFlows() int64

GetNumFlows returns total number of connections/flows stored in map

func (*AggregationProcess) GetRecords added in v0.5.9

func (a *AggregationProcess) GetRecords(flowKey *FlowKey) []map[string]interface{}

GetRecords returns map format flow records given a flow key. The key of the map is the element name and the value is the IE object. Returns partially matched flow records if the flow key is not complete. Returns all the flow records if the flow key is not provided.

func (*AggregationProcess) IsAggregatedRecordIPv4 added in v0.5.3

func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool

func (*AggregationProcess) ResetStatAndThroughputElementsInRecord added in v0.5.12

func (a *AggregationProcess) ResetStatAndThroughputElementsInRecord(record entities.Record) error

ResetStatAndThroughputElementsInRecord is called by the user after the aggregation record is sent after its expiry either by active or inactive expiry interval. This should be called by user after acquiring the mutex in the Aggregation process.

func (*AggregationProcess) SetCorrelatedFieldsFilled added in v0.5.3

func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)

func (*AggregationProcess) SetExternalFieldsFilled added in v0.5.3

func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)

func (*AggregationProcess) Start added in v0.3.1

func (a *AggregationProcess) Start()

func (*AggregationProcess) Stop added in v0.3.1

func (a *AggregationProcess) Stop()

type FlowKey added in v0.3.1

type FlowKey struct {
	SourceAddress      string
	DestinationAddress string
	Protocol           uint8
	SourcePort         uint16
	DestinationPort    uint16

type FlowKeyRecordMapCallBack added in v0.3.1

type FlowKeyRecordMapCallBack func(key FlowKey, record *AggregationFlowRecord) error

type ItemToExpire added in v0.5.1

type ItemToExpire struct {
	// contains filtered or unexported fields

type TimeToExpirePriorityQueue added in v0.5.1

type TimeToExpirePriorityQueue []*ItemToExpire

func (TimeToExpirePriorityQueue) Len added in v0.5.1

func (pq TimeToExpirePriorityQueue) Len() int

func (TimeToExpirePriorityQueue) Less added in v0.5.1

func (pq TimeToExpirePriorityQueue) Less(i, j int) bool

func (TimeToExpirePriorityQueue) Peek added in v0.5.1

Peek returns the item at the beginning of the queue, without removing the item or otherwise mutating the queue. It is safe to call directly.

func (*TimeToExpirePriorityQueue) Pop added in v0.5.1

func (pq *TimeToExpirePriorityQueue) Pop() interface{}

func (*TimeToExpirePriorityQueue) Push added in v0.5.1

func (pq *TimeToExpirePriorityQueue) Push(x interface{})

func (TimeToExpirePriorityQueue) Swap added in v0.5.1

func (pq TimeToExpirePriorityQueue) Swap(i, j int)

func (*TimeToExpirePriorityQueue) Update added in v0.5.1

func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, activeExpireTime time.Time, inactiveExpireTime time.Time)

update modifies the priority and flow record of an Item in the queue.

