canopsis

package
v0.0.0-...-d841f61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2021 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExitOK = iota
	ExitPanic
	ExitEngine
)

Returned values on exit

View Source
const (
	EnvCpsAMQPTestExchange          = "canopsis.tests"
	ActionEngineName                = "engine-action"
	ActionQueueName                 = "Engine_action"
	ActionAxeRPCClientQueueName     = "Engine_action_axe_rpc_client"
	ActionWebhookRPCClientQueueName = "Engine_action_webhook_rpc_client"
	ActionConsumerName              = "action"
	ActionRPCConsumerName           = "action_rpc"
	AxeExchangeName                 = "engine-axe"
	AxeQueueName                    = "Engine_axe"
	AxeServiceRPCClientQueueName    = "Engine_axe_service_rpc_client"
	AxePbehaviorRPCClientQueueName  = "Engine_axe_pbehavior_rpc_client"
	AxeRPCQueueServerName           = "Engine_axe_rpc_server"
	AxeConsumerName                 = "axe"
	AxeRPCConsumerName              = "axe_rpc"
	CheExchangeName                 = "canopsis.events"
	CheEngineName                   = "engine-che"
	CheQueueName                    = "Engine_che"
	CheConsumerName                 = "che"
	DbName                          = mongo.DB
	DefaultBulkSize                 = 1000
	DefaultEventAuthor              = "system"
	DoneAutosolveDelay              = 15 * 60
	DynamicInfosQueueName           = "Engine_dynamic_infos"
	DynamicInfosConsumerName        = "dynamic-infos"
	HeartBeatExchangeName           = "canopsis.events"
	HeartBeatQueueName              = "Engine_heartbeat"
	HeartBeatConsumerName           = "heartbeat"
	MaxPythonTimestamp              = 253402297199 // compat: max timestamp in python
	StatsExchangeName               = "canopsis.events"
	StatsQueueName                  = "Engine_stat"
	StatsConsumerName               = "stat"
	StatsDatabase                   = "canopsis"
	StatsMeasurement                = "event_state_history"
	StatsngExchangeName             = "amq.direct"
	StatsngQueueName                = "Engine_statsng"
	PBehaviorEngineName             = "engine-pbehavior"
	PBehaviorQueueName              = "Engine_pbehavior"
	PBehaviorRPCQueueServerName     = "Engine_pbehavior_rpc_server"
	PBehaviorConsumerName           = "pbehavior"
	PBehaviorRPCConsumerName        = "pbehavior_rpc"
	PluginExtension                 = ".so"
	ServiceEngineName               = "engine-service"
	ServiceQueueName                = "Engine_service"
	ServiceRPCQueueServerName       = "Engine_service_rpc_server"
	ServiceConsumerName             = "service"
	ServiceRPCConsumerName          = "service_rpc"
	WebhookRPCQueueServerName       = "Engine_webhook_rpc_server"
	WebhookRPCConsumerName          = "webhook_rpc"
	FIFOExchangeName                = ""
	FIFOQueueName                   = "Engine_fifo"
	FIFOAckExchangeName             = ""
	FIFOAckQueueName                = "FIFO_ack"
	FIFOConsumerName                = "fifo"
	CorrelationQueueName            = "Engine_correlation"
	CorrelationConsumerName         = "correlation"
	PeriodicalWaitTime              = time.Minute
	JsonContentType                 = "application/json"
	CanopsisEventsExchange          = "canopsis.events"
)

Globals

Variables

View Source
var BuildDate string

BuildDate ...

View Source
var BuildGitBranch string

BuildGitBranch ...

View Source
var BuildGitCommit string

BuildGitCommit is the short version of git commit

Functions

func PrintVersion

func PrintVersion()

PrintVersion outputs versions informations

func PrintVersionExit

func PrintVersionExit()

PrintVersionExit calls PrintVersion then exit(0)

func StartEngine

func StartEngine(ctx context.Context, engine Engine, waitChan *chan os.Signal) (int, error)

StartEngine handles starting the WorkerProcess and PeriodicalProcess of your Engine. It first calls Engine.Initialize(), returns an error if any, then proceed to signals binding.

Engine.PeriodicalProcess and Engine.WorkerProcess are launched into separate goroutines.

Only SIGTERM or SIGINT will trigger the Engine.Stop() method.

waitChan is optional: if nil a chan will be instanciated and managed for you. This is mainly for tests.

Types

type DefaultEngine

type DefaultEngine struct {
	ConfirmChan   chan bool
	Sub           libamqp.Channel
	Sleep         time.Duration
	RunWorker     bool
	RunPeriodical bool
	Debug         bool

	RunInfoManager engine.RunInfoManager
	// contains filtered or unexported fields
}

DefaultEngine provides basic functions and will behave as an Engine with PeriodicalProcess and WorkerProcess.

Stop() will automatically close the Sub channel and waiting for confirmations to come on ConfirmChan.

func NewDefaultEngine

func NewDefaultEngine(
	sleep time.Duration,
	runworker, runperiodical bool,
	sub libamqp.Channel,
	logger zerolog.Logger,
	runInfoManager ...engine.RunInfoManager,
) DefaultEngine

NewDefaultEngine returns an default engine implementation. Check engine_test.go in this package to see how you can use it.

func (*DefaultEngine) AckMessage

func (de *DefaultEngine) AckMessage(msg amqp.Delivery)

func (*DefaultEngine) AcknowledgeConfirmStop

func (de *DefaultEngine) AcknowledgeConfirmStop()

AcknowledgeConfirmStop sends true to ConfirmChan once for each call. Do NOT override this method when using DefaultEngine in your struct.

func (*DefaultEngine) AskStop

func (de *DefaultEngine) AskStop(state int)

AskStop sends SIGINT into the channel where StartEngine waits for sigint/term, leading to a proper engine stop.

func (*DefaultEngine) ConfirmPeriodicalStop

func (de *DefaultEngine) ConfirmPeriodicalStop()

ConfirmPeriodicalStop sends one value to ConfirmChan.

func (*DefaultEngine) ConfirmWorkerStop

func (de *DefaultEngine) ConfirmWorkerStop()

ConfirmWorkerStop sends one value to ConfirmChan.

func (*DefaultEngine) Continue

func (de *DefaultEngine) Continue() bool

Continue returns the DefaultEngine.cont flag.

func (*DefaultEngine) GetRunInfo

func (de *DefaultEngine) GetRunInfo() engine.RunInfo

func (*DefaultEngine) Initialize

func (de *DefaultEngine) Initialize(ctx context.Context) error

Initialize does nothing here besides returning nil.

func (*DefaultEngine) Logger

func (de *DefaultEngine) Logger() *zerolog.Logger

func (*DefaultEngine) NackMessage

func (de *DefaultEngine) NackMessage(msg amqp.Delivery)

func (*DefaultEngine) PeriodicalEnd

func (de *DefaultEngine) PeriodicalEnd()

func (*DefaultEngine) PeriodicalWaitTime

func (de *DefaultEngine) PeriodicalWaitTime() time.Duration

PeriodicalWaitTime returns the Sleep attribute.

func (*DefaultEngine) ProcessWorkerError

func (de *DefaultEngine) ProcessWorkerError(err error, msg amqp.Delivery)

ProcessWorkerError nacks message if external services are not reachable and stops engine. It acks messages on other errors and continues engine working.

func (DefaultEngine) RunPeriodicalProcess

func (de DefaultEngine) RunPeriodicalProcess() bool

RunPeriodicalProcess returns DefaultEngine.RunPeriodical

func (DefaultEngine) RunWorkerProcess

func (de DefaultEngine) RunWorkerProcess() bool

RunWorkerProcess returns DefaultEngine.RunWorker

