reconciler

package
v0.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package reconciler provides a unified reconciliation system for muster resources.

Overview

The reconciler package implements automatic change detection and reconciliation for both Kubernetes CRDs and filesystem-based YAML configurations. It ensures that the actual state of muster resources matches the desired state defined in configuration files or Kubernetes custom resources.

Architecture

The reconciliation system consists of several key components:

  • ReconcileManager: Central coordinator that manages all reconcilers
  • Reconciler: Interface for resource-specific reconciliation logic
  • ChangeDetector: Interface for detecting changes in resource sources
  • ReconcileLoop: Generic reconciliation loop with retry and backoff

The system supports two modes of operation:

  • Kubernetes Mode: Uses informers and controllers for CRD changes
  • Filesystem Mode: Uses fsnotify for watching YAML file changes

Usage

The reconciliation system is automatically integrated with the muster application bootstrap process. It starts watching for changes when the application starts and stops when the application shuts down.

Example usage:

manager := reconciler.NewManager(config)
if err := manager.Start(ctx); err != nil {
    return fmt.Errorf("failed to start reconciliation: %w", err)
}
defer manager.Stop()

Resource Types

The following resource types are supported for reconciliation:

  • MCPServer: MCP server definitions and lifecycle management
  • Workflow: Workflow definitions for multi-step operations

Event Integration

The reconciliation system integrates with muster's existing event system to provide notifications about reconciliation activities. Events are generated for successful reconciliations, failures, and retries.

Performance Considerations

The system implements several optimizations:

  • Debouncing: Multiple rapid changes are batched together
  • Efficient watching: Uses informers for Kubernetes, fsnotify for files
  • Backoff: Failed reconciliations use exponential backoff
  • Rate limiting: Prevents overwhelming the system with rapid changes

Index

Constants

View Source
const (
	// ServiceStateStopped indicates the service is not running.
	ServiceStateStopped = "stopped"

	// ServiceHealthUnknown indicates the health status is unknown.
	ServiceHealthUnknown = "unknown"
)

Service state constants for status syncing. These are used when a service doesn't exist or has no state.

View Source
const DefaultNamespace = "default"

DefaultNamespace is the default namespace for Kubernetes resources.

View Source
const DefaultStatusSyncInterval = 30 * time.Second

DefaultStatusSyncInterval is how often to requeue for periodic status sync. This ensures status is eventually consistent even if state change events are missed.

## Purpose

Reconcilers use this interval to schedule periodic re-reconciliation of resources. This implements the "level-triggered" reconciliation pattern from Kubernetes, where we periodically check that desired state matches actual state, rather than relying solely on "edge-triggered" events.

## Tuning Considerations

  • **Shorter intervals** (e.g., 10s): More responsive status updates, but higher API server load and more reconciliation overhead.
  • **Longer intervals** (e.g., 60s): Lower load, but status may be stale longer if state change events are missed.

## Default Value

The default of 30 seconds provides a good balance between:

  • Responsiveness: Status is refreshed at least every 30 seconds
  • Efficiency: Low enough frequency to avoid overwhelming the API server
  • Eventual consistency: Missed events are recovered within 30 seconds

## Performance Impact

For a deployment with N resources, this generates approximately:

  • N / 30 = reconciliations per second (e.g., 100 resources = ~3.3/s)
  • Each reconciliation involves: 1 Get + 1 Status Update to the API server

## Customization

To customize this interval, you can:

  1. Define a custom reconciler with a different interval
  2. Set RequeueAfter explicitly in your Reconcile() method

Note: This constant is used by MCPServerReconciler for periodic status sync. The Workflow reconciler doesn't currently use periodic requeue as it primarily manages static definitions.

View Source
const FailureLogBackoffTimeout = 5 * time.Minute

FailureLogBackoffTimeout is the maximum time between log entries for persistent failures. Even if a resource is continuously failing, we'll log at least once every this duration to ensure operators are aware of ongoing issues.

Variables

View Source
var StatusSyncRetryBackoff = retry.DefaultRetry

StatusSyncRetryBackoff is the retry backoff configuration for status updates. It uses an aggressive retry strategy since status updates are idempotent and conflicts are expected during high-frequency reconciliation.

ValidResourceTypes is the set of all valid resource types. Used for input validation when accepting resource types from external sources.

Functions

func CategorizeStatusSyncError

func CategorizeStatusSyncError(err error) string

CategorizeStatusSyncError returns a descriptive reason for a status sync error. This provides actionable information for metrics and debugging, categorizing errors into meaningful buckets rather than using a generic "update_status_failed".

Categories:

  • "conflict_after_retries": Optimistic locking failed even after retries
  • "crd_not_found": The CRD resource doesn't exist
  • "api_server_unreachable": Network connectivity issues to API server
  • "timeout": Request timed out or context deadline exceeded
  • "permission_denied": RBAC or authorization failure
  • "authentication_failed": Authentication/token issues
  • "update_status_failed": Generic fallback for other errors
  • "unknown": Nil error (shouldn't happen but handles edge case)

func GetRestConfig

func GetRestConfig() (*rest.Config, error)

GetRestConfig returns the REST config for creating a Kubernetes detector. This is a convenience function that uses controller-runtime's config detection.

func IsConflictError

func IsConflictError(err error) bool

IsConflictError returns true if the error is a Kubernetes conflict error. Conflict errors occur when the resource was modified since it was read, indicating the resource version is stale (optimistic locking failure).

func IsKubernetesAvailable

func IsKubernetesAvailable() bool

IsKubernetesAvailable checks if Kubernetes cluster access is available.

func IsNotFoundError

func IsNotFoundError(err error) bool

IsNotFoundError checks if an error indicates a resource was not found. It checks for Kubernetes NotFound errors first, then falls back to case-insensitive string matching for common "not found" patterns.

func IsValidResourceType

func IsValidResourceType(resourceType string) bool

IsValidResourceType checks if a resource type string is valid. This should be used when accepting resource type input from APIs or external sources.

func NewDelayedQueue

func NewDelayedQueue() *delayedQueue

NewDelayedQueue creates a queue that supports delayed requeuing.

func SanitizeErrorMessage

func SanitizeErrorMessage(errMsg string) string

SanitizeErrorMessage sanitizes an error message for external exposure. It removes potentially sensitive information like absolute file paths, credentials, and internal implementation details.

This should be used when error messages are exposed via APIs or stored in CRD status fields that may be visible to users.

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter wraps the ReconcileManager and provides API registration.

func NewAdapter

func NewAdapter(manager *Manager) *Adapter

NewAdapter creates a new reconciler API adapter.

func (*Adapter) DisableResourceType

func (a *Adapter) DisableResourceType(resourceType string)

DisableResourceType disables reconciliation for a specific resource type. Does nothing if the resource type is invalid.

func (*Adapter) EnableResourceType

func (a *Adapter) EnableResourceType(resourceType string)

EnableResourceType enables reconciliation for a specific resource type. Does nothing if the resource type is invalid.

func (*Adapter) GetAllStatuses

func (a *Adapter) GetAllStatuses() []api.ReconcileStatusInfo

GetAllStatuses returns all reconciliation statuses. Implements api.ReconcileManagerHandler interface.

func (*Adapter) GetEnabledResourceTypes

func (a *Adapter) GetEnabledResourceTypes() []string

GetEnabledResourceTypes returns the list of resource types with reconciliation enabled. Implements api.ReconcileManagerHandler interface.

func (*Adapter) GetManager

func (a *Adapter) GetManager() *Manager

GetManager returns the underlying reconcile manager.

func (*Adapter) GetOverview

func (a *Adapter) GetOverview() *api.ReconcileOverview

GetOverview returns a comprehensive overview of the reconciliation system status.

func (*Adapter) GetQueueLength

func (a *Adapter) GetQueueLength() int

GetQueueLength returns the current reconciliation queue length. Implements api.ReconcileManagerHandler interface.

func (*Adapter) GetStatus

func (a *Adapter) GetStatus(resourceType, name, namespace string) (*api.ReconcileStatusInfo, bool)

GetStatus returns the reconciliation status for a resource. Implements api.ReconcileManagerHandler interface.

func (*Adapter) GetWatchMode

func (a *Adapter) GetWatchMode() string

GetWatchMode returns the current watch mode (kubernetes/filesystem). Implements api.ReconcileManagerHandler interface.

func (*Adapter) IsResourceTypeEnabled

func (a *Adapter) IsResourceTypeEnabled(resourceType string) bool

IsResourceTypeEnabled checks if reconciliation is enabled for a resource type. Returns false for invalid resource types.

func (*Adapter) IsRunning

func (a *Adapter) IsRunning() bool

IsRunning returns whether the reconciliation manager is running. Implements api.ReconcileManagerHandler interface.

func (*Adapter) Register

func (a *Adapter) Register()

Register registers the reconciler with the API layer. Note: The reconciler doesn't currently expose tools through the aggregator, but this provides a consistent pattern for future expansion.

func (*Adapter) TriggerReconcile

func (a *Adapter) TriggerReconcile(resourceType, name, namespace string)

TriggerReconcile manually triggers reconciliation for a resource. Implements api.ReconcileManagerHandler interface. Does nothing if the resource type is invalid.

type BaseStatusConfig

type BaseStatusConfig struct {
	// StatusUpdater provides access to update CRD status (optional)
	StatusUpdater StatusUpdater

	// Namespace is the namespace to use for status updates
	Namespace string
}

BaseStatusConfig holds common configuration for status updates. This is used by reconcilers that sync status back to CRDs.

func (*BaseStatusConfig) GetNamespace

func (c *BaseStatusConfig) GetNamespace(reqNamespace string) string

GetNamespace returns the namespace to use, falling back to the default.

func (*BaseStatusConfig) SetStatusUpdater

func (c *BaseStatusConfig) SetStatusUpdater(updater StatusUpdater, namespace string)

SetStatusUpdater sets the status updater and namespace.

type ChangeDetector

type ChangeDetector interface {
	// Start begins watching for changes.
	// The detector should send change events to the provided channel.
	Start(ctx context.Context, changes chan<- ChangeEvent) error

	// Stop gracefully stops the change detector.
	Stop() error

	// GetSource returns the source type this detector monitors.
	GetSource() ChangeSource

	// AddResourceType adds a resource type to watch.
	// This allows dynamic configuration of which resources to monitor.
	AddResourceType(resourceType ResourceType) error

	// RemoveResourceType removes a resource type from watching.
	RemoveResourceType(resourceType ResourceType) error
}

ChangeDetector is the interface for components that detect changes in resources.

Different implementations exist for filesystem watching and Kubernetes informers.

type ChangeEvent

type ChangeEvent struct {
	// Type is the type of resource that changed.
	Type ResourceType

	// Name is the name of the resource that changed.
	Name string

	// Namespace is the Kubernetes namespace (empty for filesystem mode).
	Namespace string

	// Operation describes what kind of change occurred.
	Operation ChangeOperation

	// Timestamp is when the change was detected.
	Timestamp time.Time

	// Source indicates where the change came from.
	Source ChangeSource

	// FilePath is the path to the file that changed (filesystem mode only).
	FilePath string
}

ChangeEvent represents a detected change in a resource.

type ChangeOperation

type ChangeOperation string

ChangeOperation represents the type of change detected.

const (
	// OperationCreate indicates a new resource was created.
	OperationCreate ChangeOperation = "Create"

	// OperationUpdate indicates an existing resource was modified.
	OperationUpdate ChangeOperation = "Update"

	// OperationDelete indicates a resource was deleted.
	OperationDelete ChangeOperation = "Delete"
)

type ChangeSource

type ChangeSource string

ChangeSource indicates where a change originated.

const (
	// SourceFilesystem indicates the change came from filesystem watching.
	SourceFilesystem ChangeSource = "Filesystem"

	// SourceKubernetes indicates the change came from Kubernetes informers.
	SourceKubernetes ChangeSource = "Kubernetes"

	// SourceManual indicates the change was triggered manually (e.g., API call).
	SourceManual ChangeSource = "Manual"

	// SourceServiceState indicates the change came from a service state change.
	// This is used when runtime state changes (e.g., service crashes, health check fails)
	// trigger reconciliation to sync status back to the CRD.
	SourceServiceState ChangeSource = "ServiceState"
)

type FilesystemDetector

type FilesystemDetector struct {
	// contains filtered or unexported fields
}

FilesystemDetector implements ChangeDetector for filesystem-based configurations.

It uses fsnotify to watch for changes in YAML configuration files and generates change events when files are created, modified, or deleted.

func NewFilesystemDetector

func NewFilesystemDetector(basePath string, debounceInterval time.Duration) *FilesystemDetector

NewFilesystemDetector creates a new filesystem change detector.

func (*FilesystemDetector) AddResourceType

func (d *FilesystemDetector) AddResourceType(resourceType ResourceType) error

AddResourceType adds a resource type to watch.

func (*FilesystemDetector) GetSource

func (d *FilesystemDetector) GetSource() ChangeSource

GetSource returns the change source type.

func (*FilesystemDetector) RemoveResourceType

func (d *FilesystemDetector) RemoveResourceType(resourceType ResourceType) error

RemoveResourceType removes a resource type from watching.

func (*FilesystemDetector) Start

func (d *FilesystemDetector) Start(ctx context.Context, changes chan<- ChangeEvent) error

Start begins watching for filesystem changes.

func (*FilesystemDetector) Stop

func (d *FilesystemDetector) Stop() error

Stop gracefully stops the filesystem detector.

type KubernetesDetector

type KubernetesDetector struct {
	// contains filtered or unexported fields
}

KubernetesDetector implements ChangeDetector using controller-runtime informers.

It watches muster CRDs (MCPServer, Workflow) via Kubernetes informers and generates change events when resources are created, updated, or deleted.

This detector provides native Kubernetes integration with proper event handling, caching, and efficient watch-based change detection.

func NewKubernetesDetector

func NewKubernetesDetector(restConfig *rest.Config, namespace string) (*KubernetesDetector, error)

NewKubernetesDetector creates a new Kubernetes change detector.

Args:

  • restConfig: Kubernetes REST configuration for API access
  • namespace: Namespace to watch (empty string watches all namespaces)

Returns:

  • *KubernetesDetector: The configured detector
  • error: Error if scheme registration fails

func (*KubernetesDetector) AddResourceType

func (d *KubernetesDetector) AddResourceType(resourceType ResourceType) error

AddResourceType adds a resource type to watch.

func (*KubernetesDetector) GetSource

func (d *KubernetesDetector) GetSource() ChangeSource

GetSource returns the change source type.

func (*KubernetesDetector) RemoveResourceType

func (d *KubernetesDetector) RemoveResourceType(resourceType ResourceType) error

RemoveResourceType removes a resource type from watching.

func (*KubernetesDetector) Start

func (d *KubernetesDetector) Start(ctx context.Context, changes chan<- ChangeEvent) error

Start begins watching for Kubernetes resource changes.

func (*KubernetesDetector) Stop

func (d *KubernetesDetector) Stop() error

Stop gracefully stops the Kubernetes detector.

type MCPServerManager

type MCPServerManager interface {
	ListMCPServers() []api.MCPServerInfo
	GetMCPServer(name string) (*api.MCPServerInfo, error)
}

MCPServerManager is an interface for accessing MCPServer definitions. This is an alias for the api.MCPServerManagerHandler interface.

type MCPServerReconciler

type MCPServerReconciler struct {
	BaseStatusConfig
	// contains filtered or unexported fields
}

MCPServerReconciler reconciles MCPServer resources.

It ensures that MCPServer definitions (from CRDs or YAML files) are synchronized with the running services managed by the orchestrator.

Reconciliation logic:

  • Create: Register and start a new MCPServer service
  • Update: Update the service configuration and restart if needed
  • Delete: Stop and unregister the MCPServer service

After each reconciliation, the reconciler syncs the service state back to the CRD's Status field. See ADR 007 for details.

func NewMCPServerReconciler

func NewMCPServerReconciler(
	orchestratorAPI api.OrchestratorAPI,
	mcpServerManager MCPServerManager,
	serviceRegistry api.ServiceRegistryHandler,
) *MCPServerReconciler

NewMCPServerReconciler creates a new MCPServer reconciler.

func (*MCPServerReconciler) GetResourceType

func (r *MCPServerReconciler) GetResourceType() ResourceType

GetResourceType returns the resource type this reconciler handles.

func (*MCPServerReconciler) Reconcile

Reconcile processes a single MCPServer reconciliation request.

After successful reconciliation, this returns RequeueAfter to enable periodic status sync. This ensures that runtime state changes (service crashes, health check failures, etc.) are eventually reflected in the CRD status even if state change events are missed.

func (*MCPServerReconciler) WithStatusUpdater

func (r *MCPServerReconciler) WithStatusUpdater(updater StatusUpdater, namespace string) *MCPServerReconciler

WithStatusUpdater sets the status updater for syncing status back to CRDs.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates all reconciliation activities.

It manages:

  • Change detectors (filesystem/Kubernetes)
  • Resource-specific reconcilers
  • Work queue and worker pool
  • Retry logic with exponential backoff

func NewManager

func NewManager(config ManagerConfig) *Manager

NewManager creates a new reconciliation manager.

func (*Manager) DisableResourceType

func (m *Manager) DisableResourceType(resourceType ResourceType)

DisableResourceType disables reconciliation for a specific resource type.

func (*Manager) EnableResourceType

func (m *Manager) EnableResourceType(resourceType ResourceType)

EnableResourceType enables reconciliation for a specific resource type.

func (*Manager) GetAllStatuses

func (m *Manager) GetAllStatuses() []ReconcileStatus

GetAllStatuses returns all reconciliation statuses.

func (*Manager) GetEnabledResourceTypes

func (m *Manager) GetEnabledResourceTypes() []string

GetEnabledResourceTypes returns the list of resource types with reconciliation enabled.

func (*Manager) GetQueueLength

func (m *Manager) GetQueueLength() int

GetQueueLength returns the current queue length.

func (*Manager) GetStatus

func (m *Manager) GetStatus(resourceType ResourceType, name, namespace string) (*ReconcileStatus, bool)

GetStatus returns the reconciliation status for a resource.

func (*Manager) GetWatchMode

func (m *Manager) GetWatchMode() string

GetWatchMode returns the current watch mode.

func (*Manager) IsResourceTypeEnabled

func (m *Manager) IsResourceTypeEnabled(resourceType ResourceType) bool

IsResourceTypeEnabled checks if reconciliation is enabled for a resource type.

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns whether the manager is running.

func (*Manager) RegisterReconciler

func (m *Manager) RegisterReconciler(reconciler Reconciler) error

RegisterReconciler registers a reconciler for a specific resource type.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start begins the reconciliation system.

func (*Manager) Stop

func (m *Manager) Stop() error

Stop gracefully shuts down the reconciliation manager.

func (*Manager) TriggerReconcile

func (m *Manager) TriggerReconcile(resourceType ResourceType, name, namespace string)

TriggerReconcile manually triggers reconciliation for a resource.

type ManagerConfig

type ManagerConfig struct {
	// Mode specifies whether to use Kubernetes or filesystem watching.
	// If empty, the system will auto-detect based on available resources.
	Mode WatchMode

	// FilesystemPath is the base path for filesystem watching.
	// Only used when Mode is WatchModeFilesystem.
	FilesystemPath string

	// Namespace is the Kubernetes namespace to watch.
	// Only used when Mode is WatchModeKubernetes.
	Namespace string

	// WorkerCount is the number of concurrent reconciliation workers.
	// Defaults to 2 if not specified.
	WorkerCount int

	// MaxRetries is the maximum number of retry attempts for failed reconciliations.
	// Defaults to 5 if not specified.
	MaxRetries int

	// InitialBackoff is the initial backoff duration for retries.
	// Defaults to 1 second if not specified.
	InitialBackoff time.Duration

	// MaxBackoff is the maximum backoff duration for retries.
	// Defaults to 5 minutes if not specified.
	MaxBackoff time.Duration

	// DebounceInterval is how long to wait for additional changes before reconciling.
	// Defaults to 500ms if not specified.
	DebounceInterval time.Duration

	// ReconcileTimeout is the maximum duration for a single reconciliation operation.
	// If a reconciler takes longer than this, the context will be cancelled.
	// Defaults to 30 seconds if not specified.
	ReconcileTimeout time.Duration

	// Debug enables debug logging for reconciliation operations.
	Debug bool

	// DisabledResourceTypes is a set of resource types that should not be reconciled.
	// This allows selective disabling of reconciliation for specific resource types.
	// Empty or nil means all registered resource types are enabled.
	DisabledResourceTypes map[ResourceType]bool
}

ManagerConfig holds configuration for the ReconcileManager.

type ReconcileQueue

type ReconcileQueue interface {
	// Add adds a request to the queue.
	// If the same resource is already queued, the existing entry is updated.
	Add(req ReconcileRequest)

	// Get retrieves the next request from the queue.
	// Blocks until a request is available or the context is cancelled.
	Get(ctx context.Context) (ReconcileRequest, bool)

	// Done marks a request as processed.
	// This should be called after processing to enable rate limiting.
	Done(req ReconcileRequest)

	// Len returns the current queue length.
	Len() int

	// Shutdown signals the queue to stop accepting new items.
	Shutdown()
}

ReconcileQueue represents a queue of resources awaiting reconciliation.

func NewQueue

func NewQueue() ReconcileQueue

NewQueue creates a new reconciliation queue.

type ReconcileRequest

type ReconcileRequest struct {
	// Type is the type of resource to reconcile.
	Type ResourceType

	// Name is the name of the resource.
	Name string

	// Namespace is the Kubernetes namespace (empty for filesystem mode).
	Namespace string

	// Attempt is the current retry attempt number (starts at 1).
	Attempt int

	// LastError is the error from the previous attempt, if any.
	LastError error
}

ReconcileRequest represents a request to reconcile a specific resource.

type ReconcileResult

type ReconcileResult struct {
	// Requeue indicates whether the resource should be requeued for retry.
	Requeue bool

	// RequeueAfter specifies when to requeue (0 means use default backoff).
	RequeueAfter time.Duration

	// Error is any error that occurred during reconciliation.
	Error error
}

ReconcileResult represents the outcome of a reconciliation attempt.

type ReconcileState

type ReconcileState string

ReconcileState represents the state of a resource's reconciliation.

const (
	// StatePending means the resource is awaiting reconciliation.
	StatePending ReconcileState = "Pending"

	// StateReconciling means reconciliation is in progress.
	StateReconciling ReconcileState = "Reconciling"

	// StateSynced means the resource is successfully reconciled.
	StateSynced ReconcileState = "Synced"

	// StateError means reconciliation failed and may be retried.
	StateError ReconcileState = "Error"

	// StateFailed means reconciliation failed permanently (max retries exceeded).
	StateFailed ReconcileState = "Failed"
)

type ReconcileStatus

type ReconcileStatus struct {
	// ResourceType is the type of the resource.
	ResourceType ResourceType

	// Name is the name of the resource.
	Name string

	// Namespace is the Kubernetes namespace (empty for filesystem mode).
	Namespace string

	// LastReconcileTime is when the resource was last successfully reconciled.
	LastReconcileTime *time.Time

	// LastError is the most recent error, if any.
	LastError string

	// RetryCount is the number of retry attempts.
	RetryCount int

	// State describes the current reconciliation state.
	State ReconcileState
}

ReconcileStatus represents the current status of reconciliation for a resource.

type Reconciler

type Reconciler interface {
	// Reconcile processes a single reconciliation request.
	// It should be idempotent - calling it multiple times with the same
	// input should produce the same result.
	//
	// The reconciler should:
	//  1. Fetch the current desired state from the source (CRD/YAML)
	//  2. Compare with the actual running state
	//  3. Take actions to bring actual state in line with desired state
	//  4. Return a result indicating success or need for retry
	Reconcile(ctx context.Context, req ReconcileRequest) ReconcileResult

	// GetResourceType returns the type of resource this reconciler handles.
	GetResourceType() ResourceType
}

Reconciler is the interface that resource-specific reconcilers must implement.

Each resource type (MCPServer, Workflow) has its own reconciler that knows how to reconcile that specific type of resource.

type ReconcilerMetrics

type ReconcilerMetrics struct {
	// contains filtered or unexported fields
}

ReconcilerMetrics tracks reconciliation-related metrics for monitoring and alerting.

This provides visibility into reconciliation patterns, status sync failures, and overall reconciler health. Metrics are tracked per-resource-type to enable targeted alerting and debugging.

func GetReconcilerMetrics

func GetReconcilerMetrics() *ReconcilerMetrics

GetReconcilerMetrics returns the global reconciler metrics instance. It creates the instance on first access (lazy initialization).

func NewReconcilerMetrics

func NewReconcilerMetrics() *ReconcilerMetrics

NewReconcilerMetrics creates a new ReconcilerMetrics instance.

func (*ReconcilerMetrics) RecordStatusSyncAttempt

func (m *ReconcilerMetrics) RecordStatusSyncAttempt(resourceType ResourceType, resourceName string)

RecordStatusSyncAttempt records a status sync attempt.

func (*ReconcilerMetrics) RecordStatusSyncFailure

func (m *ReconcilerMetrics) RecordStatusSyncFailure(resourceType ResourceType, resourceName string, reason string)

RecordStatusSyncFailure records a failed status sync attempt.

This metric is important for monitoring the health of CRD status updates. High failure rates may indicate:

  • Kubernetes API server issues
  • RBAC permission problems
  • Network connectivity issues
  • CRD schema mismatches

func (*ReconcilerMetrics) RecordStatusSyncSuccess

func (m *ReconcilerMetrics) RecordStatusSyncSuccess(resourceType ResourceType, resourceName string)

RecordStatusSyncSuccess records a successful status sync.

type ReconcilerMetricsSummary

type ReconcilerMetricsSummary struct {
	TotalReconcileAttempts   int64                    `json:"total_reconcile_attempts"`
	TotalReconcileSuccesses  int64                    `json:"total_reconcile_successes"`
	TotalReconcileFailures   int64                    `json:"total_reconcile_failures"`
	TotalStatusSyncAttempts  int64                    `json:"total_status_sync_attempts"`
	TotalStatusSyncSuccesses int64                    `json:"total_status_sync_successes"`
	TotalStatusSyncFailures  int64                    `json:"total_status_sync_failures"`
	PerResourceTypeMetrics   []ResourceTypeMetricView `json:"per_resource_type_metrics"`
	StatusSyncFailureRate    float64                  `json:"status_sync_failure_rate"`
	ReconcileFailureRate     float64                  `json:"reconcile_failure_rate"`
}

ReconcilerMetricsSummary provides a summary of reconciliation metrics.

type ResourceType

type ResourceType string

ResourceType represents the type of resource being reconciled.

const (
	// ResourceTypeMCPServer represents MCPServer CRD/YAML resources.
	ResourceTypeMCPServer ResourceType = "MCPServer"

	// ResourceTypeWorkflow represents Workflow CRD/YAML resources.
	ResourceTypeWorkflow ResourceType = "Workflow"
)

type ResourceTypeMetricView

type ResourceTypeMetricView struct {
	ResourceType        ResourceType `json:"resource_type"`
	ReconcileAttempts   int64        `json:"reconcile_attempts"`
	ReconcileSuccesses  int64        `json:"reconcile_successes"`
	ReconcileFailures   int64        `json:"reconcile_failures"`
	StatusSyncAttempts  int64        `json:"status_sync_attempts"`
	StatusSyncSuccesses int64        `json:"status_sync_successes"`
	StatusSyncFailures  int64        `json:"status_sync_failures"`
	LastReconcileAt     time.Time    `json:"last_reconcile_at,omitempty"`
	LastSuccessAt       time.Time    `json:"last_success_at,omitempty"`
	LastFailureAt       time.Time    `json:"last_failure_at,omitempty"`
	LastStatusSyncAt    time.Time    `json:"last_status_sync_at,omitempty"`
}

ResourceTypeMetricView is a read-only view of resource-type-specific metrics.

type StateChangeBridge

type StateChangeBridge struct {
	// contains filtered or unexported fields
}

StateChangeBridge bridges service state changes from the orchestrator to the reconciliation system. This enables the idiomatic Kubernetes controller pattern where status is updated in response to runtime state changes, not just spec changes.

The bridge subscribes to orchestrator state change events and triggers reconciliation for the affected resources. This ensures that CRD status subresources reflect the actual runtime state of services (running, stopped, healthy, unhealthy, etc.).

This implements the "external event source" pattern commonly used in controller-runtime based controllers via source.Channel.

func NewStateChangeBridge

func NewStateChangeBridge(
	orchestratorAPI api.OrchestratorAPI,
	reconcileManager *Manager,
	namespace string,
) *StateChangeBridge

NewStateChangeBridge creates a new state change bridge.

Args:

  • orchestratorAPI: Interface for subscribing to service state changes
  • reconcileManager: The reconcile manager to trigger reconciliation on
  • namespace: Default namespace for resources (used when namespace is empty)

Returns a configured but not yet started bridge.

func (*StateChangeBridge) Start

func (b *StateChangeBridge) Start(ctx context.Context) error

Start begins listening for orchestrator state changes and triggering reconciliation.

This method subscribes to service state change events from the orchestrator and starts a background goroutine to process them. The method is idempotent - calling it multiple times has no additional effect.

Args:

  • ctx: Context for controlling the bridge lifecycle

Returns nil on successful startup.

func (*StateChangeBridge) Stop

func (b *StateChangeBridge) Stop() error

Stop gracefully shuts down the bridge and waits for cleanup to complete.

This method cancels the event processing context and waits for the background goroutine to finish. The method is idempotent and can be called multiple times.

Returns nil after successful shutdown.

type StatusSyncFailureTracker

type StatusSyncFailureTracker struct {
	// contains filtered or unexported fields
}

StatusSyncFailureTracker tracks per-resource status sync failures to implement backoff-based logging. This reduces log spam when status syncs fail repeatedly for the same resource.

func GetStatusSyncFailureTracker

func GetStatusSyncFailureTracker() *StatusSyncFailureTracker

GetStatusSyncFailureTracker returns the global failure tracker instance.

func NewStatusSyncFailureTracker

func NewStatusSyncFailureTracker() *StatusSyncFailureTracker

NewStatusSyncFailureTracker creates a new failure tracker.

func (*StatusSyncFailureTracker) GetFailureCount

func (t *StatusSyncFailureTracker) GetFailureCount(resourceType ResourceType, name string) int

GetFailureCount returns the current consecutive failure count for a resource.

func (*StatusSyncFailureTracker) RecordFailure

func (t *StatusSyncFailureTracker) RecordFailure(resourceType ResourceType, name string, err error) bool

RecordFailure records a status sync failure for a resource. Returns true if this failure should be logged (based on backoff).

func (*StatusSyncFailureTracker) RecordSuccess

func (t *StatusSyncFailureTracker) RecordSuccess(resourceType ResourceType, name string)

RecordSuccess records a successful status sync, resetting the failure counter.

type StatusSyncHelper

type StatusSyncHelper struct {
	ResourceType   ResourceType
	ResourceName   string
	Metrics        *ReconcilerMetrics
	FailureTracker *StatusSyncFailureTracker
	ReconcilerName string
}

StatusSyncHelper encapsulates the common retry-on-conflict pattern for status sync. This helper reduces duplication across MCPServer and Workflow reconcilers.

func NewStatusSyncHelper

func NewStatusSyncHelper(resourceType ResourceType, name, reconcilerName string) *StatusSyncHelper

NewStatusSyncHelper creates a new helper for status sync operations.

func (*StatusSyncHelper) HandleResult

func (h *StatusSyncHelper) HandleResult(retryErr, lastErr error)

HandleResult processes the result of a status sync operation. It records success/failure metrics and logs with backoff for failures.

func (*StatusSyncHelper) RecordAttempt

func (h *StatusSyncHelper) RecordAttempt()

RecordAttempt records a status sync attempt in metrics.

func (*StatusSyncHelper) WasSuccessful

func (h *StatusSyncHelper) WasSuccessful(retryErr, lastErr error) bool

WasSuccessful returns true if the status sync succeeded.

type StatusSyncResult

type StatusSyncResult struct {
	Success bool
	Error   error
}

StatusSyncResult holds the outcome of a status sync operation.

type StatusUpdater

type StatusUpdater interface {
	GetMCPServer(ctx context.Context, name, namespace string) (*musterv1alpha1.MCPServer, error)
	UpdateMCPServerStatus(ctx context.Context, server *musterv1alpha1.MCPServer) error
	GetWorkflow(ctx context.Context, name, namespace string) (*musterv1alpha1.Workflow, error)
	UpdateWorkflowStatus(ctx context.Context, workflow *musterv1alpha1.Workflow) error
	IsKubernetesMode() bool
}

StatusUpdater is an interface for updating CRD status. This is implemented by the MusterClient.

type WatchMode

type WatchMode string

WatchMode specifies how to detect configuration changes.

const (
	// WatchModeFilesystem uses filesystem watching for YAML files.
	WatchModeFilesystem WatchMode = "filesystem"

	// WatchModeKubernetes uses Kubernetes informers for CRDs.
	WatchModeKubernetes WatchMode = "kubernetes"

	// WatchModeAuto automatically selects based on environment.
	WatchModeAuto WatchMode = "auto"
)

func WatchModeFromKubernetesFlag

func WatchModeFromKubernetesFlag(kubernetesEnabled bool) WatchMode

WatchModeFromKubernetesFlag returns the appropriate WatchMode based on whether Kubernetes mode is enabled. This helper ensures consistent mode selection across the codebase.

When kubernetesEnabled is true, returns WatchModeKubernetes (CRD-based). When kubernetesEnabled is false, returns WatchModeFilesystem (YAML file-based).

type WorkflowManager

type WorkflowManager interface {
	GetWorkflows() []api.Workflow
	GetWorkflow(name string) (*api.Workflow, error)
}

WorkflowManager is an interface for accessing Workflow definitions.

type WorkflowReconciler

type WorkflowReconciler struct {
	BaseStatusConfig
	// contains filtered or unexported fields
}

WorkflowReconciler reconciles Workflow resources.

It ensures that Workflow definitions (from CRDs or YAML files) are synchronized with the system's understanding of available workflows.

Reconciliation logic:

  • Create: Validate and register the Workflow definition
  • Update: Re-validate and update the Workflow configuration
  • Delete: Remove the Workflow from the system

After each reconciliation, the reconciler syncs validation status back to the CRD's Status field. See ADR 007 for details.

func NewWorkflowReconciler

func NewWorkflowReconciler(
	workflowManager WorkflowManager,
) *WorkflowReconciler

NewWorkflowReconciler creates a new Workflow reconciler.

func (*WorkflowReconciler) GetResourceType

func (r *WorkflowReconciler) GetResourceType() ResourceType

GetResourceType returns the resource type this reconciler handles.

func (*WorkflowReconciler) Reconcile

Reconcile processes a single Workflow reconciliation request.

func (*WorkflowReconciler) WithStatusUpdater

func (r *WorkflowReconciler) WithStatusUpdater(updater StatusUpdater, namespace string) *WorkflowReconciler

WithStatusUpdater sets the status updater for syncing status back to CRDs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL