watcher

package
v0.0.0-...-8cc84bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2020 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package watcher implements the watcher service used by the watcher engine to compute the watchers' states.

Read https://doc.canopsis.net/guide-administration/moteurs/moteur-watcher/ for more details on watchers.

A previous implementation of the watcher engine used to get all the alarms that impact a watcher from the database each time its state needed to be recomputed. This did not scale well for watchers with a lot dependencies.

The current implementation was written to make the most common operation (updating the watcher's states after an event) fast and scalable. It works by storing counters (number of alarm in each state, number of acknowledged alarms, ...) for each watcher in redis, updating those counters when an event is received, and computing the watchers' states from these counters.

Multiple types were defined for this implementation:

  • Watcher (defined in model.go) is the type used to represent a watcher in the MongoDB database.
  • AlarmCounters contains various counters for a watcher (number of alarm in each state, number of acknowledged alarms, ...), and is used to compute the watchers' states and outputs.
  • DependencyState contains all the informations on an entity that affect the watchers' states. It is stored in redis, and is used to update the watchers' counters.
  • ImpactDiff is used to keep track of changes in the context-graph (when an entity is added or removed from a watcher's dependencies).
  • CountersCache is the service that stores the DependencyStates of the entities and updates the AlarmCounters of the watchers in redis.

When an event is received by the watcher engine:

  • it is converted into a DependencyState for the corresponding entity (in Service.Process);
  • the DependencyState is compared to the previous DependencyState of this entity, that is stored in redis (in CountersCache.Process);
  • the impacted watchers' AlarmCounters are updated in redis (in CountersCache.Process);
  • the state and output of the watchers are deduced from the AlarmCounters, and sent to the axe engine (in Service.Process).

Index

Examples

Constants

View Source
const (
	// MethodWorst is the method that returns the worst state among the alarms
	// that impact a watcher.
	MethodWorst = "worst"
)

Variables

This section is empty.

Functions

func DefaultCollection

func DefaultCollection(session *mgo.Session) mongo.Collection

DefaultCollection returns the collection containing the watchers.

func GetCountersIncrementsFromStates

func GetCountersIncrementsFromStates(
	previousState DependencyState,
	currentState DependencyState,
) map[string]AlarmCounters

GetCountersIncrementsFromStates returns a map containing, for each watcher that is impacted by a change that occured on an entity, the AlarmCounters that should be added to the watcher's counters.

This function handles the changes of the entity's alarms and pbehaviors, as well as the changes in the context-graph.

See counters_test.go for examples for this function.

Example (AlarmResolution)

This example corresponds to the resolution of a critical (and acknowledged) alarm. In this case, the Alarms, Critical and Acknowledged counters are decremented (since there is one less acknowledged critical alarm that impacts the watcher), and Info is incremented (since there is one more entity in this state).

package main

import (
	"encoding/json"
	"fmt"

	"git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher"
)

func main() {
	previousState := watcher.DependencyState{
		EntityID:          "cpu/server1",
		ImpactedWatchers:  []string{"servers"},
		HasAlarm:          true,
		AlarmState:        3,
		AlarmAcknowledged: true,
	}
	currentState := watcher.DependencyState{
		EntityID:         "cpu/server1",
		ImpactedWatchers: []string{"servers"},
		HasAlarm:         false,
	}

	increments := watcher.GetCountersIncrementsFromStates(
		previousState,
		currentState)

	json, err := json.MarshalIndent(increments, "", "  ")
	if err != nil {
		panic(err)
	}
	fmt.Println(string(json))
}
Output:

{
  "servers": {
    "Alarms": -1,
    "State": {
      "Critical": -1,
      "Major": 0,
      "Minor": 0,
      "Info": 1
    },
    "Acknowledged": -1,
    "NotAcknowledged": 0
  }
}
Example (NewWatcher)

This example correspond to the creation of a new watcher that is impacted by an existing entity (with a major alarm). In this case, the counters of the "servers" watcher are not updated (since the entity's state did not change). The Alarms, Major and NotAcknowledged counters of the "critical_servers" watcher are incremented, since there is a new non-acknowledged major alarm impacting this watcher.

package main

import (
	"encoding/json"
	"fmt"

	"git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher"
)

func main() {
	previousState := watcher.DependencyState{
		EntityID:          "cpu/server1",
		ImpactedWatchers:  []string{"servers"},
		HasAlarm:          true,
		AlarmState:        2,
		AlarmAcknowledged: false,
	}
	currentState := watcher.DependencyState{
		EntityID:          "cpu/server1",
		ImpactedWatchers:  []string{"servers", "critical_servers"},
		HasAlarm:          true,
		AlarmState:        2,
		AlarmAcknowledged: false,
	}

	increments := watcher.GetCountersIncrementsFromStates(
		previousState,
		currentState)

	json, err := json.MarshalIndent(increments, "", "  ")
	if err != nil {
		panic(err)
	}
	fmt.Println(string(json))
}
Output:

{
  "critical_servers": {
    "Alarms": 1,
    "State": {
      "Critical": 0,
      "Major": 1,
      "Minor": 0,
      "Info": 0
    },
    "Acknowledged": 0,
    "NotAcknowledged": 1
  }
}
Example (PbehaviorEnd)

This example corresponds to a pbehavior becoming inactive on an entity.

package main

import (
	"encoding/json"
	"fmt"

	"git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher"
)

func main() {
	previousState := watcher.DependencyState{
		EntityID:             "cpu/server1",
		ImpactedWatchers:     []string{"servers"},
		HasAlarm:             true,
		AlarmState:           1,
		AlarmAcknowledged:    true,
		ActivePBehaviorTypes: map[string]bool{"maintenance": true},
	}
	currentState := watcher.DependencyState{
		EntityID:             "cpu/server1",
		ImpactedWatchers:     []string{"servers"},
		HasAlarm:             true,
		AlarmState:           1,
		AlarmAcknowledged:    true,
		ActivePBehaviorTypes: map[string]bool{},
	}

	increments := watcher.GetCountersIncrementsFromStates(
		previousState,
		currentState)

	json, err := json.MarshalIndent(increments, "", "  ")
	if err != nil {
		panic(err)
	}
	fmt.Println(string(json))
}
Output:

{
  "servers": {
    "Alarms": 1,
    "State": {
      "Critical": 0,
      "Major": 0,
      "Minor": 1,
      "Info": -1
    },
    "Acknowledged": 1,
    "NotAcknowledged": 0
  }
}

Types

type Adapter

type Adapter interface {
	// GetAll gets all watchers from database
	GetAll(watchers *[]Watcher) error

	// GetAllValidWatchers gets all valid and enabled watchers from database
	GetAllValidWatchers(watchers *[]Watcher) error

	// GetByID finds the watcher by its entity id
	GetByID(id string, watchers *Watcher) error

	// GetEntities gets the entities watched by the watcher
	GetEntities(watcher Watcher, entities *[]types.Entity) error

	// GetAllAnnotatedEntities returns a slice containing all the entities,
	// annotated with their pbehaviors and alarms.
	// Note that this method may allocate a lot of memory. This may be
	// optimised by returning an mgo.Iter object instead of a slice. As of this
	// writing, the code complexity required to use an iterator outweighs the
	// benefits of doing so.
	GetAllAnnotatedEntities() ([]AnnotatedEntity, error)

	// GetAnnotatedDependencies returns a slice containing the dependencies of
	// a watcher annotated with their pbehaviors and alarms.
	// Note that this method may allocate a lot of memory. This may be
	// optimised by returning an mgo.Iter object instead of a slice. As of this
	// writing, the code complexity required to use an iterator outweighs the
	// benefits of doing so.
	GetAnnotatedDependencies(watcherID string) ([]AnnotatedEntity, error)
}

Adapter is an interface that provides methods for database queries regarding watchers and their dependencies.

func NewAdapter

func NewAdapter(
	collection mongo.Collection,
	safeBulk bulk.Bulk,
	entityCollectionName string,
	alarmCollectionName string,
	pbehaviorCollectionName string,
	logger zerolog.Logger,
) Adapter

NewAdapter gives the correct mongo watcher adapter.

type AlarmCounters

type AlarmCounters struct {
	Alarms          int64
	State           StateCounters
	Acknowledged    int64
	NotAcknowledged int64
}

AlarmCounters is a struct containing various counters that are used to determine a watcher's state and output.

func NewAlarmCountersFromState

func NewAlarmCountersFromState(state DependencyState) AlarmCounters

NewAlarmCountersFromState returns an AlarmCounters that corresponds to the state of a single dependency. For example, if this dependency has a critical alarm that has been acknowleged, the Alarms, State.Critical and Acknowledged counters are equal to 1, and the other counters are equal to 0. The counters corresponding to a watcher can be obtained by adding the counters of all its dependencies together.

func (AlarmCounters) Add

Add returns a new AlarmCounters containing the sums of two AlarmCounters.

func (AlarmCounters) IsZero

func (c AlarmCounters) IsZero() bool

IsZero returns true if all the counters of the AlarmCounters are equal to 0.

func (AlarmCounters) Negate

func (c AlarmCounters) Negate() AlarmCounters

Negate returns a new AlarmCounters, with all the counters negated.

type AmqpChannelPublisher

type AmqpChannelPublisher interface {
	// Publish sends an amqp.Publishing from the client to an exchange on the server.
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

AmqpChannelPublisher is an interface that represents a non-consumable AMQP channel. This interface is implemented by amqp.Channel. It should be used in services that only publish to amqp, in order to be able to test them easily by mocking this interface.

type AnnotatedEntity

type AnnotatedEntity struct {
	types.Entity `bson:"entity"`
	Alarm        *types.Alarm      `bson:"alarm"`
	PBehaviors   []types.PBehavior `bson:"pbehaviors"`
}

AnnotatedEntity is a struct containing an entity, a list of pbehaviors that impact it, and the ongoing alarm on this entity (or nil if there isn't one). The pbehaviors may or may not be active.

type CountersCache

type CountersCache interface {
	// ProcessState processes and entity's update, and update the counters of
	// the watchers that are (or used to be) impacted by it.
	// It returns a map containing the impacted watchers and the new values of
	// their counters.
	// This method should be safe to run in multiple goroutines, since it is
	// used in the PeriodicalProcess and WorkerProcess of the watcher.
	ProcessState(currentState DependencyState) (map[string]AlarmCounters, error)
}

CountersCache is a type that handles the counting of the alarms and entities that impact watchers.

func NewCountersCache

func NewCountersCache(client *redis.Client, logger zerolog.Logger) CountersCache

NewCountersCache creates a new CountersCache

type DependencyState

type DependencyState struct {
	EntityID             string
	ImpactedWatchers     []string
	HasAlarm             bool
	AlarmState           int
	AlarmAcknowledged    bool
	ActivePBehaviorTypes map[string]bool
	LastUpdateDate       time.Time
}

DependencyState is a struct containing the informations about a dependency that are used to compute a watcher's state.

func NewDependencyState

func NewDependencyState(
	entity types.Entity,
	alarm *types.Alarm,
	pbehaviors []types.PBehavior,
	watchers map[string]Watcher,
	date time.Time,
) DependencyState

NewDependencyState creates a DependencyState given an entity, an (optional) alarm and a list of pbehaviors (active or not) that impact the entity. watchers should be a map containing all the watchers, and is used to filter the impact of the entity to only keep the watchers. FIXME: LastUpdateDate is used to ensure that only the most recent DependencyStates are taken into account. This is necessary because the ComputeAllWatchers method may take a few seconds to run, which could easily lead to a race condition. As of this writing, LastUpdateDate is set to the time the data enters the engine (the time the event was received, or the time the MongoDB query was started). This may break if several instances of the engine are run in parallel.

func (DependencyState) HasActivePBehavior

func (s DependencyState) HasActivePBehavior() bool

HasActivePBehavior returns true if one of the pbehavior types is active on the entity.

type ImpactDiff

type ImpactDiff struct {
	All      []string
	Previous map[string]bool
	Current  map[string]bool
}

ImpactDiff is a struct that allows to easily handle changes in the impact of an entity. See NewImpactDiffFromImpacts for the documentation of its field.

func NewImpactDiffFromImpacts

func NewImpactDiffFromImpacts(
	previousImpacts []string,
	currentImpacts []string,
) ImpactDiff

NewImpactDiffFromImpacts takes as parameters two lists of impacts (a previous and a current one), and returns an ImpactDiff structs containing:

  • All: a slice containing all the impacts (previous or current)
  • Previous: a map such that Previous[watcherID] is true if watcherID is in the previousImpacts slice
  • Current: a map such that Current[watcherID] is true if watcherID is in the currentImpacts slice

See impact_diff_test.go for an example.

Example
package main

import (
	"encoding/json"
	"fmt"

	"git.canopsis.net/canopsis/go-engines/lib/canopsis/watcher"
)

func main() {
	previousImpacts := []string{"a", "b"}
	currentImpacts := []string{"b", "c"}

	impacts := watcher.NewImpactDiffFromImpacts(
		previousImpacts,
		currentImpacts)

	json, err := json.MarshalIndent(impacts, "", "  ")
	if err != nil {
		panic(err)
	}
	fmt.Println(string(json))
}
Output:

{
  "All": [
    "a",
    "b",
    "c"
  ],
  "Previous": {
    "a": true,
    "b": true
  },
  "Current": {
    "b": true,
    "c": true
  }
}

type Service

type Service interface {
	// Process updates the watchers impacted by the event and alarmChange
	// This method should be called on each event published by the axe engine,
	// so that the watchers' states are updated in real time.
	Process(ctx context.Context, event types.Event) error

	ProcessResolvedAlarm(ctx context.Context, alarm types.Alarm, entity types.Entity) error

	// UpdateWatcher updates the state of a watcher given its ID.
	// FIXME: the current implementation of this method does not handle
	// dependencies being removed from the watcher.
	UpdateWatcher(ctx context.Context, watcherID string) error

	// ComputeAllWatchers updates the states of all watchers.
	// As of this writing, calling this method regularly (at the beat) is
	// necessary to handle changes in the pbehaviors and in the context-graph,
	// since those changes do not trigger an event.
	// FIXME: the current implementation of this method does not handle
	// entities being disabled or removed from the database.
	ComputeAllWatchers(ctx context.Context) error
}

Service is the interface implemented by the watcher service, that processes events, computes the watchers' states accordingly, and sends events to the axe engine so that the watchers' alarms are updated. This service does not handle the links (impact and depends) between the watcher and their dependencies. This is handled by the context builder, which is used by the che engine.

func NewService

func NewService(
	pubChannel AmqpChannelPublisher,
	pubExchangeName, pubQueueName string,
	jsonEncoder encoding.Encoder,
	watcherAdapter Adapter,
	alarmAdapter alarm.Adapter,
	pbehaviorService pbehavior.Service,
	countersCache CountersCache,
	logger zerolog.Logger,
) Service

NewService gives the correct watcher adapter.

type StateCounters

type StateCounters struct {
	Critical int64
	Major    int64
	Minor    int64
	Info     int64
}

StateCounters is a struct containing the number of alarms in each state that impact a watcher.

func NewStateCountersFromState

func NewStateCountersFromState(state DependencyState) StateCounters

NewStateCountersFromState returns a StateCounters that corresponds to the state of a single dependency. For example, if the state of this dependency is critical (and it does not have any active pbehavior), the Critical counter is equal to 1, and the other counters are equal to 0. The counters corresponding to a watcher can be obtained by adding the counters of all its dependencies together.

func (StateCounters) Add

Add returns a new StateCounters containing the sums of two StateCounters.

func (StateCounters) IsZero

func (c StateCounters) IsZero() bool

IsZero returns true if all the counters of the StateCounters are equal to 0.

func (StateCounters) Negate

func (c StateCounters) Negate() StateCounters

Negate returns a new StateCounters, with all the counters negated.

type Watcher

type Watcher struct {
	types.Entity   `bson:",inline"`          // inherits from entity
	Entities       pattern.EntityPatternList `bson:"entities"`
	State          map[string]interface{}    `bson:"state"`
	OutputTemplate string                    `bson:"output_template"`
}

Watcher is a structure representing a watcher type entity document.

func (Watcher) CheckEntityInWatcher

func (w Watcher) CheckEntityInWatcher(entity types.Entity) bool

CheckEntityInWatcher checks if the entity is watched by the watcher. It returns true when the entity is matched by a pattern in the watcher, false otherwise.

func (Watcher) GetOutput

func (w Watcher) GetOutput(counters AlarmCounters) (string, error)

GetOutput returns the output of the watcher.

func (Watcher) GetState

func (w Watcher) GetState(counters AlarmCounters) (int, error)

GetState returns the state of the watcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL