activityprocessor

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2026 License: AGPL-3.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EventReasonEvaluationFailed is the reason for Warning events when CEL evaluation fails.
	EventReasonEvaluationFailed = "EvaluationFailed"
)

Variables

This section is empty.

Functions

func EvaluateCompiledAuditRules

func EvaluateCompiledAuditRules(
	policy *CompiledPolicy,
	auditMap map[string]any,
	audit *auditv1.Event,
	resolveKind processor.KindResolver,
) (*v1alpha1.Activity, int, error)

EvaluateCompiledAuditRules evaluates pre-compiled audit rules against an audit event. Returns the generated Activity, the matching rule index, and any error. Returns (nil, -1, nil) if no rule matched.

Types

type CompiledPolicy

type CompiledPolicy struct {
	// Name is the policy name.
	Name string
	// APIGroup is the target resource's API group.
	APIGroup string
	// Kind is the target resource's kind.
	Kind string
	// Resource is the plural resource name (for audit event matching).
	Resource string
	// AuditRules are the compiled audit rules.
	AuditRules []CompiledRule
	// EventRules are the compiled event rules.
	EventRules []CompiledRule
	// ResourceVersion is the policy's resource version for cache invalidation.
	ResourceVersion string
	// OriginalPolicy is the original policy for metrics and logging.
	OriginalPolicy *v1alpha1.ActivityPolicy
}

CompiledPolicy represents a pre-compiled ActivityPolicy ready for execution.

type CompiledRule

type CompiledRule struct {
	// Match is the original match expression.
	Match string
	// Summary is the original summary template.
	Summary string
	// MatchProgram is the pre-compiled CEL program for match evaluation.
	MatchProgram cel.Program
	// SummaryTemplates contains pre-compiled CEL programs for each template expression.
	SummaryTemplates []compiledTemplate
	// Valid indicates if the rule compiled successfully.
	Valid bool
	// CompileError holds any error from compilation.
	CompileError string
}

CompiledRule represents a pre-compiled policy rule ready for execution.

func (*CompiledRule) EvaluateAuditMatch

func (r *CompiledRule) EvaluateAuditMatch(auditMap map[string]any) (bool, error)

EvaluateAuditRules evaluates audit rules against an audit event using pre-compiled programs. Returns the index of the matching rule, the generated summary, and whether a match was found.

func (*CompiledRule) EvaluateEventMatch

func (r *CompiledRule) EvaluateEventMatch(eventMap map[string]any) (bool, error)

EvaluateEventMatch evaluates the match expression against a Kubernetes event.

func (*CompiledRule) EvaluateSummary

func (r *CompiledRule) EvaluateSummary(vars map[string]any) (string, error)

EvaluateSummary evaluates the summary template using pre-compiled programs.

type Config

type Config struct {
	// NATS configuration
	NATSURL        string
	NATSStreamName string // Source stream for audit events (e.g., "AUDIT_EVENTS")
	ConsumerName   string // Durable consumer name

	// Event stream configuration
	NATSEventStream   string // Source stream for Kubernetes events (e.g., "EVENTS")
	NATSEventConsumer string // Durable consumer name for event processor

	// Output NATS stream for generated activities
	OutputStreamName    string // Stream for publishing activities (e.g., "ACTIVITIES")
	OutputSubjectPrefix string // Subject prefix for activities (e.g., "activities")

	// NATS TLS/mTLS configuration
	NATSTLSEnabled  bool   // Enable TLS for NATS connection
	NATSTLSCertFile string // Path to client certificate file (for mTLS)
	NATSTLSKeyFile  string // Path to client private key file (for mTLS)
	NATSTLSCAFile   string // Path to CA certificate file for server verification

	// Dead-letter queue configuration
	DLQEnabled       bool   // Enable dead-letter queue for failed events
	DLQStreamName    string // NATS stream name for DLQ (e.g., "ACTIVITY_DEAD_LETTER")
	DLQSubjectPrefix string // Subject prefix for DLQ messages (e.g., "activity.dlq")

	// DLQ retry configuration
	DLQRetryEnabled           bool          // Enable automatic retry of DLQ events
	DLQRetryInterval          time.Duration // Interval between retry batches
	DLQRetryBatchSize         int           // Number of events to process per retry batch
	DLQRetryBackoffBase       time.Duration // Initial backoff duration
	DLQRetryBackoffMultiplier float64       // Exponential backoff multiplier
	DLQRetryBackoffMax        time.Duration // Maximum backoff duration
	DLQRetryAlertThreshold    int           // Retry count threshold for alerts
	DLQRetryAuditSubject      string        // Subject for republishing audit events from DLQ
	DLQRetryEventSubject      string        // Subject for republishing Kubernetes events from DLQ

	// Processing configuration
	Workers    int           // Number of concurrent workers
	BatchSize  int           // Messages to fetch per batch
	AckWait    time.Duration // Time before message redelivery
	MaxDeliver int           // Maximum redelivery attempts

	// Health probe configuration
	HealthProbeAddr string // Address for health probe server (e.g., ":8081")

}

Config contains configuration for the activity processor.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns configuration with default values.

type DLQRetryConfig

