engine

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2021 License: MIT Imports: 16 Imported by: 2

Documentation

Index

Constants

View Source
const MessageRootMonitorFinished = "MessageRootMonitorFinished"

MessageRootMonitorFinished is send from a root monitor when it has finished

View Source
const RuleKindSeparator = "."

RuleKindSeparator is the separator for rule kinds

View Source
const RuleKindWildcard = "*"

RuleKindWildcard is a wildcard for rule kinds

Variables

View Source
var EventTracer = &eventTrace{lock: &sync.Mutex{}, out: os.Stdout}

EventTracer is a debugging interface to the engine

Functions

func SortRuleSlice

func SortRuleSlice(a []*Rule)

SortRuleSlice sorts a slice of rules.

func UnitTestResetIDs

func UnitTestResetIDs()

UnitTestResetIDs reset all counting IDs. THIS FUNCTION SHOULD ONLY BE CALLED BY UNIT TESTS!

Types

type ChildMonitor

type ChildMonitor struct {
	// contains filtered or unexported fields
}

ChildMonitor is a monitor which is a descendant of a root monitor.

func (ChildMonitor) Activate

func (mb ChildMonitor) Activate(e *Event)

Activate activates this monitor.

func (ChildMonitor) Errors

func (mb ChildMonitor) Errors() *TaskError

Errors returns the error object of this monitor.

func (ChildMonitor) EventPath

func (mb ChildMonitor) EventPath() []*Event

EventPath returns the chain of events which created this monitor.

func (ChildMonitor) EventPathString

func (mb ChildMonitor) EventPathString() string

EventPathString returns the event path as a string.

func (ChildMonitor) Finish

func (mb ChildMonitor) Finish()

Finish finishes this monitor.

func (ChildMonitor) ID

func (mb ChildMonitor) ID() uint64

ID returns the monitor ID.

func (ChildMonitor) IsActivated

func (mb ChildMonitor) IsActivated() bool

IsActivated returns if this monitor has been activated.

func (ChildMonitor) IsFinished

func (mb ChildMonitor) IsFinished() bool

IsFinished returns if this monitor has finished.

func (ChildMonitor) NewChildMonitor

func (mb ChildMonitor) NewChildMonitor(priority int) Monitor

NewChildMonitor creates a new child monitor of this monitor.

func (ChildMonitor) Priority

func (mb ChildMonitor) Priority() int

Priority returns the priority of this monitor.

func (ChildMonitor) RootMonitor

func (mb ChildMonitor) RootMonitor() *RootMonitor

RootMonitor returns the root monitor of this monitor.

func (ChildMonitor) Scope

func (mb ChildMonitor) Scope() *RuleScope

Scope returns the rule scope of this monitor.

func (ChildMonitor) SetErrors

func (mb ChildMonitor) SetErrors(e *TaskError)

SetErrors adds an error object to this monitor.

func (ChildMonitor) Skip

func (mb ChildMonitor) Skip(e *Event)

Skip finishes this monitor without activation.

func (ChildMonitor) String

func (mb ChildMonitor) String() string

String returns a string representation of this monitor.

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event data structure

func NewEvent

func NewEvent(name string, kind []string, state map[interface{}]interface{}) *Event

NewEvent returns a new event object.

func (*Event) Kind

func (e *Event) Kind() []string

Kind returns the event kind.

func (*Event) Name

func (e *Event) Name() string

Name returns the event name.

func (*Event) State

func (e *Event) State() map[interface{}]interface{}

State returns the event state.

func (*Event) String

func (e *Event) String() string

type Monitor

type Monitor interface {

	/*
	   ID returns the monitor ID.
	*/
	ID() uint64

	/*
	   NewChildMonitor creates a new child monitor of this monitor.
	*/
	NewChildMonitor(priority int) Monitor

	/*
	   Scope returns the rule scope of this monitor.
	*/
	Scope() *RuleScope

	/*
	   Priority returns the monitors priority.
	*/
	Priority() int

	/*
		Activated returns if this monitor has been activated.
	*/
	IsActivated() bool

	/*
		Activate activates this monitor with a given event.
	*/
	Activate(e *Event)

	/*
	   Skip finishes this monitor without activation.
	*/
	Skip(e *Event)

	/*
		Finish finishes this monitor.
	*/
	Finish()

	/*
		Returns the root monitor of this monitor.
	*/
	RootMonitor() *RootMonitor

	/*
		Errors returns the error object of this monitor.
	*/
	Errors() *TaskError

	/*
		SetErrors adds an error object to this monitor.
	*/
	SetErrors(e *TaskError)

	/*
		EventPath returns the chain of events which created this monitor.
	*/
	EventPath() []*Event

	/*
	   EventPathString returns the event path as a string.
	*/
	EventPathString() string

	/*
		String returns a string representation of this monitor.
	*/
	String() string
}

