Documentation
¶
Overview ¶
Package aggregator provides the MCP aggregator server implementation.
SSO Authentication Mechanisms ¶
Muster supports two Single Sign-On (SSO) mechanisms for authenticating to downstream MCP servers:
## SSO Token Forwarding (explicit opt-in)
When muster itself is protected by OAuth (via oauth_server configuration), muster can forward its own ID token to downstream MCP servers. The downstream server must be configured to trust muster's OAuth client ID in its TrustedAudiences.
Flow:
- User authenticates TO muster via OAuth (Google, Dex, etc.)
- Muster receives and stores the user's ID token
- User accesses server with forwardToken: true
- Muster injects ID token as Authorization: Bearer header
- Downstream server validates token, trusts muster's client ID
Configuration: Requires `auth.forwardToken: true` in MCPServer spec.
## SSO Token Exchange (RFC 8693)
When clusters have separate Identity Providers, muster can exchange its local token for one valid on the remote cluster's IdP (e.g., Dex). This enables cross-cluster SSO without requiring shared trust.
Flow:
- User authenticates TO muster via OAuth
- User accesses server with tokenExchange configuration
- Muster exchanges its token at the remote IdP's token endpoint
- Remote IdP issues a new token valid for the remote cluster
- Muster uses the exchanged token for downstream requests
Configuration: Requires `auth.tokenExchange` configuration in MCPServer spec.
Package aggregator provides MCP (Model Context Protocol) server aggregation functionality for the muster system.
The aggregator package acts as a central hub that collects and exposes tools, resources, and prompts from multiple backend MCP servers through a single unified interface. It implements intelligent name collision resolution, security filtering, automatic server lifecycle management, and seamless integration with the muster service architecture.
Architecture Overview ¶
The aggregator follows a layered architecture with clear separation of concerns:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MCP Clients │ │ Core Tools │ │ Workflow │
│ (External) │ │ (Internal) │ │ Adapter │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└───────────────────────┼───────────────────────┘
│
┌─────────────────┐
│ AggregatorServer│
│ (Core MCP) │
└─────────────────┘
│
┌───────────────────────┼───────────────────────┐
│ │ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SSE Transport │ │ Stdio Transport │ │ HTTP Transport │
│ (:8080) │ │ (CLI Mode) │ │ (Streaming) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Core Components ¶
## AggregatorManager
The AggregatorManager is the main entry point and coordinates all aggregator functionality. It manages the complete lifecycle, from server startup to graceful shutdown, and provides automatic MCP server registration based on service health status.
Key responsibilities:
- Server lifecycle management (start/stop coordination)
- Event-driven MCP server registration/deregistration
- Integration with the muster service architecture
- Periodic retry mechanism for failed registrations
- Service monitoring and health reporting
Usage:
config := AggregatorConfig{
Port: 8080,
Host: "localhost",
Transport: "sse",
Yolo: false,
MusterPrefix: "x",
}
manager := NewAggregatorManager(config, orchestratorAPI, serviceRegistry)
err := manager.Start(ctx)
defer manager.Stop(ctx)
## AggregatorServer
The AggregatorServer implements the core MCP server functionality, aggregating capabilities from multiple backend servers and exposing them through various transports. It provides real-time capability updates and maintains synchronization with backend servers.
Features:
- Multi-transport support (SSE, stdio, streamable-http)
- Dynamic capability discovery and updates
- Core tool integration (workflow, config management)
- Thread-safe concurrent operations
- Intelligent request routing to backend servers
## ServerRegistry
The ServerRegistry manages the collection of registered MCP servers and their cached capabilities. It provides thread-safe access to server information and maintains bidirectional name mappings for routing requests to the correct backend servers.
Features:
- Thread-safe server registration/deregistration
- Capability caching for performance optimization
- Server health tracking and connection status
- Efficient bulk operations for capability retrieval
- Update notifications for dynamic capability changes
## EventHandler
The EventHandler bridges the gap between the muster service orchestrator and the aggregator by automatically registering/deregistering MCP servers based on their health status. It ensures that only healthy, running services are exposed through the aggregator.
Event-driven behavior:
- Running + Healthy → Automatic registration
- Stopped/Failed/Unhealthy → Automatic deregistration
- Resilient to temporary failures
- Asynchronous processing for responsiveness
Transport Protocols ¶
## Server-Sent Events (SSE)
SSE transport provides real-time communication over HTTP with automatic reconnection and keep-alive support. Ideal for web-based integrations and real-time applications.
Endpoints: - http://localhost:8080/sse (SSE stream) - http://localhost:8080/message (message posting)
## Standard I/O (stdio)
Stdio transport enables CLI integration by communicating over standard input/output. Perfect for command-line tools and shell integrations.
## Streamable HTTP
Streamable HTTP provides HTTP-based streaming protocol support with full bidirectional communication. This is the default transport for maximum compatibility.
Security Model ¶
## Denylist System
The aggregator implements a "secure by default" approach with a comprehensive denylist of potentially destructive tools. This prevents accidental execution of dangerous operations in production environments.
Categories of blocked operations:
- Kubernetes resource modification (kubectl apply, delete, patch)
- Cluster API lifecycle operations (create/delete clusters)
- Helm package management (install, uninstall, upgrade)
- Flux GitOps operations (reconcile, suspend, resume)
- System maintenance operations (cleanup, incident creation)
## Yolo Mode
The --yolo flag disables the security denylist for development environments where destructive operations may be needed. This should never be used in production.
config.Yolo = true // Disables all security restrictions
Integration with Muster Architecture ¶
## Central API Pattern
The aggregator follows the central API pattern for inter-package communication:
- Receives service state events through api.OrchestratorAPI
- Queries service information via api.ServiceRegistryHandler
- Publishes tool update events via api.PublishToolUpdateEvent
- Integrates with workflow manager through api interfaces
## Tool Update Events
The aggregator publishes ToolUpdateEvent notifications when its capability set changes, ensuring system-wide consistency with dependent components like Capability managers.
Event flow: Backend server change → Registry update → Capability refresh → Event publication
## Workflow Integration
When configured with a config directory, the aggregator automatically integrates with the workflow system to expose workflow definitions as executable tools.
Performance Considerations ¶
## Capability Caching
Backend server capabilities are cached in memory to avoid repeated network calls. Caches are updated automatically when servers are registered/deregistered or when explicit refresh operations are performed.
## Batch Operations
The aggregator uses batch operations where possible to minimize the number of MCP protocol messages and improve performance with multiple backend servers.
## Background Processing
All registry updates and event processing happen asynchronously in background goroutines to ensure that the main request handling remains responsive.
Error Handling and Resilience ¶
## Graceful Degradation
The aggregator continues operating even when individual backend servers become unavailable. Healthy servers remain accessible while unhealthy servers are automatically deregistered.
## Retry Mechanisms
A periodic retry mechanism attempts to register servers that are healthy but not yet registered, providing resilience against temporary registration failures.
## Connection Management
Backend server connections are managed automatically with proper cleanup during deregistration and graceful shutdown procedures.
Monitoring and Observability ¶
## Service Data
The aggregator provides comprehensive service monitoring data including:
- Tool/resource/prompt counts and status
- Server connectivity statistics
- Security filtering statistics (blocked tools)
- Transport endpoint information
- Event handler status
## Logging
Structured logging provides visibility into:
- Server registration/deregistration events
- Capability update operations
- Security filtering actions
- Error conditions and recovery
Thread Safety ¶
All public APIs are thread-safe and can be called concurrently. Internal state is protected by appropriate synchronization mechanisms (mutexes, channels, etc.). The package is designed for high-concurrency environments.
Configuration ¶
The AggregatorConfig structure provides comprehensive configuration options:
type AggregatorConfig struct {
Port int // Server port (default: 8080)
Host string // Bind address (default: localhost)
Transport string // Protocol: "sse", "stdio", "streamable-http"
Yolo bool // Disable security denylist (development only)
ConfigDir string // Directory for workflow definitions
MusterPrefix string // Global prefix for all tools (default: "x")
}
Example: Complete Setup ¶
// Create configuration
config := AggregatorConfig{
Port: 8080,
Host: "localhost",
Transport: "sse",
Yolo: false,
ConfigDir: "/etc/muster/workflows",
MusterPrefix: "x",
}
// Initialize dependencies through central API
orchestratorAPI := api.GetOrchestrator()
serviceRegistry := api.GetServiceRegistry()
// Create and start aggregator
manager := NewAggregatorManager(config, orchestratorAPI, serviceRegistry)
if err := manager.Start(ctx); err != nil {
log.Fatal("Failed to start aggregator:", err)
}
defer manager.Stop(ctx)
// Access aggregator endpoint
endpoint := manager.GetEndpoint()
fmt.Printf("Aggregator running at: %s\n", endpoint)
The aggregator package is the cornerstone of muster's MCP integration, providing a robust, secure, and scalable foundation for tool aggregation and distribution.
Index ¶
- Constants
- func Logging() server.ToolHandlerMiddleware
- func Metrics() server.ToolHandlerMiddleware
- func ShouldUseLocalMint(serverInfo *ServerInfo) bool
- func ShouldUseTokenExchange(serverInfo *ServerInfo) bool
- func ShouldUseTokenForwarding(serverInfo *ServerInfo) bool
- type AdminConfig
- type AggregatorConfig
- type AggregatorManager
- func (am *AggregatorManager) GetAggregatorServer() *AggregatorServer
- func (am *AggregatorManager) GetEndpoint() string
- func (am *AggregatorManager) GetEventHandler() *EventHandler
- func (am *AggregatorManager) GetServiceData() map[string]interface{}
- func (am *AggregatorManager) ManualRefresh(ctx context.Context) error
- func (am *AggregatorManager) RegisterServerPendingAuth(registration PendingAuthRegistration) error
- func (am *AggregatorManager) Start(ctx context.Context) error
- func (am *AggregatorManager) Stop(ctx context.Context) error
- type AggregatorServer
- func (a *AggregatorServer) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (res *mcp.CallToolResult, err error)
- func (a *AggregatorServer) DeregisterServer(name string) error
- func (a *AggregatorServer) GetAvailableTools() []string
- func (a *AggregatorServer) GetEndpoint() string
- func (a *AggregatorServer) GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error)
- func (a *AggregatorServer) GetPrompts() []mcp.Prompt
- func (a *AggregatorServer) GetPromptsForSession(ctx context.Context, sessionID string) []mcp.Prompt
- func (a *AggregatorServer) GetRegistry() *ServerRegistry
- func (a *AggregatorServer) GetResources() []mcp.Resource
- func (a *AggregatorServer) GetResourcesForSession(ctx context.Context, sessionID string) []mcp.Resource
- func (a *AggregatorServer) GetTools() []mcp.Tool
- func (a *AggregatorServer) GetToolsForSession(ctx context.Context, sessionID string) []mcp.Tool
- func (a *AggregatorServer) GetToolsWithStatus() []ToolWithStatus
- func (a *AggregatorServer) IsToolAvailable(toolName string) bool
- func (a *AggregatorServer) IsYoloMode() bool
- func (a *AggregatorServer) ListPromptsForContext(ctx context.Context) []mcp.Prompt
- func (a *AggregatorServer) ListResourcesForContext(ctx context.Context) []mcp.Resource
- func (a *AggregatorServer) ListServersRequiringAuth(ctx context.Context) []api.ServerAuthInfo
- func (a *AggregatorServer) ListToolsForContext(ctx context.Context) []mcp.Tool
- func (a *AggregatorServer) MissingToolsForSession(ctx context.Context, toolNames []string) []string
- func (a *AggregatorServer) OnToolsUpdated(event api.ToolUpdateEvent)
- func (a *AggregatorServer) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)
- func (a *AggregatorServer) RegisterServer(ctx context.Context, registration ServerRegistration, client MCPClient) error
- func (a *AggregatorServer) Start(ctx context.Context) error
- func (a *AggregatorServer) Stop(ctx context.Context) error
- func (a *AggregatorServer) ToggleToolBlock(toolName string) error
- func (a *AggregatorServer) UpdateCapabilities()
- type AuthInfo
- type AuthMetrics
- func (m *AuthMetrics) GetServerMetrics(serverName string) (AuthServerMetricView, bool)
- func (m *AuthMetrics) GetSummary() AuthMetricsSummary
- func (m *AuthMetrics) RecordLoginAttempt(serverName, sub string)
- func (m *AuthMetrics) RecordLoginFailure(serverName, sub, reason string)
- func (m *AuthMetrics) RecordLoginSuccess(serverName, sub string)
- func (m *AuthMetrics) RecordLogoutAttempt(serverName, sub string)
- func (m *AuthMetrics) RecordLogoutSuccess(serverName, sub string)
- func (m *AuthMetrics) RecordRateLimitBlock(serverName, sub string)
- type AuthMetricsSummary
- type AuthRateLimiter
- type AuthRateLimiterConfig
- type AuthServerMetricView
- type AuthToolProvider
- type ConnectionResult
- func EstablishConnectionWithLocalMint(ctx context.Context, a *AggregatorServer, serverInfo *ServerInfo, ...) (*ConnectionResult, error)
- func EstablishConnectionWithTokenExchange(ctx context.Context, a *AggregatorServer, serverInfo *ServerInfo, ...) (*ConnectionResult, error)
- func EstablishConnectionWithTokenForwarding(ctx context.Context, a *AggregatorServer, serverInfo *ServerInfo, ...) (*ConnectionResult, error)
- type EventHandler
- type EventType
- type MCPClient
- type OAuthProxyConfig
- type OAuthServerConfig
- type PendingAuthRegistration
- type PooledInfo
- type ProtectedResourceMetadata
- type RegistrationEvent
- type ServerInfo
- func (s *ServerInfo) GetNamespace() string
- func (s *ServerInfo) GetStatus() api.ServiceState
- func (s *ServerInfo) IsConnected() bool
- func (s *ServerInfo) RequiresSessionAuth() bool
- func (s *ServerInfo) UpdatePrompts(prompts []mcp.Prompt)
- func (s *ServerInfo) UpdateResources(resources []mcp.Resource)
- func (s *ServerInfo) UpdateTools(tools []mcp.Tool)
- type ServerRegistration
- type ServerRegistry
- func (r *ServerRegistry) Deregister(name string) error
- func (r *ServerRegistry) ExposedPromptName(serverName, promptName string) string
- func (r *ServerRegistry) ExposedResourceURI(serverName, resourceURI string) string
- func (r *ServerRegistry) ExposedToolName(serverName, toolName string) string
- func (r *ServerRegistry) FamilyInstanceArgFor(exposedName string) string
- func (r *ServerRegistry) GetAllPrompts() []mcp.Prompt
- func (r *ServerRegistry) GetAllPromptsForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Prompt
- func (r *ServerRegistry) GetAllResources() []mcp.Resource
- func (r *ServerRegistry) GetAllResourcesForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Resource
- func (r *ServerRegistry) GetAllServers() map[string]*ServerInfo
- func (r *ServerRegistry) GetAllTools() []mcp.Tool
- func (r *ServerRegistry) GetAllToolsForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Tool
- func (r *ServerRegistry) GetClient(name string) (MCPClient, error)
- func (r *ServerRegistry) GetOAuthServers() []*ServerInfo
- func (r *ServerRegistry) GetServerInfo(name string) (*ServerInfo, bool)
- func (r *ServerRegistry) GetToolServerNames(exposedName string) []string
- func (r *ServerRegistry) GetUpdateChannel() <-chan struct{}
- func (r *ServerRegistry) IsFamilyTool(exposedName string) bool
- func (r *ServerRegistry) IsOAuthServer(serverName string) bool
- func (r *ServerRegistry) Register(ctx context.Context, registration ServerRegistration, client MCPClient) error
- func (r *ServerRegistry) RegisterPendingAuth(registration PendingAuthRegistration) error
- func (r *ServerRegistry) ResolvePromptName(exposedName string) (serverName, originalName string, err error)
- func (r *ServerRegistry) ResolveResourceName(exposedURI string) (serverName, originalURI string, err error)
- func (r *ServerRegistry) ResolveToolName(exposedName string) (serverName, originalName string, err error)
- func (r *ServerRegistry) ResolveToolNameForServer(exposedName, serverName string) (originalName string, err error)
- func (r *ServerRegistry) SetServerPrefix(serverName, prefix string)
- type SessionConnectionPool
- func (p *SessionConnectionPool) DrainAll()
- func (p *SessionConnectionPool) Evict(sessionID, serverName string)
- func (p *SessionConnectionPool) EvictServer(serverName string)
- func (p *SessionConnectionPool) EvictSession(sessionID string)
- func (p *SessionConnectionPool) Get(sessionID, serverName string) (MCPClient, bool)
- func (p *SessionConnectionPool) IsTokenExpired(sessionID, serverName string) bool
- func (p *SessionConnectionPool) IsTokenExpiringSoon(sessionID, serverName string, margin time.Duration) bool
- func (p *SessionConnectionPool) Len() int
- func (p *SessionConnectionPool) Put(sessionID, serverName string, client MCPClient)
- func (p *SessionConnectionPool) PutWithDeferredClose(sessionID, serverName string, client MCPClient, tokenExpiry time.Time, ...)
- func (p *SessionConnectionPool) PutWithExpiry(sessionID, serverName string, client MCPClient, tokenExpiry time.Time)
- func (p *SessionConnectionPool) SetExchangedToken(sessionID, serverName, token string)
- func (p *SessionConnectionPool) SetNotificationCallback(serverName string, cb func(sessionID string, client MCPClient))
- func (p *SessionConnectionPool) Snapshot(sessionID string) []PooledInfo
- func (p *SessionConnectionPool) Stop()
- type ToolWithStatus
Constants ¶
const AuthStatusResourceURI = "auth://status"
AuthStatusResourceURI is the URI for the auth status MCP resource. This resource provides real-time authentication status for all MCP servers.
const DefaultConnectionPoolMaxAge = 1 * time.Hour
DefaultConnectionPoolMaxAge is the idle timeout for pooled connections. Connections not accessed within this window are closed by the reaper. This is intentionally much shorter than the capability/auth store TTLs because pooled entries hold live network connections (open sockets), not lightweight metadata.
const LogMessage = "tool call"
LogMessage is the constant msg field on emitted lines. Log queries in Loki use this as the anchor (e.g. `msg = "tool call"`).
const LogSubsystem = "MCP-Tool"
LogSubsystem tags the structured log lines emitted by this middleware, matching muster's pkg/logging subsystem convention.
Variables ¶
This section is empty.
Functions ¶
func Logging ¶ added in v0.1.187
func Logging() server.ToolHandlerMiddleware
Logging returns a ToolHandlerMiddleware that emits one structured info-level log line per tool call with fields: tool, outcome, duration_s, error (when set).
func Metrics ¶ added in v0.1.187
func Metrics() server.ToolHandlerMiddleware
Metrics returns a ToolHandlerMiddleware that records:
- muster.tool_calls (counter) with attributes tool, outcome
- muster.tool_call.duration (histogram, unit "s") with the same attributes
Exported via the Prometheus OTEL exporter these become muster_tool_calls_total and muster_tool_call_duration_seconds.
func ShouldUseLocalMint ¶ added in v0.13.0
func ShouldUseLocalMint(serverInfo *ServerInfo) bool
ShouldUseLocalMint reports whether muster mints a per-backend token for a server. Enabled when AuthConfig.LocalMint is non-nil, Enabled, and carries an Audience (the broker local-mint target). Mutually exclusive with token forwarding and token exchange (enforced by the CRD admission rules).
func ShouldUseTokenExchange ¶
func ShouldUseTokenExchange(serverInfo *ServerInfo) bool
ShouldUseTokenExchange checks if RFC 8693 token exchange should be used for a server. Token exchange is enabled when:
- AuthConfig.TokenExchange is not nil
- AuthConfig.TokenExchange.Enabled is true
- Required fields (DexTokenEndpoint, ConnectorID) are set
Token exchange takes precedence over token forwarding if both are configured.
func ShouldUseTokenForwarding ¶
func ShouldUseTokenForwarding(serverInfo *ServerInfo) bool
ShouldUseTokenForwarding checks if token forwarding should be used for a server. Token forwarding is enabled when:
- AuthConfig.ForwardToken is true (forwardToken implies OAuth-based auth)
- OR: AuthConfig.Type is "oauth" and ForwardToken is true
Setting forwardToken: true implicitly enables OAuth authentication since token forwarding only makes sense in an OAuth context.
Types ¶
type AdminConfig ¶ added in v0.1.123
type AdminConfig struct {
// Enabled controls whether the admin listener is started.
Enabled bool
// Port is the TCP port for the admin listener (default: 9999 when enabled).
Port int
// BindAddress is the interface to bind the admin listener to (default: 127.0.0.1).
BindAddress string
}
AdminConfig holds admin web UI configuration for the aggregator.
type AggregatorConfig ¶
type AggregatorConfig struct {
// Port specifies the port number to listen on for the aggregated MCP endpoint
Port int
// Host specifies the host address to bind to (default: localhost)
Host string
// Transport defines the protocol to use for MCP communication.
// Supported values: "sse", "streamable-http", "stdio"
Transport string
// Yolo disables the security denylist for destructive tools.
// When true, all tools are allowed regardless of their destructive nature.
// This should only be enabled in development environments.
Yolo bool
// ConfigDir is the user configuration directory for workflows and other configs.
// This is used to load workflow definitions and make them available as tools.
ConfigDir string
// MusterPrefix is the global prefix applied to all aggregated tools.
// This helps distinguish muster tools from other MCP tools in mixed environments.
// Default value is "x".
MusterPrefix string
// Version is the muster server version to report in the MCP protocol handshake.
// This should be set to the build version during application initialization.
// Defaults to "dev" if not specified.
Version string
// OAuth configuration for remote MCP server authentication (client role)
OAuth OAuthProxyConfig
// OAuthServer configuration for protecting the Muster Server (resource server role)
OAuthServer OAuthServerConfig
// Debug enables debug logging
Debug bool
// Admin, when enabled, starts a separate HTTP listener that serves the
// session management web UI. See internal/admin for details.
Admin AdminConfig
}
AggregatorConfig holds configuration args for the aggregator. This structure defines how the aggregator should behave and what endpoints it should expose.
type AggregatorManager ¶
type AggregatorManager struct {
// contains filtered or unexported fields
}
AggregatorManager provides a high-level interface for managing the aggregator server and coordinating automatic MCP server registration based on service health status.
The manager combines the aggregator server with event handling to provide:
- Automatic registration of healthy MCP servers
- Event-driven updates when service states change
- Periodic retry mechanisms for failed registrations
- Centralized lifecycle management
- OAuth proxy for remote MCP server authentication
It acts as the primary entry point for the aggregator functionality and integrates with the muster service architecture through the central API pattern.
func NewAggregatorManager ¶
func NewAggregatorManager(config AggregatorConfig, orchestratorAPI api.OrchestratorAPI, serviceRegistry api.ServiceRegistryHandler, errorCallback func(err error)) *AggregatorManager
NewAggregatorManager creates a new aggregator manager with the specified configuration.
The manager requires access to the orchestrator API for receiving service state events and the service registry for querying service information. These dependencies are provided through the central API pattern to maintain loose coupling.
Args:
- config: Configuration for the aggregator server behavior
- orchestratorAPI: Interface for receiving service lifecycle events
- serviceRegistry: Interface for querying service information
Returns a configured but not yet started aggregator manager.
func (*AggregatorManager) GetAggregatorServer ¶
func (am *AggregatorManager) GetAggregatorServer() *AggregatorServer
GetAggregatorServer returns the underlying aggregator server instance.
This method provides access to advanced aggregator operations that are not exposed through the manager interface. It should be used carefully and primarily for testing or debugging purposes.
Returns nil if the server is not initialized.
func (*AggregatorManager) GetEndpoint ¶
func (am *AggregatorManager) GetEndpoint() string
GetEndpoint returns the aggregator's MCP endpoint URL.
The endpoint format depends on the configured transport:
- SSE: http://host:port/sse
- Streamable HTTP: http://host:port/mcp
- Stdio: "stdio"
Returns an empty string if the aggregator server is not available.
func (*AggregatorManager) GetEventHandler ¶
func (am *AggregatorManager) GetEventHandler() *EventHandler
GetEventHandler returns the event handler instance.
This method is primarily intended for testing and debugging purposes to inspect the state of the event handling system.
Returns nil if the event handler is not initialized.
func (*AggregatorManager) GetServiceData ¶
func (am *AggregatorManager) GetServiceData() map[string]interface{}
GetServiceData returns comprehensive service monitoring data.
This method provides detailed information about the aggregator's current state, including configuration, connection status, tool/resource/prompt counts, and server statistics. The data is suitable for monitoring dashboards and health checks.
Returns a map containing various metrics and status information.
func (*AggregatorManager) ManualRefresh ¶
func (am *AggregatorManager) ManualRefresh(ctx context.Context) error
ManualRefresh manually triggers a re-synchronization of all healthy MCP servers.
This method can be useful for debugging or when you need to force a refresh of the server registrations outside of the normal event-driven flow. It performs the same operation as the initial sync during startup.
Args:
- ctx: Context for the refresh operation
Returns an error if the refresh operation fails.
func (*AggregatorManager) RegisterServerPendingAuth ¶
func (am *AggregatorManager) RegisterServerPendingAuth(registration PendingAuthRegistration) error
RegisterServerPendingAuth registers a server that requires OAuth authentication before its tools can be exposed. Per ADR-008, no synthetic auth tools are created; users authenticate via core_auth_login.
AuthConfig inside registration may be nil; in either case the server is flagged as requiring per-session authentication.
func (*AggregatorManager) Start ¶
func (am *AggregatorManager) Start(ctx context.Context) error
Start initializes and starts the aggregator manager.
This method performs the following initialization sequence:
- Starts the underlying aggregator server
- Validates that required APIs are available
- Performs initial sync of healthy MCP servers
- Sets up event handling for automatic updates
- Starts periodic retry mechanism for failed registrations
The method is idempotent - calling it multiple times has no additional effect. Returns an error if any component fails to start.
func (*AggregatorManager) Stop ¶
func (am *AggregatorManager) Stop(ctx context.Context) error
Stop gracefully shuts down the aggregator manager.
This method stops all components in reverse order of startup:
- Cancels the context to signal shutdown to all goroutines
- Stops the event handler
- Stops the OAuth manager
- Stops the aggregator server
- Waits for all background operations to complete
The method is idempotent and can be called multiple times safely.
type AggregatorServer ¶
type AggregatorServer struct {
// contains filtered or unexported fields
}
AggregatorServer implements a comprehensive MCP server that aggregates multiple backend MCP servers.
The AggregatorServer is the core component responsible for:
- Collecting and exposing tools, resources, and prompts from multiple backend servers
- Managing multiple transport protocols (SSE, stdio, streamable-http)
- Integrating core muster tools alongside external MCP servers
- Providing intelligent name collision resolution
- Implementing security filtering through the denylist system
- Real-time capability updates when backend servers change
- User-scoped tool visibility for OAuth-protected servers
Architecture: The server maintains a registry of backend MCP servers and dynamically updates its exposed capabilities as servers are registered/deregistered. It supports multiple transport protocols simultaneously and provides both external MCP compatibility and internal tool calling capabilities.
Session-Scoped Tool Visibility: For OAuth-protected servers, each login session's tool view is determined by the CapabilityStore keyed by (sessionID, serverName). There is no session registry; connections are created on demand for each tool call.
Thread Safety: All public methods are thread-safe and can be called concurrently. Internal state is protected by appropriate synchronization mechanisms.
func NewAggregatorServer ¶
func NewAggregatorServer(aggConfig AggregatorConfig, errorCallback func(error)) *AggregatorServer
NewAggregatorServer creates a new aggregator server with the specified configuration.
This constructor initializes all necessary components but does not start any servers. The returned server must be started with the Start method before it can handle requests.
The server is configured with:
- A server registry using the specified muster prefix
- Active item managers for tracking capabilities
- Per-session capability cache and SSO tracker for OAuth servers
- Default transport settings based on configuration
Args:
- aggConfig: Configuration args defining server behavior, transport, and security settings
Returns a configured but unstarted aggregator server ready for initialization.
func (*AggregatorServer) CallToolInternal ¶
func (a *AggregatorServer) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (res *mcp.CallToolResult, err error)
CallToolInternal provides internal tool calling capability for muster components.
This method allows internal muster components to execute tools through the aggregator without going through the external MCP protocol. It supports both:
- Tools from registered backend servers (resolved through the registry)
- Core tools from muster components (called directly through providers)
The method performs intelligent tool resolution:
- First attempts to resolve the tool through the server registry
- If not found, checks if it's a core tool by name pattern
- Routes the call to the appropriate handler based on tool type
This internal calling mechanism is essential for:
- Inter-component communication within muster
- Workflow execution that needs to call other tools
- Administrative operations that require tool access
Args:
- ctx: Context for the tool execution
- toolName: Name of the tool to execute (may be prefixed)
- args: Arguments to pass to the tool as key-value pairs
Returns the tool execution result or an error if the tool cannot be found or executed.
func (*AggregatorServer) DeregisterServer ¶
func (a *AggregatorServer) DeregisterServer(name string) error
DeregisterServer removes a backend MCP server from the aggregator.
This method cleanly removes a backend server from the aggregator, which will cause all tools, resources, and prompts from that server to become unavailable. The backend client connection is closed as part of the deregistration process.
Additionally, this method cleans up any stale session connections for the server. This is critical for handling MCPServer renames, where the old server is deleted and a new one is created. Without this cleanup, session connections stored under the old server name would persist and cause stale auth status displays.
Args:
- name: Unique identifier of the server to remove
Returns an error if the server is not found or deregistration fails.
func (*AggregatorServer) GetAvailableTools ¶
func (a *AggregatorServer) GetAvailableTools() []string
GetAvailableTools implements the ToolAvailabilityChecker interface.
This method returns a comprehensive list of all tools currently available through the aggregator, including both external tools from backend servers and core tools from muster components. The returned list represents the complete tool inventory that can be used by workflows, capabilities, and other muster components.
The aggregation process:
- Collects all tools from registered backend servers via the registry
- Collects all core tools from muster component providers
- Combines both lists into a unified tool inventory
- Returns tool names (with appropriate prefixing applied)
This method is used by:
- Workflow manager for populating available tool lists
- Service class manager for tool validation
- Administrative interfaces for tool discovery
Returns a slice of tool names representing all available tools.
func (*AggregatorServer) GetEndpoint ¶
func (a *AggregatorServer) GetEndpoint() string
GetEndpoint returns the aggregator's primary endpoint URL based on the configured transport.
The endpoint format varies by transport type:
- SSE: http://host:port/sse (Server-Sent Events endpoint)
- Streamable HTTP: http://host:port/mcp (default HTTP streaming path)
- Stdio: "stdio" (indicates standard I/O communication)
This endpoint can be used by MCP clients to connect to the aggregator and access all aggregated capabilities from backend servers.
Returns the endpoint URL as a string, or "stdio" for standard I/O transport.
func (*AggregatorServer) GetPrompt ¶
func (a *AggregatorServer) GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error)
GetPrompt executes a prompt with the provided arguments. This resolves the prompt name to its origin server and retrieves the prompt.
func (*AggregatorServer) GetPrompts ¶
func (a *AggregatorServer) GetPrompts() []mcp.Prompt
GetPrompts returns all available prompts from all sources with intelligent name prefixing.
This method aggregates prompts from all registered backend servers, applying smart prefixing similar to tools to avoid name conflicts. The prefixing ensures that prompts from different servers can coexist without naming collisions.
Returns a slice of MCP prompts ready for client consumption.
func (*AggregatorServer) GetPromptsForSession ¶
GetPromptsForSession returns a session-specific view of all available prompts. For OAuth servers, prompts are read from the CapabilityStore keyed by session ID.
func (*AggregatorServer) GetRegistry ¶
func (a *AggregatorServer) GetRegistry() *ServerRegistry
GetRegistry returns the server registry for direct access to backend server information.
This method provides access to the underlying registry for advanced operations such as inspecting server status, accessing raw capabilities, or performing administrative tasks. It should be used carefully to avoid disrupting the aggregator's normal operation.
Returns the ServerRegistry instance managing all backend servers.
func (*AggregatorServer) GetResources ¶
func (a *AggregatorServer) GetResources() []mcp.Resource
GetResources returns all available resources from all registered backend servers.
This method aggregates resources from all connected backend servers, applying appropriate URI prefixing to avoid conflicts. Resources maintain their original functionality while being properly namespaced within the aggregated environment.
Returns a slice of MCP resources ready for client access.
func (*AggregatorServer) GetResourcesForSession ¶
func (a *AggregatorServer) GetResourcesForSession(ctx context.Context, sessionID string) []mcp.Resource
GetResourcesForSession returns a session-specific view of all available resources. For OAuth servers, resources are read from the CapabilityStore keyed by session ID.
func (*AggregatorServer) GetTools ¶
func (a *AggregatorServer) GetTools() []mcp.Tool
GetTools returns all available tools from all sources with intelligent name prefixing.
This method aggregates tools from all registered backend servers, applying smart prefixing to avoid name conflicts. The prefixing is only applied when conflicts would otherwise occur, following the pattern: {muster_prefix}_{server_prefix}_{original_name}
Note: This returns the global tool view. For session-specific tool visibility, use GetToolsForSession instead.
Returns a slice of MCP tools ready for client consumption.
func (*AggregatorServer) GetToolsForSession ¶
GetToolsForSession returns a session-specific view of all available tools. For OAuth servers, tools are read from the CapabilityStore keyed by session ID. For non-OAuth servers, tools are read from ServerInfo (same as GetAllTools).
func (*AggregatorServer) GetToolsWithStatus ¶
func (a *AggregatorServer) GetToolsWithStatus() []ToolWithStatus
GetToolsWithStatus returns all available tools along with their security blocking status.
This method provides enhanced tool information that includes whether each tool is blocked by the security denylist. The blocking status depends on:
- The tool's classification as destructive in the denylist
- The current yolo mode setting (yolo=true allows all tools)
The tool names are resolved to their original names (before prefixing) for accurate denylist checking, ensuring consistent security behavior regardless of how tools are exposed.
Returns a slice of ToolWithStatus containing both tool definitions and security status.
func (*AggregatorServer) IsToolAvailable ¶
func (a *AggregatorServer) IsToolAvailable(toolName string) bool
IsToolAvailable implements the ToolAvailabilityChecker interface.
This method determines whether a specific tool is available through the aggregator, checking both external backend servers (via the registry) and core muster tools (via name pattern matching). It provides a unified way for other components to verify tool availability before attempting to use them.
The check process:
- Attempts to resolve the tool through the server registry
- If unresolved because the tool is a family backed by more than one provider (an intentional ambiguity error), treats it as available since the family still exists; routing only needs the instance-selector arg
- If still not found, checks if it matches core tool naming patterns
- Returns true if found in any of these
This method is used by:
- Workflow manager for validating workflow step tools
- Service class manager for tool availability validation
Args:
- toolName: Name of the tool to check (may be prefixed)
Returns true if the tool is available, false otherwise.
func (*AggregatorServer) IsYoloMode ¶
func (a *AggregatorServer) IsYoloMode() bool
IsYoloMode returns whether yolo mode is currently enabled.
Yolo mode disables the security denylist, allowing all tools to be executed regardless of their destructive potential. This mode should only be enabled in development or testing environments where the risk is acceptable.
Returns true if yolo mode is enabled, false if security filtering is active.
func (*AggregatorServer) ListPromptsForContext ¶
func (a *AggregatorServer) ListPromptsForContext(ctx context.Context) []mcp.Prompt
ListPromptsForContext returns all available prompts for the current session context.
func (*AggregatorServer) ListResourcesForContext ¶
func (a *AggregatorServer) ListResourcesForContext(ctx context.Context) []mcp.Resource
ListResourcesForContext returns all available resources for the current session context.
func (*AggregatorServer) ListServersRequiringAuth ¶
func (a *AggregatorServer) ListServersRequiringAuth(ctx context.Context) []api.ServerAuthInfo
ListServersRequiringAuth returns a list of servers that require authentication for the current session. This enables the list_tools meta-tool to inform users about servers that are available but require authentication before their tools become visible.
The method checks each registered server and returns those that:
- Require per-session authentication (RequiresSessionAuth)
- The session has not yet authenticated to
- Are NOT SSO-configured (token forwarding/exchange)
This is part of the server-side meta-tools migration (Issue #343) to provide better visibility into which servers need authentication.
func (*AggregatorServer) ListToolsForContext ¶
func (a *AggregatorServer) ListToolsForContext(ctx context.Context) []mcp.Tool
ListToolsForContext returns all available tools for the current user context. This is used by the metatools package to provide user-scoped tool visibility.
The method extracts the session ID from the context and returns tools appropriate for that session's authentication state. This includes:
- MCP server tools (prefixed with x_<server>_)
- Core muster tools (prefixed with core_) from internal providers
The core tools are collected from workflow, service, config, mcpserver, events, and auth providers.
func (*AggregatorServer) MissingToolsForSession ¶ added in v0.1.222
func (a *AggregatorServer) MissingToolsForSession(ctx context.Context, toolNames []string) []string
MissingToolsForSession returns, from toolNames, the (deduplicated, input-ordered) subset that is not available for the calling session.
Availability of family tools provided by SSO / auth-protected servers must be scoped to the calling session and must not depend on call ordering (#764):
- False negative: such servers are skipped by GetAllTools(), so their family routing entries land in the process-global familyMappings index only as a side effect of a prior GetAllToolsForSession() call. A check against that index reported the tool missing until some session listed tools.
- False positive: that index is unioned across sessions and keyed only by exposed tool name. Once any session lists tools, registry.IsFamilyTool flips true process-wide, so a global check then reported the tool available for every session — including ones that never authenticated to the family.
Both stem from resolving session-scoped tools against the process-global view. So when the context carries a session, that session's accessible tool set — hydrated from the CapabilityStore (keyed by subject / session) via GetToolsForSession — is authoritative for downstream (non-core) tools. The session set already includes every non-auth connected server's tools plus the family tools the caller has authenticated to, so it neither under- nor over-reports. Core / meta tools (core_*, workflow_*) are never session-scoped and resolve by name. Only when there is no session does availability fall back to the process-global view.
The session tool set is resolved at most once per call (lazily, on the first non-core tool), so checking many tools does not rebuild it repeatedly.
func (*AggregatorServer) OnToolsUpdated ¶
func (a *AggregatorServer) OnToolsUpdated(event api.ToolUpdateEvent)
OnToolsUpdated implements the ToolUpdateSubscriber interface for handling tool change events.
This method responds to tool update events from other muster components, particularly the workflow manager, to maintain synchronization of the aggregator's exposed capabilities with the current tool landscape.
Event Processing:
- Filters events to focus on workflow-related tool changes
- Triggers capability refresh for workflow tool updates
- Uses asynchronous processing with a small delay to avoid mutex conflicts
- Logs all received events for debugging and monitoring
The asynchronous processing pattern ensures that:
- The event publisher (workflow manager) doesn't block waiting for aggregator updates
- Mutex conflicts are avoided by allowing the publisher to complete first
- Capability updates happen promptly but safely
Args:
- event: Tool update event containing change information, tool lists, and metadata
The method processes events selectively, focusing on workflow manager events that indicate changes to workflow-based tools.
func (*AggregatorServer) ReadResource ¶
func (a *AggregatorServer) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)
ReadResource retrieves the contents of a resource by URI. This resolves the resource URI to its origin server and reads the content.
func (*AggregatorServer) RegisterServer ¶
func (a *AggregatorServer) RegisterServer(ctx context.Context, registration ServerRegistration, client MCPClient) error
RegisterServer registers a new backend MCP server with the aggregator.
This method adds a backend MCP server to the aggregator's registry, making its tools, resources, and prompts available through the aggregated interface. The registration process includes client initialization and capability discovery.
Args:
- ctx: Context for the registration operation and capability queries
- registration: Server identification (name, toolPrefix, family)
- client: MCP client interface for communicating with the backend server
Returns an error if registration fails due to naming conflicts, client issues, or communication problems with the backend server.
func (*AggregatorServer) Start ¶
func (a *AggregatorServer) Start(ctx context.Context) error
Start initializes and starts the aggregator server with all configured transports.
This method performs a comprehensive startup sequence:
- Creates and configures the core MCP server with full capabilities
- Initializes workflow adapter if config directory is provided
- Starts background monitoring of registry updates
- Subscribes to tool update events from other muster components
- Performs initial capability discovery and registration
- Starts the appropriate transport server(s) based on configuration
Transport Support:
- SSE: Server-Sent Events with HTTP endpoints (/sse, /message)
- Stdio: Standard input/output for CLI integration
- Streamable HTTP: HTTP-based streaming protocol (default)
The method is idempotent for the same context - calling it multiple times with the same context will return an error indicating the server is already started.
Args:
- ctx: Context for controlling the server lifecycle and coordinating shutdown
Returns an error if startup fails at any stage, or nil on successful startup.
func (*AggregatorServer) Stop ¶
func (a *AggregatorServer) Stop(ctx context.Context) error
Stop gracefully shuts down the aggregator server and all its components.
This method performs a coordinated shutdown sequence:
- Cancels the context to signal shutdown to all background routines
- Shuts down all transport servers with a timeout
- Waits for background routines to complete
- Deregisters all backend servers to clean up connections
- Resets internal state to allow for restart
The shutdown process includes:
- Graceful shutdown of HTTP-based transports with a 5-second timeout
- Automatic shutdown of stdio transport via context cancellation
- Cleanup of all registered backend MCP servers
- Synchronization with background monitoring routines
The method is idempotent - calling it multiple times is safe and will not cause errors or duplicate cleanup operations.
Args:
- ctx: Context for controlling the shutdown timeout and operations
Returns an error if shutdown encounters issues, though cleanup continues regardless.
func (*AggregatorServer) ToggleToolBlock ¶
func (a *AggregatorServer) ToggleToolBlock(toolName string) error
ToggleToolBlock toggles the blocked status of a specific tool (placeholder implementation).
This method is intended to provide runtime control over individual tool blocking, allowing administrators to override the default denylist behavior for specific tools. Currently, this functionality is not fully implemented and returns an error.
Future Enhancement: The full implementation would maintain a runtime override list that could selectively enable or disable specific tools regardless of the global yolo setting.
Args:
- toolName: Name of the tool to toggle blocking status for
Returns an error indicating the feature is not yet implemented.
func (*AggregatorServer) UpdateCapabilities ¶
func (a *AggregatorServer) UpdateCapabilities()
UpdateCapabilities provides public access to capability updates for external components.
This method exposes the internal updateCapabilities functionality to allow other muster components (particularly the workflow manager) to trigger capability refreshes when they detect changes in their tool inventory.
The method is thread-safe and can be called concurrently without causing issues. It performs the same comprehensive capability update as the internal method, including cleanup of obsolete items and addition of new capabilities.
Use Cases:
- Workflow manager triggering updates when workflow definitions change
- Administrative tools forcing capability refresh
- Integration testing scenarios requiring capability synchronization
This is a lightweight wrapper around the internal updateCapabilities method.
type AuthInfo ¶
AuthInfo is an alias to the mcpserver AuthInfo type for OAuth authentication. It contains OAuth authentication information extracted from a 401 response. See mcpserver.AuthInfo for detailed field documentation.
type AuthMetrics ¶
type AuthMetrics struct {
// contains filtered or unexported fields
}
AuthMetrics tracks authentication-related metrics for monitoring and alerting.
This provides visibility into authentication patterns, failures, and potential abuse. Metrics are tracked per-server to enable targeted alerting and debugging.
func NewAuthMetrics ¶
func NewAuthMetrics() *AuthMetrics
NewAuthMetrics creates a new AuthMetrics instance.
func (*AuthMetrics) GetServerMetrics ¶
func (m *AuthMetrics) GetServerMetrics(serverName string) (AuthServerMetricView, bool)
GetServerMetrics returns metrics for a specific server.
func (*AuthMetrics) GetSummary ¶
func (m *AuthMetrics) GetSummary() AuthMetricsSummary
GetSummary returns a summary of all authentication metrics.
func (*AuthMetrics) RecordLoginAttempt ¶
func (m *AuthMetrics) RecordLoginAttempt(serverName, sub string)
RecordLoginAttempt records an authentication login attempt.
Args:
- serverName: Name of the server being authenticated to
- sub: The user subject making the attempt (for logging, truncated)
func (*AuthMetrics) RecordLoginFailure ¶
func (m *AuthMetrics) RecordLoginFailure(serverName, sub, reason string)
RecordLoginFailure records a failed authentication attempt.
Args:
- serverName: Name of the server where authentication failed
- sub: The user subject that failed (for logging, truncated)
- reason: The reason for the failure
func (*AuthMetrics) RecordLoginSuccess ¶
func (m *AuthMetrics) RecordLoginSuccess(serverName, sub string)
RecordLoginSuccess records a successful authentication.
Args:
- serverName: Name of the server authenticated to
- sub: The user subject that authenticated (for logging, truncated)
func (*AuthMetrics) RecordLogoutAttempt ¶
func (m *AuthMetrics) RecordLogoutAttempt(serverName, sub string)
RecordLogoutAttempt records a logout attempt.
Args:
- serverName: Name of the server being logged out from
- sub: The user subject making the attempt (for logging, truncated)
func (*AuthMetrics) RecordLogoutSuccess ¶
func (m *AuthMetrics) RecordLogoutSuccess(serverName, sub string)
RecordLogoutSuccess records a successful logout.
Args:
- serverName: Name of the server logged out from
- sub: The user subject that logged out (for logging, truncated)
func (*AuthMetrics) RecordRateLimitBlock ¶
func (m *AuthMetrics) RecordRateLimitBlock(serverName, sub string)
RecordRateLimitBlock records when a user was rate limited.
Args:
- serverName: Name of the server the user was trying to authenticate to
- sub: The user subject that was blocked (for logging, truncated)
type AuthMetricsSummary ¶
type AuthMetricsSummary struct {
TotalLoginAttempts int64 `json:"total_login_attempts"`
TotalLoginSuccesses int64 `json:"total_login_successes"`
TotalLoginFailures int64 `json:"total_login_failures"`
TotalRateLimitBlocks int64 `json:"total_rate_limit_blocks"`
TotalLogoutAttempts int64 `json:"total_logout_attempts"`
TotalLogoutSuccesses int64 `json:"total_logout_successes"`
PerServerMetrics []AuthServerMetricView `json:"per_server_metrics"`
}
AuthMetricsSummary provides a summary of authentication metrics.
type AuthRateLimiter ¶
type AuthRateLimiter struct {
// contains filtered or unexported fields
}
AuthRateLimiter provides per-user rate limiting for authentication operations. This prevents OAuth flow abuse by limiting the number of auth attempts per user.
Rate limiting is implemented using a sliding window approach:
- Each user can make at most MaxAuthAttempts auth attempts within the time window
- Attempts are tracked per user (sub claim), not globally
- Old attempts are automatically cleaned up via a background goroutine
Callers MUST call Stop() when done to prevent goroutine leaks.
func NewAuthRateLimiter ¶
func NewAuthRateLimiter(config AuthRateLimiterConfig) *AuthRateLimiter
NewAuthRateLimiter creates a new rate limiter with the given configuration. It starts a background goroutine that periodically calls Cleanup() to remove stale entries. Callers MUST call Stop() when done to prevent goroutine leaks.
func (*AuthRateLimiter) Allow ¶
func (rl *AuthRateLimiter) Allow(userID, serverName string) bool
Allow checks if an auth attempt is allowed for the given user. If allowed, the attempt is recorded and true is returned. If rate limited, false is returned and the attempt is NOT recorded.
Args:
- userID: The user making the auth attempt (sub claim or session ID fallback)
- serverName: The server being authenticated to (for logging)
Returns true if the attempt is allowed, false if rate limited.
func (*AuthRateLimiter) Cleanup ¶
func (rl *AuthRateLimiter) Cleanup()
Cleanup removes stale entries from the rate limiter. This is called periodically by the background cleanup goroutine.
func (*AuthRateLimiter) RemainingAttempts ¶
func (rl *AuthRateLimiter) RemainingAttempts(userID string) int
RemainingAttempts returns the number of remaining auth attempts for a user.
func (*AuthRateLimiter) Reset ¶
func (rl *AuthRateLimiter) Reset(userID string)
Reset clears all rate limiting state for a user. This can be called after successful authentication to reset the counter.
func (*AuthRateLimiter) Stop ¶ added in v0.1.34
func (rl *AuthRateLimiter) Stop()
Stop terminates the background cleanup goroutine. It is safe to call Stop multiple times.
type AuthRateLimiterConfig ¶
type AuthRateLimiterConfig struct {
// MaxAttempts is the maximum number of auth attempts per user within the window.
// Default: 10 attempts
MaxAttempts int
// Window is the time window for rate limiting.
// Default: 1 minute
Window time.Duration
}
AuthRateLimiterConfig holds configuration for the rate limiter.
func DefaultAuthRateLimiterConfig ¶
func DefaultAuthRateLimiterConfig() AuthRateLimiterConfig
DefaultAuthRateLimiterConfig returns the default rate limiter configuration.
type AuthServerMetricView ¶
type AuthServerMetricView struct {
ServerName string `json:"server_name"`
LoginAttempts int64 `json:"login_attempts"`
LoginSuccesses int64 `json:"login_successes"`
LoginFailures int64 `json:"login_failures"`
RateLimitBlocks int64 `json:"rate_limit_blocks"`
LogoutAttempts int64 `json:"logout_attempts"`
LogoutSuccesses int64 `json:"logout_successes"`
LastAttemptAt time.Time `json:"last_attempt_at,omitempty"`
LastSuccessAt time.Time `json:"last_success_at,omitempty"`
LastFailureAt time.Time `json:"last_failure_at,omitempty"`
}
AuthServerMetricView is a read-only view of server-specific auth metrics.
type AuthToolProvider ¶
type AuthToolProvider struct {
// contains filtered or unexported fields
}
AuthToolProvider provides core authentication tools for the aggregator. These tools allow users to authenticate to OAuth-protected MCP servers through `core_auth_login` and `core_auth_logout` commands.
This implements ADR-008: Authentication is a muster platform concern, not an MCP server concern. Instead of synthetic per-server authenticate tools, we use core tools that take a server parameter.
func NewAuthToolProvider ¶
func NewAuthToolProvider(aggregator *AggregatorServer) *AuthToolProvider
NewAuthToolProvider creates a new authentication tool provider.
func (*AuthToolProvider) ExecuteTool ¶
func (p *AuthToolProvider) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*api.CallToolResult, error)
ExecuteTool executes an authentication tool by name.
type ConnectionResult ¶ added in v0.1.50
type ConnectionResult struct {
// ServerName is the name of the server that was connected
ServerName string
// ToolCount is the number of tools available from the server
ToolCount int
// ResourceCount is the number of resources available from the server
ResourceCount int
// PromptCount is the number of prompts available from the server
PromptCount int
// Client is the live MCP client. The caller owns its lifecycle and must
// either pool it for reuse or close it when done.
Client MCPClient
// TokenExpiry records when the client's bearer token expires. Zero means
// no expiry tracking (e.g., token forwarding clients). Callers should pass
// this to SessionConnectionPool.PutWithExpiry for proactive refresh.
TokenExpiry time.Time
// ExchangedToken is the RFC 8693 exchanged bearer this client sends
// downstream. Populated only by the token-exchange path — forward-token
// and DynamicAuth connections leave it empty. Callers should pass it to
// SessionConnectionPool.SetExchangedToken so the admin UI can surface it.
ExchangedToken string
}
ConnectionResult contains the result of establishing a session connection. This is returned by establishConnection and used by callers to format their specific result types (api.CallToolResult or mcp.CallToolResult).
The Client field holds the live, initialized MCP client. Ownership is transferred to the caller, who must either pool or close it.
func EstablishConnectionWithLocalMint ¶ added in v0.13.0
func EstablishConnectionWithLocalMint( ctx context.Context, a *AggregatorServer, serverInfo *ServerInfo, musterIssuer string, ) (*ConnectionResult, error)
EstablishConnectionWithLocalMint connects to a localMint server to discover its capabilities. Discovery mints an M2M token from the connecting identity's session token (in the agent topology that is the agent's own SA token, which WorkloadAudiences authorizes); a human's on-behalf-of identity only arrives per call_tool via the actor header. The returned client's header func re-mints on every request from the live request context, so per-call execution still performs the full M2M-or-delegation mint — the discovery token is not reused.
func EstablishConnectionWithTokenExchange ¶ added in v0.1.50
func EstablishConnectionWithTokenExchange( ctx context.Context, a *AggregatorServer, serverInfo *ServerInfo, musterIssuer string, ) (*ConnectionResult, error)
EstablishConnectionWithTokenExchange attempts to establish a connection using RFC 8693 Token Exchange for cross-cluster SSO. This is used when an MCPServer has tokenExchange configured.
The function:
- Gets the user's ID token from muster's OAuth session
- Extracts the user ID (sub claim) from the token
- Exchanges it for a token valid on the remote cluster's Dex
- If successful, populates the CapabilityStore and registers tools
Both sessionID and sub are extracted from ctx (set by OAuth middleware).
Args:
- ctx: Context for the operation (must contain sessionID and sub)
- a: The aggregator server instance
- serverInfo: The server info containing URL and auth config
- musterIssuer: The issuer URL of muster's OAuth provider (used to get the ID token)
Returns:
- *ConnectionResult: The connection result if successful
- error: The error if connection failed
func EstablishConnectionWithTokenForwarding ¶ added in v0.1.50
func EstablishConnectionWithTokenForwarding( ctx context.Context, a *AggregatorServer, serverInfo *ServerInfo, musterIssuer string, ) (*ConnectionResult, error)
EstablishConnectionWithTokenForwarding attempts to establish a connection using ID token forwarding for SSO. This is used when an MCPServer has forwardToken: true.
The function:
- Gets the user's ID token from muster's OAuth session
- Forwards it to the downstream MCP server
- If successful, populates the CapabilityStore and registers tools
Both sessionID and sub are extracted from ctx (set by OAuth middleware).
Args:
- ctx: Context for the operation (must contain sessionID and sub)
- a: The aggregator server instance
- serverInfo: The server info containing URL and auth config
- musterIssuer: The issuer URL of muster's OAuth provider (used to get the ID token)
Returns:
- *ConnectionResult: The connection result if successful
- error: The error if connection failed
func (*ConnectionResult) FormatAsAPIResult ¶ added in v0.1.50
func (r *ConnectionResult) FormatAsAPIResult() *api.CallToolResult
FormatAsAPIResult formats the connection result as an api.CallToolResult. Used by AuthToolProvider.tryConnectWithToken.
func (*ConnectionResult) FormatAsMCPResult ¶ added in v0.1.50
func (r *ConnectionResult) FormatAsMCPResult() *mcp.CallToolResult
FormatAsMCPResult formats the connection result as an mcp.CallToolResult. Used by AggregatorServer.tryConnectWithToken.
type EventHandler ¶
type EventHandler struct {
// contains filtered or unexported fields
}
EventHandler manages automatic MCP server registration based on service lifecycle events.
The event handler bridges the gap between the muster service orchestrator and the aggregator by listening for service state changes and automatically registering or deregistering MCP servers as they become healthy or unhealthy.
Key responsibilities:
- Subscribe to orchestrator service state changes
- Filter events to only process MCP service changes
- Automatically register healthy running MCP servers
- Automatically deregister unhealthy or stopped MCP servers
- Skip global registration for SSO-based servers (handled at session level)
- Maintain separation of concerns through callback functions
The handler operates asynchronously and is designed to be resilient to temporary failures in the registration process.
func NewEventHandler ¶
func NewEventHandler( orchestratorAPI api.OrchestratorAPI, registerFunc func(context.Context, string) error, deregisterFunc func(string) error, isServerAuthRequired func(string) bool, isServerSSOBased func(string) bool, ) *EventHandler
NewEventHandler creates a new event handler with the specified dependencies and callbacks.
The event handler uses callback functions to maintain loose coupling with the aggregator manager. This design allows the handler to focus solely on event processing while delegating the actual registration logic to the caller.
Args:
- orchestratorAPI: Interface for subscribing to service state changes
- registerFunc: Callback function to register a server by name
- deregisterFunc: Callback function to deregister a server by name
- isServerAuthRequired: Optional callback to check if server is in auth_required state
- isServerSSOBased: Optional callback to check if server uses SSO token forwarding/exchange
Returns a configured but not yet started event handler.
func (*EventHandler) IsRunning ¶
func (eh *EventHandler) IsRunning() bool
IsRunning returns whether the event handler is currently active.
This method is thread-safe and can be used to check the handler's status for monitoring or debugging purposes.
Returns true if the event handler is currently processing events.
func (*EventHandler) Start ¶
func (eh *EventHandler) Start(ctx context.Context) error
Start begins listening for orchestrator events and processing them asynchronously.
This method subscribes to service state change events from the orchestrator and starts a background goroutine to process them. The method is idempotent - calling it multiple times has no additional effect.
The event processing continues until the provided context is cancelled or the Stop method is called.
Args:
- ctx: Context for controlling the event handler lifecycle
Returns nil on successful startup. The method does not wait for event processing to complete, as that happens asynchronously.
func (*EventHandler) Stop ¶
func (eh *EventHandler) Stop() error
Stop gracefully shuts down the event handler and waits for cleanup to complete.
This method cancels the event processing context and waits for the background goroutine to finish processing any in-flight events. The method is idempotent and can be called multiple times safely.
Returns nil after successful shutdown.
type EventType ¶
type EventType int
EventType represents the type of registration event. This enumeration defines the possible state changes for server registration.
type MCPClient ¶
type MCPClient interface {
// Initialize establishes the connection and performs protocol handshake.
// This must be called before any other operations on the client.
// Returns an error if the handshake fails or connection cannot be established.
Initialize(ctx context.Context) error
// Close cleanly shuts down the client connection.
// This should be called when the client is no longer needed
// to ensure proper cleanup of resources.
Close() error
// ListTools returns all available tools from the server.
// This is used to discover what capabilities the server provides.
ListTools(ctx context.Context) ([]mcp.Tool, error)
// CallTool executes a specific tool and returns the result.
// The name arg should match one of the tools returned by ListTools.
// The args arg contains the tool-specific arguments as key-value pairs.
CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error)
// ListResources returns all available resources from the server.
// Resources are data sources that can be read by the client.
ListResources(ctx context.Context) ([]mcp.Resource, error)
// ReadResource retrieves a specific resource by its URI.
// The URI should match one of the resources returned by ListResources.
ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)
// ListPrompts returns all available prompts from the server.
// Prompts are templates or structured text that can be retrieved and used.
ListPrompts(ctx context.Context) ([]mcp.Prompt, error)
// GetPrompt retrieves a specific prompt with optional arguments.
// The name should match one of the prompts returned by ListPrompts.
// Args can be used to customize the prompt content.
GetPrompt(ctx context.Context, name string, args map[string]interface{}) (*mcp.GetPromptResult, error)
// Ping checks if the server is responsive.
// This is used for health checking and connection validation.
Ping(ctx context.Context) error
// OnNotification registers a handler for server-pushed JSON-RPC
// notifications (e.g. notifications/tools/list_changed).
OnNotification(handler func(mcp.JSONRPCNotification))
}
MCPClient defines the interface for MCP client operations. This interface abstracts the underlying MCP client implementation and will be implemented by the client in the mcpserver package. It provides all necessary operations for interacting with MCP servers including initialization, tool execution, and resource/prompt access.
type OAuthProxyConfig ¶
type OAuthProxyConfig struct {
// Enabled controls whether OAuth proxy functionality is active.
Enabled bool
// PublicURL is the publicly accessible URL of the Muster Server.
// This is used to construct OAuth callback URLs.
PublicURL string
// ClientID is the OAuth client identifier (CIMD URL).
ClientID string
// CallbackPath is the path for the OAuth callback endpoint.
CallbackPath string
// ExtraCAFile mirrors the process-level --extra-ca-file flag for the
// OAuth/token-exchange layer's internal-deployment heuristic.
ExtraCAFile string
}
OAuthProxyConfig holds OAuth proxy configuration for the aggregator.
type OAuthServerConfig ¶
type OAuthServerConfig struct {
// Enabled controls whether OAuth server protection is active.
Enabled bool
// Config holds the full OAuth server configuration.
// This is populated from the config.OAuthServerConfig during initialization.
Config interface{}
}
OAuthServerConfig holds OAuth server configuration for protecting the Muster Server. This is a simplified configuration that references the full config from the config package.
type PendingAuthRegistration ¶ added in v0.1.180
type PendingAuthRegistration struct {
ServerRegistration
// URL is the remote server endpoint.
URL string
// AuthInfo carries the OAuth metadata returned in the 401 response.
AuthInfo *AuthInfo
// AuthConfig describes how to forward or exchange tokens, when set.
AuthConfig *api.MCPServerAuth
}
PendingAuthRegistration carries the configuration needed to register a backend MCP server that is reachable but requires OAuth authentication before its tools can be exposed. AuthConfig may be nil; an empty AuthConfig is equivalent to a nil one — both mark the server as requiring per-session authentication.
type PooledInfo ¶ added in v0.1.123
type PooledInfo struct {
ServerName string
CreatedAt time.Time
LastUsedAt time.Time
TokenExpiry time.Time // Zero if no tracked expiry.
ExchangedToken string // Empty for forward-token / unauthenticated clients.
}
PooledInfo is a read-only snapshot of a pool entry's metadata. Used by the admin UI to introspect live connections without exposing the MCP client.
type ProtectedResourceMetadata ¶
type ProtectedResourceMetadata struct {
// Issuer is the authorization server URL
Issuer string
// Scope is the space-separated list of required scopes
Scope string
// Resource is the canonical resource URI per RFC 9728 §3.2 (REQUIRED in
// the spec but omitted by some real backends — log + accept on absence).
// Used by RFC 8707 token-binding consumers downstream.
Resource string
}
ProtectedResourceMetadata contains OAuth information discovered from the /.well-known/oauth-protected-resource endpoint.
type RegistrationEvent ¶
type RegistrationEvent struct {
// Type indicates whether this is a registration or deregistration event
Type EventType
// ServerName is the unique identifier of the server involved in the event
ServerName string
// Client is the MCP client associated with the server (may be nil for deregistration)
Client MCPClient
}
RegistrationEvent represents a server registration or deregistration event. These events are used internally to coordinate between different components when servers are added or removed from the aggregator.
type ServerInfo ¶
type ServerInfo struct {
// Name is the unique identifier for this server within the aggregator
Name string
// Namespace is the Kubernetes namespace for this server.
// This is used for event emission and resource references.
// Defaults to "default" if not specified.
Namespace string
// Client is the MCP client instance used to communicate with the server
Client MCPClient
// LastUpdate tracks when the server information was last refreshed
LastUpdate time.Time
// ToolPrefix is the configured prefix for tools from this server.
// This is used for name collision resolution.
ToolPrefix string
// Family, when set, declares that this server is an instance of a
// family of equivalent servers. Tools from all servers in the same family
// are exposed as {musterPrefix}_{family.Name}_{toolName} with a required
// parameter named by family.InstanceArg selecting the instance.
Family *api.MCPServerFamily
// URL is the server endpoint URL (for remote servers)
URL string
// AuthInfo contains OAuth information if authentication is required.
// This is populated when a 401 is received during initialization.
AuthInfo *AuthInfo
// AuthConfig contains the authentication configuration for this server.
// This is used to determine token forwarding behavior for SSO.
// Immutable after registration — set once by RegisterPendingAuthWithConfig
// and never modified, so RequiresSessionAuth() is safe without locking.
AuthConfig *api.MCPServerAuth
Tools []mcp.Tool // Cached list of available tools
Resources []mcp.Resource // Cached list of available resources
Prompts []mcp.Prompt // Cached list of available prompts
// contains filtered or unexported fields
}
ServerInfo contains information about a registered MCP server. This structure maintains both the connection details and cached capabilities for efficient access. It is thread-safe for concurrent access to cached data.
func (*ServerInfo) GetNamespace ¶
func (s *ServerInfo) GetNamespace() string
GetNamespace returns the namespace for this server, defaulting to metav1.NamespaceDefault when no namespace is set.
func (*ServerInfo) GetStatus ¶ added in v0.1.72
func (s *ServerInfo) GetStatus() api.ServiceState
GetStatus returns the current service state from the canonical service registry. Returns api.StateUnknown if the service is not found in the registry.
func (*ServerInfo) IsConnected ¶
func (s *ServerInfo) IsConnected() bool
IsConnected reports whether the server is in an active (running/connected) state. Falls back to checking for a live client when the service registry has no entry (e.g. servers registered directly via Register).
func (*ServerInfo) RequiresSessionAuth ¶ added in v0.1.72
func (s *ServerInfo) RequiresSessionAuth() bool
RequiresSessionAuth reports whether this server uses per-session authentication. This is a permanent property based on the server's auth configuration, set during RegisterPendingAuth and never changed by connection state transitions.
func (*ServerInfo) UpdatePrompts ¶
func (s *ServerInfo) UpdatePrompts(prompts []mcp.Prompt)
UpdatePrompts safely updates the server's cached prompt list. This method is thread-safe and should be used whenever the server's available prompts change.
func (*ServerInfo) UpdateResources ¶
func (s *ServerInfo) UpdateResources(resources []mcp.Resource)
UpdateResources safely updates the server's cached resource list. This method is thread-safe and should be used whenever the server's available resources change.
func (*ServerInfo) UpdateTools ¶
func (s *ServerInfo) UpdateTools(tools []mcp.Tool)
UpdateTools safely updates the server's cached tool list. This method is thread-safe and should be used whenever the server's available tools change.
type ServerRegistration ¶ added in v0.1.180
type ServerRegistration struct {
// Name is the unique identifier for the server within the aggregator.
Name string
// ToolPrefix is the per-server tool prefix used when Family is nil.
// Pattern: {musterPrefix}_{toolPrefix-or-name}_{toolName}.
ToolPrefix string
// Family declares that this server is an instance of a family of
// equivalent servers. When set, tools are exposed as
// {musterPrefix}_{family.Name}_{toolName} with a required parameter
// named by family.InstanceArg.
Family *api.MCPServerFamily
}
ServerRegistration carries the configuration needed to register a backend MCP server with the aggregator. Use this struct instead of passing fields positionally to avoid silent swaps between same-typed identifiers (toolPrefix, family, name) that the type system would otherwise tolerate.
type ServerRegistry ¶
type ServerRegistry struct {
// contains filtered or unexported fields
}
ServerRegistry manages the collection of registered MCP servers and their capabilities.
The registry maintains a thread-safe mapping of server names to their information, including cached capabilities (tools, resources, prompts) and connection status. It applies deterministic prefixing ({musterPrefix}_{serverPrefix}_{originalName}) and maintains a reverse lookup map for routing requests to the correct backend.
Servers that declare spec.family share their exposed name space: every server in a family advertises tools as {musterPrefix}_{family}_{toolName} and the aggregator injects a required "server" enum parameter so callers select which instance handles the call. The "server" parameter is always required when family is set, even for single-instance families, so skills written against the family name remain stable when instances are added or removed.
Key responsibilities:
- Server lifecycle management (registration/deregistration)
- Capability caching for performance
- Deterministic name prefixing and reverse resolution
- Family-based grouping with explicit instance routing
- Thread-safe access to server information
- Update notifications for capability changes
func NewServerRegistry ¶
func NewServerRegistry(musterPrefix string) *ServerRegistry
NewServerRegistry creates a new server registry with the specified global prefix.
The registry uses the musterPrefix to ensure all exposed capabilities are prefixed appropriately to distinguish them from other MCP tools in the environment.
Args:
- musterPrefix: Global prefix applied to all aggregated capabilities (default: "x")
Returns a new, empty server registry ready for use.
func (*ServerRegistry) Deregister ¶
func (r *ServerRegistry) Deregister(name string) error
Deregister removes an MCP server from the registry and cleans up its resources.
This method safely removes a server from the registry, closes its client connection, and notifies subscribers of the change. All tools, resources, and prompts provided by the server will no longer be available through the aggregator.
The method is thread-safe and can be called concurrently.
Args:
- name: Unique identifier of the server to remove
Returns an error if the server is not found in the registry.
func (*ServerRegistry) ExposedPromptName ¶ added in v0.1.69
func (r *ServerRegistry) ExposedPromptName(serverName, promptName string) string
ExposedPromptName returns the fully prefixed name for a prompt and records the reverse mapping for later resolution.
func (*ServerRegistry) ExposedResourceURI ¶ added in v0.1.69
func (r *ServerRegistry) ExposedResourceURI(serverName, resourceURI string) string
ExposedResourceURI returns the fully prefixed URI for a resource and records the reverse mapping for later resolution. URIs with a scheme (e.g. http://) are returned unchanged.
func (*ServerRegistry) ExposedToolName ¶ added in v0.1.69
func (r *ServerRegistry) ExposedToolName(serverName, toolName string) string
ExposedToolName returns the fully prefixed name for a tool and records the reverse mapping for later resolution.
Pattern: {musterPrefix}_{serverPrefix}_{originalName}
func (*ServerRegistry) FamilyInstanceArgFor ¶ added in v0.1.180
func (r *ServerRegistry) FamilyInstanceArgFor(exposedName string) string
FamilyInstanceArgFor returns the required instance-selector arg name for a family-grouped exposed tool, or empty string if the name is not family- grouped or unknown.
func (*ServerRegistry) GetAllPrompts ¶
func (r *ServerRegistry) GetAllPrompts() []mcp.Prompt
GetAllPrompts returns a consolidated list of all prompts from all connected servers.
This method aggregates prompts from all registered and connected servers, applying intelligent prefixing to avoid name conflicts. Only servers that are currently connected contribute their prompts to the result.
Returns a slice of MCP prompts ready for exposure through the aggregator.
func (*ServerRegistry) GetAllPromptsForSession ¶
func (r *ServerRegistry) GetAllPromptsForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Prompt
GetAllPromptsForSession returns the prompts visible to a specific login session.
For OAuth servers, prompts are read from the CapabilityStore. For non-OAuth servers, prompts are read from ServerInfo.Prompts.
func (*ServerRegistry) GetAllResources ¶
func (r *ServerRegistry) GetAllResources() []mcp.Resource
GetAllResources returns a consolidated list of all resources from all connected servers.
This method aggregates resources from all registered and connected servers, applying intelligent prefixing to resource URIs to avoid conflicts. Only servers that are currently connected contribute their resources to the result.
Returns a slice of MCP resources ready for exposure through the aggregator.
func (*ServerRegistry) GetAllResourcesForSession ¶
func (r *ServerRegistry) GetAllResourcesForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Resource
GetAllResourcesForSession returns the resources visible to a specific login session.
For OAuth servers, resources are read from the CapabilityStore. For non-OAuth servers, resources are read from ServerInfo.Resources.
func (*ServerRegistry) GetAllServers ¶
func (r *ServerRegistry) GetAllServers() map[string]*ServerInfo
GetAllServers returns a copy of all registered server information.
This method provides a snapshot of all servers currently registered with the registry, including both connected and disconnected servers. The returned map is a copy to prevent external modifications to the internal state.
Returns a map of server names to their corresponding ServerInfo structures.
func (*ServerRegistry) GetAllTools ¶
func (r *ServerRegistry) GetAllTools() []mcp.Tool
GetAllTools returns a consolidated list of all tools from all connected servers.
Tools from servers that share a non-empty spec.family are grouped under a single exposed name ({musterPrefix}_{family}_{toolName}) with a required "server" enum parameter selecting the providing instance. Tools from servers without a family fall back to per-server prefixing ({musterPrefix}_{serverPrefix}_{originalName}).
Per ADR-008, servers in auth_required state do NOT contribute any tools. Users must use core_auth_login to authenticate before server tools become visible.
Returns a slice of MCP tools ready for exposure through the aggregator.
func (*ServerRegistry) GetAllToolsForSession ¶
func (r *ServerRegistry) GetAllToolsForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Tool
GetAllToolsForSession returns the tools visible to a specific login session.
For OAuth servers (RequiresSessionAuth), tools are read from the CapabilityStore keyed by session ID (token family). For non-OAuth servers, tools are read from ServerInfo.Tools (same as GetAllTools). Family grouping is applied to the resulting union so a user who is authenticated against multiple instances of the same family sees a single deduplicated tool with the "server" enum.
func (*ServerRegistry) GetClient ¶
func (r *ServerRegistry) GetClient(name string) (MCPClient, error)
GetClient returns the MCP client for a specific registered server.
This method provides access to the underlying MCP client for direct communication with a specific server. The client can be used to execute tools, read resources, or retrieve prompts from the server.
Args:
- name: Unique identifier of the server
Returns the MCP client interface and nil error if successful. Returns nil client and an error if the server is not found or not connected.
func (*ServerRegistry) GetOAuthServers ¶
func (r *ServerRegistry) GetOAuthServers() []*ServerInfo
GetOAuthServers returns a list of servers that require OAuth authentication.
func (*ServerRegistry) GetServerInfo ¶
func (r *ServerRegistry) GetServerInfo(name string) (*ServerInfo, bool)
GetServerInfo returns detailed information about a specific registered server.
This method provides access to the complete ServerInfo structure for a given server, including its client, cached capabilities, and connection status.
Args:
- name: Unique identifier of the server
Returns the ServerInfo pointer and true if the server exists. Returns nil and false if the server is not found.
func (*ServerRegistry) GetToolServerNames ¶ added in v0.1.180
func (r *ServerRegistry) GetToolServerNames(exposedName string) []string
GetToolServerNames returns the set of server names that provide the given exposed tool name. Returns nil if the name is unknown. For family-grouped tools the slice has multiple entries (sorted); for solo tools a single entry; for prompts/resources or unmapped names, nil.
func (*ServerRegistry) GetUpdateChannel ¶
func (r *ServerRegistry) GetUpdateChannel() <-chan struct{}
GetUpdateChannel returns a read-only channel that receives notifications when the registry is updated.
Subscribers can use this channel to react to server registrations, deregistrations, or capability changes. The channel is buffered with a capacity of 1 to prevent blocking the registry operations.
Returns a receive-only channel for registry update notifications.
func (*ServerRegistry) IsFamilyTool ¶ added in v0.1.180
func (r *ServerRegistry) IsFamilyTool(exposedName string) bool
IsFamilyTool reports whether the given exposed name is family-grouped (i.e. provided by one or more servers sharing a spec.family). Returns false for solo tools, core tools, and unknown names.
func (*ServerRegistry) IsOAuthServer ¶
func (r *ServerRegistry) IsOAuthServer(serverName string) bool
IsOAuthServer checks if a server requires OAuth authentication.
func (*ServerRegistry) Register ¶
func (r *ServerRegistry) Register(ctx context.Context, registration ServerRegistration, client MCPClient) error
Register adds a new MCP server to the registry and initializes its capabilities.
This method performs the following operations:
- Validates that the server name is not already in use
- Initializes the MCP client if needed
- Queries the server for its initial capabilities
- Stores the server information and updates the name tracker
- Notifies subscribers of the registry update
The method is thread-safe and can be called concurrently.
Args:
- ctx: Context for initialization and capability queries
- registration: Server identification (name, toolPrefix, family)
- client: MCP client instance for communicating with the server
Returns an error if the server name is already registered, client initialization fails, or the server cannot be reached.
func (*ServerRegistry) RegisterPendingAuth ¶
func (r *ServerRegistry) RegisterPendingAuth(registration PendingAuthRegistration) error
RegisterPendingAuth registers a server that is reachable but requires authentication before its tools can be exposed. Per ADR-008, no synthetic authentication tools are created — users authenticate via core_auth_login.
AuthConfig in the registration may be nil; in either case the server is flagged as requiring per-session authentication.
Registering over an existing entry (pending-auth or active) is an upsert: the entry's URL, ToolPrefix, Family, AuthInfo, and AuthConfig are replaced with the new registration values. This covers two cases:
- Restart after config update: the hook re-fires with the updated definition while the old pending entry is still present (event_handler skips deregistration for auth-required servers); the upsert propagates the new URL/issuer/prefix.
- Active→auth-required transition: a previously-connected server restarts into a 401 before the Disconnected event deregisters the old active entry; replacing it now means the event handler will skip deregistration (RequiresSessionAuth will be true), keeping the pending-auth entry intact.
func (*ServerRegistry) ResolvePromptName ¶
func (r *ServerRegistry) ResolvePromptName(exposedName string) (serverName, originalName string, err error)
ResolvePromptName resolves an exposed (prefixed) prompt name back to its source server and original name.
This method is used when a prompt request is received to determine which server should handle the request and what the original prompt name was before prefixing.
Args:
- exposedName: The prefixed prompt name as seen by clients
Returns the server name, original prompt name, and nil error if resolution succeeds. Returns empty strings and an error if the name cannot be resolved or refers to a different item type.
func (*ServerRegistry) ResolveResourceName ¶
func (r *ServerRegistry) ResolveResourceName(exposedURI string) (serverName, originalURI string, err error)
ResolveResourceName resolves an exposed (prefixed) resource URI back to its source server and original URI.
This method is used when a resource read request is received to determine which server should handle the request and what the original resource URI was before prefixing.
Args:
- exposedURI: The prefixed resource URI as seen by clients
Returns the server name, original resource URI, and nil error if resolution succeeds. Returns empty strings and an error if the URI cannot be resolved or refers to a different item type.
func (*ServerRegistry) ResolveToolName ¶
func (r *ServerRegistry) ResolveToolName(exposedName string) (serverName, originalName string, err error)
ResolveToolName resolves an exposed (prefixed) tool name back to its source server and original name.
This method is used when a tool call is received to determine which server should handle the request and what the original tool name was before prefixing. For family-grouped tools where multiple servers provide the same exposed name, callers must specify which server via ResolveToolNameForServer instead; this method returns an error noting that the "server" parameter is required.
Args:
- exposedName: The prefixed tool name as seen by clients
Returns the server name, original tool name, and nil error if resolution succeeds. Returns empty strings and an error if the name cannot be resolved, is ambiguous (family-grouped with multiple providers), or refers to a different item type.
func (*ServerRegistry) ResolveToolNameForServer ¶ added in v0.1.180
func (r *ServerRegistry) ResolveToolNameForServer(exposedName, serverName string) (originalName string, err error)
ResolveToolNameForServer resolves a family-grouped or per-server exposed tool name when the caller already specified which backend server to route to (via the injected "server" arg). The returned originalName is the per-server tool name to forward to the backend.
Returns an error if the exposed name is unknown OR if the requested serverName is not among the providers (for family tools) or not the owning server (for solo tools). Crucially, ambiguous family tools resolve here when given an explicit server, instead of falling back to legacy mappings.
func (*ServerRegistry) SetServerPrefix ¶ added in v0.1.69
func (r *ServerRegistry) SetServerPrefix(serverName, prefix string)
SetServerPrefix configures the prefix to use for a specific server. If prefix is empty the server name itself is used.
type SessionConnectionPool ¶ added in v0.1.60
type SessionConnectionPool struct {
// contains filtered or unexported fields
}
SessionConnectionPool maintains a per-(session, server) pool of live MCP clients. It is orthogonal to the CapabilityStore: the store caches what tools exist; the pool caches the live connection used to call them.
For token-forwarding and DynamicAuth clients, token rotation is handled transparently by the headerFunc / MusterTokenStore pattern.
For token-exchange clients, the pool tracks the exchanged token's expiry time. Callers can check IsTokenExpiringSoon to proactively evict and re-exchange before the downstream server returns 401.
A background reaper goroutine periodically evicts entries that have been idle longer than maxAge. Call Stop to shut down the reaper before DrainAll.
All methods are safe for concurrent use.
func NewSessionConnectionPool ¶ added in v0.1.60
func NewSessionConnectionPool(maxAge time.Duration) *SessionConnectionPool
NewSessionConnectionPool creates an empty connection pool with a background reaper that evicts entries idle longer than maxAge. The reaper runs every maxAge/2 to balance between prompt cleanup and low overhead.
Callers must call Stop followed by DrainAll during shutdown.
func (*SessionConnectionPool) DrainAll ¶ added in v0.1.60
func (p *SessionConnectionPool) DrainAll()
DrainAll closes and removes every entry in the pool. Intended for use during graceful shutdown.
func (*SessionConnectionPool) Evict ¶ added in v0.1.60
func (p *SessionConnectionPool) Evict(sessionID, serverName string)
Evict removes and closes a single pooled entry.
func (*SessionConnectionPool) EvictServer ¶ added in v0.1.60
func (p *SessionConnectionPool) EvictServer(serverName string)
EvictServer removes and closes all pooled entries for the given server across every session.
func (*SessionConnectionPool) EvictSession ¶ added in v0.1.60
func (p *SessionConnectionPool) EvictSession(sessionID string)
EvictSession removes and closes all pooled entries for the given session.
func (*SessionConnectionPool) Get ¶ added in v0.1.60
func (p *SessionConnectionPool) Get(sessionID, serverName string) (MCPClient, bool)
Get returns the pooled client for the given session and server, or false if no entry exists. Each successful Get resets the entry's idle timer so the reaper won't evict actively used connections.
func (*SessionConnectionPool) IsTokenExpired ¶ added in v0.1.60
func (p *SessionConnectionPool) IsTokenExpired(sessionID, serverName string) bool
IsTokenExpired returns true if the pooled entry's token has already expired. This is equivalent to IsTokenExpiringSoon with a zero margin.
func (*SessionConnectionPool) IsTokenExpiringSoon ¶ added in v0.1.60
func (p *SessionConnectionPool) IsTokenExpiringSoon(sessionID, serverName string, margin time.Duration) bool
IsTokenExpiringSoon returns true if the pooled entry's token will expire within the given margin. Returns false if there is no pool entry, the entry has no tracked expiry (zero time), or the token has enough remaining lifetime.
func (*SessionConnectionPool) Len ¶ added in v0.1.60
func (p *SessionConnectionPool) Len() int
Len returns the current number of pooled connections (useful for testing).
func (*SessionConnectionPool) Put ¶ added in v0.1.60
func (p *SessionConnectionPool) Put(sessionID, serverName string, client MCPClient)
Put stores a client in the pool, closing any previously pooled client for the same (session, server) key. No token expiry is tracked; use PutWithExpiry for token-exchange clients that need proactive refresh.
func (*SessionConnectionPool) PutWithDeferredClose ¶ added in v0.1.60
func (p *SessionConnectionPool) PutWithDeferredClose(sessionID, serverName string, client MCPClient, tokenExpiry time.Time, closeDelay time.Duration)
PutWithDeferredClose stores a new client in the pool and schedules the replaced client (if any) for deferred close after closeDelay.
This is used by background token refresh: the old client may still be serving an in-flight request, so we cannot close it immediately. The delay gives in-flight requests time to complete before the old client's underlying connection is torn down.
func (*SessionConnectionPool) PutWithExpiry ¶ added in v0.1.60
func (p *SessionConnectionPool) PutWithExpiry(sessionID, serverName string, client MCPClient, tokenExpiry time.Time)
PutWithExpiry stores a client in the pool with an associated token expiry time. When tokenExpiry is non-zero, IsTokenExpiringSoon can be used to proactively evict the entry before the token expires.
func (*SessionConnectionPool) SetExchangedToken ¶ added in v0.1.123
func (p *SessionConnectionPool) SetExchangedToken(sessionID, serverName, token string)
SetExchangedToken records the RFC 8693 exchanged bearer on an existing pool entry so the admin UI can surface it. Exchange results are otherwise held only in the client's headerFunc closure, not the oauth token store. No-op if no pool entry exists for the pair.
func (*SessionConnectionPool) SetNotificationCallback ¶ added in v0.1.62
func (p *SessionConnectionPool) SetNotificationCallback(serverName string, cb func(sessionID string, client MCPClient))
SetNotificationCallback registers a callback that is invoked whenever a new client is stored for the given server (via Put, PutWithExpiry, or PutWithDeferredClose). The callback receives the sessionID that owns the client, enabling session-aware notification handling for SSO servers.
func (*SessionConnectionPool) Snapshot ¶ added in v0.1.123
func (p *SessionConnectionPool) Snapshot(sessionID string) []PooledInfo
Snapshot returns a copy of the metadata for every pool entry belonging to the given session. The underlying MCP clients are not exposed; callers only see timing and server-name information.
func (*SessionConnectionPool) Stop ¶ added in v0.1.60
func (p *SessionConnectionPool) Stop()
Stop shuts down the background reaper goroutine. It is safe to call multiple times but must be called before DrainAll during shutdown.
type ToolWithStatus ¶
type ToolWithStatus struct {
// Tool contains the MCP tool definition
Tool mcp.Tool
// Blocked indicates whether this tool is blocked by the security denylist.
// Blocked tools cannot be executed unless the Yolo flag is enabled.
Blocked bool
}
ToolWithStatus represents a tool along with its security blocking status. This is used to provide visibility into which tools are available and which are blocked by the security denylist.
Source Files
¶
- admin_adapter.go
- auth_metrics.go
- auth_rate_limiter.go
- auth_resource.go
- auth_tools.go
- connection_helper.go
- constants.go
- denylist.go
- doc.go
- event_follow.go
- event_handler.go
- logging.go
- manager.go
- metrics.go
- notification_subscriber.go
- registry.go
- server.go
- server_helpers.go
- server_options.go
- session_connection_pool.go
- tool_factory.go
- types.go