type DLQRetryConfig struct {
	// Enabled controls whether automatic retry is enabled.
	Enabled bool
	// Interval is how often to check for retry-eligible events.
	Interval time.Duration
	// BatchSize is how many events to process per batch.
	BatchSize int
	// BackoffBase is the initial backoff duration.
	BackoffBase time.Duration
	// BackoffMultiplier is the exponential multiplier (typically 2.0).
	BackoffMultiplier float64
	// BackoffMax is the maximum backoff duration.
	BackoffMax time.Duration
	// AlertThreshold triggers metrics when retry count exceeds this.
	AlertThreshold int
	// AuditRetrySubject is the subject to republish audit events to.
	AuditRetrySubject string
	// EventRetrySubject is the subject to republish Kubernetes events to.
	EventRetrySubject string
}

DLQRetryConfig holds configuration for the DLQ retry controller.

func DefaultDLQRetryConfig

func DefaultDLQRetryConfig() DLQRetryConfig

DefaultDLQRetryConfig returns sensible defaults for DLQ retry.

type DLQRetryController

type DLQRetryController struct {
	// contains filtered or unexported fields
}

DLQRetryController manages automatic retry of dead-letter queue events.

func NewDLQRetryController

func NewDLQRetryController(
	js nats.JetStreamContext,
	config DLQRetryConfig,
	auditStreamName string,
	eventStreamName string,
	dlqStreamName string,
	dlqSubjectPrefix string,
	evaluator RetryEvaluator,
) *DLQRetryController

NewDLQRetryController creates a new DLQ retry controller. evaluator may be nil, in which case retries are reported as "republished" rather than "succeeded".

func (*DLQRetryController) RetryForPolicy

func (c *DLQRetryController) RetryForPolicy(ctx context.Context, policy *v1alpha1.ActivityPolicy)

RetryForPolicy triggers immediate retry for events that match a specific policy. This is called when an ActivityPolicy is updated.

func (*DLQRetryController) Start

func (c *DLQRetryController) Start(ctx context.Context) error

Start begins the retry controller with periodic retry.

type EventEmitter

type EventEmitter struct {
	// contains filtered or unexported fields
}

EventEmitter emits Kubernetes Warning events for policy evaluation failures.

func NewEventEmitter

func NewEventEmitter(client client.Client, recorder record.EventRecorder) *EventEmitter

NewEventEmitter creates a new event emitter.

func (*EventEmitter) EmitEvaluationError

func (e *EventEmitter) EmitEvaluationError(ctx context.Context, policyName string, ruleIndex int, err error)

EmitEvaluationError emits a Kubernetes Warning event for an evaluation failure.

type PolicyCache

type PolicyCache struct {
	// contains filtered or unexported fields
}

PolicyCache provides thread-safe caching of pre-compiled ActivityPolicy resources.

func NewPolicyCache

func NewPolicyCache() *PolicyCache

NewPolicyCache creates a new policy cache.

func (*PolicyCache) Add

func (c *PolicyCache) Add(policy *v1alpha1.ActivityPolicy, resource string) error

Add compiles and adds a policy to the cache.

func (*PolicyCache) Get

func (c *PolicyCache) Get(apiGroup, resource string) []*CompiledPolicy

Get returns compiled policies for a given apiGroup and resource.

func (*PolicyCache) GetByKind

func (c *PolicyCache) GetByKind(apiGroup, kind string) []*CompiledPolicy

GetByKind returns compiled policies for a given apiGroup and kind. Used by event processing since events reference Kind not Resource.

func (*PolicyCache) Len

func (c *PolicyCache) Len() int

Len returns the total number of policies in the cache.

func (*PolicyCache) MatchEvent

func (c *PolicyCache) MatchEvent(apiGroup, kind string, eventMap map[string]any) (*processor.MatchedPolicy, error)

MatchEvent implements processor.EventPolicyLookup. It looks up matching event rules for the given apiGroup/kind and evaluates them against the provided event map. Returns the first matching result, or nil if no policy matched.

func (*PolicyCache) Remove

func (c *PolicyCache) Remove(policy *v1alpha1.ActivityPolicy, resource string)

Remove removes a policy from the cache.

func (*PolicyCache) Update

func (c *PolicyCache) Update(oldPolicy, newPolicy *v1alpha1.ActivityPolicy, oldResource, newResource string) error

Update removes the old policy and adds the new one.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor consumes audit events from NATS, evaluates ActivityPolicies, and publishes Activity resources to NATS for downstream consumption.

func New

func New(config Config, restConfig *rest.Config) (*Processor, error)

New creates a new activity processor.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start begins processing audit events.

func (*Processor) Stop

func (p *Processor) Stop()

Stop gracefully shuts down the processor.

type RetryEvaluator added in v0.7.1

type RetryEvaluator func(ctx context.Context, event *processor.DeadLetterEvent) RetryOutcome

RetryEvaluator re-runs policy evaluation against a dead-letter event's original payload so the retry worker can tell "republished to NATS" apart from "successfully processed". A nil evaluator falls back to publish-ACK accounting.

type RetryOutcome added in v0.7.1

type RetryOutcome struct {
	// Resolved is true when re-evaluation succeeded (no error). Only resolved
	// events are republished onto the source stream to generate an activity.
	Resolved bool
	// ErrorType classifies the failure when Resolved is false, for metrics.
	ErrorType processor.ErrorType
	// Err is the underlying re-evaluation error when Resolved is false.
	Err error
}

RetryOutcome describes the result of re-evaluating a dead-letter event in place.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL