runtime

package
v0.0.0-...-38df0df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithPanicRecovery

func WithPanicRecovery(observerName string, fn func() error) (err error)

WithPanicRecovery wraps a function with panic recovery. Returns the original error if no panic occurred. Returns a panic error if a panic was caught.

func WithPanicRecoveryAndLog

func WithPanicRecoveryAndLog(observerName string, fn func() error, logFn func(observerName string, panicErr error)) (err error)

WithPanicRecoveryAndLog wraps a function with panic recovery and logging. Calls the log function when a panic is caught.

func WithPanicRecoveryTimeout

func WithPanicRecoveryTimeout(observerName string, fn func(ctx context.Context) error, timeout time.Duration) error

WithPanicRecoveryTimeout wraps a function with panic recovery and timeout. The provided function must accept a context and should respect its cancellation. Returns a timeout error if the function doesn't complete in time.

Types

type BackpressureConfig

type BackpressureConfig struct {
	// QueueSize is the maximum number of events to buffer
	QueueSize int

	// DropPolicy defines what to drop when queue is full
	DropPolicy DropPolicy

	// MaxRetries is the maximum number of retry attempts for critical emitter failures
	// 0 = no retries (drop immediately on failure)
	// N = retry up to N times before dropping
	MaxRetries int

	// DrainInterval is how often to drain the queue and emit events
	// Lower values = lower latency but more CPU usage
	// Higher values = higher latency but less CPU usage
	DrainInterval time.Duration
}

BackpressureConfig defines queue and backpressure behavior.

type BoundedQueue

type BoundedQueue struct {
	// contains filtered or unexported fields
}

BoundedQueue implements a fixed-size queue with configurable drop policies. Thread-safe for concurrent use. All rng operations are protected by mu.

func NewBoundedQueue

func NewBoundedQueue(config BackpressureConfig) *BoundedQueue

NewBoundedQueue creates a new bounded queue with the given configuration

func (*BoundedQueue) Cap

func (q *BoundedQueue) Cap() int

Cap returns the maximum capacity of the queue

func (*BoundedQueue) Dequeue

func (q *BoundedQueue) Dequeue() *domain.ObserverEvent

Dequeue removes and returns the oldest event from the queue. Returns nil if queue is empty.

func (*BoundedQueue) Enqueue

func (q *BoundedQueue) Enqueue(event *domain.ObserverEvent) bool

Enqueue adds an event to the queue. Returns true if event was added, false if it was dropped.

func (*BoundedQueue) IsEmpty

func (q *BoundedQueue) IsEmpty() bool

IsEmpty returns true if queue is empty

func (*BoundedQueue) IsFull

func (q *BoundedQueue) IsFull() bool

IsFull returns true if queue is at capacity

func (*BoundedQueue) Len

func (q *BoundedQueue) Len() int

Len returns the current number of events in the queue

type Config

type Config struct {
	// Name of the observer (e.g., "network", "node", "deployments")
	Name string

	// Sampling configuration
	Sampling SamplingConfig

	// Backpressure configuration
	Backpressure BackpressureConfig

	// Health check configuration
	Health HealthConfig

	// Failure handling configuration
	Failure FailureConfig

	// Metrics configuration
	Metrics MetricsConfig
}

Config holds the configuration for an ObserverRuntime instance.

func DefaultConfig

func DefaultConfig(name string) Config

DefaultConfig returns a config with sensible defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the config is valid.

type DropPolicy

type DropPolicy string

DropPolicy defines what to do when queue is full.

const (
	// DropOldest drops oldest events first (keep recent)
	DropOldest DropPolicy = "oldest"

	// DropNewest drops incoming events (preserve history)
	DropNewest DropPolicy = "newest"

	// DropRandom drops random events
	DropRandom DropPolicy = "random"
)

type EBPFManager

type EBPFManager interface {
	AttachKprobe(progName, symbol string) error
	AttachKretprobe(progName, symbol string) error
	AttachTracepoint(progName, group, name string) error
	AttachTracepointWithProgram(prog *ebpf.Program, group, name string) error
	OpenRingBuffer(mapName string) error
	ReadEvents(ctx context.Context, handler func([]byte) error) error
	GetMap(name string) (*ebpf.Map, error)
	Close() error
	WaitForReady(timeout time.Duration) error
}

EBPFManager is the interface for managing eBPF programs Implemented by base.EBPFManager

type EBPFRuntime

type EBPFRuntime struct {
	// contains filtered or unexported fields
}

EBPFRuntime manages eBPF program lifecycle

func NewEBPFRuntime

func NewEBPFRuntime(manager EBPFManager) *EBPFRuntime

NewEBPFRuntime creates a new eBPF runtime with the given manager

func (*EBPFRuntime) IsHealthy

func (r *EBPFRuntime) IsHealthy() bool

IsHealthy returns true if the runtime is running

func (*EBPFRuntime) Start

func (r *EBPFRuntime) Start(ctx context.Context, handler func([]byte) error) error

Start begins reading events from the eBPF ring buffer

func (*EBPFRuntime) Stop

func (r *EBPFRuntime) Stop() error

Stop gracefully stops the eBPF runtime

type EventProcessor

type EventProcessor interface {
	// Process converts raw event bytes to a domain event.
	// Returns nil if event should be ignored (not interesting or not recognized).
	// Returns error only for unrecoverable processing errors.
	//
	// Thread-safety: Process may be called concurrently from multiple goroutines.
	// Implementations must be thread-safe if they maintain state.
	Process(ctx context.Context, rawEvent []byte) (*domain.ObserverEvent, error)

	// Name returns the processor name for logging and metrics.
	// Should be lowercase with hyphens (e.g., "network", "node-pmc", "container-runtime").
	Name() string

	// Setup is called once before the runtime starts processing events.
	// Use this for initialization that can't be done in the constructor:
	//   - Loading configuration from cfg
	//   - Establishing connections (K8s informers, context service)
	//   - Pre-warming caches
	//   - Starting background goroutines
	//
	// Config is passed so processor can access runtime configuration.
	// If Setup returns an error, the runtime will not start.
	Setup(ctx context.Context, cfg Config) error

	// Teardown is called once after the runtime stops processing events.
	// Use this for cleanup:
	//   - Closing connections
	//   - Flushing buffers
	//   - Releasing resources
	//
	// Teardown is called even if Setup failed.
	// Teardown errors are logged but don't prevent shutdown.
	Teardown(ctx context.Context) error
}

EventProcessor is the core interface that all observers must implement. This is the ONLY observer-specific code - all infrastructure is handled by ObserverRuntime.

Examples:

  • NetworkProcessor: Parses eBPF network events, runs DNS/Link/Status detection
  • NodeProcessor: Parses eBPF PMC events, calculates IPC and memory stalls
  • ContainerProcessor: Parses eBPF container events, detects OOM/Exit patterns
  • DeploymentsProcessor: Watches K8s deployments, detects rollout issues

The processor should be PURE BUSINESS LOGIC with no infrastructure concerns:

  • No eBPF loading/management
  • No K8s informer management
  • No OTLP/POLKU export
  • No metrics collection

Just: Parse raw event → Apply observer logic → Return domain event (or nil)

type FailureConfig

type FailureConfig struct {
	// Policy defines how to handle failures
	Policy FailurePolicy

	// Retry configuration
	Retry RetryConfig
}

FailureConfig defines failure handling and retry behavior.

type FailurePolicy

type FailurePolicy string

FailurePolicy defines failure handling behavior.

const (
	// FailPolicyIsolate continues other observers if this one fails
	// Recommended for production (graceful degradation)
	FailPolicyIsolate FailurePolicy = "isolate"

	// FailPolicyRestart retries with exponential backoff
	FailPolicyRestart FailurePolicy = "restart"

	// FailPolicyFailFast crashes entire binary on first failure
	// Use for critical observers that must work
	FailPolicyFailFast FailurePolicy = "fail_fast"
)

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker monitors observer health

func NewHealthChecker

func NewHealthChecker(config HealthConfig, observerName string) *HealthChecker

NewHealthChecker creates a new health checker

func (*HealthChecker) GetStatus

func (h *HealthChecker) GetStatus() HealthStatus

GetStatus returns the current health status

func (*HealthChecker) IsHealthy

func (h *HealthChecker) IsHealthy() bool

IsHealthy returns true if observer is currently healthy

func (*HealthChecker) LastCheck

func (h *HealthChecker) LastCheck() time.Time

LastCheck returns the time of the last health check

func (*HealthChecker) MarkHealthy

func (h *HealthChecker) MarkHealthy()

MarkHealthy marks the observer as healthy

func (*HealthChecker) MarkUnhealthy

func (h *HealthChecker) MarkUnhealthy(reason string)

MarkUnhealthy marks the observer as unhealthy with a reason

func (*HealthChecker) Run

func (h *HealthChecker) Run(ctx context.Context)

Run starts the health checker and blocks until context is cancelled

type HealthConfig

type HealthConfig struct {
	// Enabled turns health checking on/off
	Enabled bool

	// CheckInterval is how often to run health checks
	CheckInterval time.Duration
}

HealthConfig defines health checking behavior.

type HealthStatus

type HealthStatus struct {
	Healthy      bool
	Reason       string
	ObserverName string
	LastCheck    time.Time
}

HealthStatus represents the current health status

type K8sRuntime

type K8sRuntime struct {
	// contains filtered or unexported fields
}

K8sRuntime manages Kubernetes informer lifecycle

func NewK8sRuntime

func NewK8sRuntime() *K8sRuntime

NewK8sRuntime creates a new K8s runtime

func (*K8sRuntime) AddInformer

func (r *K8sRuntime) AddInformer(informer cache.SharedIndexInformer, handlers cache.ResourceEventHandlerFuncs) error

AddInformer registers an informer with event handlers

func (*K8sRuntime) Start

func (r *K8sRuntime) Start(ctx context.Context) error

Start starts all registered informers

func (*K8sRuntime) Stop

func (r *K8sRuntime) Stop()

Stop stops all informers

func (*K8sRuntime) WaitForCacheSync

func (r *K8sRuntime) WaitForCacheSync(ctx context.Context) error

WaitForCacheSync waits for all informers to sync

type MetricsConfig

type MetricsConfig struct {
	// Enabled turns metrics collection on/off
	Enabled bool

	// AllowedLabels defines which labels to include in metrics
	// Use this to control cardinality
	// Example: ["namespace", "observer_type"] but NOT ["pod_name", "container_id"]
	AllowedLabels []string
}

MetricsConfig defines metrics collection behavior.

type ObserverRuntime

type ObserverRuntime struct {
	// contains filtered or unexported fields
}

ObserverRuntime is the unified infrastructure for all observers.

func NewObserverRuntime

func NewObserverRuntime(processor EventProcessor, opts ...Option) (*ObserverRuntime, error)

NewObserverRuntime creates a new runtime with the given processor and options.

func (*ObserverRuntime) CausalityTracker

func (r *ObserverRuntime) CausalityTracker() *base.CausalityTracker

CausalityTracker returns the causality tracker for this runtime. Observers can use this to query parent spans and build causality chains.

func (*ObserverRuntime) GetParentSpanForEntity

func (r *ObserverRuntime) GetParentSpanForEntity(entityID string) string

GetParentSpanForEntity returns the parent span ID for a given entity. Entity ID format: "namespace/name" for K8s resources, "ip:port" for network endpoints. Returns empty string if no parent span is tracked for this entity.

func (*ObserverRuntime) IsHealthy

func (r *ObserverRuntime) IsHealthy() bool

IsHealthy returns true if runtime is running

func (*ObserverRuntime) ProcessEvent

func (r *ObserverRuntime) ProcessEvent(ctx context.Context, rawEvent []byte) error

ProcessEvent processes a raw event through the processor and emits to emitters

func (*ObserverRuntime) Run

func (r *ObserverRuntime) Run(ctx context.Context) error

Run starts the observer runtime and blocks until context is cancelled.

type Option

type Option func(*ObserverRuntime)

Option is a functional option for configuring ObserverRuntime

func WithEmitters

func WithEmitters(emitters ...domain.EventEmitter) Option

WithEmitters configures the event emitters to use for event emission. All observers emit through domain.EventEmitter (the universal gateway).

func WithSamplingDisabled

func WithSamplingDisabled() Option

WithSamplingDisabled disables event sampling (useful for tests)

type RetryConfig

type RetryConfig struct {
	// MaxAttempts before giving up
	MaxAttempts int

	// InitialDelay before first retry
	InitialDelay time.Duration

	// MaxDelay caps the retry delay
	MaxDelay time.Duration

	// Multiplier for exponential backoff
	Multiplier float64
}

RetryConfig defines retry behavior for FailPolicyRestart.

type Sampler

type Sampler struct {
	// contains filtered or unexported fields
}

Sampler implements event sampling based on configuration rules. Thread-safe for concurrent use.

func NewSampler

func NewSampler(config SamplingConfig) *Sampler

NewSampler creates a new sampler with the given configuration

func (*Sampler) ShouldSample

func (s *Sampler) ShouldSample(event *domain.ObserverEvent) bool

ShouldSample determines if an event should be kept or dropped. Returns true if event should be kept, false if it should be sampled out. Thread-safe: Can be called concurrently from multiple goroutines.

type SamplingConfig

type SamplingConfig struct {
	// Enabled turns sampling on/off
	Enabled bool

	// DefaultRate is the default sample rate (0.0 - 1.0)
	// 0.1 = 10% sampling, 1.0 = keep all events
	DefaultRate float64

	// Rules define type-specific sampling overrides
	Rules []SamplingRule
}

SamplingConfig defines event sampling behavior.

type SamplingRule

type SamplingRule struct {
	// EventType to match (e.g., "network", "node")
	EventType string

	// Subtype to match (e.g., "dns_query", "link_failure")
	// Empty string matches all subtypes
	Subtype string

	// Rate override for this type (0.0 - 1.0)
	// Ignored if KeepAll is true
	Rate float64

	// KeepAll keeps all events of this type (100% sampling)
	// Use for critical events like failures
	KeepAll bool
}

SamplingRule defines sampling for a specific event type.

func (SamplingRule) Matches

func (r SamplingRule) Matches(eventType, subtype string) bool

Matches returns true if this rule matches the given event type/subtype.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL