Documentation
¶
Overview ¶
Package reconciler provides a unified reconciliation system for muster resources.
Overview ¶
The reconciler package implements automatic change detection and reconciliation for both Kubernetes CRDs and filesystem-based YAML configurations. It ensures that the actual state of muster resources matches the desired state defined in configuration files or Kubernetes custom resources.
Architecture ¶
The reconciliation system consists of several key components:
- ReconcileManager: Central coordinator that manages all reconcilers
- Reconciler: Interface for resource-specific reconciliation logic
- ChangeDetector: Interface for detecting changes in resource sources
- ReconcileLoop: Generic reconciliation loop with retry and backoff
The system supports two modes of operation:
- Kubernetes Mode: Uses informers and controllers for CRD changes
- Filesystem Mode: Uses fsnotify for watching YAML file changes
Usage ¶
The reconciliation system is automatically integrated with the muster application bootstrap process. It starts watching for changes when the application starts and stops when the application shuts down.
Example usage:
manager := reconciler.NewManager(config)
if err := manager.Start(ctx); err != nil {
return fmt.Errorf("failed to start reconciliation: %w", err)
}
defer manager.Stop()
Resource Types ¶
The following resource types are supported for reconciliation:
- MCPServer: MCP server definitions and lifecycle management
- Workflow: Workflow definitions for multi-step operations
Event Integration ¶
The reconciliation system integrates with muster's existing event system to provide notifications about reconciliation activities. Events are generated for successful reconciliations, failures, and retries.
Performance Considerations ¶
The system implements several optimizations:
- Debouncing: Multiple rapid changes are batched together
- Efficient watching: Uses informers for Kubernetes, fsnotify for files
- Backoff: Failed reconciliations use exponential backoff
- Rate limiting: Prevents overwhelming the system with rapid changes
Index ¶
- Constants
- Variables
- func CategorizeStatusSyncError(err error) string
- func GetRestConfig() (*rest.Config, error)
- func IsConflictError(err error) bool
- func IsKubernetesAvailable() bool
- func IsNotFoundError(err error) bool
- func IsValidResourceType(resourceType string) bool
- func NewDelayedQueue() *delayedQueue
- func SanitizeErrorMessage(errMsg string) string
- type Adapter
- func (a *Adapter) DisableResourceType(resourceType string)
- func (a *Adapter) EnableResourceType(resourceType string)
- func (a *Adapter) GetAllStatuses() []api.ReconcileStatusInfo
- func (a *Adapter) GetEnabledResourceTypes() []string
- func (a *Adapter) GetManager() *Manager
- func (a *Adapter) GetOverview() *api.ReconcileOverview
- func (a *Adapter) GetQueueLength() int
- func (a *Adapter) GetStatus(resourceType, name, namespace string) (*api.ReconcileStatusInfo, bool)
- func (a *Adapter) GetWatchMode() string
- func (a *Adapter) IsResourceTypeEnabled(resourceType string) bool
- func (a *Adapter) IsRunning() bool
- func (a *Adapter) Register()
- func (a *Adapter) TriggerReconcile(resourceType, name, namespace string)
- type BaseStatusConfig
- type ChangeDetector
- type ChangeEvent
- type ChangeOperation
- type ChangeSource
- type FilesystemDetector
- func (d *FilesystemDetector) AddResourceType(resourceType ResourceType) error
- func (d *FilesystemDetector) GetSource() ChangeSource
- func (d *FilesystemDetector) RemoveResourceType(resourceType ResourceType) error
- func (d *FilesystemDetector) Start(ctx context.Context, changes chan<- ChangeEvent) error
- func (d *FilesystemDetector) Stop() error
- type KubernetesDetector
- func (d *KubernetesDetector) AddResourceType(resourceType ResourceType) error
- func (d *KubernetesDetector) GetSource() ChangeSource
- func (d *KubernetesDetector) RemoveResourceType(resourceType ResourceType) error
- func (d *KubernetesDetector) Start(ctx context.Context, changes chan<- ChangeEvent) error
- func (d *KubernetesDetector) Stop() error
- type MCPServerManager
- type MCPServerReconciler
- type Manager
- func (m *Manager) DisableResourceType(resourceType ResourceType)
- func (m *Manager) EnableResourceType(resourceType ResourceType)
- func (m *Manager) GetAllStatuses() []ReconcileStatus
- func (m *Manager) GetEnabledResourceTypes() []string
- func (m *Manager) GetQueueLength() int
- func (m *Manager) GetStatus(resourceType ResourceType, name, namespace string) (*ReconcileStatus, bool)
- func (m *Manager) GetWatchMode() string
- func (m *Manager) IsResourceTypeEnabled(resourceType ResourceType) bool
- func (m *Manager) IsRunning() bool
- func (m *Manager) RegisterReconciler(reconciler Reconciler) error
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop() error
- func (m *Manager) TriggerReconcile(resourceType ResourceType, name, namespace string)
- type ManagerConfig
- type ReconcileQueue
- type ReconcileRequest
- type ReconcileResult
- type ReconcileState
- type ReconcileStatus
- type Reconciler
- type ReconcilerMetrics
- func (m *ReconcilerMetrics) RecordStatusSyncAttempt(resourceType ResourceType, resourceName string)
- func (m *ReconcilerMetrics) RecordStatusSyncFailure(resourceType ResourceType, resourceName string, reason string)
- func (m *ReconcilerMetrics) RecordStatusSyncSuccess(resourceType ResourceType, resourceName string)
- type ReconcilerMetricsSummary
- type ResourceType
- type ResourceTypeMetricView
- type StateChangeBridge
- type StatusSyncFailureTracker
- type StatusSyncHelper
- type StatusSyncResult
- type StatusUpdater
- type WatchMode
- type WorkflowManager
- type WorkflowReconciler
Constants ¶
const ( // ServiceStateStopped indicates the service is not running. ServiceStateStopped = "stopped" // ServiceHealthUnknown indicates the health status is unknown. ServiceHealthUnknown = "unknown" )
Service state constants for status syncing. These are used when a service doesn't exist or has no state.
const DefaultNamespace = "default"
DefaultNamespace is the default namespace for Kubernetes resources.
const DefaultStatusSyncInterval = 30 * time.Second
DefaultStatusSyncInterval is how often to requeue for periodic status sync. This ensures status is eventually consistent even if state change events are missed.
## Purpose
Reconcilers use this interval to schedule periodic re-reconciliation of resources. This implements the "level-triggered" reconciliation pattern from Kubernetes, where we periodically check that desired state matches actual state, rather than relying solely on "edge-triggered" events.
## Tuning Considerations
- **Shorter intervals** (e.g., 10s): More responsive status updates, but higher API server load and more reconciliation overhead.
- **Longer intervals** (e.g., 60s): Lower load, but status may be stale longer if state change events are missed.
## Default Value
The default of 30 seconds provides a good balance between:
- Responsiveness: Status is refreshed at least every 30 seconds
- Efficiency: Low enough frequency to avoid overwhelming the API server
- Eventual consistency: Missed events are recovered within 30 seconds
## Performance Impact
For a deployment with N resources, this generates approximately:
- N / 30 = reconciliations per second (e.g., 100 resources = ~3.3/s)
- Each reconciliation involves: 1 Get + 1 Status Update to the API server
## Customization
To customize this interval, you can:
- Define a custom reconciler with a different interval
- Set RequeueAfter explicitly in your Reconcile() method
Note: This constant is used by MCPServerReconciler for periodic status sync. The Workflow reconciler doesn't currently use periodic requeue as it primarily manages static definitions.
const FailureLogBackoffTimeout = 5 * time.Minute
FailureLogBackoffTimeout is the maximum time between log entries for persistent failures. Even if a resource is continuously failing, we'll log at least once every this duration to ensure operators are aware of ongoing issues.
Variables ¶
var StatusSyncRetryBackoff = retry.DefaultRetry
StatusSyncRetryBackoff is the retry backoff configuration for status updates. It uses an aggressive retry strategy since status updates are idempotent and conflicts are expected during high-frequency reconciliation.
var ValidResourceTypes = map[ResourceType]bool{ ResourceTypeMCPServer: true, ResourceTypeWorkflow: true, }
ValidResourceTypes is the set of all valid resource types. Used for input validation when accepting resource types from external sources.
Functions ¶
func CategorizeStatusSyncError ¶
CategorizeStatusSyncError returns a descriptive reason for a status sync error. This provides actionable information for metrics and debugging, categorizing errors into meaningful buckets rather than using a generic "update_status_failed".
Categories:
- "conflict_after_retries": Optimistic locking failed even after retries
- "crd_not_found": The CRD resource doesn't exist
- "api_server_unreachable": Network connectivity issues to API server
- "timeout": Request timed out or context deadline exceeded
- "permission_denied": RBAC or authorization failure
- "authentication_failed": Authentication/token issues
- "update_status_failed": Generic fallback for other errors
- "unknown": Nil error (shouldn't happen but handles edge case)
func GetRestConfig ¶
GetRestConfig returns the REST config for creating a Kubernetes detector. This is a convenience function that uses controller-runtime's config detection.
func IsConflictError ¶
IsConflictError returns true if the error is a Kubernetes conflict error. Conflict errors occur when the resource was modified since it was read, indicating the resource version is stale (optimistic locking failure).
func IsKubernetesAvailable ¶
func IsKubernetesAvailable() bool
IsKubernetesAvailable checks if Kubernetes cluster access is available.
func IsNotFoundError ¶
IsNotFoundError checks if an error indicates a resource was not found. It checks for Kubernetes NotFound errors first, then falls back to case-insensitive string matching for common "not found" patterns.
func IsValidResourceType ¶
IsValidResourceType checks if a resource type string is valid. This should be used when accepting resource type input from APIs or external sources.
func NewDelayedQueue ¶
func NewDelayedQueue() *delayedQueue
NewDelayedQueue creates a queue that supports delayed requeuing.
func SanitizeErrorMessage ¶
SanitizeErrorMessage sanitizes an error message for external exposure. It removes potentially sensitive information like absolute file paths, credentials, and internal implementation details.
This should be used when error messages are exposed via APIs or stored in CRD status fields that may be visible to users.
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
Adapter wraps the ReconcileManager and provides API registration.
func NewAdapter ¶
NewAdapter creates a new reconciler API adapter.
func (*Adapter) DisableResourceType ¶
DisableResourceType disables reconciliation for a specific resource type. Does nothing if the resource type is invalid.
func (*Adapter) EnableResourceType ¶
EnableResourceType enables reconciliation for a specific resource type. Does nothing if the resource type is invalid.
func (*Adapter) GetAllStatuses ¶
func (a *Adapter) GetAllStatuses() []api.ReconcileStatusInfo
GetAllStatuses returns all reconciliation statuses. Implements api.ReconcileManagerHandler interface.
func (*Adapter) GetEnabledResourceTypes ¶
GetEnabledResourceTypes returns the list of resource types with reconciliation enabled. Implements api.ReconcileManagerHandler interface.
func (*Adapter) GetManager ¶
GetManager returns the underlying reconcile manager.
func (*Adapter) GetOverview ¶
func (a *Adapter) GetOverview() *api.ReconcileOverview
GetOverview returns a comprehensive overview of the reconciliation system status.
func (*Adapter) GetQueueLength ¶
GetQueueLength returns the current reconciliation queue length. Implements api.ReconcileManagerHandler interface.
func (*Adapter) GetStatus ¶
func (a *Adapter) GetStatus(resourceType, name, namespace string) (*api.ReconcileStatusInfo, bool)
GetStatus returns the reconciliation status for a resource. Implements api.ReconcileManagerHandler interface.
func (*Adapter) GetWatchMode ¶
GetWatchMode returns the current watch mode (kubernetes/filesystem). Implements api.ReconcileManagerHandler interface.
func (*Adapter) IsResourceTypeEnabled ¶
IsResourceTypeEnabled checks if reconciliation is enabled for a resource type. Returns false for invalid resource types.
func (*Adapter) IsRunning ¶
IsRunning returns whether the reconciliation manager is running. Implements api.ReconcileManagerHandler interface.
func (*Adapter) Register ¶
func (a *Adapter) Register()
Register registers the reconciler with the API layer. Note: The reconciler doesn't currently expose tools through the aggregator, but this provides a consistent pattern for future expansion.
func (*Adapter) TriggerReconcile ¶
TriggerReconcile manually triggers reconciliation for a resource. Implements api.ReconcileManagerHandler interface. Does nothing if the resource type is invalid.
type BaseStatusConfig ¶
type BaseStatusConfig struct {
// StatusUpdater provides access to update CRD status (optional)
StatusUpdater StatusUpdater
// Namespace is the namespace to use for status updates
Namespace string
}
BaseStatusConfig holds common configuration for status updates. This is used by reconcilers that sync status back to CRDs.
func (*BaseStatusConfig) GetNamespace ¶
func (c *BaseStatusConfig) GetNamespace(reqNamespace string) string
GetNamespace returns the namespace to use, falling back to the default.
func (*BaseStatusConfig) SetStatusUpdater ¶
func (c *BaseStatusConfig) SetStatusUpdater(updater StatusUpdater, namespace string)
SetStatusUpdater sets the status updater and namespace.
type ChangeDetector ¶
type ChangeDetector interface {
// Start begins watching for changes.
// The detector should send change events to the provided channel.
Start(ctx context.Context, changes chan<- ChangeEvent) error
// Stop gracefully stops the change detector.
Stop() error
// GetSource returns the source type this detector monitors.
GetSource() ChangeSource
// AddResourceType adds a resource type to watch.
// This allows dynamic configuration of which resources to monitor.
AddResourceType(resourceType ResourceType) error
// RemoveResourceType removes a resource type from watching.
RemoveResourceType(resourceType ResourceType) error
}
ChangeDetector is the interface for components that detect changes in resources.
Different implementations exist for filesystem watching and Kubernetes informers.
type ChangeEvent ¶
type ChangeEvent struct {
// Type is the type of resource that changed.
Type ResourceType
// Name is the name of the resource that changed.
Name string
// Namespace is the Kubernetes namespace (empty for filesystem mode).
Namespace string
// Operation describes what kind of change occurred.
Operation ChangeOperation
// Timestamp is when the change was detected.
Timestamp time.Time
// Source indicates where the change came from.
Source ChangeSource
// FilePath is the path to the file that changed (filesystem mode only).
FilePath string
}
ChangeEvent represents a detected change in a resource.
type ChangeOperation ¶
type ChangeOperation string
ChangeOperation represents the type of change detected.
const ( // OperationCreate indicates a new resource was created. OperationCreate ChangeOperation = "Create" // OperationUpdate indicates an existing resource was modified. OperationUpdate ChangeOperation = "Update" // OperationDelete indicates a resource was deleted. OperationDelete ChangeOperation = "Delete" )
type ChangeSource ¶
type ChangeSource string
ChangeSource indicates where a change originated.
const ( // SourceFilesystem indicates the change came from filesystem watching. SourceFilesystem ChangeSource = "Filesystem" // SourceKubernetes indicates the change came from Kubernetes informers. SourceKubernetes ChangeSource = "Kubernetes" // SourceManual indicates the change was triggered manually (e.g., API call). SourceManual ChangeSource = "Manual" // SourceServiceState indicates the change came from a service state change. // This is used when runtime state changes (e.g., service crashes, health check fails) // trigger reconciliation to sync status back to the CRD. SourceServiceState ChangeSource = "ServiceState" )
type FilesystemDetector ¶
type FilesystemDetector struct {
// contains filtered or unexported fields
}
FilesystemDetector implements ChangeDetector for filesystem-based configurations.
It uses fsnotify to watch for changes in YAML configuration files and generates change events when files are created, modified, or deleted.
func NewFilesystemDetector ¶
func NewFilesystemDetector(basePath string, debounceInterval time.Duration) *FilesystemDetector
NewFilesystemDetector creates a new filesystem change detector.
func (*FilesystemDetector) AddResourceType ¶
func (d *FilesystemDetector) AddResourceType(resourceType ResourceType) error
AddResourceType adds a resource type to watch.
func (*FilesystemDetector) GetSource ¶
func (d *FilesystemDetector) GetSource() ChangeSource
GetSource returns the change source type.
func (*FilesystemDetector) RemoveResourceType ¶
func (d *FilesystemDetector) RemoveResourceType(resourceType ResourceType) error
RemoveResourceType removes a resource type from watching.
func (*FilesystemDetector) Start ¶
func (d *FilesystemDetector) Start(ctx context.Context, changes chan<- ChangeEvent) error
Start begins watching for filesystem changes.
func (*FilesystemDetector) Stop ¶
func (d *FilesystemDetector) Stop() error
Stop gracefully stops the filesystem detector.
type KubernetesDetector ¶
type KubernetesDetector struct {
// contains filtered or unexported fields
}
KubernetesDetector implements ChangeDetector using controller-runtime informers.
It watches muster CRDs (MCPServer, Workflow) via Kubernetes informers and generates change events when resources are created, updated, or deleted.
This detector provides native Kubernetes integration with proper event handling, caching, and efficient watch-based change detection.
func NewKubernetesDetector ¶
func NewKubernetesDetector(restConfig *rest.Config, namespace string) (*KubernetesDetector, error)
NewKubernetesDetector creates a new Kubernetes change detector.
Args:
- restConfig: Kubernetes REST configuration for API access
- namespace: Namespace to watch (empty string watches all namespaces)
Returns:
- *KubernetesDetector: The configured detector
- error: Error if scheme registration fails
func (*KubernetesDetector) AddResourceType ¶
func (d *KubernetesDetector) AddResourceType(resourceType ResourceType) error
AddResourceType adds a resource type to watch.
func (*KubernetesDetector) GetSource ¶
func (d *KubernetesDetector) GetSource() ChangeSource
GetSource returns the change source type.
func (*KubernetesDetector) RemoveResourceType ¶
func (d *KubernetesDetector) RemoveResourceType(resourceType ResourceType) error
RemoveResourceType removes a resource type from watching.
func (*KubernetesDetector) Start ¶
func (d *KubernetesDetector) Start(ctx context.Context, changes chan<- ChangeEvent) error
Start begins watching for Kubernetes resource changes.
func (*KubernetesDetector) Stop ¶
func (d *KubernetesDetector) Stop() error
Stop gracefully stops the Kubernetes detector.
type MCPServerManager ¶
type MCPServerManager interface {
ListMCPServers() []api.MCPServerInfo
GetMCPServer(name string) (*api.MCPServerInfo, error)
}
MCPServerManager is an interface for accessing MCPServer definitions. This is an alias for the api.MCPServerManagerHandler interface.
type MCPServerReconciler ¶
type MCPServerReconciler struct {
BaseStatusConfig
// contains filtered or unexported fields
}
MCPServerReconciler reconciles MCPServer resources.
It ensures that MCPServer definitions (from CRDs or YAML files) are synchronized with the running services managed by the orchestrator.
Reconciliation logic:
- Create: Register and start a new MCPServer service
- Update: Update the service configuration and restart if needed
- Delete: Stop and unregister the MCPServer service
After each reconciliation, the reconciler syncs the service state back to the CRD's Status field. See ADR 007 for details.
func NewMCPServerReconciler ¶
func NewMCPServerReconciler( orchestratorAPI api.OrchestratorAPI, mcpServerManager MCPServerManager, serviceRegistry api.ServiceRegistryHandler, ) *MCPServerReconciler
NewMCPServerReconciler creates a new MCPServer reconciler.
func (*MCPServerReconciler) GetResourceType ¶
func (r *MCPServerReconciler) GetResourceType() ResourceType
GetResourceType returns the resource type this reconciler handles.
func (*MCPServerReconciler) Reconcile ¶
func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ReconcileRequest) ReconcileResult
Reconcile processes a single MCPServer reconciliation request.
After successful reconciliation, this returns RequeueAfter to enable periodic status sync. This ensures that runtime state changes (service crashes, health check failures, etc.) are eventually reflected in the CRD status even if state change events are missed.
func (*MCPServerReconciler) WithStatusUpdater ¶
func (r *MCPServerReconciler) WithStatusUpdater(updater StatusUpdater, namespace string) *MCPServerReconciler
WithStatusUpdater sets the status updater for syncing status back to CRDs.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates all reconciliation activities.
It manages:
- Change detectors (filesystem/Kubernetes)
- Resource-specific reconcilers
- Work queue and worker pool
- Retry logic with exponential backoff
func NewManager ¶
func NewManager(config ManagerConfig) *Manager
NewManager creates a new reconciliation manager.
func (*Manager) DisableResourceType ¶
func (m *Manager) DisableResourceType(resourceType ResourceType)
DisableResourceType disables reconciliation for a specific resource type.
func (*Manager) EnableResourceType ¶
func (m *Manager) EnableResourceType(resourceType ResourceType)
EnableResourceType enables reconciliation for a specific resource type.
func (*Manager) GetAllStatuses ¶
func (m *Manager) GetAllStatuses() []ReconcileStatus
GetAllStatuses returns all reconciliation statuses.
func (*Manager) GetEnabledResourceTypes ¶
GetEnabledResourceTypes returns the list of resource types with reconciliation enabled.
func (*Manager) GetQueueLength ¶
GetQueueLength returns the current queue length.
func (*Manager) GetStatus ¶
func (m *Manager) GetStatus(resourceType ResourceType, name, namespace string) (*ReconcileStatus, bool)
GetStatus returns the reconciliation status for a resource.
func (*Manager) GetWatchMode ¶
GetWatchMode returns the current watch mode.
func (*Manager) IsResourceTypeEnabled ¶
func (m *Manager) IsResourceTypeEnabled(resourceType ResourceType) bool
IsResourceTypeEnabled checks if reconciliation is enabled for a resource type.
func (*Manager) RegisterReconciler ¶
func (m *Manager) RegisterReconciler(reconciler Reconciler) error
RegisterReconciler registers a reconciler for a specific resource type.
func (*Manager) TriggerReconcile ¶
func (m *Manager) TriggerReconcile(resourceType ResourceType, name, namespace string)
TriggerReconcile manually triggers reconciliation for a resource.
type ManagerConfig ¶
type ManagerConfig struct {
// Mode specifies whether to use Kubernetes or filesystem watching.
// If empty, the system will auto-detect based on available resources.
Mode WatchMode
// FilesystemPath is the base path for filesystem watching.
// Only used when Mode is WatchModeFilesystem.
FilesystemPath string
// Namespace is the Kubernetes namespace to watch.
// Only used when Mode is WatchModeKubernetes.
Namespace string
// WorkerCount is the number of concurrent reconciliation workers.
// Defaults to 2 if not specified.
WorkerCount int
// MaxRetries is the maximum number of retry attempts for failed reconciliations.
// Defaults to 5 if not specified.
MaxRetries int
// InitialBackoff is the initial backoff duration for retries.
// Defaults to 1 second if not specified.
InitialBackoff time.Duration
// MaxBackoff is the maximum backoff duration for retries.
// Defaults to 5 minutes if not specified.
MaxBackoff time.Duration
// DebounceInterval is how long to wait for additional changes before reconciling.
// Defaults to 500ms if not specified.
DebounceInterval time.Duration
// ReconcileTimeout is the maximum duration for a single reconciliation operation.
// If a reconciler takes longer than this, the context will be cancelled.
// Defaults to 30 seconds if not specified.
ReconcileTimeout time.Duration
// Debug enables debug logging for reconciliation operations.
Debug bool
// DisabledResourceTypes is a set of resource types that should not be reconciled.
// This allows selective disabling of reconciliation for specific resource types.
// Empty or nil means all registered resource types are enabled.
DisabledResourceTypes map[ResourceType]bool
}
ManagerConfig holds configuration for the ReconcileManager.
type ReconcileQueue ¶
type ReconcileQueue interface {
// Add adds a request to the queue.
// If the same resource is already queued, the existing entry is updated.
Add(req ReconcileRequest)
// Get retrieves the next request from the queue.
// Blocks until a request is available or the context is cancelled.
Get(ctx context.Context) (ReconcileRequest, bool)
// Done marks a request as processed.
// This should be called after processing to enable rate limiting.
Done(req ReconcileRequest)
// Len returns the current queue length.
Len() int
// Shutdown signals the queue to stop accepting new items.
Shutdown()
}
ReconcileQueue represents a queue of resources awaiting reconciliation.
type ReconcileRequest ¶
type ReconcileRequest struct {
// Type is the type of resource to reconcile.
Type ResourceType
// Name is the name of the resource.
Name string
// Namespace is the Kubernetes namespace (empty for filesystem mode).
Namespace string
// Attempt is the current retry attempt number (starts at 1).
Attempt int
// LastError is the error from the previous attempt, if any.
LastError error
}
ReconcileRequest represents a request to reconcile a specific resource.
type ReconcileResult ¶
type ReconcileResult struct {
// Requeue indicates whether the resource should be requeued for retry.
Requeue bool
// RequeueAfter specifies when to requeue (0 means use default backoff).
RequeueAfter time.Duration
// Error is any error that occurred during reconciliation.
Error error
}
ReconcileResult represents the outcome of a reconciliation attempt.
type ReconcileState ¶
type ReconcileState string
ReconcileState represents the state of a resource's reconciliation.
const ( // StatePending means the resource is awaiting reconciliation. StatePending ReconcileState = "Pending" // StateReconciling means reconciliation is in progress. StateReconciling ReconcileState = "Reconciling" // StateSynced means the resource is successfully reconciled. StateSynced ReconcileState = "Synced" // StateError means reconciliation failed and may be retried. StateError ReconcileState = "Error" // StateFailed means reconciliation failed permanently (max retries exceeded). StateFailed ReconcileState = "Failed" )
type ReconcileStatus ¶
type ReconcileStatus struct {
// ResourceType is the type of the resource.
ResourceType ResourceType
// Name is the name of the resource.
Name string
// Namespace is the Kubernetes namespace (empty for filesystem mode).
Namespace string
// LastReconcileTime is when the resource was last successfully reconciled.
LastReconcileTime *time.Time
// LastError is the most recent error, if any.
LastError string
// RetryCount is the number of retry attempts.
RetryCount int
// State describes the current reconciliation state.
State ReconcileState
}
ReconcileStatus represents the current status of reconciliation for a resource.
type Reconciler ¶
type Reconciler interface {
// Reconcile processes a single reconciliation request.
// It should be idempotent - calling it multiple times with the same
// input should produce the same result.
//
// The reconciler should:
// 1. Fetch the current desired state from the source (CRD/YAML)
// 2. Compare with the actual running state
// 3. Take actions to bring actual state in line with desired state
// 4. Return a result indicating success or need for retry
Reconcile(ctx context.Context, req ReconcileRequest) ReconcileResult
// GetResourceType returns the type of resource this reconciler handles.
GetResourceType() ResourceType
}
Reconciler is the interface that resource-specific reconcilers must implement.
Each resource type (MCPServer, Workflow) has its own reconciler that knows how to reconcile that specific type of resource.
type ReconcilerMetrics ¶
type ReconcilerMetrics struct {
// contains filtered or unexported fields
}
ReconcilerMetrics tracks reconciliation-related metrics for monitoring and alerting.
This provides visibility into reconciliation patterns, status sync failures, and overall reconciler health. Metrics are tracked per-resource-type to enable targeted alerting and debugging.
func GetReconcilerMetrics ¶
func GetReconcilerMetrics() *ReconcilerMetrics
GetReconcilerMetrics returns the global reconciler metrics instance. It creates the instance on first access (lazy initialization).
func NewReconcilerMetrics ¶
func NewReconcilerMetrics() *ReconcilerMetrics
NewReconcilerMetrics creates a new ReconcilerMetrics instance.
func (*ReconcilerMetrics) RecordStatusSyncAttempt ¶
func (m *ReconcilerMetrics) RecordStatusSyncAttempt(resourceType ResourceType, resourceName string)
RecordStatusSyncAttempt records a status sync attempt.
func (*ReconcilerMetrics) RecordStatusSyncFailure ¶
func (m *ReconcilerMetrics) RecordStatusSyncFailure(resourceType ResourceType, resourceName string, reason string)
RecordStatusSyncFailure records a failed status sync attempt.
This metric is important for monitoring the health of CRD status updates. High failure rates may indicate:
- Kubernetes API server issues
- RBAC permission problems
- Network connectivity issues
- CRD schema mismatches
func (*ReconcilerMetrics) RecordStatusSyncSuccess ¶
func (m *ReconcilerMetrics) RecordStatusSyncSuccess(resourceType ResourceType, resourceName string)
RecordStatusSyncSuccess records a successful status sync.
type ReconcilerMetricsSummary ¶
type ReconcilerMetricsSummary struct {
TotalReconcileAttempts int64 `json:"total_reconcile_attempts"`
TotalReconcileSuccesses int64 `json:"total_reconcile_successes"`
TotalReconcileFailures int64 `json:"total_reconcile_failures"`
TotalStatusSyncAttempts int64 `json:"total_status_sync_attempts"`
TotalStatusSyncSuccesses int64 `json:"total_status_sync_successes"`
TotalStatusSyncFailures int64 `json:"total_status_sync_failures"`
PerResourceTypeMetrics []ResourceTypeMetricView `json:"per_resource_type_metrics"`
StatusSyncFailureRate float64 `json:"status_sync_failure_rate"`
ReconcileFailureRate float64 `json:"reconcile_failure_rate"`
}
ReconcilerMetricsSummary provides a summary of reconciliation metrics.
type ResourceType ¶
type ResourceType string
ResourceType represents the type of resource being reconciled.
const ( // ResourceTypeMCPServer represents MCPServer CRD/YAML resources. ResourceTypeMCPServer ResourceType = "MCPServer" // ResourceTypeWorkflow represents Workflow CRD/YAML resources. ResourceTypeWorkflow ResourceType = "Workflow" )
type ResourceTypeMetricView ¶
type ResourceTypeMetricView struct {
ResourceType ResourceType `json:"resource_type"`
ReconcileAttempts int64 `json:"reconcile_attempts"`
ReconcileSuccesses int64 `json:"reconcile_successes"`
ReconcileFailures int64 `json:"reconcile_failures"`
StatusSyncAttempts int64 `json:"status_sync_attempts"`
StatusSyncSuccesses int64 `json:"status_sync_successes"`
StatusSyncFailures int64 `json:"status_sync_failures"`
LastReconcileAt time.Time `json:"last_reconcile_at,omitempty"`
LastSuccessAt time.Time `json:"last_success_at,omitempty"`
LastFailureAt time.Time `json:"last_failure_at,omitempty"`
LastStatusSyncAt time.Time `json:"last_status_sync_at,omitempty"`
}
ResourceTypeMetricView is a read-only view of resource-type-specific metrics.
type StateChangeBridge ¶
type StateChangeBridge struct {
// contains filtered or unexported fields
}
StateChangeBridge bridges service state changes from the orchestrator to the reconciliation system. This enables the idiomatic Kubernetes controller pattern where status is updated in response to runtime state changes, not just spec changes.
The bridge subscribes to orchestrator state change events and triggers reconciliation for the affected resources. This ensures that CRD status subresources reflect the actual runtime state of services (running, stopped, healthy, unhealthy, etc.).
This implements the "external event source" pattern commonly used in controller-runtime based controllers via source.Channel.
func NewStateChangeBridge ¶
func NewStateChangeBridge( orchestratorAPI api.OrchestratorAPI, reconcileManager *Manager, namespace string, ) *StateChangeBridge
NewStateChangeBridge creates a new state change bridge.
Args:
- orchestratorAPI: Interface for subscribing to service state changes
- reconcileManager: The reconcile manager to trigger reconciliation on
- namespace: Default namespace for resources (used when namespace is empty)
Returns a configured but not yet started bridge.
func (*StateChangeBridge) Start ¶
func (b *StateChangeBridge) Start(ctx context.Context) error
Start begins listening for orchestrator state changes and triggering reconciliation.
This method subscribes to service state change events from the orchestrator and starts a background goroutine to process them. The method is idempotent - calling it multiple times has no additional effect.
Args:
- ctx: Context for controlling the bridge lifecycle
Returns nil on successful startup.
func (*StateChangeBridge) Stop ¶
func (b *StateChangeBridge) Stop() error
Stop gracefully shuts down the bridge and waits for cleanup to complete.
This method cancels the event processing context and waits for the background goroutine to finish. The method is idempotent and can be called multiple times.
Returns nil after successful shutdown.
type StatusSyncFailureTracker ¶
type StatusSyncFailureTracker struct {
// contains filtered or unexported fields
}
StatusSyncFailureTracker tracks per-resource status sync failures to implement backoff-based logging. This reduces log spam when status syncs fail repeatedly for the same resource.
func GetStatusSyncFailureTracker ¶
func GetStatusSyncFailureTracker() *StatusSyncFailureTracker
GetStatusSyncFailureTracker returns the global failure tracker instance.
func NewStatusSyncFailureTracker ¶
func NewStatusSyncFailureTracker() *StatusSyncFailureTracker
NewStatusSyncFailureTracker creates a new failure tracker.
func (*StatusSyncFailureTracker) GetFailureCount ¶
func (t *StatusSyncFailureTracker) GetFailureCount(resourceType ResourceType, name string) int
GetFailureCount returns the current consecutive failure count for a resource.
func (*StatusSyncFailureTracker) RecordFailure ¶
func (t *StatusSyncFailureTracker) RecordFailure(resourceType ResourceType, name string, err error) bool
RecordFailure records a status sync failure for a resource. Returns true if this failure should be logged (based on backoff).
func (*StatusSyncFailureTracker) RecordSuccess ¶
func (t *StatusSyncFailureTracker) RecordSuccess(resourceType ResourceType, name string)
RecordSuccess records a successful status sync, resetting the failure counter.
type StatusSyncHelper ¶
type StatusSyncHelper struct {
ResourceType ResourceType
ResourceName string
Metrics *ReconcilerMetrics
FailureTracker *StatusSyncFailureTracker
ReconcilerName string
}
StatusSyncHelper encapsulates the common retry-on-conflict pattern for status sync. This helper reduces duplication across MCPServer and Workflow reconcilers.
func NewStatusSyncHelper ¶
func NewStatusSyncHelper(resourceType ResourceType, name, reconcilerName string) *StatusSyncHelper
NewStatusSyncHelper creates a new helper for status sync operations.
func (*StatusSyncHelper) HandleResult ¶
func (h *StatusSyncHelper) HandleResult(retryErr, lastErr error)
HandleResult processes the result of a status sync operation. It records success/failure metrics and logs with backoff for failures.
func (*StatusSyncHelper) RecordAttempt ¶
func (h *StatusSyncHelper) RecordAttempt()
RecordAttempt records a status sync attempt in metrics.
func (*StatusSyncHelper) WasSuccessful ¶
func (h *StatusSyncHelper) WasSuccessful(retryErr, lastErr error) bool
WasSuccessful returns true if the status sync succeeded.
type StatusSyncResult ¶
StatusSyncResult holds the outcome of a status sync operation.
type StatusUpdater ¶
type StatusUpdater interface {
GetMCPServer(ctx context.Context, name, namespace string) (*musterv1alpha1.MCPServer, error)
UpdateMCPServerStatus(ctx context.Context, server *musterv1alpha1.MCPServer) error
GetWorkflow(ctx context.Context, name, namespace string) (*musterv1alpha1.Workflow, error)
UpdateWorkflowStatus(ctx context.Context, workflow *musterv1alpha1.Workflow) error
IsKubernetesMode() bool
}
StatusUpdater is an interface for updating CRD status. This is implemented by the MusterClient.
type WatchMode ¶
type WatchMode string
WatchMode specifies how to detect configuration changes.
const ( // WatchModeFilesystem uses filesystem watching for YAML files. WatchModeFilesystem WatchMode = "filesystem" // WatchModeKubernetes uses Kubernetes informers for CRDs. WatchModeKubernetes WatchMode = "kubernetes" // WatchModeAuto automatically selects based on environment. WatchModeAuto WatchMode = "auto" )
func WatchModeFromKubernetesFlag ¶
WatchModeFromKubernetesFlag returns the appropriate WatchMode based on whether Kubernetes mode is enabled. This helper ensures consistent mode selection across the codebase.
When kubernetesEnabled is true, returns WatchModeKubernetes (CRD-based). When kubernetesEnabled is false, returns WatchModeFilesystem (YAML file-based).
type WorkflowManager ¶
type WorkflowManager interface {
GetWorkflows() []api.Workflow
GetWorkflow(name string) (*api.Workflow, error)
}
WorkflowManager is an interface for accessing Workflow definitions.
type WorkflowReconciler ¶
type WorkflowReconciler struct {
BaseStatusConfig
// contains filtered or unexported fields
}
WorkflowReconciler reconciles Workflow resources.
It ensures that Workflow definitions (from CRDs or YAML files) are synchronized with the system's understanding of available workflows.
Reconciliation logic:
- Create: Validate and register the Workflow definition
- Update: Re-validate and update the Workflow configuration
- Delete: Remove the Workflow from the system
After each reconciliation, the reconciler syncs validation status back to the CRD's Status field. See ADR 007 for details.
func NewWorkflowReconciler ¶
func NewWorkflowReconciler( workflowManager WorkflowManager, ) *WorkflowReconciler
NewWorkflowReconciler creates a new Workflow reconciler.
func (*WorkflowReconciler) GetResourceType ¶
func (r *WorkflowReconciler) GetResourceType() ResourceType
GetResourceType returns the resource type this reconciler handles.
func (*WorkflowReconciler) Reconcile ¶
func (r *WorkflowReconciler) Reconcile(ctx context.Context, req ReconcileRequest) ReconcileResult
Reconcile processes a single Workflow reconciliation request.
func (*WorkflowReconciler) WithStatusUpdater ¶
func (r *WorkflowReconciler) WithStatusUpdater(updater StatusUpdater, namespace string) *WorkflowReconciler
WithStatusUpdater sets the status updater for syncing status back to CRDs.