Documentation ¶
Overview ¶
Package watcher implements the watcher service used by the watcher engine to compute the watchers' states.
Read https://doc.canopsis.net/guide-administration/moteurs/moteur-watcher/ for more details on watchers.
A previous implementation of the watcher engine used to get all the alarms that impact a watcher from the database each time its state needed to be recomputed. This did not scale well for watchers with a lot dependencies.
The current implementation was written to make the most common operation (updating the watcher's states after an event) fast and scalable. It works by storing counters (number of alarm in each state, number of acknowledged alarms, ...) for each watcher in redis, updating those counters when an event is received, and computing the watchers' states from these counters.
Multiple types were defined for this implementation:
- Watcher (defined in model.go) is the type used to represent a watcher in the MongoDB database.
- AlarmCounters contains various counters for a watcher (number of alarm in each state, number of acknowledged alarms, ...), and is used to compute the watchers' states and outputs.
- DependencyState contains all the informations on an entity that affect the watchers' states. It is stored in redis, and is used to update the watchers' counters.
- ImpactDiff is used to keep track of changes in the context-graph (when an entity is added or removed from a watcher's dependencies).
- CountersCache is the service that stores the DependencyStates of the entities and updates the AlarmCounters of the watchers in redis.
When an event is received by the watcher engine:
- it is converted into a DependencyState for the corresponding entity (in Service.Process);
- the DependencyState is compared to the previous DependencyState of this entity, that is stored in redis (in CountersCache.Process);
- the impacted watchers' AlarmCounters are updated in redis (in CountersCache.Process);
- the state and output of the watchers are deduced from the AlarmCounters, and sent to the axe engine (in Service.Process).
Index ¶
- Constants
- func DefaultCollection(session *mgo.Session) mongo.Collection
- func GetCountersIncrementsFromStates(previousState DependencyState, currentState DependencyState) map[string]AlarmCounters
- type Adapter
- type AlarmCounters
- type AmqpChannelPublisher
- type AnnotatedEntity
- type CountersCache
- type DependencyState
- type ImpactDiff
- type Service
- type StateCounters
- type Watcher
Examples ¶
Constants ¶
const ( // MethodWorst is the method that returns the worst state among the alarms // that impact a watcher. MethodWorst = "worst" )
Variables ¶
This section is empty.
Functions ¶
func DefaultCollection ¶
func DefaultCollection(session *mgo.Session) mongo.Collection
DefaultCollection returns the collection containing the watchers.
func GetCountersIncrementsFromStates ¶
func GetCountersIncrementsFromStates( previousState DependencyState, currentState DependencyState, ) map[string]AlarmCounters
GetCountersIncrementsFromStates returns a map containing, for each watcher that is impacted by a change that occured on an entity, the AlarmCounters that should be added to the watcher's counters.
This function handles the changes of the entity's alarms and pbehaviors, as well as the changes in the context-graph.
See counters_test.go for examples for this function.
Example (AlarmResolution) ¶
This example corresponds to the resolution of a critical (and acknowledged) alarm. In this case, the Alarms, Critical and Acknowledged counters are decremented (since there is one less acknowledged critical alarm that impacts the watcher), and Info is incremented (since there is one more entity in this state).
package main import ( "encoding/json" "fmt" "git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher" ) func main() { previousState := watcher.DependencyState{ EntityID: "cpu/server1", ImpactedWatchers: []string{"servers"}, HasAlarm: true, AlarmState: 3, AlarmAcknowledged: true, } currentState := watcher.DependencyState{ EntityID: "cpu/server1", ImpactedWatchers: []string{"servers"}, HasAlarm: false, } increments := watcher.GetCountersIncrementsFromStates( previousState, currentState) json, err := json.MarshalIndent(increments, "", " ") if err != nil { panic(err) } fmt.Println(string(json)) }
Output: { "servers": { "Alarms": -1, "State": { "Critical": -1, "Major": 0, "Minor": 0, "Info": 1 }, "Acknowledged": -1, "NotAcknowledged": 0 } }
Example (NewWatcher) ¶
This example correspond to the creation of a new watcher that is impacted by an existing entity (with a major alarm). In this case, the counters of the "servers" watcher are not updated (since the entity's state did not change). The Alarms, Major and NotAcknowledged counters of the "critical_servers" watcher are incremented, since there is a new non-acknowledged major alarm impacting this watcher.
package main import ( "encoding/json" "fmt" "git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher" ) func main() { previousState := watcher.DependencyState{ EntityID: "cpu/server1", ImpactedWatchers: []string{"servers"}, HasAlarm: true, AlarmState: 2, AlarmAcknowledged: false, } currentState := watcher.DependencyState{ EntityID: "cpu/server1", ImpactedWatchers: []string{"servers", "critical_servers"}, HasAlarm: true, AlarmState: 2, AlarmAcknowledged: false, } increments := watcher.GetCountersIncrementsFromStates( previousState, currentState) json, err := json.MarshalIndent(increments, "", " ") if err != nil { panic(err) } fmt.Println(string(json)) }
Output: { "critical_servers": { "Alarms": 1, "State": { "Critical": 0, "Major": 1, "Minor": 0, "Info": 0 }, "Acknowledged": 0, "NotAcknowledged": 1 } }
Example (PbehaviorEnd) ¶
This example corresponds to a pbehavior becoming inactive on an entity.
package main import ( "encoding/json" "fmt" "git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher" ) func main() { previousState := watcher.DependencyState{ EntityID: "cpu/server1", ImpactedWatchers: []string{"servers"}, HasAlarm: true, AlarmState: 1, AlarmAcknowledged: true, ActivePBehaviorTypes: map[string]bool{"maintenance": true}, } currentState := watcher.DependencyState{ EntityID: "cpu/server1", ImpactedWatchers: []string{"servers"}, HasAlarm: true, AlarmState: 1, AlarmAcknowledged: true, ActivePBehaviorTypes: map[string]bool{}, } increments := watcher.GetCountersIncrementsFromStates( previousState, currentState) json, err := json.MarshalIndent(increments, "", " ") if err != nil { panic(err) } fmt.Println(string(json)) }
Output: { "servers": { "Alarms": 1, "State": { "Critical": 0, "Major": 0, "Minor": 1, "Info": -1 }, "Acknowledged": 1, "NotAcknowledged": 0 } }
Types ¶
type Adapter ¶
type Adapter interface { // GetAll gets all watchers from database GetAll(watchers *[]Watcher) error // GetAllValidWatchers gets all valid and enabled watchers from database GetAllValidWatchers(watchers *[]Watcher) error // GetByID finds the watcher by its entity id GetByID(id string, watchers *Watcher) error // GetEntities gets the entities watched by the watcher GetEntities(watcher Watcher, entities *[]types.Entity) error // GetAllAnnotatedEntities returns a slice containing all the entities, // annotated with their pbehaviors and alarms. // Note that this method may allocate a lot of memory. This may be // optimised by returning an mgo.Iter object instead of a slice. As of this // writing, the code complexity required to use an iterator outweighs the // benefits of doing so. GetAllAnnotatedEntities() ([]AnnotatedEntity, error) // GetAnnotatedDependencies returns a slice containing the dependencies of // a watcher annotated with their pbehaviors and alarms. // Note that this method may allocate a lot of memory. This may be // optimised by returning an mgo.Iter object instead of a slice. As of this // writing, the code complexity required to use an iterator outweighs the // benefits of doing so. GetAnnotatedDependencies(watcherID string) ([]AnnotatedEntity, error) }
Adapter is an interface that provides methods for database queries regarding watchers and their dependencies.
type AlarmCounters ¶
type AlarmCounters struct { Alarms int64 State StateCounters Acknowledged int64 NotAcknowledged int64 }
AlarmCounters is a struct containing various counters that are used to determine a watcher's state and output.
func NewAlarmCountersFromState ¶
func NewAlarmCountersFromState(state DependencyState) AlarmCounters
NewAlarmCountersFromState returns an AlarmCounters that corresponds to the state of a single dependency. For example, if this dependency has a critical alarm that has been acknowleged, the Alarms, State.Critical and Acknowledged counters are equal to 1, and the other counters are equal to 0. The counters corresponding to a watcher can be obtained by adding the counters of all its dependencies together.
func (AlarmCounters) Add ¶
func (c AlarmCounters) Add(other AlarmCounters) AlarmCounters
Add returns a new AlarmCounters containing the sums of two AlarmCounters.
func (AlarmCounters) IsZero ¶
func (c AlarmCounters) IsZero() bool
IsZero returns true if all the counters of the AlarmCounters are equal to 0.
func (AlarmCounters) Negate ¶
func (c AlarmCounters) Negate() AlarmCounters
Negate returns a new AlarmCounters, with all the counters negated.
type AmqpChannelPublisher ¶
type AmqpChannelPublisher interface { // Publish sends an amqp.Publishing from the client to an exchange on the server. Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error }
AmqpChannelPublisher is an interface that represents a non-consumable AMQP channel. This interface is implemented by amqp.Channel. It should be used in services that only publish to amqp, in order to be able to test them easily by mocking this interface.
type AnnotatedEntity ¶
type AnnotatedEntity struct { types.Entity `bson:"entity"` Alarm *types.Alarm `bson:"alarm"` PBehaviors []types.PBehavior `bson:"pbehaviors"` }
AnnotatedEntity is a struct containing an entity, a list of pbehaviors that impact it, and the ongoing alarm on this entity (or nil if there isn't one). The pbehaviors may or may not be active.
type CountersCache ¶
type CountersCache interface { // ProcessState processes and entity's update, and update the counters of // the watchers that are (or used to be) impacted by it. // It returns a map containing the impacted watchers and the new values of // their counters. // This method should be safe to run in multiple goroutines, since it is // used in the PeriodicalProcess and WorkerProcess of the watcher. ProcessState(currentState DependencyState) (map[string]AlarmCounters, error) }
CountersCache is a type that handles the counting of the alarms and entities that impact watchers.
func NewCountersCache ¶
func NewCountersCache(client *redis.Client, logger zerolog.Logger) CountersCache
NewCountersCache creates a new CountersCache
type DependencyState ¶
type DependencyState struct { EntityID string ImpactedWatchers []string HasAlarm bool AlarmState int AlarmAcknowledged bool ActivePBehaviorTypes map[string]bool LastUpdateDate time.Time }
DependencyState is a struct containing the informations about a dependency that are used to compute a watcher's state.
func NewDependencyState ¶
func NewDependencyState( entity types.Entity, alarm *types.Alarm, pbehaviors []types.PBehavior, watchers map[string]Watcher, date time.Time, ) DependencyState
NewDependencyState creates a DependencyState given an entity, an (optional) alarm and a list of pbehaviors (active or not) that impact the entity. watchers should be a map containing all the watchers, and is used to filter the impact of the entity to only keep the watchers. FIXME: LastUpdateDate is used to ensure that only the most recent DependencyStates are taken into account. This is necessary because the ComputeAllWatchers method may take a few seconds to run, which could easily lead to a race condition. As of this writing, LastUpdateDate is set to the time the data enters the engine (the time the event was received, or the time the MongoDB query was started). This may break if several instances of the engine are run in parallel.
func (DependencyState) HasActivePBehavior ¶
func (s DependencyState) HasActivePBehavior() bool
HasActivePBehavior returns true if one of the pbehavior types is active on the entity.
type ImpactDiff ¶
ImpactDiff is a struct that allows to easily handle changes in the impact of an entity. See NewImpactDiffFromImpacts for the documentation of its field.
func NewImpactDiffFromImpacts ¶
func NewImpactDiffFromImpacts( previousImpacts []string, currentImpacts []string, ) ImpactDiff
NewImpactDiffFromImpacts takes as parameters two lists of impacts (a previous and a current one), and returns an ImpactDiff structs containing:
- All: a slice containing all the impacts (previous or current)
- Previous: a map such that Previous[watcherID] is true if watcherID is in the previousImpacts slice
- Current: a map such that Current[watcherID] is true if watcherID is in the currentImpacts slice
See impact_diff_test.go for an example.
Example ¶
package main import ( "encoding/json" "fmt" "git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher" ) func main() { previousImpacts := []string{"a", "b"} currentImpacts := []string{"b", "c"} impacts := watcher.NewImpactDiffFromImpacts( previousImpacts, currentImpacts) json, err := json.MarshalIndent(impacts, "", " ") if err != nil { panic(err) } fmt.Println(string(json)) }
Output: { "All": [ "a", "b", "c" ], "Previous": { "a": true, "b": true }, "Current": { "b": true, "c": true } }
type Service ¶
type Service interface { // Process updates the watchers impacted by the event and alarmChange // This method should be called on each event published by the axe engine, // so that the watchers' states are updated in real time. Process(ctx context.Context, event types.Event) error ProcessResolvedAlarm(ctx context.Context, alarm types.Alarm, entity types.Entity) error // UpdateWatcher updates the state of a watcher given its ID. // FIXME: the current implementation of this method does not handle // dependencies being removed from the watcher. UpdateWatcher(ctx context.Context, watcherID string) error // ComputeAllWatchers updates the states of all watchers. // As of this writing, calling this method regularly (at the beat) is // necessary to handle changes in the pbehaviors and in the context-graph, // since those changes do not trigger an event. // FIXME: the current implementation of this method does not handle // entities being disabled or removed from the database. ComputeAllWatchers(ctx context.Context) error }
Service is the interface implemented by the watcher service, that processes events, computes the watchers' states accordingly, and sends events to the axe engine so that the watchers' alarms are updated. This service does not handle the links (impact and depends) between the watcher and their dependencies. This is handled by the context builder, which is used by the che engine.
func NewService ¶
func NewService( pubChannel AmqpChannelPublisher, pubExchangeName, pubQueueName string, jsonEncoder encoding.Encoder, watcherAdapter Adapter, alarmAdapter alarm.Adapter, pbehaviorService pbehavior.Service, countersCache CountersCache, logger zerolog.Logger, ) Service
NewService gives the correct watcher adapter.
type StateCounters ¶
StateCounters is a struct containing the number of alarms in each state that impact a watcher.
func NewStateCountersFromState ¶
func NewStateCountersFromState(state DependencyState) StateCounters
NewStateCountersFromState returns a StateCounters that corresponds to the state of a single dependency. For example, if the state of this dependency is critical (and it does not have any active pbehavior), the Critical counter is equal to 1, and the other counters are equal to 0. The counters corresponding to a watcher can be obtained by adding the counters of all its dependencies together.
func (StateCounters) Add ¶
func (c StateCounters) Add(other StateCounters) StateCounters
Add returns a new StateCounters containing the sums of two StateCounters.
func (StateCounters) IsZero ¶
func (c StateCounters) IsZero() bool
IsZero returns true if all the counters of the StateCounters are equal to 0.
func (StateCounters) Negate ¶
func (c StateCounters) Negate() StateCounters
Negate returns a new StateCounters, with all the counters negated.
type Watcher ¶
type Watcher struct { types.Entity `bson:",inline"` // inherits from entity Entities pattern.EntityPatternList `bson:"entities"` State map[string]interface{} `bson:"state"` OutputTemplate string `bson:"output_template"` }
Watcher is a structure representing a watcher type entity document.
func (Watcher) CheckEntityInWatcher ¶
CheckEntityInWatcher checks if the entity is watched by the watcher. It returns true when the entity is matched by a pattern in the watcher, false otherwise.