Documentation
¶
Overview ¶
Package services provides the service abstraction layer for muster.
This package defines the core interfaces and types that all services in muster must implement. It provides a unified way to manage different types of services through a common interface, supporting both static service types and dynamic ServiceClass-based service instances.
Core Concepts ¶
Service: The fundamental unit of work in muster. Each service represents a manageable component that can be started, stopped, and monitored. Services can be either statically defined or dynamically created from ServiceClass definitions.
ServiceRegistry: A thread-safe registry that holds all active services and provides methods to query and manage them. Supports both static and dynamic service registration.
ServiceState: Represents the current state of a service (unknown, waiting, starting, running, stopping, stopped, failed, retrying).
GenericServiceInstance: A runtime-configurable service implementation that uses ServiceClass definitions to drive its lifecycle through tool execution.
Service Architecture ¶
The services package now supports two main service paradigms:
## Static Services (Traditional)
- TypeKubeConnection: Manages Kubernetes cluster connections
- TypePortForward: Creates and maintains kubectl port-forward tunnels
- TypeMCPServer: Runs Model Context Protocol servers for AI integration
## ServiceClass-Based Services (Dynamic)
- GenericServiceInstance: Runtime-configurable services driven by ServiceClass definitions
- Tool-driven lifecycle management through aggregator integration
- Template-based arg handling and response mapping
- Configurable health checking and monitoring
Service Interface ¶
All services must implement the core Service interface:
type Service interface {
GetLabel() string // Unique identifier
GetType() ServiceType // Service type
GetState() ServiceState // Current state
GetHealth() HealthStatus // Health status
GetLastError() error // Last error encountered
GetDependencies() []string // Service dependencies
Start(ctx context.Context) error // Start the service
Stop(ctx context.Context) error // Stop the service
Restart(ctx context.Context) error // Restart the service
SetStateChangeCallback(StateChangeCallback) // State change notifications
}
Extended Interfaces ¶
Services can implement additional interfaces for extended functionality:
HealthChecker: For services that support periodic health checks
type HealthChecker interface {
CheckHealth(ctx context.Context) (HealthStatus, error)
GetHealthCheckInterval() time.Duration
}
ServiceDataProvider: For services that provide runtime data
type ServiceDataProvider interface {
GetServiceData() map[string]interface{}
}
StateUpdater: For external state management
type StateUpdater interface {
UpdateState(state ServiceState, health HealthStatus, err error)
}
GenericServiceInstance ¶
The GenericServiceInstance provides ServiceClass-driven service management:
- Loads service behavior from ServiceClass definitions
- Executes lifecycle operations through aggregator tools
- Supports template-based arg substitution
- Handles response mapping and data extraction
- Provides configurable health checking
- Manages comprehensive state and event tracking
## ServiceClass Integration
GenericServiceInstance integrates with the ServiceClass system:
// Creation through ServiceClass
instance := NewGenericServiceInstance(
"service-id",
"service-label",
"serviceclass-name",
toolCaller,
args,
)
// Lifecycle managed through ServiceClass tools
err := instance.Start(ctx) // Executes create tool
health, err := instance.CheckHealth(ctx) // Executes health tool
err := instance.Stop(ctx) // Executes delete tool
Service Lifecycle ¶
## Traditional Service Lifecycle 1. Creation: Service created with static configuration 2. Registration: Service registered with ServiceRegistry 3. Starting: Service transitions through Stopped → Starting → Running 4. Health Monitoring: Optional periodic health checks 5. Stopping: Service transitions through Running → Stopping → Stopped 6. Failure: Service can transition to Failed state
## ServiceClass Service Lifecycle 1. ServiceClass Loading: ServiceClass definition loaded and validated 2. Tool Availability Check: Required tools validated against aggregator 3. Instance Creation: GenericServiceInstance created with ServiceClass reference 4. Registration: Instance registered with ServiceRegistry 5. Tool-Driven Operations: Lifecycle managed through tool execution:
- Start: Executes ServiceClass create tool with templated args
- Health: Periodic execution of ServiceClass health check tool
- Stop: Executes ServiceClass delete tool for cleanup
6. State Management: Comprehensive state tracking with events 7. Error Handling: Tool execution errors mapped to service states
Tool Integration ¶
ServiceClass-based services integrate with the aggregator tool system:
- Tool Execution: Operations executed through ToolCaller interface
- Arg Templates: Dynamic arg substitution using template engine
- Response Mapping: Structured extraction of data from tool responses
- Error Handling: Tool execution errors properly mapped to service states
State Change Events ¶
Services support state change notifications through callbacks:
type StateChangeCallback func(
label string,
oldState, newState ServiceState,
health HealthStatus,
err error,
)
Thread Safety ¶
All components in this package are designed for concurrent access:
- ServiceRegistry is fully thread-safe
- GenericServiceInstance uses proper synchronization
- Static service implementations maintain thread safety
- State updates are atomic and properly synchronized
API Integration ¶
Following the project's Service Locator Pattern, this package provides API adapters for integration with the central API layer:
- RegistryAdapter: Exposes service registry through API
- Proper interface segregation for different access patterns
- No direct inter-package dependencies
Example Usage ¶
## Static Service Usage
// Create a registry
registry := services.NewRegistry()
// Create and register a static service
service := k8s.NewK8sConnectionService("k8s-mc-prod", "prod-context", kubeMgr)
registry.Register(service)
// Start the service
if err := service.Start(ctx); err != nil {
log.Fatal(err)
}
## ServiceClass Service Usage
// Create a tool caller (typically from aggregator)
toolCaller := aggregator.GetToolCaller()
// Create ServiceClass-based service instance
instance := services.NewGenericServiceInstance(
"port-forward-1",
"my-app-port-forward",
"kubernetes_port_forward", // ServiceClass name
toolCaller,
map[string]interface{}{
"namespace": "default",
"service_name": "my-app",
"local_port": "8080",
"remote_port": "80",
},
)
// Register and start
registry.Register(instance)
if err := instance.Start(ctx); err != nil {
log.Fatal(err)
}
// Query service state
if instance.GetState() == services.StateRunning {
fmt.Println("ServiceClass instance is running")
data := instance.GetServiceData()
fmt.Printf("Service data: %+v\n", data)
}
Index ¶
- Constants
- func EvaluateHealthCheckExpectation(response map[string]interface{}, expectation *api.HealthCheckExpectation) (bool, error)
- func ExtractFromResponse(response map[string]interface{}, path string) interface{}
- func ProcessToolOutputs(response map[string]interface{}, outputs map[string]string) map[string]interface{}
- type BaseService
- func (b *BaseService) GetDependencies() []string
- func (b *BaseService) GetHealth() HealthStatus
- func (b *BaseService) GetLastError() error
- func (b *BaseService) GetName() string
- func (b *BaseService) GetState() ServiceState
- func (b *BaseService) GetType() ServiceType
- func (b *BaseService) SetStateChangeCallback(callback StateChangeCallback)
- func (b *BaseService) UpdateError(err error)
- func (b *BaseService) UpdateHealth(health HealthStatus)
- func (b *BaseService) UpdateState(newState ServiceState, health HealthStatus, err error)
- type GenericServiceInstance
- func (gsi *GenericServiceInstance) CheckHealth(ctx context.Context) (HealthStatus, error)
- func (gsi *GenericServiceInstance) GetCreatedAt() time.Time
- func (gsi *GenericServiceInstance) GetCreationArgs() map[string]interface{}
- func (gsi *GenericServiceInstance) GetDependencies() []string
- func (gsi *GenericServiceInstance) GetHealth() HealthStatus
- func (gsi *GenericServiceInstance) GetHealthCheckInterval() time.Duration
- func (gsi *GenericServiceInstance) GetLastError() error
- func (gsi *GenericServiceInstance) GetName() string
- func (gsi *GenericServiceInstance) GetOutputs() map[string]interface{}
- func (gsi *GenericServiceInstance) GetServiceClassName() string
- func (gsi *GenericServiceInstance) GetServiceData() map[string]interface{}
- func (gsi *GenericServiceInstance) GetState() ServiceState
- func (gsi *GenericServiceInstance) GetType() ServiceType
- func (gsi *GenericServiceInstance) GetUpdatedAt() time.Time
- func (gsi *GenericServiceInstance) Restart(ctx context.Context) error
- func (gsi *GenericServiceInstance) SetOutputs(outputs map[string]interface{})
- func (gsi *GenericServiceInstance) SetStateChangeCallback(callback StateChangeCallback)
- func (gsi *GenericServiceInstance) Start(ctx context.Context) error
- func (gsi *GenericServiceInstance) Stop(ctx context.Context) error
- func (gsi *GenericServiceInstance) UpdateState(state ServiceState, health HealthStatus, err error)
- type HealthChecker
- type HealthStatus
- type RegistryAdapter
- type Service
- type ServiceDataProvider
- type ServiceManager
- type ServiceRegistry
- type ServiceState
- type ServiceType
- type StateChangeCallback
- type StateUpdater
- type StatusReporter
- type StatusUpdate
- type StatusUpdateFunc
- type ToolCaller
Constants ¶
const ( StateUnknown = api.StateUnknown StateWaiting = api.StateWaiting StateStarting = api.StateStarting StateRunning = api.StateRunning StateStopping = api.StateStopping StateStopped = api.StateStopped StateFailed = api.StateFailed StateRetrying = api.StateRetrying StateUnreachable = api.StateUnreachable StateAuthRequired = api.StateAuthRequired StateConnected = api.StateConnected StateDisconnected = api.StateDisconnected )
const ( HealthUnknown = api.HealthUnknown HealthHealthy = api.HealthHealthy HealthUnhealthy = api.HealthUnhealthy HealthChecking = api.HealthChecking )
const ( StatusInitializing = "Initializing" StatusStarting = "Starting" StatusRunning = "Running" StatusActive = "Active" StatusWaiting = "Waiting" StatusStopping = "Stopping" StatusStopped = "Stopped" StatusFailed = "Failed" StatusUnhealthy = "Unhealthy" )
Common status constants that services can use
Variables ¶
This section is empty.
Functions ¶
func EvaluateHealthCheckExpectation ¶
func EvaluateHealthCheckExpectation(response map[string]interface{}, expectation *api.HealthCheckExpectation) (bool, error)
EvaluateHealthCheckExpectation evaluates health check conditions against tool response
func ExtractFromResponse ¶
ExtractFromResponse extracts a value from response using a JSON path Currently supports simple dot notation paths like "result.sessionID" or direct field names
Types ¶
type BaseService ¶
type BaseService struct {
// contains filtered or unexported fields
}
BaseService provides a base implementation of the Service interface that other services can embed to avoid reimplementing common functionality
func NewBaseService ¶
func NewBaseService(name string, serviceType ServiceType, dependencies []string) *BaseService
NewBaseService creates a new base service
func (*BaseService) GetDependencies ¶
func (b *BaseService) GetDependencies() []string
GetDependencies returns the service dependencies
func (*BaseService) GetHealth ¶
func (b *BaseService) GetHealth() HealthStatus
GetHealth returns the current health status
func (*BaseService) GetLastError ¶
func (b *BaseService) GetLastError() error
GetLastError returns the last error
func (*BaseService) GetName ¶
func (b *BaseService) GetName() string
GetName returns the service name
func (*BaseService) GetState ¶
func (b *BaseService) GetState() ServiceState
GetState returns the current state
func (*BaseService) GetType ¶
func (b *BaseService) GetType() ServiceType
GetType returns the service type
func (*BaseService) SetStateChangeCallback ¶
func (b *BaseService) SetStateChangeCallback(callback StateChangeCallback)
SetStateChangeCallback sets the state change callback
func (*BaseService) UpdateError ¶
func (b *BaseService) UpdateError(err error)
UpdateError updates the error state
func (*BaseService) UpdateHealth ¶
func (b *BaseService) UpdateHealth(health HealthStatus)
UpdateHealth updates just the health status
func (*BaseService) UpdateState ¶
func (b *BaseService) UpdateState(newState ServiceState, health HealthStatus, err error)
UpdateState updates the service state and notifies the callback
type GenericServiceInstance ¶
type GenericServiceInstance struct {
// contains filtered or unexported fields
}
GenericServiceInstance is a runtime-configurable service instance that implements the services.Service interface using API-accessed ServiceClass definitions
func NewGenericServiceInstance ¶
func NewGenericServiceInstance( name string, serviceClassName string, toolCaller ToolCaller, args map[string]interface{}, ) *GenericServiceInstance
NewGenericServiceInstance creates a new generic service instance configured with a service class name and ToolCaller
func (*GenericServiceInstance) CheckHealth ¶
func (gsi *GenericServiceInstance) CheckHealth(ctx context.Context) (HealthStatus, error)
CheckHealth implements the HealthChecker interface
func (*GenericServiceInstance) GetCreatedAt ¶
func (gsi *GenericServiceInstance) GetCreatedAt() time.Time
GetCreatedAt returns the creation time for this instance
func (*GenericServiceInstance) GetCreationArgs ¶
func (gsi *GenericServiceInstance) GetCreationArgs() map[string]interface{}
GetCreationArgs returns the creation args for this instance
func (*GenericServiceInstance) GetDependencies ¶
func (gsi *GenericServiceInstance) GetDependencies() []string
GetDependencies implements the Service interface
func (*GenericServiceInstance) GetHealth ¶
func (gsi *GenericServiceInstance) GetHealth() HealthStatus
GetHealth implements the Service interface
func (*GenericServiceInstance) GetHealthCheckInterval ¶
func (gsi *GenericServiceInstance) GetHealthCheckInterval() time.Duration
GetHealthCheckInterval implements the HealthChecker interface
func (*GenericServiceInstance) GetLastError ¶
func (gsi *GenericServiceInstance) GetLastError() error
GetLastError implements the Service interface
func (*GenericServiceInstance) GetName ¶
func (gsi *GenericServiceInstance) GetName() string
GetName implements the Service interface
func (*GenericServiceInstance) GetOutputs ¶
func (gsi *GenericServiceInstance) GetOutputs() map[string]interface{}
GetOutputs returns the ServiceClass-level outputs for this service instance
func (*GenericServiceInstance) GetServiceClassName ¶
func (gsi *GenericServiceInstance) GetServiceClassName() string
GetServiceClassName returns the service class name for this instance
func (*GenericServiceInstance) GetServiceData ¶
func (gsi *GenericServiceInstance) GetServiceData() map[string]interface{}
GetServiceData implements the ServiceDataProvider interface
func (*GenericServiceInstance) GetState ¶
func (gsi *GenericServiceInstance) GetState() ServiceState
GetState implements the Service interface
func (*GenericServiceInstance) GetType ¶
func (gsi *GenericServiceInstance) GetType() ServiceType
GetType implements the Service interface
func (*GenericServiceInstance) GetUpdatedAt ¶
func (gsi *GenericServiceInstance) GetUpdatedAt() time.Time
GetUpdatedAt returns the last update time for this instance
func (*GenericServiceInstance) Restart ¶
func (gsi *GenericServiceInstance) Restart(ctx context.Context) error
Restart implements the Service interface
func (*GenericServiceInstance) SetOutputs ¶
func (gsi *GenericServiceInstance) SetOutputs(outputs map[string]interface{})
SetOutputs sets the ServiceClass-level outputs for this service instance
func (*GenericServiceInstance) SetStateChangeCallback ¶
func (gsi *GenericServiceInstance) SetStateChangeCallback(callback StateChangeCallback)
SetStateChangeCallback implements the Service interface
func (*GenericServiceInstance) Start ¶
func (gsi *GenericServiceInstance) Start(ctx context.Context) error
Start implements the Service interface - starts the service using the start tool
func (*GenericServiceInstance) Stop ¶
func (gsi *GenericServiceInstance) Stop(ctx context.Context) error
Stop implements the Service interface - stops the service using the stop tool
func (*GenericServiceInstance) UpdateState ¶
func (gsi *GenericServiceInstance) UpdateState(state ServiceState, health HealthStatus, err error)
UpdateState implements the StateUpdater interface
type HealthChecker ¶
type HealthChecker interface {
// CheckHealth performs a health check and returns the current health status
CheckHealth(ctx context.Context) (HealthStatus, error)
// GetHealthCheckInterval returns the interval at which health checks should be performed
GetHealthCheckInterval() time.Duration
}
HealthChecker is an optional interface for services that support health checking
type HealthStatus ¶
type HealthStatus = api.HealthStatus
type RegistryAdapter ¶
type RegistryAdapter struct {
// contains filtered or unexported fields
}
RegistryAdapter adapts the ServiceRegistry to implement api.ServiceRegistryHandler
func NewRegistryAdapter ¶
func NewRegistryAdapter(r ServiceRegistry) *RegistryAdapter
NewRegistryAdapter creates a new registry adapter
func (*RegistryAdapter) Get ¶
func (r *RegistryAdapter) Get(name string) (api.ServiceInfo, bool)
Get returns a service by name
func (*RegistryAdapter) GetAll ¶
func (r *RegistryAdapter) GetAll() []api.ServiceInfo
GetAll returns all registered services
func (*RegistryAdapter) GetByType ¶
func (r *RegistryAdapter) GetByType(serviceType api.ServiceType) []api.ServiceInfo
GetByType returns all services of a specific type
func (*RegistryAdapter) Register ¶
func (r *RegistryAdapter) Register()
Register registers this adapter with the API package
type Service ¶
type Service interface {
// Lifecycle management
Start(ctx context.Context) error
Stop(ctx context.Context) error
Restart(ctx context.Context) error
// State management
GetState() ServiceState
GetHealth() HealthStatus
GetLastError() error
// Service metadata
GetName() string
GetType() ServiceType
GetDependencies() []string
// State change notifications
// The service should call this callback when its state changes
SetStateChangeCallback(callback StateChangeCallback)
}
Service is the core interface that all services must implement
type ServiceDataProvider ¶
type ServiceDataProvider interface {
// GetServiceData returns service-specific data that can be accessed via the API layer
// This data should not be stored in the state store
GetServiceData() map[string]interface{}
}
ServiceDataProvider is an optional interface for services that expose additional data
type ServiceManager ¶
type ServiceManager interface {
// Start starts a service by name
StartService(ctx context.Context, name string) error
// Stop stops a service by name
StopService(ctx context.Context, name string) error
// Restart restarts a service by name
RestartService(ctx context.Context, name string) error
// StartAll starts all registered services respecting dependencies
StartAll(ctx context.Context) error
// StopAll stops all registered services
StopAll(ctx context.Context) error
// GetServiceState returns the current state of a service
GetServiceState(name string) (ServiceState, HealthStatus, error)
}
ServiceManager orchestrates service lifecycle
type ServiceRegistry ¶
type ServiceRegistry interface {
// Register adds a service to the registry
Register(service Service) error
// Unregister removes a service from the registry
Unregister(name string) error
// Get returns a service by name
Get(name string) (Service, bool)
// GetAll returns all registered services
GetAll() []Service
// GetByType returns all services of a specific type
GetByType(serviceType ServiceType) []Service
}
ServiceRegistry manages all registered services
type ServiceState ¶
type ServiceState = api.ServiceState
Use API package types instead of duplicating them
type ServiceType ¶
type ServiceType string
ServiceType represents the type of service
const (
TypeMCPServer ServiceType = "MCPServer"
)
type StateChangeCallback ¶
type StateChangeCallback func(name string, oldState, newState ServiceState, health HealthStatus, err error)
StateChangeCallback is called when a service's state changes
type StateUpdater ¶
type StateUpdater interface {
UpdateState(state ServiceState, health HealthStatus, err error)
}
StateUpdater is an optional interface for services that allow external state updates This is used by the orchestrator to set services to StateWaiting when dependencies fail
type StatusReporter ¶
type StatusReporter interface {
// SetStatusUpdateCallback sets a callback for detailed status updates
// This is in addition to the state change callback
SetStatusUpdateCallback(callback StatusUpdateFunc)
}
StatusReporter interface for services that report detailed status updates
type StatusUpdate ¶
type StatusUpdate struct {
Name string // Service name
Status string // Status string (service-specific)
Detail string // Optional detail message
IsError bool // Whether this is an error condition
IsReady bool // Whether the service is ready/active
Error error // Error if any
}
StatusUpdate represents a generic status update from any service type
type StatusUpdateFunc ¶
type StatusUpdateFunc func(update StatusUpdate)
StatusUpdateFunc is a callback for receiving status updates