Monitor monitors events as they are cascading. Event cascades will produce tree structures.

type Processor

type Processor interface {

	/*
	   ID returns the processor ID.
	*/
	ID() uint64

	/*
	   ThreadPool returns the thread pool which this processor is using.
	*/
	ThreadPool() *pool.ThreadPool

	/*
	   Workers returns the number of threads of this processor.
	*/
	Workers() int

	/*
	   Reset removes all stored rules from this processor.
	*/
	Reset() error

	/*
	   AddRule adds a new rule to the processor.
	*/
	AddRule(rule *Rule) error

	/*
	   Rules returns all loaded rules.
	*/
	Rules() map[string]*Rule

	/*
	   Start starts this processor.
	*/
	Start()

	/*
	   Finish will finish all remaining tasks and then stop the processor.
	*/
	Finish()

	/*
	   Stopped returns if the processor is stopped.
	*/
	Stopped() bool

	/*
	   Status returns the status of the processor (Running / Stopping / Stopped).
	*/
	Status() string

	/*
	   NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
	   root events.
	*/
	NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor

	/*
		SetRootMonitorErrorObserver specifies an observer which is triggered
		when a root monitor of this processor has finished and returns errors.
		By default this is set to nil (no observer).
	*/
	SetRootMonitorErrorObserver(func(rm *RootMonitor))

	/*
		SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
		If set to false (default) then all rules in a trigger sequence for a specific event
		are executed. If set to true then the first rule which returns an error will stop
		the trigger sequence. Events which have been added by the failing rule are still processed.
	*/
	SetFailOnFirstErrorInTriggerSequence(bool)

	/*
	   AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
	   to finish. If a monitor is passed then it must be a RootMonitor.
	*/
	AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error)

	/*
	   AddEvent adds a new event to the processor. Returns the monitor if the event
	   triggered a rule and nil if the event was skipped.
	*/
	AddEvent(event *Event, parentMonitor Monitor) (Monitor, error)

	/*
	   IsTriggering checks if a given event triggers a loaded rule. This does not the
	   actual state matching for speed.
	*/
	IsTriggering(event *Event) bool

	/*
		ProcessEvent processes an event by determining which rules trigger and match
		the given event. This function must receive a unique thread ID from the
		executing thread.
	*/
	ProcessEvent(tid uint64, event *Event, parent Monitor) map[string]error

	/*
	   String returns a string representation the processor.
	*/
	String() string
}

Processor is the main object of the event engine. It coordinates the thread pool and rule index. Rules can only be added if the processor is stopped. Events can only be added if the processor is not stopped.

func NewProcessor

func NewProcessor(workerCount int) Processor

NewProcessor creates a new event processor with a given number of workers.

type RootMonitor

type RootMonitor struct {
	// contains filtered or unexported fields
}

RootMonitor is a monitor which is at a beginning of an event cascade.

func (RootMonitor) Activate

func (mb RootMonitor) Activate(e *Event)

Activate activates this monitor.

func (*RootMonitor) AllErrors

func (rm *RootMonitor) AllErrors() []*TaskError

AllErrors returns all error which have been collected in this root monitor.

func (RootMonitor) Errors

func (mb RootMonitor) Errors() *TaskError

Errors returns the error object of this monitor.

func (RootMonitor) EventPath

func (mb RootMonitor) EventPath() []*Event

EventPath returns the chain of events which created this monitor.

func (RootMonitor) EventPathString

func (mb RootMonitor) EventPathString() string

EventPathString returns the event path as a string.

func (RootMonitor) Finish

func (mb RootMonitor) Finish()

Finish finishes this monitor.

func (*RootMonitor) HighestPriority

func (rm *RootMonitor) HighestPriority() int

HighestPriority returns the highest priority which is handled by this monitor.

func (RootMonitor) ID

func (mb RootMonitor) ID() uint64

ID returns the monitor ID.

func (RootMonitor) IsActivated

func (mb RootMonitor) IsActivated() bool

IsActivated returns if this monitor has been activated.

func (RootMonitor) IsFinished

func (mb RootMonitor) IsFinished() bool

IsFinished returns if this monitor has finished.

func (RootMonitor) NewChildMonitor

func (mb RootMonitor) NewChildMonitor(priority int) Monitor

NewChildMonitor creates a new child monitor of this monitor.

func (RootMonitor) Priority

func (mb RootMonitor) Priority() int

Priority returns the priority of this monitor.

func (RootMonitor) RootMonitor

func (mb RootMonitor) RootMonitor() *RootMonitor

RootMonitor returns the root monitor of this monitor.

func (RootMonitor) Scope

func (mb RootMonitor) Scope() *RuleScope

Scope returns the rule scope of this monitor.

func (RootMonitor) SetErrors

func (mb RootMonitor) SetErrors(e *TaskError)

SetErrors adds an error object to this monitor.

func (*RootMonitor) SetFinishHandler

func (rm *RootMonitor) SetFinishHandler(fh func(Processor))

SetFinishHandler adds a handler function to this monitor which is called once this monitor has finished.

func (RootMonitor) Skip

func (mb RootMonitor) Skip(e *Event)

Skip finishes this monitor without activation.

func (RootMonitor) String

func (mb RootMonitor) String() string

String returns a string representation of this monitor.

type Rule

type Rule struct {
	Name            string                 // Name of the rule
	Desc            string                 // Description of the rule (optional)
	KindMatch       []string               // Match on event kinds
	ScopeMatch      []string               // Match on event cascade scope
	StateMatch      map[string]interface{} // Match on event state
	Priority        int                    // Priority of the rule
	SuppressionList []string               // List of suppressed rules by this rule
	Action          RuleAction             // Action of the rule
}

Rule models a matching rule for event receivers (actions). A rule has 3 possible matching criteria:

- Match on event kinds: A list of strings in dot notation which describes event kinds. May contain '*' characters as wildcards (e.g. core.tests.*).

- Match on event cascade scope: A list of strings in dot notation which describe the required scopes of an event cascade.

- Match on event state: A simple list of required key / value states in the event state. Nil values can be used as wildcards (i.e. match is only on key).

Rules have priorities (0 being the highest) and may suppress each other.

func (*Rule) CopyAs

func (r *Rule) CopyAs(newName string) *Rule

CopyAs returns a shallow copy of this rule with a new name.

func (*Rule) String

func (r *Rule) String() string

type RuleAction

type RuleAction func(p Processor, m Monitor, e *Event, tid uint64) error

RuleAction is an action which is executed by a matching rule. The action gets a unique thread ID from the executing thread.

type RuleIndex

type RuleIndex interface {

	/*
	   AddRule adds a new rule to the index.
	*/
	AddRule(rule *Rule) error

	IsTriggering(event *Event) bool

	/*
		Match returns all rules in this index which match a given event. This
		method does a full matching check including state matching.
	*/
	Match(event *Event) []*Rule

	/*
		String returns a string representation of this rule index and all subindexes.
	*/
	String() string

	/*
		Rules returns all rules with the given prefix in the name. Use the empty
		string to return all rules.
	*/
	Rules() map[string]*Rule
}

RuleIndex is an index for rules. It takes the form of a tree structure in which incoming events are matched level by level (e.g. event of kind core.task1.step1 is first matched by kind "core" then "task1" and then "step1". At the leaf of the index tree it may then be matched on a state condition).

func NewRuleIndex

func NewRuleIndex() RuleIndex

NewRuleIndex creates a new rule container for efficient event matching.

type RuleIndexAll

type RuleIndexAll struct {
	// contains filtered or unexported fields
}

RuleIndexAll data structure.

func (*RuleIndexAll) Type

func (ri *RuleIndexAll) Type() string

Type returns the type of the rule sub index.

type RuleIndexKind

type RuleIndexKind struct {
	// contains filtered or unexported fields
}

RuleIndexKind data structure.

func (*RuleIndexKind) AddRule

func (ri *RuleIndexKind) AddRule(rule *Rule) error

AddRule adds a new rule to the index.

func (*RuleIndexKind) IsTriggering

func (ri *RuleIndexKind) IsTriggering(event *Event) bool

IsTriggering checks if a given event triggers a rule in this index.

func (*RuleIndexKind) Match

func (ri *RuleIndexKind) Match(event *Event) []*Rule

Match returns all rules in this index which match a given event. This method does a full matching check including state matching.

func (*RuleIndexKind) String

func (ri *RuleIndexKind) String() string

String returns a string representation of this rule index and all subindexes.

func (*RuleIndexKind) Type

func (ri *RuleIndexKind) Type() string

Type returns the type of the rule sub index.

type RuleIndexState

type RuleIndexState struct {
	// contains filtered or unexported fields
}

RuleIndexState data structure

func (*RuleIndexState) Type

func (ri *RuleIndexState) Type() string

Type returns the type of the rule sub index.

type RuleMatcherKey

type RuleMatcherKey struct {
	// contains filtered or unexported fields
}

RuleMatcherKey is used for pure key - value state matches.

func (*RuleMatcherKey) String

func (rm *RuleMatcherKey) String() string

String returns a string representation of this key matcher.

type RuleScope

type RuleScope struct {
	// contains filtered or unexported fields
}

RuleScope is a set of scope definitions for rules. Each definition allows or disallows a set of rule types. Scope definitions and rule sets are usually expressed with named paths (scope paths) in dot notation (e.g. core.data.read).

func NewRuleScope

func NewRuleScope(allows map[string]bool) *RuleScope

NewRuleScope creates a new rule scope object with an initial set of definitions.

func (*RuleScope) Add

func (rs *RuleScope) Add(scopePath string, allow bool)

Add adds a given definition to the rule scope.

func (*RuleScope) AddAll

func (rs *RuleScope) AddAll(allows map[string]bool)

AddAll adds all given definitions to the rule scope.

func (*RuleScope) IsAllowed

func (rs *RuleScope) IsAllowed(scopePath string) bool

IsAllowed checks if a given scope path is allowed within this rule scope.

func (*RuleScope) IsAllowedAll

func (rs *RuleScope) IsAllowedAll(scopePaths []string) bool

IsAllowedAll checks if all given scopes are allowed.

type RuleSlice

type RuleSlice []*Rule

RuleSlice is a slice of rules

func (RuleSlice) Len

func (s RuleSlice) Len() int

func (RuleSlice) Less

func (s RuleSlice) Less(i, j int) bool

func (RuleSlice) Swap

func (s RuleSlice) Swap(i, j int)

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task models a task which is created and executed by the processor.

func (*Task) HandleError

func (t *Task) HandleError(e error)

HandleError handles an error which occurred during the run method.

func (*Task) Run

func (t *Task) Run(tid uint64) error

Run the task.

func (*Task) String

func (t *Task) String() string

Returns a string representation of this task.

type TaskError

type TaskError struct {
	ErrorMap map[string]error // Rule errors (rule name -> error)
	Event    *Event           // Event which caused the error
	Monitor  Monitor          // Event monitor
}

TaskError datastructure to collect all rule errors of an event.

func (*TaskError) Error

func (te *TaskError) Error() string

Error returns a string representation of this error.

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue models the queue of tasks for a processor.

func NewTaskQueue

func NewTaskQueue(ep *pubsub.EventPump) *TaskQueue

NewTaskQueue creates a new TaskQueue object.

func (*TaskQueue) Clear

func (tq *TaskQueue) Clear()

Clear the queue of all pending tasks.

func (*TaskQueue) Pop

func (tq *TaskQueue) Pop() pool.Task

Pop returns the next task from the queue.

func (*TaskQueue) Push

func (tq *TaskQueue) Push(t pool.Task)

Push adds another task to the queue.

func (*TaskQueue) Size

func (tq *TaskQueue) Size() int

Size returns the size of the queue.

Directories

Path Synopsis
Package pool contains a thread pool implementation.
Package pool contains a thread pool implementation.
Package pubsub contains a pub/sub event handling implementation.
Package pubsub contains a pub/sub event handling implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL