kube_events_manager

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2023 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ResyncPeriodMedian            = time.Duration(3) * time.Hour
	ResyncPeriodSpread            = time.Duration(2) * time.Hour
	ResyncPeriodGranularity       = time.Duration(5) * time.Minute
	ResyncPeriodJitterGranularity = time.Duration(15) * time.Second
)

Variables

View Source
var NewKubeEventsManager = func() *kubeEventsManager {
	em := &kubeEventsManager{
		m:           sync.RWMutex{},
		Monitors:    make(map[string]Monitor),
		KubeEventCh: make(chan KubeEvent, 1),
	}
	return em
}

NewKubeEventsManager returns an implementation of KubeEventsManager.

View Source
var NewMonitor = func() Monitor {
	return &monitor{
		ResourceInformers: make([]ResourceInformer, 0),
		VaryingInformers:  make(map[string][]ResourceInformer),
		cancelForNs:       make(map[string]context.CancelFunc),
		staticNamespaces:  make(map[string]bool),
	}
}
View Source
var NewNamespaceInformer = func(monitor *MonitorConfig) NamespaceInformer {
	informer := &namespaceInformer{
		Monitor:        monitor,
		ExistedObjects: make(map[string]bool),
	}
	return informer
}
View Source
var NewResourceInformer = func(monitor *MonitorConfig) ResourceInformer {
	informer := &resourceInformer{
		Monitor:                monitor,
		cachedObjects:          make(map[string]*ObjectAndFilterResult),
		cacheLock:              sync.RWMutex{},
		eventBufLock:           sync.Mutex{},
		cachedObjectsInfo:      &CachedObjectsInfo{},
		cachedObjectsIncrement: &CachedObjectsInfo{},
	}
	return informer
}

Functions

func ApplyFilter

func ApplyFilter(jqFilter string, filterFn func(obj *unstructured.Unstructured) (result interface{}, err error), obj *unstructured.Unstructured) (*ObjectAndFilterResult, error)

ApplyFilter filters object json representation with jq expression, calculate checksum over result and return ObjectAndFilterResult. If jqFilter is empty, no filter is required and checksum is calculated over full json representation of the object.

func FormatFieldSelector

func FormatFieldSelector(selector *FieldSelector) (string, error)

func FormatLabelSelector

func FormatLabelSelector(selector *metav1.LabelSelector) (string, error)

func IsExpiredError

func IsExpiredError(err error) bool

IsExpiredError is a private method from k8s.io/client-go/tools/cache.

func RandomizedResyncPeriod

func RandomizedResyncPeriod() time.Duration

RandomizedResyncPeriod returns a time.Duration between 2 hours and 4 hours with jitter and granularity

func ResourceId

func ResourceId(obj *unstructured.Unstructured) string

ResourceId describes object with namespace, kind and name

Change with caution, as this string is used for sorting objects and snapshots.

Types

type CachedObjectsInfo

type CachedObjectsInfo struct {
	Count    uint64 `json:"count"`
	Added    uint64 `json:"added"`
	Deleted  uint64 `json:"deleted"`
	Modified uint64 `json:"modified"`
	Cleaned  uint64 `json:"cleaned"`
}

CachedObjectsInfo stores counters of operations over resources in Monitors and Informers.

func (*CachedObjectsInfo) Add

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

type FactoryIndex

type FactoryIndex struct {
	GVR           schema.GroupVersionResource
	Namespace     string
	FieldSelector string
	LabelSelector string
}

type FactoryStore

type FactoryStore struct {
	// contains filtered or unexported fields
}
var (
	DefaultFactoryStore *FactoryStore
	DefaultSyncTime     = 100 * time.Millisecond
)

func NewFactoryStore

func NewFactoryStore() *FactoryStore

func (*FactoryStore) Start

func (c *FactoryStore) Start(client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error

func (*FactoryStore) Stop

func (c *FactoryStore) Stop(index FactoryIndex)

type KubeEventsManager

type KubeEventsManager interface {
	WithContext(ctx context.Context)
	WithMetricStorage(mstor *metric_storage.MetricStorage)
	WithKubeClient(client klient.Client)
	AddMonitor(monitorConfig *MonitorConfig) error
	HasMonitor(monitorID string) bool
	GetMonitor(monitorID string) Monitor
	StartMonitor(monitorID string)
	StopMonitor(monitorID string) error

	Ch() chan KubeEvent
	PauseHandleEvents()
}

type Monitor

type Monitor interface {
	WithContext(ctx context.Context)
	WithKubeClient(client klient.Client)
	WithMetricStorage(mstor *metric_storage.MetricStorage)
	WithConfig(config *MonitorConfig)
	WithKubeEventCb(eventCb func(KubeEvent))
	CreateInformers() error
	Start(context.Context)
	Stop()
	PauseHandleEvents()
	Snapshot() []ObjectAndFilterResult
	EnableKubeEventCb()
	GetConfig() *MonitorConfig
	SnapshotOperations() (total *CachedObjectsInfo, last *CachedObjectsInfo)
}

type MonitorConfig

type MonitorConfig struct {
	Metadata struct {
		MonitorId    string
		DebugName    string
		LogLabels    map[string]string
		MetricLabels map[string]string
	}
	EventTypes              []WatchEventType
	ApiVersion              string
	Kind                    string
	NameSelector            *NameSelector
	NamespaceSelector       *NamespaceSelector
	LabelSelector           *metav1.LabelSelector
	FieldSelector           *FieldSelector
	JqFilter                string
	LogEntry                *log.Entry
	Mode                    KubeEventMode
	KeepFullObjectsInMemory bool
	FilterFunc              func(*unstructured.Unstructured) (interface{}, error)
}

KubeEventMonitorConfig is a config that suits the latest version of OnKubernetesEventConfig.

func (*MonitorConfig) AddFieldSelectorRequirement

func (c *MonitorConfig) AddFieldSelectorRequirement(field string, op string, value string)

func (*MonitorConfig) IsAnyNamespace

func (c *MonitorConfig) IsAnyNamespace() bool

func (*MonitorConfig) Names

func (c *MonitorConfig) Names() []string

Names returns names of monitored objects if nameSelector.matchNames is defined in config.

func (*MonitorConfig) Namespaces

func (c *MonitorConfig) Namespaces() (nsNames []string)

Namespaces returns names of namespaces if namescpace.nameSelector is defined in config.

If no namespace specified or no namespace.nameSelector or length of namespace.nameSeletor.matchNames is 0 then empty string is returned to monitor all namespaces.

If namespace.labelSelector is specified, then return empty array.

func (*MonitorConfig) WithEventTypes

func (c *MonitorConfig) WithEventTypes(types []WatchEventType) *MonitorConfig

func (*MonitorConfig) WithFieldSelector

func (c *MonitorConfig) WithFieldSelector(fieldSel *FieldSelector)

WithFieldSelector copies input FieldSelector into monitor.FieldSelector

func (*MonitorConfig) WithLabelSelector

func (c *MonitorConfig) WithLabelSelector(labelSel *metav1.LabelSelector)

WithLabelSelector copies input LabelSelector into monitor.LabelSelector

func (*MonitorConfig) WithMode

func (c *MonitorConfig) WithMode(mode KubeEventMode)

func (*MonitorConfig) WithNameSelector

func (c *MonitorConfig) WithNameSelector(nSel *NameSelector)

WithNamespaceSelector copies input NamespaceSelector into monitor.NamespaceSelector

func (*MonitorConfig) WithNamespaceSelector

func (c *MonitorConfig) WithNamespaceSelector(nsSel *NamespaceSelector)

WithNamespaceSelector copies input NamespaceSelector into monitor.NamespaceSelector

type NamespaceInformer

type NamespaceInformer interface {
	WithContext(ctx context.Context)
	WithKubeClient(client klient.Client)
	CreateSharedInformer(addFn func(string), delFn func(string)) error
	GetExistedObjects() map[string]bool
	Start()
	Stop()
	PauseHandleEvents()
}

type ResourceInformer

type ResourceInformer interface {
	WithContext(ctx context.Context)
	WithKubeClient(client klient.Client)
	WithMetricStorage(mstor *metric_storage.MetricStorage)
	WithNamespace(string)
	WithName(string)
	WithKubeEventCb(eventCb func(KubeEvent))
	CreateSharedInformer() error
	CachedObjects() []ObjectAndFilterResult
	EnableKubeEventCb() // Call it to use KubeEventCb to emit events.
	Start()
	Stop()
	PauseHandleEvents()
	CachedObjectsInfo() CachedObjectsInfo
	CachedObjectsInfoIncrement() CachedObjectsInfo
}

ResourceInformer is a kube informer for particular onKubernetesEvent

type WatchErrorHandler

type WatchErrorHandler struct {
	// contains filtered or unexported fields
}

func NewWatchErrorHandler

func NewWatchErrorHandler(description string, kind string, logLabels map[string]string, metricStorage *metric_storage.MetricStorage) *WatchErrorHandler

func (*WatchErrorHandler) Handler

func (weh *WatchErrorHandler) Handler(_ *cache.Reflector, err error)

Handler is the implementation of WatchErrorHandler that is aware of monitors and metricStorage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL