aggregator

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Overview

Package aggregator provides the MCP aggregator server implementation.

SSO Authentication Mechanisms

Muster supports two Single Sign-On (SSO) mechanisms for authenticating to downstream MCP servers:

## SSO Token Forwarding (explicit opt-in)

When muster itself is protected by OAuth (via oauth_server configuration), muster can forward its own ID token to downstream MCP servers. The downstream server must be configured to trust muster's OAuth client ID in its TrustedAudiences.

Flow:

  1. User authenticates TO muster via OAuth (Google, Dex, etc.)
  2. Muster receives and stores the user's ID token
  3. User accesses server with forwardToken: true
  4. Muster injects ID token as Authorization: Bearer header
  5. Downstream server validates token, trusts muster's client ID

Configuration: Requires `auth.forwardToken: true` in MCPServer spec.

## SSO Token Exchange (RFC 8693)

When clusters have separate Identity Providers, muster can exchange its local token for one valid on the remote cluster's IdP (e.g., Dex). This enables cross-cluster SSO without requiring shared trust.

Flow:

  1. User authenticates TO muster via OAuth
  2. User accesses server with tokenExchange configuration
  3. Muster exchanges its token at the remote IdP's token endpoint
  4. Remote IdP issues a new token valid for the remote cluster
  5. Muster uses the exchanged token for downstream requests

Configuration: Requires `auth.tokenExchange` configuration in MCPServer spec.

Package aggregator provides MCP (Model Context Protocol) server aggregation functionality for the muster system.

The aggregator package acts as a central hub that collects and exposes tools, resources, and prompts from multiple backend MCP servers through a single unified interface. It implements intelligent name collision resolution, security filtering, automatic server lifecycle management, and seamless integration with the muster service architecture.

Architecture Overview

The aggregator follows a layered architecture with clear separation of concerns:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   MCP Clients   │    │  Core Tools     │    │  Workflow       │
│  (External)     │    │  (Internal)     │    │  Adapter        │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘
                                 │
                    ┌─────────────────┐
                    │ AggregatorServer│
                    │  (Core MCP)     │
                    └─────────────────┘
                                 │
         ┌───────────────────────┼───────────────────────┐
         │                       │                       │
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ SSE Transport   │    │ Stdio Transport │    │ HTTP Transport  │
│    (:8080)      │    │   (CLI Mode)    │    │  (Streaming)    │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Core Components

## AggregatorManager

The AggregatorManager is the main entry point and coordinates all aggregator functionality. It manages the complete lifecycle, from server startup to graceful shutdown, and provides automatic MCP server registration based on service health status.

Key responsibilities:

  • Server lifecycle management (start/stop coordination)
  • Event-driven MCP server registration/deregistration
  • Integration with the muster service architecture
  • Periodic retry mechanism for failed registrations
  • Service monitoring and health reporting

Usage:

config := AggregatorConfig{
	Port:         8080,
	Host:         "localhost",
	Transport:    "sse",
	Yolo:         false,
	MusterPrefix: "x",
}

manager := NewAggregatorManager(config, orchestratorAPI, serviceRegistry)
err := manager.Start(ctx)
defer manager.Stop(ctx)

## AggregatorServer

The AggregatorServer implements the core MCP server functionality, aggregating capabilities from multiple backend servers and exposing them through various transports. It provides real-time capability updates and maintains synchronization with backend servers.

Features:

  • Multi-transport support (SSE, stdio, streamable-http)
  • Dynamic capability discovery and updates
  • Core tool integration (workflow, config management)
  • Thread-safe concurrent operations
  • Intelligent request routing to backend servers

## ServerRegistry

The ServerRegistry manages the collection of registered MCP servers and their cached capabilities. It provides thread-safe access to server information and maintains bidirectional name mappings for routing requests to the correct backend servers.

Features:

  • Thread-safe server registration/deregistration
  • Capability caching for performance optimization
  • Server health tracking and connection status
  • Efficient bulk operations for capability retrieval
  • Update notifications for dynamic capability changes

## EventHandler

The EventHandler bridges the gap between the muster service orchestrator and the aggregator by automatically registering/deregistering MCP servers based on their health status. It ensures that only healthy, running services are exposed through the aggregator.

Event-driven behavior:

  • Running + Healthy → Automatic registration
  • Stopped/Failed/Unhealthy → Automatic deregistration
  • Resilient to temporary failures
  • Asynchronous processing for responsiveness

Transport Protocols

## Server-Sent Events (SSE)

SSE transport provides real-time communication over HTTP with automatic reconnection and keep-alive support. Ideal for web-based integrations and real-time applications.

Endpoints:
- http://localhost:8080/sse     (SSE stream)
- http://localhost:8080/message (message posting)

## Standard I/O (stdio)

Stdio transport enables CLI integration by communicating over standard input/output. Perfect for command-line tools and shell integrations.

## Streamable HTTP

Streamable HTTP provides HTTP-based streaming protocol support with full bidirectional communication. This is the default transport for maximum compatibility.

Security Model

## Denylist System

The aggregator implements a "secure by default" approach with a comprehensive denylist of potentially destructive tools. This prevents accidental execution of dangerous operations in production environments.

Categories of blocked operations:

  • Kubernetes resource modification (kubectl apply, delete, patch)
  • Cluster API lifecycle operations (create/delete clusters)
  • Helm package management (install, uninstall, upgrade)
  • Flux GitOps operations (reconcile, suspend, resume)
  • System maintenance operations (cleanup, incident creation)

## Yolo Mode

The --yolo flag disables the security denylist for development environments where destructive operations may be needed. This should never be used in production.

config.Yolo = true  // Disables all security restrictions

Integration with Muster Architecture

## Central API Pattern

The aggregator follows the central API pattern for inter-package communication:

  • Receives service state events through api.OrchestratorAPI
  • Queries service information via api.ServiceRegistryHandler
  • Publishes tool update events via api.PublishToolUpdateEvent
  • Integrates with workflow manager through api interfaces

## Tool Update Events

The aggregator publishes ToolUpdateEvent notifications when its capability set changes, ensuring system-wide consistency with dependent components like Capability managers.

Event flow:
Backend server change → Registry update → Capability refresh → Event publication

## Workflow Integration

When configured with a config directory, the aggregator automatically integrates with the workflow system to expose workflow definitions as executable tools.

Performance Considerations

## Capability Caching

Backend server capabilities are cached in memory to avoid repeated network calls. Caches are updated automatically when servers are registered/deregistered or when explicit refresh operations are performed.

## Batch Operations

The aggregator uses batch operations where possible to minimize the number of MCP protocol messages and improve performance with multiple backend servers.

## Background Processing

All registry updates and event processing happen asynchronously in background goroutines to ensure that the main request handling remains responsive.

Error Handling and Resilience

## Graceful Degradation

The aggregator continues operating even when individual backend servers become unavailable. Healthy servers remain accessible while unhealthy servers are automatically deregistered.

## Retry Mechanisms

A periodic retry mechanism attempts to register servers that are healthy but not yet registered, providing resilience against temporary registration failures.

## Connection Management

Backend server connections are managed automatically with proper cleanup during deregistration and graceful shutdown procedures.

Monitoring and Observability

## Service Data

The aggregator provides comprehensive service monitoring data including:

  • Tool/resource/prompt counts and status
  • Server connectivity statistics
  • Security filtering statistics (blocked tools)
  • Transport endpoint information
  • Event handler status

## Logging

Structured logging provides visibility into:

  • Server registration/deregistration events
  • Capability update operations
  • Security filtering actions
  • Error conditions and recovery

Thread Safety

All public APIs are thread-safe and can be called concurrently. Internal state is protected by appropriate synchronization mechanisms (mutexes, channels, etc.). The package is designed for high-concurrency environments.

Configuration

The AggregatorConfig structure provides comprehensive configuration options:

type AggregatorConfig struct {
	Port         int    // Server port (default: 8080)
	Host         string // Bind address (default: localhost)
	Transport    string // Protocol: "sse", "stdio", "streamable-http"
	Yolo         bool   // Disable security denylist (development only)
	ConfigDir    string // Directory for workflow definitions
	MusterPrefix string // Global prefix for all tools (default: "x")
}

Example: Complete Setup

// Create configuration
config := AggregatorConfig{
	Port:         8080,
	Host:         "localhost",
	Transport:    "sse",
	Yolo:         false,
	ConfigDir:    "/etc/muster/workflows",
	MusterPrefix: "x",
}

// Initialize dependencies through central API
orchestratorAPI := api.GetOrchestrator()
serviceRegistry := api.GetServiceRegistry()

// Create and start aggregator
manager := NewAggregatorManager(config, orchestratorAPI, serviceRegistry)
if err := manager.Start(ctx); err != nil {
	log.Fatal("Failed to start aggregator:", err)
}
defer manager.Stop(ctx)

// Access aggregator endpoint
endpoint := manager.GetEndpoint()
fmt.Printf("Aggregator running at: %s\n", endpoint)

The aggregator package is the cornerstone of muster's MCP integration, providing a robust, secure, and scalable foundation for tool aggregation and distribution.

Index

Constants

View Source
const AuthStatusResourceURI = "auth://status"

AuthStatusResourceURI is the URI for the auth status MCP resource. This resource provides real-time authentication status for all MCP servers.

View Source
const DefaultConnectionPoolMaxAge = 1 * time.Hour

DefaultConnectionPoolMaxAge is the idle timeout for pooled connections. Connections not accessed within this window are closed by the reaper. This is intentionally much shorter than the capability/auth store TTLs because pooled entries hold live network connections (open sockets), not lightweight metadata.

View Source
const LogMessage = "tool call"

LogMessage is the constant msg field on emitted lines. Log queries in Loki use this as the anchor (e.g. `msg = "tool call"`).

View Source
const LogSubsystem = "MCP-Tool"

LogSubsystem tags the structured log lines emitted by this middleware, matching muster's pkg/logging subsystem convention.

Variables

This section is empty.

Functions

func Logging added in v0.1.187

func Logging() server.ToolHandlerMiddleware

Logging returns a ToolHandlerMiddleware that emits one structured info-level log line per tool call with fields: tool, outcome, duration_s, error (when set).

func Metrics added in v0.1.187

func Metrics() server.ToolHandlerMiddleware

Metrics returns a ToolHandlerMiddleware that records:

  • muster.tool_calls (counter) with attributes tool, outcome
  • muster.tool_call.duration (histogram, unit "s") with the same attributes

Exported via the Prometheus OTEL exporter these become muster_tool_calls_total and muster_tool_call_duration_seconds.

func ShouldUseLocalMint added in v0.13.0

func ShouldUseLocalMint(serverInfo *ServerInfo) bool

ShouldUseLocalMint reports whether muster mints a per-backend token for a server. Enabled when AuthConfig.LocalMint is non-nil, Enabled, and carries an Audience (the broker local-mint target). Mutually exclusive with token forwarding and token exchange (enforced by the CRD admission rules).

func ShouldUseTokenExchange

func ShouldUseTokenExchange(serverInfo *ServerInfo) bool

ShouldUseTokenExchange checks if RFC 8693 token exchange should be used for a server. Token exchange is enabled when:

  • AuthConfig.TokenExchange is not nil
  • AuthConfig.TokenExchange.Enabled is true
  • Required fields (DexTokenEndpoint, ConnectorID) are set

Token exchange takes precedence over token forwarding if both are configured.

func ShouldUseTokenForwarding

func ShouldUseTokenForwarding(serverInfo *ServerInfo) bool

ShouldUseTokenForwarding checks if token forwarding should be used for a server. Token forwarding is enabled when:

  • AuthConfig.ForwardToken is true (forwardToken implies OAuth-based auth)
  • OR: AuthConfig.Type is "oauth" and ForwardToken is true

Setting forwardToken: true implicitly enables OAuth authentication since token forwarding only makes sense in an OAuth context.

Types

type AdminConfig added in v0.1.123

type AdminConfig struct {
	// Enabled controls whether the admin listener is started.
	Enabled bool

	// Port is the TCP port for the admin listener (default: 9999 when enabled).
	Port int

	// BindAddress is the interface to bind the admin listener to (default: 127.0.0.1).
	BindAddress string
}

AdminConfig holds admin web UI configuration for the aggregator.

type AggregatorConfig

type AggregatorConfig struct {
	// Port specifies the port number to listen on for the aggregated MCP endpoint
	Port int

	// Host specifies the host address to bind to (default: localhost)
	Host string

	// Transport defines the protocol to use for MCP communication.
	// Supported values: "sse", "streamable-http", "stdio"
	Transport string

	// Yolo disables the security denylist for destructive tools.
	// When true, all tools are allowed regardless of their destructive nature.
	// This should only be enabled in development environments.
	Yolo bool

	// ConfigDir is the user configuration directory for workflows and other configs.
	// This is used to load workflow definitions and make them available as tools.
	ConfigDir string

	// MusterPrefix is the global prefix applied to all aggregated tools.
	// This helps distinguish muster tools from other MCP tools in mixed environments.
	// Default value is "x".
	MusterPrefix string

	// Version is the muster server version to report in the MCP protocol handshake.
	// This should be set to the build version during application initialization.
	// Defaults to "dev" if not specified.
	Version string

	// OAuth configuration for remote MCP server authentication (client role)
	OAuth OAuthProxyConfig

	// OAuthServer configuration for protecting the Muster Server (resource server role)
	OAuthServer OAuthServerConfig

	// Debug enables debug logging
	Debug bool

	// Admin, when enabled, starts a separate HTTP listener that serves the
	// session management web UI. See internal/admin for details.
	Admin AdminConfig
}

AggregatorConfig holds configuration args for the aggregator. This structure defines how the aggregator should behave and what endpoints it should expose.

type AggregatorManager

type AggregatorManager struct {
	// contains filtered or unexported fields
}

AggregatorManager provides a high-level interface for managing the aggregator server and coordinating automatic MCP server registration based on service health status.

The manager combines the aggregator server with event handling to provide:

  • Automatic registration of healthy MCP servers
  • Event-driven updates when service states change
  • Periodic retry mechanisms for failed registrations
  • Centralized lifecycle management
  • OAuth proxy for remote MCP server authentication

It acts as the primary entry point for the aggregator functionality and integrates with the muster service architecture through the central API pattern.

func NewAggregatorManager

func NewAggregatorManager(config AggregatorConfig, orchestratorAPI api.OrchestratorAPI, serviceRegistry api.ServiceRegistryHandler, errorCallback func(err error)) *AggregatorManager

NewAggregatorManager creates a new aggregator manager with the specified configuration.

The manager requires access to the orchestrator API for receiving service state events and the service registry for querying service information. These dependencies are provided through the central API pattern to maintain loose coupling.

Args:

  • config: Configuration for the aggregator server behavior
  • orchestratorAPI: Interface for receiving service lifecycle events
  • serviceRegistry: Interface for querying service information

Returns a configured but not yet started aggregator manager.

func (*AggregatorManager) GetAggregatorServer

func (am *AggregatorManager) GetAggregatorServer() *AggregatorServer

GetAggregatorServer returns the underlying aggregator server instance.

This method provides access to advanced aggregator operations that are not exposed through the manager interface. It should be used carefully and primarily for testing or debugging purposes.

Returns nil if the server is not initialized.

func (*AggregatorManager) GetEndpoint

func (am *AggregatorManager) GetEndpoint() string

GetEndpoint returns the aggregator's MCP endpoint URL.

The endpoint format depends on the configured transport:

Returns an empty string if the aggregator server is not available.

func (*AggregatorManager) GetEventHandler

func (am *AggregatorManager) GetEventHandler() *EventHandler

GetEventHandler returns the event handler instance.

This method is primarily intended for testing and debugging purposes to inspect the state of the event handling system.

Returns nil if the event handler is not initialized.

func (*AggregatorManager) GetServiceData

func (am *AggregatorManager) GetServiceData() map[string]interface{}

GetServiceData returns comprehensive service monitoring data.

This method provides detailed information about the aggregator's current state, including configuration, connection status, tool/resource/prompt counts, and server statistics. The data is suitable for monitoring dashboards and health checks.

Returns a map containing various metrics and status information.

func (*AggregatorManager) ManualRefresh

func (am *AggregatorManager) ManualRefresh(ctx context.Context) error

ManualRefresh manually triggers a re-synchronization of all healthy MCP servers.

This method can be useful for debugging or when you need to force a refresh of the server registrations outside of the normal event-driven flow. It performs the same operation as the initial sync during startup.

Args:

  • ctx: Context for the refresh operation

Returns an error if the refresh operation fails.

func (*AggregatorManager) RegisterServerPendingAuth

func (am *AggregatorManager) RegisterServerPendingAuth(registration PendingAuthRegistration) error

RegisterServerPendingAuth registers a server that requires OAuth authentication before its tools can be exposed. Per ADR-008, no synthetic auth tools are created; users authenticate via core_auth_login.

AuthConfig inside registration may be nil; in either case the server is flagged as requiring per-session authentication.

func (*AggregatorManager) Start

func (am *AggregatorManager) Start(ctx context.Context) error

Start initializes and starts the aggregator manager.

This method performs the following initialization sequence:

  1. Starts the underlying aggregator server
  2. Validates that required APIs are available
  3. Performs initial sync of healthy MCP servers
  4. Sets up event handling for automatic updates
  5. Starts periodic retry mechanism for failed registrations

The method is idempotent - calling it multiple times has no additional effect. Returns an error if any component fails to start.

func (*AggregatorManager) Stop

func (am *AggregatorManager) Stop(ctx context.Context) error

Stop gracefully shuts down the aggregator manager.

This method stops all components in reverse order of startup:

  1. Cancels the context to signal shutdown to all goroutines
  2. Stops the event handler
  3. Stops the OAuth manager
  4. Stops the aggregator server
  5. Waits for all background operations to complete

The method is idempotent and can be called multiple times safely.

type AggregatorServer

type AggregatorServer struct {
	// contains filtered or unexported fields
}

AggregatorServer implements a comprehensive MCP server that aggregates multiple backend MCP servers.

The AggregatorServer is the core component responsible for:

  • Collecting and exposing tools, resources, and prompts from multiple backend servers
  • Managing multiple transport protocols (SSE, stdio, streamable-http)
  • Integrating core muster tools alongside external MCP servers
  • Providing intelligent name collision resolution
  • Implementing security filtering through the denylist system
  • Real-time capability updates when backend servers change
  • User-scoped tool visibility for OAuth-protected servers

Architecture: The server maintains a registry of backend MCP servers and dynamically updates its exposed capabilities as servers are registered/deregistered. It supports multiple transport protocols simultaneously and provides both external MCP compatibility and internal tool calling capabilities.

Session-Scoped Tool Visibility: For OAuth-protected servers, each login session's tool view is determined by the CapabilityStore keyed by (sessionID, serverName). There is no session registry; connections are created on demand for each tool call.

Thread Safety: All public methods are thread-safe and can be called concurrently. Internal state is protected by appropriate synchronization mechanisms.

func NewAggregatorServer

func NewAggregatorServer(aggConfig AggregatorConfig, errorCallback func(error)) *AggregatorServer

NewAggregatorServer creates a new aggregator server with the specified configuration.

This constructor initializes all necessary components but does not start any servers. The returned server must be started with the Start method before it can handle requests.

The server is configured with:

  • A server registry using the specified muster prefix
  • Active item managers for tracking capabilities
  • Per-session capability cache and SSO tracker for OAuth servers
  • Default transport settings based on configuration

Args:

  • aggConfig: Configuration args defining server behavior, transport, and security settings

Returns a configured but unstarted aggregator server ready for initialization.

func (*AggregatorServer) CallToolInternal

func (a *AggregatorServer) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (res *mcp.CallToolResult, err error)

CallToolInternal provides internal tool calling capability for muster components.

This method allows internal muster components to execute tools through the aggregator without going through the external MCP protocol. It supports both:

  • Tools from registered backend servers (resolved through the registry)
  • Core tools from muster components (called directly through providers)

The method performs intelligent tool resolution:

  1. First attempts to resolve the tool through the server registry
  2. If not found, checks if it's a core tool by name pattern
  3. Routes the call to the appropriate handler based on tool type

This internal calling mechanism is essential for:

  • Inter-component communication within muster
  • Workflow execution that needs to call other tools
  • Administrative operations that require tool access

Args:

  • ctx: Context for the tool execution
  • toolName: Name of the tool to execute (may be prefixed)
  • args: Arguments to pass to the tool as key-value pairs

Returns the tool execution result or an error if the tool cannot be found or executed.

func (*AggregatorServer) DeregisterServer

func (a *AggregatorServer) DeregisterServer(name string) error

DeregisterServer removes a backend MCP server from the aggregator.

This method cleanly removes a backend server from the aggregator, which will cause all tools, resources, and prompts from that server to become unavailable. The backend client connection is closed as part of the deregistration process.

Additionally, this method cleans up any stale session connections for the server. This is critical for handling MCPServer renames, where the old server is deleted and a new one is created. Without this cleanup, session connections stored under the old server name would persist and cause stale auth status displays.

Args:

  • name: Unique identifier of the server to remove

Returns an error if the server is not found or deregistration fails.

func (*AggregatorServer) GetAvailableTools

func (a *AggregatorServer) GetAvailableTools() []string

GetAvailableTools implements the ToolAvailabilityChecker interface.

This method returns a comprehensive list of all tools currently available through the aggregator, including both external tools from backend servers and core tools from muster components. The returned list represents the complete tool inventory that can be used by workflows, capabilities, and other muster components.

The aggregation process:

  1. Collects all tools from registered backend servers via the registry
  2. Collects all core tools from muster component providers
  3. Combines both lists into a unified tool inventory
  4. Returns tool names (with appropriate prefixing applied)

This method is used by:

  • Workflow manager for populating available tool lists
  • Service class manager for tool validation
  • Administrative interfaces for tool discovery

Returns a slice of tool names representing all available tools.

func (*AggregatorServer) GetEndpoint

func (a *AggregatorServer) GetEndpoint() string

GetEndpoint returns the aggregator's primary endpoint URL based on the configured transport.

The endpoint format varies by transport type:

This endpoint can be used by MCP clients to connect to the aggregator and access all aggregated capabilities from backend servers.

Returns the endpoint URL as a string, or "stdio" for standard I/O transport.

func (*AggregatorServer) GetPrompt

func (a *AggregatorServer) GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error)

GetPrompt executes a prompt with the provided arguments. This resolves the prompt name to its origin server and retrieves the prompt.

func (*AggregatorServer) GetPrompts

func (a *AggregatorServer) GetPrompts() []mcp.Prompt

GetPrompts returns all available prompts from all sources with intelligent name prefixing.

This method aggregates prompts from all registered backend servers, applying smart prefixing similar to tools to avoid name conflicts. The prefixing ensures that prompts from different servers can coexist without naming collisions.

Returns a slice of MCP prompts ready for client consumption.

func (*AggregatorServer) GetPromptsForSession

func (a *AggregatorServer) GetPromptsForSession(ctx context.Context, sessionID string) []mcp.Prompt

GetPromptsForSession returns a session-specific view of all available prompts. For OAuth servers, prompts are read from the CapabilityStore keyed by session ID.

func (*AggregatorServer) GetRegistry

func (a *AggregatorServer) GetRegistry() *ServerRegistry

GetRegistry returns the server registry for direct access to backend server information.

This method provides access to the underlying registry for advanced operations such as inspecting server status, accessing raw capabilities, or performing administrative tasks. It should be used carefully to avoid disrupting the aggregator's normal operation.

Returns the ServerRegistry instance managing all backend servers.

func (*AggregatorServer) GetResources

func (a *AggregatorServer) GetResources() []mcp.Resource

GetResources returns all available resources from all registered backend servers.

This method aggregates resources from all connected backend servers, applying appropriate URI prefixing to avoid conflicts. Resources maintain their original functionality while being properly namespaced within the aggregated environment.

Returns a slice of MCP resources ready for client access.

func (*AggregatorServer) GetResourcesForSession

func (a *AggregatorServer) GetResourcesForSession(ctx context.Context, sessionID string) []mcp.Resource

GetResourcesForSession returns a session-specific view of all available resources. For OAuth servers, resources are read from the CapabilityStore keyed by session ID.

func (*AggregatorServer) GetTools

func (a *AggregatorServer) GetTools() []mcp.Tool

GetTools returns all available tools from all sources with intelligent name prefixing.

This method aggregates tools from all registered backend servers, applying smart prefixing to avoid name conflicts. The prefixing is only applied when conflicts would otherwise occur, following the pattern: {muster_prefix}_{server_prefix}_{original_name}

Note: This returns the global tool view. For session-specific tool visibility, use GetToolsForSession instead.

Returns a slice of MCP tools ready for client consumption.

func (*AggregatorServer) GetToolsForSession

func (a *AggregatorServer) GetToolsForSession(ctx context.Context, sessionID string) []mcp.Tool

GetToolsForSession returns a session-specific view of all available tools. For OAuth servers, tools are read from the CapabilityStore keyed by session ID. For non-OAuth servers, tools are read from ServerInfo (same as GetAllTools).

func (*AggregatorServer) GetToolsWithStatus

func (a *AggregatorServer) GetToolsWithStatus() []ToolWithStatus

GetToolsWithStatus returns all available tools along with their security blocking status.

This method provides enhanced tool information that includes whether each tool is blocked by the security denylist. The blocking status depends on:

  • The tool's classification as destructive in the denylist
  • The current yolo mode setting (yolo=true allows all tools)

The tool names are resolved to their original names (before prefixing) for accurate denylist checking, ensuring consistent security behavior regardless of how tools are exposed.

Returns a slice of ToolWithStatus containing both tool definitions and security status.

func (*AggregatorServer) IsToolAvailable

func (a *AggregatorServer) IsToolAvailable(toolName string) bool

IsToolAvailable implements the ToolAvailabilityChecker interface.

This method determines whether a specific tool is available through the aggregator, checking both external backend servers (via the registry) and core muster tools (via name pattern matching). It provides a unified way for other components to verify tool availability before attempting to use them.

The check process:

  1. Attempts to resolve the tool through the server registry
  2. If unresolved because the tool is a family backed by more than one provider (an intentional ambiguity error), treats it as available since the family still exists; routing only needs the instance-selector arg
  3. If still not found, checks if it matches core tool naming patterns
  4. Returns true if found in any of these

This method is used by:

  • Workflow manager for validating workflow step tools
  • Service class manager for tool availability validation

Args:

  • toolName: Name of the tool to check (may be prefixed)

Returns true if the tool is available, false otherwise.

func (*AggregatorServer) IsYoloMode

func (a *AggregatorServer) IsYoloMode() bool

IsYoloMode returns whether yolo mode is currently enabled.

Yolo mode disables the security denylist, allowing all tools to be executed regardless of their destructive potential. This mode should only be enabled in development or testing environments where the risk is acceptable.

Returns true if yolo mode is enabled, false if security filtering is active.

func (*AggregatorServer) ListPromptsForContext

func (a *AggregatorServer) ListPromptsForContext(ctx context.Context) []mcp.Prompt

ListPromptsForContext returns all available prompts for the current session context.

func (*AggregatorServer) ListResourcesForContext

func (a *AggregatorServer) ListResourcesForContext(ctx context.Context) []mcp.Resource

ListResourcesForContext returns all available resources for the current session context.

func (*AggregatorServer) ListServersRequiringAuth

func (a *AggregatorServer) ListServersRequiringAuth(ctx context.Context) []api.ServerAuthInfo

ListServersRequiringAuth returns a list of servers that require authentication for the current session. This enables the list_tools meta-tool to inform users about servers that are available but require authentication before their tools become visible.

The method checks each registered server and returns those that:

  • Require per-session authentication (RequiresSessionAuth)
  • The session has not yet authenticated to
  • Are NOT SSO-configured (token forwarding/exchange)

This is part of the server-side meta-tools migration (Issue #343) to provide better visibility into which servers need authentication.

func (*AggregatorServer) ListToolsForContext

func (a *AggregatorServer) ListToolsForContext(ctx context.Context) []mcp.Tool

ListToolsForContext returns all available tools for the current user context. This is used by the metatools package to provide user-scoped tool visibility.

The method extracts the session ID from the context and returns tools appropriate for that session's authentication state. This includes:

  • MCP server tools (prefixed with x_<server>_)
  • Core muster tools (prefixed with core_) from internal providers

The core tools are collected from workflow, service, config, mcpserver, events, and auth providers.

func (*AggregatorServer) MissingToolsForSession added in v0.1.222

func (a *AggregatorServer) MissingToolsForSession(ctx context.Context, toolNames []string) []string

MissingToolsForSession returns, from toolNames, the (deduplicated, input-ordered) subset that is not available for the calling session.

Availability of family tools provided by SSO / auth-protected servers must be scoped to the calling session and must not depend on call ordering (#764):

  • False negative: such servers are skipped by GetAllTools(), so their family routing entries land in the process-global familyMappings index only as a side effect of a prior GetAllToolsForSession() call. A check against that index reported the tool missing until some session listed tools.
  • False positive: that index is unioned across sessions and keyed only by exposed tool name. Once any session lists tools, registry.IsFamilyTool flips true process-wide, so a global check then reported the tool available for every session — including ones that never authenticated to the family.

Both stem from resolving session-scoped tools against the process-global view. So when the context carries a session, that session's accessible tool set — hydrated from the CapabilityStore (keyed by subject / session) via GetToolsForSession — is authoritative for downstream (non-core) tools. The session set already includes every non-auth connected server's tools plus the family tools the caller has authenticated to, so it neither under- nor over-reports. Core / meta tools (core_*, workflow_*) are never session-scoped and resolve by name. Only when there is no session does availability fall back to the process-global view.

The session tool set is resolved at most once per call (lazily, on the first non-core tool), so checking many tools does not rebuild it repeatedly.

func (*AggregatorServer) OnToolsUpdated

func (a *AggregatorServer) OnToolsUpdated(event api.ToolUpdateEvent)

OnToolsUpdated implements the ToolUpdateSubscriber interface for handling tool change events.

This method responds to tool update events from other muster components, particularly the workflow manager, to maintain synchronization of the aggregator's exposed capabilities with the current tool landscape.

Event Processing:

  • Filters events to focus on workflow-related tool changes
  • Triggers capability refresh for workflow tool updates
  • Uses asynchronous processing with a small delay to avoid mutex conflicts
  • Logs all received events for debugging and monitoring

The asynchronous processing pattern ensures that:

  • The event publisher (workflow manager) doesn't block waiting for aggregator updates
  • Mutex conflicts are avoided by allowing the publisher to complete first
  • Capability updates happen promptly but safely

Args:

  • event: Tool update event containing change information, tool lists, and metadata

The method processes events selectively, focusing on workflow manager events that indicate changes to workflow-based tools.

func (*AggregatorServer) ReadResource

func (a *AggregatorServer) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)

ReadResource retrieves the contents of a resource by URI. This resolves the resource URI to its origin server and reads the content.

func (*AggregatorServer) RegisterServer

func (a *AggregatorServer) RegisterServer(ctx context.Context, registration ServerRegistration, client MCPClient) error

RegisterServer registers a new backend MCP server with the aggregator.

This method adds a backend MCP server to the aggregator's registry, making its tools, resources, and prompts available through the aggregated interface. The registration process includes client initialization and capability discovery.

Args:

  • ctx: Context for the registration operation and capability queries
  • registration: Server identification (name, toolPrefix, family)
  • client: MCP client interface for communicating with the backend server

Returns an error if registration fails due to naming conflicts, client issues, or communication problems with the backend server.

func (*AggregatorServer) Start

func (a *AggregatorServer) Start(ctx context.Context) error

Start initializes and starts the aggregator server with all configured transports.

This method performs a comprehensive startup sequence:

  1. Creates and configures the core MCP server with full capabilities
  2. Initializes workflow adapter if config directory is provided
  3. Starts background monitoring of registry updates
  4. Subscribes to tool update events from other muster components
  5. Performs initial capability discovery and registration
  6. Starts the appropriate transport server(s) based on configuration

Transport Support:

  • SSE: Server-Sent Events with HTTP endpoints (/sse, /message)
  • Stdio: Standard input/output for CLI integration
  • Streamable HTTP: HTTP-based streaming protocol (default)

The method is idempotent for the same context - calling it multiple times with the same context will return an error indicating the server is already started.

Args:

  • ctx: Context for controlling the server lifecycle and coordinating shutdown

Returns an error if startup fails at any stage, or nil on successful startup.

func (*AggregatorServer) Stop

func (a *AggregatorServer) Stop(ctx context.Context) error

Stop gracefully shuts down the aggregator server and all its components.

This method performs a coordinated shutdown sequence:

  1. Cancels the context to signal shutdown to all background routines
  2. Shuts down all transport servers with a timeout
  3. Waits for background routines to complete
  4. Deregisters all backend servers to clean up connections
  5. Resets internal state to allow for restart

The shutdown process includes:

  • Graceful shutdown of HTTP-based transports with a 5-second timeout
  • Automatic shutdown of stdio transport via context cancellation
  • Cleanup of all registered backend MCP servers
  • Synchronization with background monitoring routines

The method is idempotent - calling it multiple times is safe and will not cause errors or duplicate cleanup operations.

Args:

  • ctx: Context for controlling the shutdown timeout and operations

Returns an error if shutdown encounters issues, though cleanup continues regardless.

func (*AggregatorServer) ToggleToolBlock

func (a *AggregatorServer) ToggleToolBlock(toolName string) error

ToggleToolBlock toggles the blocked status of a specific tool (placeholder implementation).

This method is intended to provide runtime control over individual tool blocking, allowing administrators to override the default denylist behavior for specific tools. Currently, this functionality is not fully implemented and returns an error.

Future Enhancement: The full implementation would maintain a runtime override list that could selectively enable or disable specific tools regardless of the global yolo setting.

Args:

  • toolName: Name of the tool to toggle blocking status for

Returns an error indicating the feature is not yet implemented.

func (*AggregatorServer) UpdateCapabilities

func (a *AggregatorServer) UpdateCapabilities()

UpdateCapabilities provides public access to capability updates for external components.

This method exposes the internal updateCapabilities functionality to allow other muster components (particularly the workflow manager) to trigger capability refreshes when they detect changes in their tool inventory.

The method is thread-safe and can be called concurrently without causing issues. It performs the same comprehensive capability update as the internal method, including cleanup of obsolete items and addition of new capabilities.

Use Cases:

  • Workflow manager triggering updates when workflow definitions change
  • Administrative tools forcing capability refresh
  • Integration testing scenarios requiring capability synchronization

This is a lightweight wrapper around the internal updateCapabilities method.

type AuthInfo

type AuthInfo = mcpserver.AuthInfo

AuthInfo is an alias to the mcpserver AuthInfo type for OAuth authentication. It contains OAuth authentication information extracted from a 401 response. See mcpserver.AuthInfo for detailed field documentation.

type AuthMetrics

type AuthMetrics struct {
	// contains filtered or unexported fields
}

AuthMetrics tracks authentication-related metrics for monitoring and alerting.

This provides visibility into authentication patterns, failures, and potential abuse. Metrics are tracked per-server to enable targeted alerting and debugging.

func NewAuthMetrics

func NewAuthMetrics() *AuthMetrics

NewAuthMetrics creates a new AuthMetrics instance.

func (*AuthMetrics) GetServerMetrics

func (m *AuthMetrics) GetServerMetrics(serverName string) (AuthServerMetricView, bool)

GetServerMetrics returns metrics for a specific server.

func (*AuthMetrics) GetSummary

func (m *AuthMetrics) GetSummary() AuthMetricsSummary

GetSummary returns a summary of all authentication metrics.

func (*AuthMetrics) RecordLoginAttempt

func (m *AuthMetrics) RecordLoginAttempt(serverName, sub string)

RecordLoginAttempt records an authentication login attempt.

Args:

  • serverName: Name of the server being authenticated to
  • sub: The user subject making the attempt (for logging, truncated)

func (*AuthMetrics) RecordLoginFailure

func (m *AuthMetrics) RecordLoginFailure(serverName, sub, reason string)

RecordLoginFailure records a failed authentication attempt.

Args:

  • serverName: Name of the server where authentication failed
  • sub: The user subject that failed (for logging, truncated)
  • reason: The reason for the failure

func (*AuthMetrics) RecordLoginSuccess

func (m *AuthMetrics) RecordLoginSuccess(serverName, sub string)

RecordLoginSuccess records a successful authentication.

Args:

  • serverName: Name of the server authenticated to
  • sub: The user subject that authenticated (for logging, truncated)

func (*AuthMetrics) RecordLogoutAttempt

func (m *AuthMetrics) RecordLogoutAttempt(serverName, sub string)

RecordLogoutAttempt records a logout attempt.

Args:

  • serverName: Name of the server being logged out from
  • sub: The user subject making the attempt (for logging, truncated)

func (*AuthMetrics) RecordLogoutSuccess

func (m *AuthMetrics) RecordLogoutSuccess(serverName, sub string)

RecordLogoutSuccess records a successful logout.

Args:

  • serverName: Name of the server logged out from
  • sub: The user subject that logged out (for logging, truncated)

func (*AuthMetrics) RecordRateLimitBlock

func (m *AuthMetrics) RecordRateLimitBlock(serverName, sub string)

RecordRateLimitBlock records when a user was rate limited.

Args:

  • serverName: Name of the server the user was trying to authenticate to
  • sub: The user subject that was blocked (for logging, truncated)

type AuthMetricsSummary

type AuthMetricsSummary struct {
	TotalLoginAttempts   int64                  `json:"total_login_attempts"`
	TotalLoginSuccesses  int64                  `json:"total_login_successes"`
	TotalLoginFailures   int64                  `json:"total_login_failures"`
	TotalRateLimitBlocks int64                  `json:"total_rate_limit_blocks"`
	TotalLogoutAttempts  int64                  `json:"total_logout_attempts"`
	TotalLogoutSuccesses int64                  `json:"total_logout_successes"`
	PerServerMetrics     []AuthServerMetricView `json:"per_server_metrics"`
}

AuthMetricsSummary provides a summary of authentication metrics.

type AuthRateLimiter

type AuthRateLimiter struct {
	// contains filtered or unexported fields
}

AuthRateLimiter provides per-user rate limiting for authentication operations. This prevents OAuth flow abuse by limiting the number of auth attempts per user.

Rate limiting is implemented using a sliding window approach:

  • Each user can make at most MaxAuthAttempts auth attempts within the time window
  • Attempts are tracked per user (sub claim), not globally
  • Old attempts are automatically cleaned up via a background goroutine

Callers MUST call Stop() when done to prevent goroutine leaks.

func NewAuthRateLimiter

func NewAuthRateLimiter(config AuthRateLimiterConfig) *AuthRateLimiter

NewAuthRateLimiter creates a new rate limiter with the given configuration. It starts a background goroutine that periodically calls Cleanup() to remove stale entries. Callers MUST call Stop() when done to prevent goroutine leaks.

func (*AuthRateLimiter) Allow

func (rl *AuthRateLimiter) Allow(userID, serverName string) bool

Allow checks if an auth attempt is allowed for the given user. If allowed, the attempt is recorded and true is returned. If rate limited, false is returned and the attempt is NOT recorded.

Args:

  • userID: The user making the auth attempt (sub claim or session ID fallback)
  • serverName: The server being authenticated to (for logging)

Returns true if the attempt is allowed, false if rate limited.

func (*AuthRateLimiter) Cleanup

func (rl *AuthRateLimiter) Cleanup()

Cleanup removes stale entries from the rate limiter. This is called periodically by the background cleanup goroutine.

func (*AuthRateLimiter) RemainingAttempts

func (rl *AuthRateLimiter) RemainingAttempts(userID string) int

RemainingAttempts returns the number of remaining auth attempts for a user.

func (*AuthRateLimiter) Reset

func (rl *AuthRateLimiter) Reset(userID string)

Reset clears all rate limiting state for a user. This can be called after successful authentication to reset the counter.

func (*AuthRateLimiter) Stop added in v0.1.34

func (rl *AuthRateLimiter) Stop()

Stop terminates the background cleanup goroutine. It is safe to call Stop multiple times.

type AuthRateLimiterConfig

type AuthRateLimiterConfig struct {
	// MaxAttempts is the maximum number of auth attempts per user within the window.
	// Default: 10 attempts
	MaxAttempts int

	// Window is the time window for rate limiting.
	// Default: 1 minute
	Window time.Duration
}

AuthRateLimiterConfig holds configuration for the rate limiter.

func DefaultAuthRateLimiterConfig

func DefaultAuthRateLimiterConfig() AuthRateLimiterConfig

DefaultAuthRateLimiterConfig returns the default rate limiter configuration.

type AuthServerMetricView

type AuthServerMetricView struct {
	ServerName      string    `json:"server_name"`
	LoginAttempts   int64     `json:"login_attempts"`
	LoginSuccesses  int64     `json:"login_successes"`
	LoginFailures   int64     `json:"login_failures"`
	RateLimitBlocks int64     `json:"rate_limit_blocks"`
	LogoutAttempts  int64     `json:"logout_attempts"`
	LogoutSuccesses int64     `json:"logout_successes"`
	LastAttemptAt   time.Time `json:"last_attempt_at,omitempty"`
	LastSuccessAt   time.Time `json:"last_success_at,omitempty"`
	LastFailureAt   time.Time `json:"last_failure_at,omitempty"`
}

AuthServerMetricView is a read-only view of server-specific auth metrics.

type AuthToolProvider

type AuthToolProvider struct {
	// contains filtered or unexported fields
}

AuthToolProvider provides core authentication tools for the aggregator. These tools allow users to authenticate to OAuth-protected MCP servers through `core_auth_login` and `core_auth_logout` commands.

This implements ADR-008: Authentication is a muster platform concern, not an MCP server concern. Instead of synthetic per-server authenticate tools, we use core tools that take a server parameter.

func NewAuthToolProvider

func NewAuthToolProvider(aggregator *AggregatorServer) *AuthToolProvider

NewAuthToolProvider creates a new authentication tool provider.

func (*AuthToolProvider) ExecuteTool

func (p *AuthToolProvider) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*api.CallToolResult, error)

ExecuteTool executes an authentication tool by name.

type ConnectionResult added in v0.1.50

type ConnectionResult struct {
	// ServerName is the name of the server that was connected
	ServerName string
	// ToolCount is the number of tools available from the server
	ToolCount int
	// ResourceCount is the number of resources available from the server
	ResourceCount int
	// PromptCount is the number of prompts available from the server
	PromptCount int
	// Client is the live MCP client. The caller owns its lifecycle and must
	// either pool it for reuse or close it when done.
	Client MCPClient
	// TokenExpiry records when the client's bearer token expires. Zero means
	// no expiry tracking (e.g., token forwarding clients). Callers should pass
	// this to SessionConnectionPool.PutWithExpiry for proactive refresh.
	TokenExpiry time.Time
	// ExchangedToken is the RFC 8693 exchanged bearer this client sends
	// downstream. Populated only by the token-exchange path — forward-token
	// and DynamicAuth connections leave it empty. Callers should pass it to
	// SessionConnectionPool.SetExchangedToken so the admin UI can surface it.
	ExchangedToken string
}

ConnectionResult contains the result of establishing a session connection. This is returned by establishConnection and used by callers to format their specific result types (api.CallToolResult or mcp.CallToolResult).

The Client field holds the live, initialized MCP client. Ownership is transferred to the caller, who must either pool or close it.

func EstablishConnectionWithLocalMint added in v0.13.0

func EstablishConnectionWithLocalMint(
	ctx context.Context,
	a *AggregatorServer,
	serverInfo *ServerInfo,
	musterIssuer string,
) (*ConnectionResult, error)

EstablishConnectionWithLocalMint connects to a localMint server to discover its capabilities. Discovery mints an M2M token from the connecting identity's session token (in the agent topology that is the agent's own SA token, which WorkloadAudiences authorizes); a human's on-behalf-of identity only arrives per call_tool via the actor header. The returned client's header func re-mints on every request from the live request context, so per-call execution still performs the full M2M-or-delegation mint — the discovery token is not reused.

func EstablishConnectionWithTokenExchange added in v0.1.50

func EstablishConnectionWithTokenExchange(
	ctx context.Context,
	a *AggregatorServer,
	serverInfo *ServerInfo,
	musterIssuer string,
) (*ConnectionResult, error)

EstablishConnectionWithTokenExchange attempts to establish a connection using RFC 8693 Token Exchange for cross-cluster SSO. This is used when an MCPServer has tokenExchange configured.

The function:

  1. Gets the user's ID token from muster's OAuth session
  2. Extracts the user ID (sub claim) from the token
  3. Exchanges it for a token valid on the remote cluster's Dex
  4. If successful, populates the CapabilityStore and registers tools

Both sessionID and sub are extracted from ctx (set by OAuth middleware).

Args:

  • ctx: Context for the operation (must contain sessionID and sub)
  • a: The aggregator server instance
  • serverInfo: The server info containing URL and auth config
  • musterIssuer: The issuer URL of muster's OAuth provider (used to get the ID token)

Returns:

  • *ConnectionResult: The connection result if successful
  • error: The error if connection failed

func EstablishConnectionWithTokenForwarding added in v0.1.50

func EstablishConnectionWithTokenForwarding(
	ctx context.Context,
	a *AggregatorServer,
	serverInfo *ServerInfo,
	musterIssuer string,
) (*ConnectionResult, error)

EstablishConnectionWithTokenForwarding attempts to establish a connection using ID token forwarding for SSO. This is used when an MCPServer has forwardToken: true.

The function:

  1. Gets the user's ID token from muster's OAuth session
  2. Forwards it to the downstream MCP server
  3. If successful, populates the CapabilityStore and registers tools

Both sessionID and sub are extracted from ctx (set by OAuth middleware).

Args:

  • ctx: Context for the operation (must contain sessionID and sub)
  • a: The aggregator server instance
  • serverInfo: The server info containing URL and auth config
  • musterIssuer: The issuer URL of muster's OAuth provider (used to get the ID token)

Returns:

  • *ConnectionResult: The connection result if successful
  • error: The error if connection failed

func (*ConnectionResult) FormatAsAPIResult added in v0.1.50

func (r *ConnectionResult) FormatAsAPIResult() *api.CallToolResult

FormatAsAPIResult formats the connection result as an api.CallToolResult. Used by AuthToolProvider.tryConnectWithToken.

func (*ConnectionResult) FormatAsMCPResult added in v0.1.50

func (r *ConnectionResult) FormatAsMCPResult() *mcp.CallToolResult

FormatAsMCPResult formats the connection result as an mcp.CallToolResult. Used by AggregatorServer.tryConnectWithToken.

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

EventHandler manages automatic MCP server registration based on service lifecycle events.

The event handler bridges the gap between the muster service orchestrator and the aggregator by listening for service state changes and automatically registering or deregistering MCP servers as they become healthy or unhealthy.

Key responsibilities:

  • Subscribe to orchestrator service state changes
  • Filter events to only process MCP service changes
  • Automatically register healthy running MCP servers
  • Automatically deregister unhealthy or stopped MCP servers
  • Skip global registration for SSO-based servers (handled at session level)
  • Maintain separation of concerns through callback functions

The handler operates asynchronously and is designed to be resilient to temporary failures in the registration process.

func NewEventHandler

func NewEventHandler(
	orchestratorAPI api.OrchestratorAPI,
	registerFunc func(context.Context, string) error,
	deregisterFunc func(string) error,
	isServerAuthRequired func(string) bool,
	isServerSSOBased func(string) bool,
) *EventHandler

NewEventHandler creates a new event handler with the specified dependencies and callbacks.

The event handler uses callback functions to maintain loose coupling with the aggregator manager. This design allows the handler to focus solely on event processing while delegating the actual registration logic to the caller.

Args:

  • orchestratorAPI: Interface for subscribing to service state changes
  • registerFunc: Callback function to register a server by name
  • deregisterFunc: Callback function to deregister a server by name
  • isServerAuthRequired: Optional callback to check if server is in auth_required state
  • isServerSSOBased: Optional callback to check if server uses SSO token forwarding/exchange

Returns a configured but not yet started event handler.

func (*EventHandler) IsRunning

func (eh *EventHandler) IsRunning() bool

IsRunning returns whether the event handler is currently active.

This method is thread-safe and can be used to check the handler's status for monitoring or debugging purposes.

Returns true if the event handler is currently processing events.

func (*EventHandler) Start

func (eh *EventHandler) Start(ctx context.Context) error

Start begins listening for orchestrator events and processing them asynchronously.

This method subscribes to service state change events from the orchestrator and starts a background goroutine to process them. The method is idempotent - calling it multiple times has no additional effect.

The event processing continues until the provided context is cancelled or the Stop method is called.

Args:

  • ctx: Context for controlling the event handler lifecycle

Returns nil on successful startup. The method does not wait for event processing to complete, as that happens asynchronously.

func (*EventHandler) Stop

func (eh *EventHandler) Stop() error

Stop gracefully shuts down the event handler and waits for cleanup to complete.

This method cancels the event processing context and waits for the background goroutine to finish processing any in-flight events. The method is idempotent and can be called multiple times safely.

Returns nil after successful shutdown.

type EventType

type EventType int

EventType represents the type of registration event. This enumeration defines the possible state changes for server registration.

const (
	// EventRegister indicates a server is being registered with the aggregator
	EventRegister EventType = iota

	// EventDeregister indicates a server is being removed from the aggregator
	EventDeregister
)

type MCPClient

type MCPClient interface {
	// Initialize establishes the connection and performs protocol handshake.
	// This must be called before any other operations on the client.
	// Returns an error if the handshake fails or connection cannot be established.
	Initialize(ctx context.Context) error

	// Close cleanly shuts down the client connection.
	// This should be called when the client is no longer needed
	// to ensure proper cleanup of resources.
	Close() error

	// ListTools returns all available tools from the server.
	// This is used to discover what capabilities the server provides.
	ListTools(ctx context.Context) ([]mcp.Tool, error)

	// CallTool executes a specific tool and returns the result.
	// The name arg should match one of the tools returned by ListTools.
	// The args arg contains the tool-specific arguments as key-value pairs.
	CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error)

	// ListResources returns all available resources from the server.
	// Resources are data sources that can be read by the client.
	ListResources(ctx context.Context) ([]mcp.Resource, error)

	// ReadResource retrieves a specific resource by its URI.
	// The URI should match one of the resources returned by ListResources.
	ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)

	// ListPrompts returns all available prompts from the server.
	// Prompts are templates or structured text that can be retrieved and used.
	ListPrompts(ctx context.Context) ([]mcp.Prompt, error)

	// GetPrompt retrieves a specific prompt with optional arguments.
	// The name should match one of the prompts returned by ListPrompts.
	// Args can be used to customize the prompt content.
	GetPrompt(ctx context.Context, name string, args map[string]interface{}) (*mcp.GetPromptResult, error)

	// Ping checks if the server is responsive.
	// This is used for health checking and connection validation.
	Ping(ctx context.Context) error

	// OnNotification registers a handler for server-pushed JSON-RPC
	// notifications (e.g. notifications/tools/list_changed).
	OnNotification(handler func(mcp.JSONRPCNotification))
}

MCPClient defines the interface for MCP client operations. This interface abstracts the underlying MCP client implementation and will be implemented by the client in the mcpserver package. It provides all necessary operations for interacting with MCP servers including initialization, tool execution, and resource/prompt access.

type OAuthProxyConfig

type OAuthProxyConfig struct {
	// Enabled controls whether OAuth proxy functionality is active.
	Enabled bool

	// PublicURL is the publicly accessible URL of the Muster Server.
	// This is used to construct OAuth callback URLs.
	PublicURL string

	// ClientID is the OAuth client identifier (CIMD URL).
	ClientID string

	// CallbackPath is the path for the OAuth callback endpoint.
	CallbackPath string

	// ExtraCAFile mirrors the process-level --extra-ca-file flag for the
	// OAuth/token-exchange layer's internal-deployment heuristic.
	ExtraCAFile string
}

OAuthProxyConfig holds OAuth proxy configuration for the aggregator.

type OAuthServerConfig

type OAuthServerConfig struct {
	// Enabled controls whether OAuth server protection is active.
	Enabled bool

	// Config holds the full OAuth server configuration.
	// This is populated from the config.OAuthServerConfig during initialization.
	Config interface{}
}

OAuthServerConfig holds OAuth server configuration for protecting the Muster Server. This is a simplified configuration that references the full config from the config package.

type PendingAuthRegistration added in v0.1.180

type PendingAuthRegistration struct {
	ServerRegistration

	// URL is the remote server endpoint.
	URL string

	// AuthInfo carries the OAuth metadata returned in the 401 response.
	AuthInfo *AuthInfo

	// AuthConfig describes how to forward or exchange tokens, when set.
	AuthConfig *api.MCPServerAuth
}

PendingAuthRegistration carries the configuration needed to register a backend MCP server that is reachable but requires OAuth authentication before its tools can be exposed. AuthConfig may be nil; an empty AuthConfig is equivalent to a nil one — both mark the server as requiring per-session authentication.

type PooledInfo added in v0.1.123

type PooledInfo struct {
	ServerName     string
	CreatedAt      time.Time
	LastUsedAt     time.Time
	TokenExpiry    time.Time // Zero if no tracked expiry.
	ExchangedToken string    // Empty for forward-token / unauthenticated clients.
}

PooledInfo is a read-only snapshot of a pool entry's metadata. Used by the admin UI to introspect live connections without exposing the MCP client.

type ProtectedResourceMetadata

type ProtectedResourceMetadata struct {
	// Issuer is the authorization server URL
	Issuer string
	// Scope is the space-separated list of required scopes
	Scope string
	// Resource is the canonical resource URI per RFC 9728 §3.2 (REQUIRED in
	// the spec but omitted by some real backends — log + accept on absence).
	// Used by RFC 8707 token-binding consumers downstream.
	Resource string
}

ProtectedResourceMetadata contains OAuth information discovered from the /.well-known/oauth-protected-resource endpoint.

type RegistrationEvent

type RegistrationEvent struct {
	// Type indicates whether this is a registration or deregistration event
	Type EventType

	// ServerName is the unique identifier of the server involved in the event
	ServerName string

	// Client is the MCP client associated with the server (may be nil for deregistration)
	Client MCPClient
}

RegistrationEvent represents a server registration or deregistration event. These events are used internally to coordinate between different components when servers are added or removed from the aggregator.

type ServerInfo

type ServerInfo struct {
	// Name is the unique identifier for this server within the aggregator
	Name string

	// Namespace is the Kubernetes namespace for this server.
	// This is used for event emission and resource references.
	// Defaults to "default" if not specified.
	Namespace string

	// Client is the MCP client instance used to communicate with the server
	Client MCPClient

	// LastUpdate tracks when the server information was last refreshed
	LastUpdate time.Time

	// ToolPrefix is the configured prefix for tools from this server.
	// This is used for name collision resolution.
	ToolPrefix string

	// Family, when set, declares that this server is an instance of a
	// family of equivalent servers. Tools from all servers in the same family
	// are exposed as {musterPrefix}_{family.Name}_{toolName} with a required
	// parameter named by family.InstanceArg selecting the instance.
	Family *api.MCPServerFamily

	// URL is the server endpoint URL (for remote servers)
	URL string

	// AuthInfo contains OAuth information if authentication is required.
	// This is populated when a 401 is received during initialization.
	AuthInfo *AuthInfo

	// AuthConfig contains the authentication configuration for this server.
	// This is used to determine token forwarding behavior for SSO.
	// Immutable after registration — set once by RegisterPendingAuthWithConfig
	// and never modified, so RequiresSessionAuth() is safe without locking.
	AuthConfig *api.MCPServerAuth

	Tools     []mcp.Tool     // Cached list of available tools
	Resources []mcp.Resource // Cached list of available resources
	Prompts   []mcp.Prompt   // Cached list of available prompts
	// contains filtered or unexported fields
}

ServerInfo contains information about a registered MCP server. This structure maintains both the connection details and cached capabilities for efficient access. It is thread-safe for concurrent access to cached data.

func (*ServerInfo) GetNamespace

func (s *ServerInfo) GetNamespace() string

GetNamespace returns the namespace for this server, defaulting to metav1.NamespaceDefault when no namespace is set.

func (*ServerInfo) GetStatus added in v0.1.72

func (s *ServerInfo) GetStatus() api.ServiceState

GetStatus returns the current service state from the canonical service registry. Returns api.StateUnknown if the service is not found in the registry.

func (*ServerInfo) IsConnected

func (s *ServerInfo) IsConnected() bool

IsConnected reports whether the server is in an active (running/connected) state. Falls back to checking for a live client when the service registry has no entry (e.g. servers registered directly via Register).

func (*ServerInfo) RequiresSessionAuth added in v0.1.72

func (s *ServerInfo) RequiresSessionAuth() bool

RequiresSessionAuth reports whether this server uses per-session authentication. This is a permanent property based on the server's auth configuration, set during RegisterPendingAuth and never changed by connection state transitions.

func (*ServerInfo) UpdatePrompts

func (s *ServerInfo) UpdatePrompts(prompts []mcp.Prompt)

UpdatePrompts safely updates the server's cached prompt list. This method is thread-safe and should be used whenever the server's available prompts change.

func (*ServerInfo) UpdateResources

func (s *ServerInfo) UpdateResources(resources []mcp.Resource)

UpdateResources safely updates the server's cached resource list. This method is thread-safe and should be used whenever the server's available resources change.

func (*ServerInfo) UpdateTools

func (s *ServerInfo) UpdateTools(tools []mcp.Tool)

