Documentation
¶
Index ¶
- func WithPanicRecovery(observerName string, fn func() error) (err error)
- func WithPanicRecoveryAndLog(observerName string, fn func() error, ...) (err error)
- func WithPanicRecoveryTimeout(observerName string, fn func(ctx context.Context) error, timeout time.Duration) error
- type BackpressureConfig
- type BoundedQueue
- type Config
- type DropPolicy
- type EBPFManager
- type EBPFRuntime
- type EventProcessor
- type FailureConfig
- type FailurePolicy
- type HealthChecker
- type HealthConfig
- type HealthStatus
- type K8sRuntime
- type MetricsConfig
- type ObserverRuntime
- func (r *ObserverRuntime) CausalityTracker() *base.CausalityTracker
- func (r *ObserverRuntime) GetParentSpanForEntity(entityID string) string
- func (r *ObserverRuntime) IsHealthy() bool
- func (r *ObserverRuntime) ProcessEvent(ctx context.Context, rawEvent []byte) error
- func (r *ObserverRuntime) Run(ctx context.Context) error
- type Option
- type RetryConfig
- type Sampler
- type SamplingConfig
- type SamplingRule
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithPanicRecovery ¶
WithPanicRecovery wraps a function with panic recovery. Returns the original error if no panic occurred. Returns a panic error if a panic was caught.
func WithPanicRecoveryAndLog ¶
func WithPanicRecoveryAndLog(observerName string, fn func() error, logFn func(observerName string, panicErr error)) (err error)
WithPanicRecoveryAndLog wraps a function with panic recovery and logging. Calls the log function when a panic is caught.
func WithPanicRecoveryTimeout ¶
func WithPanicRecoveryTimeout(observerName string, fn func(ctx context.Context) error, timeout time.Duration) error
WithPanicRecoveryTimeout wraps a function with panic recovery and timeout. The provided function must accept a context and should respect its cancellation. Returns a timeout error if the function doesn't complete in time.
Types ¶
type BackpressureConfig ¶
type BackpressureConfig struct {
// QueueSize is the maximum number of events to buffer
QueueSize int
// DropPolicy defines what to drop when queue is full
DropPolicy DropPolicy
// MaxRetries is the maximum number of retry attempts for critical emitter failures
// 0 = no retries (drop immediately on failure)
// N = retry up to N times before dropping
MaxRetries int
// DrainInterval is how often to drain the queue and emit events
// Lower values = lower latency but more CPU usage
// Higher values = higher latency but less CPU usage
DrainInterval time.Duration
}
BackpressureConfig defines queue and backpressure behavior.
type BoundedQueue ¶
type BoundedQueue struct {
// contains filtered or unexported fields
}
BoundedQueue implements a fixed-size queue with configurable drop policies. Thread-safe for concurrent use. All rng operations are protected by mu.
func NewBoundedQueue ¶
func NewBoundedQueue(config BackpressureConfig) *BoundedQueue
NewBoundedQueue creates a new bounded queue with the given configuration
func (*BoundedQueue) Cap ¶
func (q *BoundedQueue) Cap() int
Cap returns the maximum capacity of the queue
func (*BoundedQueue) Dequeue ¶
func (q *BoundedQueue) Dequeue() *domain.ObserverEvent
Dequeue removes and returns the oldest event from the queue. Returns nil if queue is empty.
func (*BoundedQueue) Enqueue ¶
func (q *BoundedQueue) Enqueue(event *domain.ObserverEvent) bool
Enqueue adds an event to the queue. Returns true if event was added, false if it was dropped.
func (*BoundedQueue) IsEmpty ¶
func (q *BoundedQueue) IsEmpty() bool
IsEmpty returns true if queue is empty
func (*BoundedQueue) IsFull ¶
func (q *BoundedQueue) IsFull() bool
IsFull returns true if queue is at capacity
func (*BoundedQueue) Len ¶
func (q *BoundedQueue) Len() int
Len returns the current number of events in the queue
type Config ¶
type Config struct {
// Name of the observer (e.g., "network", "node", "deployments")
Name string
// Sampling configuration
Sampling SamplingConfig
// Backpressure configuration
Backpressure BackpressureConfig
// Health check configuration
Health HealthConfig
// Failure handling configuration
Failure FailureConfig
// Metrics configuration
Metrics MetricsConfig
}
Config holds the configuration for an ObserverRuntime instance.
func DefaultConfig ¶
DefaultConfig returns a config with sensible defaults.
type DropPolicy ¶
type DropPolicy string
DropPolicy defines what to do when queue is full.
const ( // DropOldest drops oldest events first (keep recent) DropOldest DropPolicy = "oldest" // DropNewest drops incoming events (preserve history) DropNewest DropPolicy = "newest" // DropRandom drops random events DropRandom DropPolicy = "random" )
type EBPFManager ¶
type EBPFManager interface {
AttachKprobe(progName, symbol string) error
AttachKretprobe(progName, symbol string) error
AttachTracepoint(progName, group, name string) error
AttachTracepointWithProgram(prog *ebpf.Program, group, name string) error
OpenRingBuffer(mapName string) error
ReadEvents(ctx context.Context, handler func([]byte) error) error
GetMap(name string) (*ebpf.Map, error)
Close() error
WaitForReady(timeout time.Duration) error
}
EBPFManager is the interface for managing eBPF programs Implemented by base.EBPFManager
type EBPFRuntime ¶
type EBPFRuntime struct {
// contains filtered or unexported fields
}
EBPFRuntime manages eBPF program lifecycle
func NewEBPFRuntime ¶
func NewEBPFRuntime(manager EBPFManager) *EBPFRuntime
NewEBPFRuntime creates a new eBPF runtime with the given manager
func (*EBPFRuntime) IsHealthy ¶
func (r *EBPFRuntime) IsHealthy() bool
IsHealthy returns true if the runtime is running
func (*EBPFRuntime) Stop ¶
func (r *EBPFRuntime) Stop() error
Stop gracefully stops the eBPF runtime
type EventProcessor ¶
type EventProcessor interface {
// Process converts raw event bytes to a domain event.
// Returns nil if event should be ignored (not interesting or not recognized).
// Returns error only for unrecoverable processing errors.
//
// Thread-safety: Process may be called concurrently from multiple goroutines.
// Implementations must be thread-safe if they maintain state.
Process(ctx context.Context, rawEvent []byte) (*domain.ObserverEvent, error)
// Name returns the processor name for logging and metrics.
// Should be lowercase with hyphens (e.g., "network", "node-pmc", "container-runtime").
Name() string
// Setup is called once before the runtime starts processing events.
// Use this for initialization that can't be done in the constructor:
// - Loading configuration from cfg
// - Establishing connections (K8s informers, context service)
// - Pre-warming caches
// - Starting background goroutines
//
// Config is passed so processor can access runtime configuration.
// If Setup returns an error, the runtime will not start.
Setup(ctx context.Context, cfg Config) error
// Teardown is called once after the runtime stops processing events.
// Use this for cleanup:
// - Closing connections
// - Flushing buffers
// - Releasing resources
//
// Teardown is called even if Setup failed.
// Teardown errors are logged but don't prevent shutdown.
Teardown(ctx context.Context) error
}
EventProcessor is the core interface that all observers must implement. This is the ONLY observer-specific code - all infrastructure is handled by ObserverRuntime.
Examples:
- NetworkProcessor: Parses eBPF network events, runs DNS/Link/Status detection
- NodeProcessor: Parses eBPF PMC events, calculates IPC and memory stalls
- ContainerProcessor: Parses eBPF container events, detects OOM/Exit patterns
- DeploymentsProcessor: Watches K8s deployments, detects rollout issues
The processor should be PURE BUSINESS LOGIC with no infrastructure concerns:
- No eBPF loading/management
- No K8s informer management
- No OTLP/POLKU export
- No metrics collection
Just: Parse raw event → Apply observer logic → Return domain event (or nil)
type FailureConfig ¶
type FailureConfig struct {
// Policy defines how to handle failures
Policy FailurePolicy
// Retry configuration
Retry RetryConfig
}
FailureConfig defines failure handling and retry behavior.
type FailurePolicy ¶
type FailurePolicy string
FailurePolicy defines failure handling behavior.
const ( // FailPolicyIsolate continues other observers if this one fails // Recommended for production (graceful degradation) FailPolicyIsolate FailurePolicy = "isolate" // FailPolicyRestart retries with exponential backoff FailPolicyRestart FailurePolicy = "restart" // FailPolicyFailFast crashes entire binary on first failure // Use for critical observers that must work FailPolicyFailFast FailurePolicy = "fail_fast" )
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker monitors observer health
func NewHealthChecker ¶
func NewHealthChecker(config HealthConfig, observerName string) *HealthChecker
NewHealthChecker creates a new health checker
func (*HealthChecker) GetStatus ¶
func (h *HealthChecker) GetStatus() HealthStatus
GetStatus returns the current health status
func (*HealthChecker) IsHealthy ¶
func (h *HealthChecker) IsHealthy() bool
IsHealthy returns true if observer is currently healthy
func (*HealthChecker) LastCheck ¶
func (h *HealthChecker) LastCheck() time.Time
LastCheck returns the time of the last health check
func (*HealthChecker) MarkHealthy ¶
func (h *HealthChecker) MarkHealthy()
MarkHealthy marks the observer as healthy
func (*HealthChecker) MarkUnhealthy ¶
func (h *HealthChecker) MarkUnhealthy(reason string)
MarkUnhealthy marks the observer as unhealthy with a reason
func (*HealthChecker) Run ¶
func (h *HealthChecker) Run(ctx context.Context)
Run starts the health checker and blocks until context is cancelled
type HealthConfig ¶
type HealthConfig struct {
// Enabled turns health checking on/off
Enabled bool
// CheckInterval is how often to run health checks
CheckInterval time.Duration
}
HealthConfig defines health checking behavior.
type HealthStatus ¶
HealthStatus represents the current health status
type K8sRuntime ¶
type K8sRuntime struct {
// contains filtered or unexported fields
}
K8sRuntime manages Kubernetes informer lifecycle
func (*K8sRuntime) AddInformer ¶
func (r *K8sRuntime) AddInformer(informer cache.SharedIndexInformer, handlers cache.ResourceEventHandlerFuncs) error
AddInformer registers an informer with event handlers
func (*K8sRuntime) Start ¶
func (r *K8sRuntime) Start(ctx context.Context) error
Start starts all registered informers
func (*K8sRuntime) WaitForCacheSync ¶
func (r *K8sRuntime) WaitForCacheSync(ctx context.Context) error
WaitForCacheSync waits for all informers to sync
type MetricsConfig ¶
type MetricsConfig struct {
// Enabled turns metrics collection on/off
Enabled bool
// AllowedLabels defines which labels to include in metrics
// Use this to control cardinality
// Example: ["namespace", "observer_type"] but NOT ["pod_name", "container_id"]
AllowedLabels []string
}
MetricsConfig defines metrics collection behavior.
type ObserverRuntime ¶
type ObserverRuntime struct {
// contains filtered or unexported fields
}
ObserverRuntime is the unified infrastructure for all observers.
func NewObserverRuntime ¶
func NewObserverRuntime(processor EventProcessor, opts ...Option) (*ObserverRuntime, error)
NewObserverRuntime creates a new runtime with the given processor and options.
func (*ObserverRuntime) CausalityTracker ¶
func (r *ObserverRuntime) CausalityTracker() *base.CausalityTracker
CausalityTracker returns the causality tracker for this runtime. Observers can use this to query parent spans and build causality chains.
func (*ObserverRuntime) GetParentSpanForEntity ¶
func (r *ObserverRuntime) GetParentSpanForEntity(entityID string) string
GetParentSpanForEntity returns the parent span ID for a given entity. Entity ID format: "namespace/name" for K8s resources, "ip:port" for network endpoints. Returns empty string if no parent span is tracked for this entity.
func (*ObserverRuntime) IsHealthy ¶
func (r *ObserverRuntime) IsHealthy() bool
IsHealthy returns true if runtime is running
func (*ObserverRuntime) ProcessEvent ¶
func (r *ObserverRuntime) ProcessEvent(ctx context.Context, rawEvent []byte) error
ProcessEvent processes a raw event through the processor and emits to emitters
type Option ¶
type Option func(*ObserverRuntime)
Option is a functional option for configuring ObserverRuntime
func WithEmitters ¶
func WithEmitters(emitters ...domain.EventEmitter) Option
WithEmitters configures the event emitters to use for event emission. All observers emit through domain.EventEmitter (the universal gateway).
func WithSamplingDisabled ¶
func WithSamplingDisabled() Option
WithSamplingDisabled disables event sampling (useful for tests)
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts before giving up
MaxAttempts int
// InitialDelay before first retry
InitialDelay time.Duration
// MaxDelay caps the retry delay
MaxDelay time.Duration
// Multiplier for exponential backoff
Multiplier float64
}
RetryConfig defines retry behavior for FailPolicyRestart.
type Sampler ¶
type Sampler struct {
// contains filtered or unexported fields
}
Sampler implements event sampling based on configuration rules. Thread-safe for concurrent use.
func NewSampler ¶
func NewSampler(config SamplingConfig) *Sampler
NewSampler creates a new sampler with the given configuration
func (*Sampler) ShouldSample ¶
func (s *Sampler) ShouldSample(event *domain.ObserverEvent) bool
ShouldSample determines if an event should be kept or dropped. Returns true if event should be kept, false if it should be sampled out. Thread-safe: Can be called concurrently from multiple goroutines.
type SamplingConfig ¶
type SamplingConfig struct {
// Enabled turns sampling on/off
Enabled bool
// DefaultRate is the default sample rate (0.0 - 1.0)
// 0.1 = 10% sampling, 1.0 = keep all events
DefaultRate float64
// Rules define type-specific sampling overrides
Rules []SamplingRule
}
SamplingConfig defines event sampling behavior.
type SamplingRule ¶
type SamplingRule struct {
// EventType to match (e.g., "network", "node")
EventType string
// Subtype to match (e.g., "dns_query", "link_failure")
// Empty string matches all subtypes
Subtype string
// Rate override for this type (0.0 - 1.0)
// Ignored if KeepAll is true
Rate float64
// KeepAll keeps all events of this type (100% sampling)
// Use for critical events like failures
KeepAll bool
}
SamplingRule defines sampling for a specific event type.
func (SamplingRule) Matches ¶
func (r SamplingRule) Matches(eventType, subtype string) bool
Matches returns true if this rule matches the given event type/subtype.