Documentation ¶
Index ¶
- Constants
- Variables
- func ErrorType(err error) int
- func PrintVersion()
- func PrintVersionExit()
- func StartEngine(engine Engine, waitChan *chan os.Signal) (int, error)
- type DefaultEngine
- func (de *DefaultEngine) AcknowledgeConfirmStop()
- func (de *DefaultEngine) AskStop(state int)
- func (de *DefaultEngine) ConfirmPeriodicalStop()
- func (de *DefaultEngine) ConfirmWorkerStop()
- func (de *DefaultEngine) Continue() bool
- func (de *DefaultEngine) DebugPrint(fmt string, params ...interface{})
- func (de *DefaultEngine) Initialize() error
- func (de *DefaultEngine) PeriodicalWaitTime() time.Duration
- func (de *DefaultEngine) RecoverPeriodical()
- func (de *DefaultEngine) RecoverWorker()
- func (de DefaultEngine) RunPeriodicalProcess() bool
- func (de DefaultEngine) RunWorkerProcess() bool
- func (de *DefaultEngine) SetWaitStopChan(sigchan chan os.Signal)
- func (de *DefaultEngine) Started()
- func (de *DefaultEngine) Stop() int
- type Engine
Constants ¶
const ( ExitOK = iota ExitPanic ExitEngine )
Returned values on exit
const ( ErrorTypeMongoConnection = iota ErrorTypeRedisConnectionClosed ErrorTypeAMQPConnection ErrorTypeNetworkConnectionRefused ErrorTypeEOF // connection closed unexpectedly, end of file... ErrorTypeNoError // nil ErrorTypeUnknown )
Error types
const ( EnvCpsAMQPTestExchange = "canopsis.tests" ActionQueueName = "Engine_action" ActionConsumerName = "action" AxeExchangeName = "" AxeQueueName = "Engine_axe" AxeConsumerName = "axe" CancelAutosolveDelay = 60 * 60 CheExchangeName = "canopsis.events" CheQueueName = "Engine_che" CheConsumerName = "che" DbName = "canopsis" DefaultBulkSize = 1000 DefaultEventAuthor = "system" DoneAutosolveDelay = 15 * 60 HeartBeatExchangeName = "canopsis.events" HeartBeatQueueName = "Engine_heartbeat" HeartBeatConsumerName = "heartbeat" MaxPythonTimestamp = 253402297199 // compat: max timestamp in python StatsExchangeName = "canopsis.events" StatsQueueName = "Engine_stat" StatsConsumerName = "stat" StatsDatabase = "canopsis" StatsMeasurement = "event_state_history" StatsngExchangeName = "amq.direct" StatsngQueueName = "Engine_statsng" PluginExtension = ".so" WatcherQueueName = "Engine_watcher" WatcherConsumerName = "watcher" )
Globals
Variables ¶
var BuildDate string
BuildDate ...
var BuildGitBranch string
BuildGitBranch ...
var BuildGitCommit string
BuildGitCommit is the short version of git commit
Functions ¶
func StartEngine ¶
StartEngine handles starting the WorkerProcess and PeriodicalProcess of your Engine. It first calls Engine.Initialize(), returns an error if any, then proceed to signals binding.
Engine.PeriodicalProcess and Engine.WorkerProcess are launched into separate goroutines.
Only SIGTERM or SIGINT will trigger the Engine.Stop() method.
waitChan is optional: if nil a chan will be instanciated and managed for you. This is mainly for tests.
Types ¶
type DefaultEngine ¶
type DefaultEngine struct { ConfirmChan chan bool Sub *amqp.Channel Sleep time.Duration RunWorker bool RunPeriodical bool Debug bool // contains filtered or unexported fields }
DefaultEngine provides basic functions and will behave as an Engine with PeriodicalProcess and WorkerProcess.
Stop() will automatically close the Sub channel and waiting for confirmations to come on ConfirmChan.
func NewDefaultEngine ¶
func NewDefaultEngine(sleep time.Duration, runworker, runperiodical bool, sub *amqp.Channel) DefaultEngine
NewDefaultEngine returns an default engine implementation. Check engine_test.go in this package to see how you can use it.
func (*DefaultEngine) AcknowledgeConfirmStop ¶
func (de *DefaultEngine) AcknowledgeConfirmStop()
AcknowledgeConfirmStop sends true to ConfirmChan once for each call. Do NOT override this method when using DefaultEngine in your struct.
func (*DefaultEngine) AskStop ¶
func (de *DefaultEngine) AskStop(state int)
AskStop sends SIGINT into the channel where StartEngine waits for sigint/term, leading to a proper engine stop.
func (*DefaultEngine) ConfirmPeriodicalStop ¶
func (de *DefaultEngine) ConfirmPeriodicalStop()
ConfirmPeriodicalStop sends one value to ConfirmChan.
func (*DefaultEngine) ConfirmWorkerStop ¶
func (de *DefaultEngine) ConfirmWorkerStop()
ConfirmWorkerStop sends one value to ConfirmChan.
func (*DefaultEngine) Continue ¶
func (de *DefaultEngine) Continue() bool
Continue returns the DefaultEngine.cont flag.
func (*DefaultEngine) DebugPrint ¶
func (de *DefaultEngine) DebugPrint(fmt string, params ...interface{})
DebugPrint print a log only if the engine is in debug mode
func (*DefaultEngine) Initialize ¶
func (de *DefaultEngine) Initialize() error
Initialize does nothing here besides returning nil.
func (*DefaultEngine) PeriodicalWaitTime ¶
func (de *DefaultEngine) PeriodicalWaitTime() time.Duration
PeriodicalWaitTime returns the Sleep attribute.
func (*DefaultEngine) RecoverPeriodical ¶
func (de *DefaultEngine) RecoverPeriodical()
RecoverPeriodical handle panic recovery in the periodical goroutine
func (*DefaultEngine) RecoverWorker ¶
func (de *DefaultEngine) RecoverWorker()
RecoverWorker handle panic recovery in the worker goroutine
func (DefaultEngine) RunPeriodicalProcess ¶
func (de DefaultEngine) RunPeriodicalProcess() bool
RunPeriodicalProcess returns DefaultEngine.RunPeriodical
func (DefaultEngine) RunWorkerProcess ¶
func (de DefaultEngine) RunWorkerProcess() bool
RunWorkerProcess returns DefaultEngine.RunWorker
func (*DefaultEngine) SetWaitStopChan ¶
func (de *DefaultEngine) SetWaitStopChan(sigchan chan os.Signal)
SetWaitStopChan keeps in memory our sigint/term chan.
func (*DefaultEngine) Started ¶
func (de *DefaultEngine) Started()
Started is called before StartEngine waits for stop signal.
func (*DefaultEngine) Stop ¶
func (de *DefaultEngine) Stop() int
Stop set cont flag to false, close the Sub channel then wait for two values from ConfirmChan. It returns the engine's exit status, that should be used in os.Exit.
type Engine ¶
type Engine interface { // PeriodicalWaitTime returns the duration to wait between // two run of PeriodicalProcess() PeriodicalWaitTime() time.Duration // Continue returns the state of the engine. // true to continue running // false to stop as soon as possible, meaning when functions finish // their job. Continue() bool // ConsumerChan must return a new channel from amqp.Channel.Consume() // The worker process will stop looping when the channel is closed, // so you must close yourself all channels. ConsumerChan() (<-chan amqp.Delivery, error) // Initialize anything you want. If err != nil then the Engine will not start. Initialize() error Started() // WorkerProcess must implement the actual consumer processing. WorkerProcess(amqp.Delivery) // PeriodicalProcess must implement the actual "beat processing". // Prefer using channels that you will close for long // PeriodicalWaitTime() values. PeriodicalProcess() SetWaitStopChan(chan os.Signal) AskStop(int) // Stop handles stopping the engine, and also waiting // for goroutines to finish. // It returns the engine's exit status, that should be used in os.Exit. Stop() int // ConfirmPeriodicalStop is useful to mutate a variable // or an internal chan to confirm that the Periodical process // is now stopped. ConfirmPeriodicalStop() ConfirmWorkerStop() RunPeriodicalProcess() bool RunWorkerProcess() bool // Do NOT override when using DefaultEngine. // Use this method to send a ConfirmStop without doing anything else. AcknowledgeConfirmStop() // RecoverWorker and RecoverPeriodical handle panic recovery for // WorkerProcess and PeriodicalProcess respectively. // The default implementation prints the stack and ask engine to stop. RecoverWorker() RecoverPeriodical() }
Engine makes engines more predictible in the way they are built. Used with StartEngine(), goroutine spawn is handled for you and you only have to implement your working logic.
You can use the DefaultEngine struct to avoid implementing Initialize(), Stop(), Continue(), Confirm*Stop() and Has*Process()