UpdateTools safely updates the server's cached tool list. This method is thread-safe and should be used whenever the server's available tools change.

type ServerRegistration added in v0.1.180

type ServerRegistration struct {
	// Name is the unique identifier for the server within the aggregator.
	Name string

	// ToolPrefix is the per-server tool prefix used when Family is nil.
	// Pattern: {musterPrefix}_{toolPrefix-or-name}_{toolName}.
	ToolPrefix string

	// Family declares that this server is an instance of a family of
	// equivalent servers. When set, tools are exposed as
	// {musterPrefix}_{family.Name}_{toolName} with a required parameter
	// named by family.InstanceArg.
	Family *api.MCPServerFamily
}

ServerRegistration carries the configuration needed to register a backend MCP server with the aggregator. Use this struct instead of passing fields positionally to avoid silent swaps between same-typed identifiers (toolPrefix, family, name) that the type system would otherwise tolerate.

type ServerRegistry

type ServerRegistry struct {
	// contains filtered or unexported fields
}

ServerRegistry manages the collection of registered MCP servers and their capabilities.

The registry maintains a thread-safe mapping of server names to their information, including cached capabilities (tools, resources, prompts) and connection status. It applies deterministic prefixing ({musterPrefix}_{serverPrefix}_{originalName}) and maintains a reverse lookup map for routing requests to the correct backend.

Servers that declare spec.family share their exposed name space: every server in a family advertises tools as {musterPrefix}_{family}_{toolName} and the aggregator injects a required "server" enum parameter so callers select which instance handles the call. The "server" parameter is always required when family is set, even for single-instance families, so skills written against the family name remain stable when instances are added or removed.

Key responsibilities:

  • Server lifecycle management (registration/deregistration)
  • Capability caching for performance
  • Deterministic name prefixing and reverse resolution
  • Family-based grouping with explicit instance routing
  • Thread-safe access to server information
  • Update notifications for capability changes

func NewServerRegistry

func NewServerRegistry(musterPrefix string) *ServerRegistry

NewServerRegistry creates a new server registry with the specified global prefix.

The registry uses the musterPrefix to ensure all exposed capabilities are prefixed appropriately to distinguish them from other MCP tools in the environment.

Args:

  • musterPrefix: Global prefix applied to all aggregated capabilities (default: "x")

Returns a new, empty server registry ready for use.

func (*ServerRegistry) Deregister

func (r *ServerRegistry) Deregister(name string) error

Deregister removes an MCP server from the registry and cleans up its resources.

This method safely removes a server from the registry, closes its client connection, and notifies subscribers of the change. All tools, resources, and prompts provided by the server will no longer be available through the aggregator.

The method is thread-safe and can be called concurrently.

Args:

  • name: Unique identifier of the server to remove

Returns an error if the server is not found in the registry.

func (*ServerRegistry) ExposedPromptName added in v0.1.69

func (r *ServerRegistry) ExposedPromptName(serverName, promptName string) string

ExposedPromptName returns the fully prefixed name for a prompt and records the reverse mapping for later resolution.

func (*ServerRegistry) ExposedResourceURI added in v0.1.69

func (r *ServerRegistry) ExposedResourceURI(serverName, resourceURI string) string

ExposedResourceURI returns the fully prefixed URI for a resource and records the reverse mapping for later resolution. URIs with a scheme (e.g. http://) are returned unchanged.

func (*ServerRegistry) ExposedToolName added in v0.1.69

func (r *ServerRegistry) ExposedToolName(serverName, toolName string) string

ExposedToolName returns the fully prefixed name for a tool and records the reverse mapping for later resolution.

Pattern: {musterPrefix}_{serverPrefix}_{originalName}

func (*ServerRegistry) FamilyInstanceArgFor added in v0.1.180

func (r *ServerRegistry) FamilyInstanceArgFor(exposedName string) string

FamilyInstanceArgFor returns the required instance-selector arg name for a family-grouped exposed tool, or empty string if the name is not family- grouped or unknown.

func (*ServerRegistry) GetAllPrompts

func (r *ServerRegistry) GetAllPrompts() []mcp.Prompt

GetAllPrompts returns a consolidated list of all prompts from all connected servers.

This method aggregates prompts from all registered and connected servers, applying intelligent prefixing to avoid name conflicts. Only servers that are currently connected contribute their prompts to the result.

Returns a slice of MCP prompts ready for exposure through the aggregator.

func (*ServerRegistry) GetAllPromptsForSession

func (r *ServerRegistry) GetAllPromptsForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Prompt

GetAllPromptsForSession returns the prompts visible to a specific login session.

For OAuth servers, prompts are read from the CapabilityStore. For non-OAuth servers, prompts are read from ServerInfo.Prompts.

func (*ServerRegistry) GetAllResources

func (r *ServerRegistry) GetAllResources() []mcp.Resource

GetAllResources returns a consolidated list of all resources from all connected servers.

This method aggregates resources from all registered and connected servers, applying intelligent prefixing to resource URIs to avoid conflicts. Only servers that are currently connected contribute their resources to the result.

Returns a slice of MCP resources ready for exposure through the aggregator.

func (*ServerRegistry) GetAllResourcesForSession

func (r *ServerRegistry) GetAllResourcesForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Resource

GetAllResourcesForSession returns the resources visible to a specific login session.

For OAuth servers, resources are read from the CapabilityStore. For non-OAuth servers, resources are read from ServerInfo.Resources.

func (*ServerRegistry) GetAllServers

func (r *ServerRegistry) GetAllServers() map[string]*ServerInfo

GetAllServers returns a copy of all registered server information.

This method provides a snapshot of all servers currently registered with the registry, including both connected and disconnected servers. The returned map is a copy to prevent external modifications to the internal state.

Returns a map of server names to their corresponding ServerInfo structures.

func (*ServerRegistry) GetAllTools

func (r *ServerRegistry) GetAllTools() []mcp.Tool

GetAllTools returns a consolidated list of all tools from all connected servers.

Tools from servers that share a non-empty spec.family are grouped under a single exposed name ({musterPrefix}_{family}_{toolName}) with a required "server" enum parameter selecting the providing instance. Tools from servers without a family fall back to per-server prefixing ({musterPrefix}_{serverPrefix}_{originalName}).

Per ADR-008, servers in auth_required state do NOT contribute any tools. Users must use core_auth_login to authenticate before server tools become visible.

Returns a slice of MCP tools ready for exposure through the aggregator.

func (*ServerRegistry) GetAllToolsForSession

func (r *ServerRegistry) GetAllToolsForSession(ctx context.Context, store oauthstore.CapabilityStore, sessionID string) []mcp.Tool

GetAllToolsForSession returns the tools visible to a specific login session.

For OAuth servers (RequiresSessionAuth), tools are read from the CapabilityStore keyed by session ID (token family). For non-OAuth servers, tools are read from ServerInfo.Tools (same as GetAllTools). Family grouping is applied to the resulting union so a user who is authenticated against multiple instances of the same family sees a single deduplicated tool with the "server" enum.

func (*ServerRegistry) GetClient

func (r *ServerRegistry) GetClient(name string) (MCPClient, error)

GetClient returns the MCP client for a specific registered server.

This method provides access to the underlying MCP client for direct communication with a specific server. The client can be used to execute tools, read resources, or retrieve prompts from the server.

Args:

  • name: Unique identifier of the server

Returns the MCP client interface and nil error if successful. Returns nil client and an error if the server is not found or not connected.

func (*ServerRegistry) GetOAuthServers

func (r *ServerRegistry) GetOAuthServers() []*ServerInfo

GetOAuthServers returns a list of servers that require OAuth authentication.

func (*ServerRegistry) GetServerInfo

func (r *ServerRegistry) GetServerInfo(name string) (*ServerInfo, bool)

GetServerInfo returns detailed information about a specific registered server.

This method provides access to the complete ServerInfo structure for a given server, including its client, cached capabilities, and connection status.

Args:

  • name: Unique identifier of the server

Returns the ServerInfo pointer and true if the server exists. Returns nil and false if the server is not found.

func (*ServerRegistry) GetToolServerNames added in v0.1.180

func (r *ServerRegistry) GetToolServerNames(exposedName string) []string

GetToolServerNames returns the set of server names that provide the given exposed tool name. Returns nil if the name is unknown. For family-grouped tools the slice has multiple entries (sorted); for solo tools a single entry; for prompts/resources or unmapped names, nil.

func (*ServerRegistry) GetUpdateChannel

func (r *ServerRegistry) GetUpdateChannel() <-chan struct{}

GetUpdateChannel returns a read-only channel that receives notifications when the registry is updated.

Subscribers can use this channel to react to server registrations, deregistrations, or capability changes. The channel is buffered with a capacity of 1 to prevent blocking the registry operations.

Returns a receive-only channel for registry update notifications.

func (*ServerRegistry) IsFamilyTool added in v0.1.180

func (r *ServerRegistry) IsFamilyTool(exposedName string) bool

IsFamilyTool reports whether the given exposed name is family-grouped (i.e. provided by one or more servers sharing a spec.family). Returns false for solo tools, core tools, and unknown names.

func (*ServerRegistry) IsOAuthServer

func (r *ServerRegistry) IsOAuthServer(serverName string) bool

IsOAuthServer checks if a server requires OAuth authentication.

func (*ServerRegistry) Register

func (r *ServerRegistry) Register(ctx context.Context, registration ServerRegistration, client MCPClient) error

Register adds a new MCP server to the registry and initializes its capabilities.

This method performs the following operations:

  1. Validates that the server name is not already in use
  2. Initializes the MCP client if needed
  3. Queries the server for its initial capabilities
  4. Stores the server information and updates the name tracker
  5. Notifies subscribers of the registry update

The method is thread-safe and can be called concurrently.

Args:

  • ctx: Context for initialization and capability queries
  • registration: Server identification (name, toolPrefix, family)
  • client: MCP client instance for communicating with the server

Returns an error if the server name is already registered, client initialization fails, or the server cannot be reached.

func (*ServerRegistry) RegisterPendingAuth

func (r *ServerRegistry) RegisterPendingAuth(registration PendingAuthRegistration) error

RegisterPendingAuth registers a server that is reachable but requires authentication before its tools can be exposed. Per ADR-008, no synthetic authentication tools are created — users authenticate via core_auth_login.

AuthConfig in the registration may be nil; in either case the server is flagged as requiring per-session authentication.

Registering over an existing entry (pending-auth or active) is an upsert: the entry's URL, ToolPrefix, Family, AuthInfo, and AuthConfig are replaced with the new registration values. This covers two cases:

  • Restart after config update: the hook re-fires with the updated definition while the old pending entry is still present (event_handler skips deregistration for auth-required servers); the upsert propagates the new URL/issuer/prefix.
  • Active→auth-required transition: a previously-connected server restarts into a 401 before the Disconnected event deregisters the old active entry; replacing it now means the event handler will skip deregistration (RequiresSessionAuth will be true), keeping the pending-auth entry intact.

func (*ServerRegistry) ResolvePromptName

func (r *ServerRegistry) ResolvePromptName(exposedName string) (serverName, originalName string, err error)

ResolvePromptName resolves an exposed (prefixed) prompt name back to its source server and original name.

This method is used when a prompt request is received to determine which server should handle the request and what the original prompt name was before prefixing.

Args:

  • exposedName: The prefixed prompt name as seen by clients

Returns the server name, original prompt name, and nil error if resolution succeeds. Returns empty strings and an error if the name cannot be resolved or refers to a different item type.

func (*ServerRegistry) ResolveResourceName

func (r *ServerRegistry) ResolveResourceName(exposedURI string) (serverName, originalURI string, err error)

ResolveResourceName resolves an exposed (prefixed) resource URI back to its source server and original URI.

This method is used when a resource read request is received to determine which server should handle the request and what the original resource URI was before prefixing.

Args:

  • exposedURI: The prefixed resource URI as seen by clients

Returns the server name, original resource URI, and nil error if resolution succeeds. Returns empty strings and an error if the URI cannot be resolved or refers to a different item type.

func (*ServerRegistry) ResolveToolName

func (r *ServerRegistry) ResolveToolName(exposedName string) (serverName, originalName string, err error)

ResolveToolName resolves an exposed (prefixed) tool name back to its source server and original name.

This method is used when a tool call is received to determine which server should handle the request and what the original tool name was before prefixing. For family-grouped tools where multiple servers provide the same exposed name, callers must specify which server via ResolveToolNameForServer instead; this method returns an error noting that the "server" parameter is required.

Args:

  • exposedName: The prefixed tool name as seen by clients

Returns the server name, original tool name, and nil error if resolution succeeds. Returns empty strings and an error if the name cannot be resolved, is ambiguous (family-grouped with multiple providers), or refers to a different item type.

func (*ServerRegistry) ResolveToolNameForServer added in v0.1.180

func (r *ServerRegistry) ResolveToolNameForServer(exposedName, serverName string) (originalName string, err error)

ResolveToolNameForServer resolves a family-grouped or per-server exposed tool name when the caller already specified which backend server to route to (via the injected "server" arg). The returned originalName is the per-server tool name to forward to the backend.

Returns an error if the exposed name is unknown OR if the requested serverName is not among the providers (for family tools) or not the owning server (for solo tools). Crucially, ambiguous family tools resolve here when given an explicit server, instead of falling back to legacy mappings.

func (*ServerRegistry) SetServerPrefix added in v0.1.69

func (r *ServerRegistry) SetServerPrefix(serverName, prefix string)

SetServerPrefix configures the prefix to use for a specific server. If prefix is empty the server name itself is used.

type SessionConnectionPool added in v0.1.60

type SessionConnectionPool struct {
	// contains filtered or unexported fields
}

SessionConnectionPool maintains a per-(session, server) pool of live MCP clients. It is orthogonal to the CapabilityStore: the store caches what tools exist; the pool caches the live connection used to call them.

For token-forwarding and DynamicAuth clients, token rotation is handled transparently by the headerFunc / MusterTokenStore pattern.

For token-exchange clients, the pool tracks the exchanged token's expiry time. Callers can check IsTokenExpiringSoon to proactively evict and re-exchange before the downstream server returns 401.

A background reaper goroutine periodically evicts entries that have been idle longer than maxAge. Call Stop to shut down the reaper before DrainAll.

All methods are safe for concurrent use.

func NewSessionConnectionPool added in v0.1.60

func NewSessionConnectionPool(maxAge time.Duration) *SessionConnectionPool

NewSessionConnectionPool creates an empty connection pool with a background reaper that evicts entries idle longer than maxAge. The reaper runs every maxAge/2 to balance between prompt cleanup and low overhead.

Callers must call Stop followed by DrainAll during shutdown.

func (*SessionConnectionPool) DrainAll added in v0.1.60

func (p *SessionConnectionPool) DrainAll()

DrainAll closes and removes every entry in the pool. Intended for use during graceful shutdown.

func (*SessionConnectionPool) Evict added in v0.1.60

func (p *SessionConnectionPool) Evict(sessionID, serverName string)

Evict removes and closes a single pooled entry.

func (*SessionConnectionPool) EvictServer added in v0.1.60

func (p *SessionConnectionPool) EvictServer(serverName string)

EvictServer removes and closes all pooled entries for the given server across every session.

func (*SessionConnectionPool) EvictSession added in v0.1.60

func (p *SessionConnectionPool) EvictSession(sessionID string)

EvictSession removes and closes all pooled entries for the given session.

func (*SessionConnectionPool) Get added in v0.1.60

func (p *SessionConnectionPool) Get(sessionID, serverName string) (MCPClient, bool)

Get returns the pooled client for the given session and server, or false if no entry exists. Each successful Get resets the entry's idle timer so the reaper won't evict actively used connections.

func (*SessionConnectionPool) IsTokenExpired added in v0.1.60

func (p *SessionConnectionPool) IsTokenExpired(sessionID, serverName string) bool

IsTokenExpired returns true if the pooled entry's token has already expired. This is equivalent to IsTokenExpiringSoon with a zero margin.

func (*SessionConnectionPool) IsTokenExpiringSoon added in v0.1.60

func (p *SessionConnectionPool) IsTokenExpiringSoon(sessionID, serverName string, margin time.Duration) bool

IsTokenExpiringSoon returns true if the pooled entry's token will expire within the given margin. Returns false if there is no pool entry, the entry has no tracked expiry (zero time), or the token has enough remaining lifetime.

func (*SessionConnectionPool) Len added in v0.1.60

func (p *SessionConnectionPool) Len() int

Len returns the current number of pooled connections (useful for testing).

func (*SessionConnectionPool) Put added in v0.1.60

func (p *SessionConnectionPool) Put(sessionID, serverName string, client MCPClient)

Put stores a client in the pool, closing any previously pooled client for the same (session, server) key. No token expiry is tracked; use PutWithExpiry for token-exchange clients that need proactive refresh.

func (*SessionConnectionPool) PutWithDeferredClose added in v0.1.60

func (p *SessionConnectionPool) PutWithDeferredClose(sessionID, serverName string, client MCPClient, tokenExpiry time.Time, closeDelay time.Duration)

PutWithDeferredClose stores a new client in the pool and schedules the replaced client (if any) for deferred close after closeDelay.

This is used by background token refresh: the old client may still be serving an in-flight request, so we cannot close it immediately. The delay gives in-flight requests time to complete before the old client's underlying connection is torn down.

func (*SessionConnectionPool) PutWithExpiry added in v0.1.60

func (p *SessionConnectionPool) PutWithExpiry(sessionID, serverName string, client MCPClient, tokenExpiry time.Time)

PutWithExpiry stores a client in the pool with an associated token expiry time. When tokenExpiry is non-zero, IsTokenExpiringSoon can be used to proactively evict the entry before the token expires.

func (*SessionConnectionPool) SetExchangedToken added in v0.1.123

func (p *SessionConnectionPool) SetExchangedToken(sessionID, serverName, token string)

SetExchangedToken records the RFC 8693 exchanged bearer on an existing pool entry so the admin UI can surface it. Exchange results are otherwise held only in the client's headerFunc closure, not the oauth token store. No-op if no pool entry exists for the pair.

func (*SessionConnectionPool) SetNotificationCallback added in v0.1.62

func (p *SessionConnectionPool) SetNotificationCallback(serverName string, cb func(sessionID string, client MCPClient))

SetNotificationCallback registers a callback that is invoked whenever a new client is stored for the given server (via Put, PutWithExpiry, or PutWithDeferredClose). The callback receives the sessionID that owns the client, enabling session-aware notification handling for SSO servers.

func (*SessionConnectionPool) Snapshot added in v0.1.123

func (p *SessionConnectionPool) Snapshot(sessionID string) []PooledInfo

Snapshot returns a copy of the metadata for every pool entry belonging to the given session. The underlying MCP clients are not exposed; callers only see timing and server-name information.

func (*SessionConnectionPool) Stop added in v0.1.60

func (p *SessionConnectionPool) Stop()

Stop shuts down the background reaper goroutine. It is safe to call multiple times but must be called before DrainAll during shutdown.

type ToolWithStatus

type ToolWithStatus struct {
	// Tool contains the MCP tool definition
	Tool mcp.Tool

	// Blocked indicates whether this tool is blocked by the security denylist.
	// Blocked tools cannot be executed unless the Yolo flag is enabled.
	Blocked bool
}

ToolWithStatus represents a tool along with its security blocking status. This is used to provide visibility into which tools are available and which are blocked by the security denylist.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL