Documentation
¶
Index ¶
- Constants
- func EvaluateCompiledAuditRules(policy *CompiledPolicy, auditMap map[string]any, audit *auditv1.Event, ...) (*v1alpha1.Activity, int, error)
- type CompiledPolicy
- type CompiledRule
- type Config
- type DLQRetryConfig
- type DLQRetryController
- type EventEmitter
- type PolicyCache
- func (c *PolicyCache) Add(policy *v1alpha1.ActivityPolicy, resource string) error
- func (c *PolicyCache) Get(apiGroup, resource string) []*CompiledPolicy
- func (c *PolicyCache) GetByKind(apiGroup, kind string) []*CompiledPolicy
- func (c *PolicyCache) Len() int
- func (c *PolicyCache) MatchEvent(apiGroup, kind string, eventMap map[string]any) (*processor.MatchedPolicy, error)
- func (c *PolicyCache) Remove(policy *v1alpha1.ActivityPolicy, resource string)
- func (c *PolicyCache) Update(oldPolicy, newPolicy *v1alpha1.ActivityPolicy, oldResource, newResource string) error
- type Processor
- type RetryEvaluator
- type RetryOutcome
Constants ¶
const (
// EventReasonEvaluationFailed is the reason for Warning events when CEL evaluation fails.
EventReasonEvaluationFailed = "EvaluationFailed"
)
Variables ¶
This section is empty.
Functions ¶
func EvaluateCompiledAuditRules ¶
func EvaluateCompiledAuditRules( policy *CompiledPolicy, auditMap map[string]any, audit *auditv1.Event, resolveKind processor.KindResolver, ) (*v1alpha1.Activity, int, error)
EvaluateCompiledAuditRules evaluates pre-compiled audit rules against an audit event. Returns the generated Activity, the matching rule index, and any error. Returns (nil, -1, nil) if no rule matched.
Types ¶
type CompiledPolicy ¶
type CompiledPolicy struct {
// Name is the policy name.
Name string
// APIGroup is the target resource's API group.
APIGroup string
// Kind is the target resource's kind.
Kind string
// Resource is the plural resource name (for audit event matching).
Resource string
// AuditRules are the compiled audit rules.
AuditRules []CompiledRule
// EventRules are the compiled event rules.
EventRules []CompiledRule
// ResourceVersion is the policy's resource version for cache invalidation.
ResourceVersion string
// OriginalPolicy is the original policy for metrics and logging.
OriginalPolicy *v1alpha1.ActivityPolicy
}
CompiledPolicy represents a pre-compiled ActivityPolicy ready for execution.
type CompiledRule ¶
type CompiledRule struct {
// Match is the original match expression.
Match string
// Summary is the original summary template.
Summary string
// MatchProgram is the pre-compiled CEL program for match evaluation.
MatchProgram cel.Program
// SummaryTemplates contains pre-compiled CEL programs for each template expression.
SummaryTemplates []compiledTemplate
// Valid indicates if the rule compiled successfully.
Valid bool
// CompileError holds any error from compilation.
CompileError string
}
CompiledRule represents a pre-compiled policy rule ready for execution.
func (*CompiledRule) EvaluateAuditMatch ¶
func (r *CompiledRule) EvaluateAuditMatch(auditMap map[string]any) (bool, error)
EvaluateAuditRules evaluates audit rules against an audit event using pre-compiled programs. Returns the index of the matching rule, the generated summary, and whether a match was found.
func (*CompiledRule) EvaluateEventMatch ¶
func (r *CompiledRule) EvaluateEventMatch(eventMap map[string]any) (bool, error)
EvaluateEventMatch evaluates the match expression against a Kubernetes event.
func (*CompiledRule) EvaluateSummary ¶
func (r *CompiledRule) EvaluateSummary(vars map[string]any) (string, error)
EvaluateSummary evaluates the summary template using pre-compiled programs.
type Config ¶
type Config struct {
// NATS configuration
NATSURL string
NATSStreamName string // Source stream for audit events (e.g., "AUDIT_EVENTS")
ConsumerName string // Durable consumer name
// Event stream configuration
NATSEventStream string // Source stream for Kubernetes events (e.g., "EVENTS")
NATSEventConsumer string // Durable consumer name for event processor
// Output NATS stream for generated activities
OutputStreamName string // Stream for publishing activities (e.g., "ACTIVITIES")
OutputSubjectPrefix string // Subject prefix for activities (e.g., "activities")
// NATS TLS/mTLS configuration
NATSTLSEnabled bool // Enable TLS for NATS connection
NATSTLSCertFile string // Path to client certificate file (for mTLS)
NATSTLSKeyFile string // Path to client private key file (for mTLS)
NATSTLSCAFile string // Path to CA certificate file for server verification
// Dead-letter queue configuration
DLQEnabled bool // Enable dead-letter queue for failed events
DLQStreamName string // NATS stream name for DLQ (e.g., "ACTIVITY_DEAD_LETTER")
DLQSubjectPrefix string // Subject prefix for DLQ messages (e.g., "activity.dlq")
// DLQ retry configuration
DLQRetryEnabled bool // Enable automatic retry of DLQ events
DLQRetryInterval time.Duration // Interval between retry batches
DLQRetryBatchSize int // Number of events to process per retry batch
DLQRetryBackoffBase time.Duration // Initial backoff duration
DLQRetryBackoffMultiplier float64 // Exponential backoff multiplier
DLQRetryBackoffMax time.Duration // Maximum backoff duration
DLQRetryAlertThreshold int // Retry count threshold for alerts
DLQRetryAuditSubject string // Subject for republishing audit events from DLQ
DLQRetryEventSubject string // Subject for republishing Kubernetes events from DLQ
// Processing configuration
Workers int // Number of concurrent workers
BatchSize int // Messages to fetch per batch
AckWait time.Duration // Time before message redelivery
MaxDeliver int // Maximum redelivery attempts
// Health probe configuration
HealthProbeAddr string // Address for health probe server (e.g., ":8081")
}
Config contains configuration for the activity processor.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns configuration with default values.
type DLQRetryConfig ¶
type DLQRetryConfig struct {
// Enabled controls whether automatic retry is enabled.
Enabled bool
// Interval is how often to check for retry-eligible events.
Interval time.Duration
// BatchSize is how many events to process per batch.
BatchSize int
// BackoffBase is the initial backoff duration.
BackoffBase time.Duration
// BackoffMultiplier is the exponential multiplier (typically 2.0).
BackoffMultiplier float64
// BackoffMax is the maximum backoff duration.
BackoffMax time.Duration
// AlertThreshold triggers metrics when retry count exceeds this.
AlertThreshold int
// AuditRetrySubject is the subject to republish audit events to.
AuditRetrySubject string
// EventRetrySubject is the subject to republish Kubernetes events to.
EventRetrySubject string
}
DLQRetryConfig holds configuration for the DLQ retry controller.
func DefaultDLQRetryConfig ¶
func DefaultDLQRetryConfig() DLQRetryConfig
DefaultDLQRetryConfig returns sensible defaults for DLQ retry.
type DLQRetryController ¶
type DLQRetryController struct {
// contains filtered or unexported fields
}
DLQRetryController manages automatic retry of dead-letter queue events.
func NewDLQRetryController ¶
func NewDLQRetryController( js nats.JetStreamContext, config DLQRetryConfig, auditStreamName string, eventStreamName string, dlqStreamName string, dlqSubjectPrefix string, evaluator RetryEvaluator, ) *DLQRetryController
NewDLQRetryController creates a new DLQ retry controller. evaluator may be nil, in which case retries are reported as "republished" rather than "succeeded".
func (*DLQRetryController) RetryForPolicy ¶
func (c *DLQRetryController) RetryForPolicy(ctx context.Context, policy *v1alpha1.ActivityPolicy)
RetryForPolicy triggers immediate retry for events that match a specific policy. This is called when an ActivityPolicy is updated.
type EventEmitter ¶
type EventEmitter struct {
// contains filtered or unexported fields
}
EventEmitter emits Kubernetes Warning events for policy evaluation failures.
func NewEventEmitter ¶
func NewEventEmitter(client client.Client, recorder record.EventRecorder) *EventEmitter
NewEventEmitter creates a new event emitter.
func (*EventEmitter) EmitEvaluationError ¶
func (e *EventEmitter) EmitEvaluationError(ctx context.Context, policyName string, ruleIndex int, err error)
EmitEvaluationError emits a Kubernetes Warning event for an evaluation failure.
type PolicyCache ¶
type PolicyCache struct {
// contains filtered or unexported fields
}
PolicyCache provides thread-safe caching of pre-compiled ActivityPolicy resources.
func (*PolicyCache) Add ¶
func (c *PolicyCache) Add(policy *v1alpha1.ActivityPolicy, resource string) error
Add compiles and adds a policy to the cache.
func (*PolicyCache) Get ¶
func (c *PolicyCache) Get(apiGroup, resource string) []*CompiledPolicy
Get returns compiled policies for a given apiGroup and resource.
func (*PolicyCache) GetByKind ¶
func (c *PolicyCache) GetByKind(apiGroup, kind string) []*CompiledPolicy
GetByKind returns compiled policies for a given apiGroup and kind. Used by event processing since events reference Kind not Resource.
func (*PolicyCache) Len ¶
func (c *PolicyCache) Len() int
Len returns the total number of policies in the cache.
func (*PolicyCache) MatchEvent ¶
func (c *PolicyCache) MatchEvent(apiGroup, kind string, eventMap map[string]any) (*processor.MatchedPolicy, error)
MatchEvent implements processor.EventPolicyLookup. It looks up matching event rules for the given apiGroup/kind and evaluates them against the provided event map. Returns the first matching result, or nil if no policy matched.
func (*PolicyCache) Remove ¶
func (c *PolicyCache) Remove(policy *v1alpha1.ActivityPolicy, resource string)
Remove removes a policy from the cache.
func (*PolicyCache) Update ¶
func (c *PolicyCache) Update(oldPolicy, newPolicy *v1alpha1.ActivityPolicy, oldResource, newResource string) error
Update removes the old policy and adds the new one.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor consumes audit events from NATS, evaluates ActivityPolicies, and publishes Activity resources to NATS for downstream consumption.
type RetryEvaluator ¶ added in v0.7.1
type RetryEvaluator func(ctx context.Context, event *processor.DeadLetterEvent) RetryOutcome
RetryEvaluator re-runs policy evaluation against a dead-letter event's original payload so the retry worker can tell "republished to NATS" apart from "successfully processed". A nil evaluator falls back to publish-ACK accounting.
type RetryOutcome ¶ added in v0.7.1
type RetryOutcome struct {
// Resolved is true when re-evaluation succeeded (no error). Only resolved
// events are republished onto the source stream to generate an activity.
Resolved bool
// ErrorType classifies the failure when Resolved is false, for metrics.
ErrorType processor.ErrorType
// Err is the underlying re-evaluation error when Resolved is false.
Err error
}
RetryOutcome describes the result of re-evaluating a dead-letter event in place.