func (*DefaultEngine) SetWaitStopChan

func (de *DefaultEngine) SetWaitStopChan(sigchan chan os.Signal)

SetWaitStopChan keeps in memory our sigint/term chan.

func (*DefaultEngine) Started

func (de *DefaultEngine) Started(ctx context.Context, runInfo engine.RunInfo)

Started is called before StartEngine waits for stop signal.

func (*DefaultEngine) Stop

func (de *DefaultEngine) Stop() int

Stop set cont flag to false, close the Sub channel then wait for two values from ConfirmChan. It returns the engine's exit status, that should be used in os.Exit.

func (*DefaultEngine) WorkerEnd

func (de *DefaultEngine) WorkerEnd()

type Engine

type Engine interface {
	// PeriodicalWaitTime returns the duration to wait between
	// two run of PeriodicalProcess()
	PeriodicalWaitTime() time.Duration

	// Continue returns the state of the engine.
	// true to continue running
	// false to stop as soon as possible, meaning when functions finish
	// their job.
	Continue() bool

	// ConsumerChan must return a new channel from amqp.Channel.Consume()
	// The worker process will stop looping when the channel is closed,
	// so you must close yourself all channels.
	ConsumerChan() (<-chan amqp.Delivery, error)

	// Initialize anything you want. If err != nil then the Engine will not start.
	Initialize(ctx context.Context) error
	Started(ctx context.Context, runInfo engine.RunInfo)

	// WorkerProcess must implement the actual consumer processing.
	WorkerProcess(context.Context, amqp.Delivery)

	// PeriodicalProcess must implement the actual "beat processing".
	// Prefer using channels that you will close for long
	// PeriodicalWaitTime() values.
	PeriodicalProcess(ctx context.Context)

	SetWaitStopChan(chan os.Signal)
	AskStop(int)
	// Stop handles stopping the engine, and also waiting
	// for goroutines to finish.
	// It returns the engine's exit status, that should be used in os.Exit.
	Stop() int

	// ConfirmPeriodicalStop is useful to mutate a variable
	// or an internal chan to confirm that the Periodical process
	// is now stopped.
	ConfirmPeriodicalStop()
	ConfirmWorkerStop()

	RunPeriodicalProcess() bool
	RunWorkerProcess() bool

	// Do NOT override when using DefaultEngine.
	// Use this method to send a ConfirmStop without doing anything else.
	AcknowledgeConfirmStop()

	// WorkerEnd is called at the end of the worker routine, when one of the
	// following things happened:
	//  - the AMQP consumer channel closed
	//  - the engine has been stopped
	//  - WorkerProcess panicked
	// It should recover from panics, and notify the engine that the worker
	// routine has stopped (with the method ConfirmWorkerStop).
	WorkerEnd()

	// PeriodicalEnd is called at the end of the periodical routine, when one
	// of the following things happened:
	//  - the engine has been stopped
	//  - PeriodicalProcess panicked
	// It should recover from panics, and notify the engine that the periodical
	// routine has stopped (with the method ConfirmWorkerStop).
	PeriodicalEnd()

	Logger() *zerolog.Logger

	GetRunInfo() engine.RunInfo
}

Engine makes engines more predictible in the way they are built. Used with StartEngine(), goroutine spawn is handled for you and you only have to implement your working logic.

You can use the DefaultEngine struct to avoid implementing Initialize(), Stop(), Continue(), Confirm*Stop() and Has*Process()

Directories

Path Synopsis
gob
xml
engine contain implementation of canopsis engine.
engine contain implementation of canopsis engine.
Package idlealarm implements alarm modification on idle alarm.
Package idlealarm implements alarm modification on idle alarm.
Package idlerule contains idle rule model and adapter.
Package idlerule contains idle rule model and adapter.
Package operation implements alarm modification operations.
Package operation implements alarm modification operations.
executor
Package executor contains operation executors.
Package executor contains operation executors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL