runtime

package
v0.16.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultRetentionMaxAge is the default max age for activity records (7 days)
	DefaultRetentionMaxAge = 7 * 24 * time.Hour
	// DefaultRetentionMaxRecords is the default max number of records (10000)
	DefaultRetentionMaxRecords = 10000
	// DefaultRetentionCheckInterval is the default interval between retention checks (1 hour)
	DefaultRetentionCheckInterval = 1 * time.Hour
)

Default retention configuration

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivityService

type ActivityService struct {
	// contains filtered or unexported fields
}

ActivityService subscribes to activity events and persists them to storage. It runs as a background goroutine and handles activity recording non-blocking.

func NewActivityService

func NewActivityService(storage *storage.Manager, logger *zap.Logger) *ActivityService

NewActivityService creates a new activity service.

func (*ActivityService) SetRetentionConfig

func (s *ActivityService) SetRetentionConfig(maxAge time.Duration, maxRecords int, checkInterval time.Duration)

SetRetentionConfig updates the retention configuration. maxAge: maximum age for records (0 = no age limit) maxRecords: maximum number of records (0 = no count limit) checkInterval: how often to run retention cleanup

func (*ActivityService) Start

func (s *ActivityService) Start(ctx context.Context, rt *Runtime)

Start begins listening for activity events and persisting them. It should be called as a goroutine: go svc.Start(ctx, runtime)

func (*ActivityService) Stop

func (s *ActivityService) Stop()

Stop gracefully shuts down the activity service.

type ConfigApplyResult

type ConfigApplyResult struct {
	Success            bool                     `json:"success"`
	AppliedImmediately bool                     `json:"applied_immediately"`
	RequiresRestart    bool                     `json:"requires_restart"`
	RestartReason      string                   `json:"restart_reason,omitempty"`
	ChangedFields      []string                 `json:"changed_fields,omitempty"`
	ValidationErrors   []config.ValidationError `json:"validation_errors,omitempty"`
}

ConfigApplyResult represents the result of applying a configuration

func DetectConfigChanges

func DetectConfigChanges(oldCfg, newCfg *config.Config) *ConfigApplyResult

DetectConfigChanges compares old and new configurations to determine what changed and whether a restart is required

func (*ConfigApplyResult) FormatChangedFields

func (r *ConfigApplyResult) FormatChangedFields() string

FormatChangedFields returns a human-readable string of changed fields

type Event

type Event struct {
	Type      EventType      `json:"type"`
	Timestamp time.Time      `json:"timestamp"`
	Payload   map[string]any `json:"payload,omitempty"`
}

Event is a typed notification published by the runtime event bus.

type EventType

type EventType string

EventType represents a runtime event category broadcast to subscribers.

const (
	// EventTypeServersChanged is emitted whenever the set of servers or their state changes.
	EventTypeServersChanged EventType = "servers.changed"
	// EventTypeConfigReloaded is emitted after configuration reload completes.
	EventTypeConfigReloaded EventType = "config.reloaded"
	// EventTypeConfigSaved is emitted after configuration is successfully saved to disk.
	EventTypeConfigSaved EventType = "config.saved"
	// EventTypeSecretsChanged is emitted when secrets are added, updated, or deleted.
	EventTypeSecretsChanged EventType = "secrets.changed"
	// EventTypeOAuthTokenRefreshed is emitted when proactive token refresh succeeds.
	EventTypeOAuthTokenRefreshed EventType = "oauth.token_refreshed"
	// EventTypeOAuthRefreshFailed is emitted when proactive token refresh fails after retries.
	EventTypeOAuthRefreshFailed EventType = "oauth.refresh_failed"

	// Activity logging events (RFC-003)
	// EventTypeActivityToolCallStarted is emitted when a tool execution begins.
	EventTypeActivityToolCallStarted EventType = "activity.tool_call.started"
	// EventTypeActivityToolCallCompleted is emitted when a tool execution finishes.
	EventTypeActivityToolCallCompleted EventType = "activity.tool_call.completed"
	// EventTypeActivityPolicyDecision is emitted when a policy blocks a tool call.
	EventTypeActivityPolicyDecision EventType = "activity.policy_decision"
	// EventTypeActivityQuarantineChange is emitted when a server's quarantine state changes.
	EventTypeActivityQuarantineChange EventType = "activity.quarantine_change"

	// Spec 024: Expanded Activity Log events
	// EventTypeActivitySystemStart is emitted when MCPProxy server starts.
	EventTypeActivitySystemStart EventType = "activity.system.start"
	// EventTypeActivitySystemStop is emitted when MCPProxy server stops.
	EventTypeActivitySystemStop EventType = "activity.system.stop"
	// EventTypeActivityInternalToolCall is emitted when an internal tool (retrieve_tools, call_tool_*, etc.) completes.
	EventTypeActivityInternalToolCall EventType = "activity.internal_tool_call.completed"
	// EventTypeActivityConfigChange is emitted when configuration changes (server add/remove/update).
	EventTypeActivityConfigChange EventType = "activity.config_change"
)

type Phase

type Phase string

Phase represents the lifecycle state of the core runtime.

const (
	PhaseInitializing Phase = "Initializing"
	PhaseLoading      Phase = "Loading"
	PhaseReady        Phase = "Ready"
	PhaseStarting     Phase = "Starting"
	PhaseRunning      Phase = "Running"
	PhaseStopping     Phase = "Stopping"
	PhaseStopped      Phase = "Stopped"
	PhaseError        Phase = "Error"
)

Core runtime phases.

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime owns the non-HTTP lifecycle for the proxy process.

func New

func New(cfg *config.Config, cfgPath string, logger *zap.Logger) (*Runtime, error)

New creates a runtime helper for the given config and prepares core managers.

func (*Runtime) AppContext

func (r *Runtime) AppContext() context.Context

AppContext returns the long-lived runtime context.

func (*Runtime) ApplyConfig

func (r *Runtime) ApplyConfig(newCfg *config.Config, cfgPath string) (*ConfigApplyResult, error)

ApplyConfig applies a new configuration with hot-reload support

func (*Runtime) BulkEnableServers

func (r *Runtime) BulkEnableServers(serverNames []string, enabled bool) (map[string]error, error)

BulkEnableServers toggles the enabled state for multiple servers in a single storage/config save to avoid repeated file writes. Returns a map of per-server errors for operations that could not be applied.

func (*Runtime) CacheManager

func (r *Runtime) CacheManager() *cache.Manager

CacheManager exposes the cache manager.

func (*Runtime) CalculateTokenSavings

func (r *Runtime) CalculateTokenSavings() (*contracts.ServerTokenMetrics, error)

CalculateTokenSavings calculates token savings from using MCPProxy

func (*Runtime) Close

func (r *Runtime) Close() error

Close releases runtime resources.

func (*Runtime) Config

func (r *Runtime) Config() *config.Config

Config returns the underlying configuration pointer. Deprecated: Use ConfigSnapshot() for lock-free reads. This method exists for backward compatibility.

func (*Runtime) ConfigPath

func (r *Runtime) ConfigPath() string

ConfigPath returns the tracked config path.

func (*Runtime) ConfigService

func (r *Runtime) ConfigService() *configsvc.Service

ConfigService returns the configuration service for advanced access patterns.

func (*Runtime) ConfigSnapshot

func (r *Runtime) ConfigSnapshot() *configsvc.Snapshot

ConfigSnapshot returns an immutable configuration snapshot. This is the preferred way to read configuration - it's lock-free and non-blocking.

func (*Runtime) CurrentPhase

func (r *Runtime) CurrentPhase() Phase

CurrentPhase returns the current lifecycle phase.

func (*Runtime) CurrentStatus

func (r *Runtime) CurrentStatus() Status

CurrentStatus returns a copy of the underlying status struct.

func (*Runtime) DiscoverAndIndexTools

func (r *Runtime) DiscoverAndIndexTools(ctx context.Context) error

DiscoverAndIndexTools discovers tools from upstream servers and indexes them.

func (*Runtime) DiscoverAndIndexToolsForServer

func (r *Runtime) DiscoverAndIndexToolsForServer(ctx context.Context, serverName string) error

DiscoverAndIndexToolsForServer discovers and indexes tools for a single server. This is used for reactive tool discovery when a server connects. Implements retry logic with exponential backoff for robustness.

func (*Runtime) EmitActivityConfigChange

func (r *Runtime) EmitActivityConfigChange(action, affectedEntity, source string, changedFields []string, previousValues, newValues map[string]interface{})

EmitActivityConfigChange emits an event when configuration changes (Spec 024). action is one of: server_added, server_removed, server_updated, settings_changed source indicates how the change was triggered: "mcp", "cli", or "api"

func (*Runtime) EmitActivityInternalToolCall

func (r *Runtime) EmitActivityInternalToolCall(internalToolName, targetServer, targetTool, toolVariant, sessionID, requestID, status, errorMsg string, durationMs int64, arguments map[string]interface{}, response interface{}, intent map[string]interface{})

EmitActivityInternalToolCall emits an event when an internal tool is called (Spec 024). internalToolName is the name of the internal tool (retrieve_tools, call_tool_read, etc.) targetServer and targetTool are used for call_tool_* handlers arguments contains the input parameters, response contains the output intent is the intent declaration metadata

func (*Runtime) EmitActivityPolicyDecision

func (r *Runtime) EmitActivityPolicyDecision(serverName, toolName, sessionID, decision, reason string)

EmitActivityPolicyDecision emits an event when a policy blocks a tool call.

func (*Runtime) EmitActivityQuarantineChange

func (r *Runtime) EmitActivityQuarantineChange(serverName string, quarantined bool, reason string)

EmitActivityQuarantineChange emits an event when a server's quarantine state changes.

func (*Runtime) EmitActivitySystemStart

func (r *Runtime) EmitActivitySystemStart(version, listenAddress string, startupDurationMs int64, configPath string)

EmitActivitySystemStart emits an event when MCPProxy server starts (Spec 024).

func (*Runtime) EmitActivitySystemStop

func (r *Runtime) EmitActivitySystemStop(reason, signal string, uptimeSeconds int64, errorMsg string)

EmitActivitySystemStop emits an event when MCPProxy server stops (Spec 024).

func (*Runtime) EmitActivityToolCallCompleted

func (r *Runtime) EmitActivityToolCallCompleted(serverName, toolName, sessionID, requestID, source, status, errorMsg string, durationMs int64, arguments map[string]interface{}, response string, responseTruncated bool, toolVariant string, intent map[string]interface{})

EmitActivityToolCallCompleted emits an event when a tool execution finishes. This is used to track activity for observability and debugging. source indicates how the call was triggered: "mcp", "cli", or "api" arguments is the input parameters passed to the tool call toolVariant is the MCP tool variant used (call_tool_read/write/destructive) - optional intent is the intent declaration metadata - optional

func (*Runtime) EmitActivityToolCallStarted

func (r *Runtime) EmitActivityToolCallStarted(serverName, toolName, sessionID, requestID, source string, args map[string]any)

EmitActivityToolCallStarted emits an event when a tool execution begins. This is used to track activity for observability and debugging. source indicates how the call was triggered: "mcp", "cli", or "api"

func (*Runtime) EmitOAuthRefreshFailed

func (r *Runtime) EmitOAuthRefreshFailed(serverName string, errorMsg string)

EmitOAuthRefreshFailed emits an event when proactive token refresh fails after retries. This is used by the RefreshManager to notify subscribers that re-authentication is needed.

func (*Runtime) EmitOAuthTokenRefreshed

func (r *Runtime) EmitOAuthTokenRefreshed(serverName string, expiresAt time.Time)

EmitOAuthTokenRefreshed emits an event when proactive token refresh succeeds. This is used by the RefreshManager to notify subscribers of successful token refresh.

func (*Runtime) EmitServersChanged

func (r *Runtime) EmitServersChanged(reason string, extra map[string]any)

EmitServersChanged implements the EventEmitter interface for the management service. This delegates to the runtime's internal event emission mechanism.

func (*Runtime) EnableServer

func (r *Runtime) EnableServer(serverName string, enabled bool) error

EnableServer enables or disables a server and persists the change.

func (*Runtime) ForceReconnectAllServers

func (r *Runtime) ForceReconnectAllServers(reason string) error

ForceReconnectAllServers triggers reconnection attempts for all managed servers.

func (*Runtime) GetActivity

func (r *Runtime) GetActivity(id string) (*storage.ActivityRecord, error)

GetActivity returns a single activity record by ID.

func (*Runtime) GetAllServers

func (r *Runtime) GetAllServers() ([]map[string]interface{}, error)

GetAllServers implements RuntimeOperations interface for management service. Returns all servers with their current status using the Supervisor's StateView.

func (*Runtime) GetConfig

func (r *Runtime) GetConfig() (*config.Config, error)

GetConfig returns a copy of the current configuration

func (*Runtime) GetCurrentConfig

func (r *Runtime) GetCurrentConfig() interface{}

GetCurrentConfig returns the current configuration

func (*Runtime) GetDockerRecoveryStatus

func (r *Runtime) GetDockerRecoveryStatus() *storage.DockerRecoveryState

GetDockerRecoveryStatus returns the current Docker recovery status from the upstream manager

func (*Runtime) GetManagementService

func (r *Runtime) GetManagementService() interface{}

GetManagementService returns the management service instance. Returns nil if service hasn't been set yet.

func (*Runtime) GetRecentSessions

func (r *Runtime) GetRecentSessions(limit int) ([]*contracts.MCPSession, int, error)

GetRecentSessions returns recent MCP sessions

func (*Runtime) GetSecretResolver

func (r *Runtime) GetSecretResolver() *secret.Resolver

GetSecretResolver returns the secret resolver instance

func (*Runtime) GetServerToolCalls

func (r *Runtime) GetServerToolCalls(serverName string, limit int) ([]*contracts.ToolCallRecord, error)

GetServerToolCalls retrieves tool call history for a specific server

func (*Runtime) GetServerTools

func (r *Runtime) GetServerTools(serverName string) ([]map[string]interface{}, error)

GetServerTools implements RuntimeOperations interface for management service. Returns all tools for a specific upstream server from StateView cache (lock-free read).

func (*Runtime) GetSessionByID

func (r *Runtime) GetSessionByID(sessionID string) (*contracts.MCPSession, error)

GetSessionByID returns a session by its ID

func (*Runtime) GetToolCallByID

func (r *Runtime) GetToolCallByID(id string) (*contracts.ToolCallRecord, error)

GetToolCallByID retrieves a single tool call by ID

func (*Runtime) GetToolCalls

func (r *Runtime) GetToolCalls(limit, offset int) ([]*contracts.ToolCallRecord, int, error)

GetToolCalls retrieves tool call history with pagination

func (*Runtime) GetToolCallsBySession

func (r *Runtime) GetToolCallsBySession(sessionID string, limit, offset int) ([]*contracts.ToolCallRecord, int, error)

GetToolCallsBySession returns tool calls filtered by session ID

func (*Runtime) GetVersionInfo

func (r *Runtime) GetVersionInfo() *updatecheck.VersionInfo

GetVersionInfo returns the current version information from the update checker. Returns nil if the update checker has not been initialized.

func (*Runtime) HandleUpstreamServerChange

func (r *Runtime) HandleUpstreamServerChange(ctx context.Context)

HandleUpstreamServerChange should be called when upstream servers change.

func (*Runtime) IndexManager

func (r *Runtime) IndexManager() *index.Manager

IndexManager exposes the index manager.

func (*Runtime) IsRunning

func (r *Runtime) IsRunning() bool

IsRunning reports the last known running state.

func (*Runtime) ListActivities

func (r *Runtime) ListActivities(filter storage.ActivityFilter) ([]*storage.ActivityRecord, int, error)

ListActivities returns activity records matching the filter.

func (*Runtime) ListRegistries

func (r *Runtime) ListRegistries() ([]interface{}, error)

ListRegistries returns the list of available MCP server registries (Phase 7)

func (*Runtime) LoadConfiguredServers

func (r *Runtime) LoadConfiguredServers(cfg *config.Config) error

LoadConfiguredServers synchronizes storage and upstream manager from the given or current config. If cfg is nil, it will use the current runtime configuration.

func (*Runtime) Logger

func (r *Runtime) Logger() *zap.Logger

Logger returns the runtime logger.

func (*Runtime) NotifySecretsChanged

func (r *Runtime) NotifySecretsChanged(ctx context.Context, operation, secretName string) error

NotifySecretsChanged notifies the runtime that secrets have changed and restarts affected servers. This method should be called by the HTTP API when secrets are added, updated, or deleted.

func (*Runtime) QuarantineServer

func (r *Runtime) QuarantineServer(serverName string, quarantined bool) error

QuarantineServer updates the quarantine state and persists the change. Security: When quarantining a server, all its tools are removed from the index to prevent Tool Poisoning Attacks (TPA) from exposing potentially malicious tool descriptions.

func (*Runtime) RefreshManager added in v0.16.0

func (r *Runtime) RefreshManager() *oauth.RefreshManager

RefreshManager returns the OAuth refresh manager for health status integration. Returns nil if refresh manager hasn't been initialized.

func (*Runtime) RefreshOAuthToken

func (r *Runtime) RefreshOAuthToken(serverName string) error

RefreshOAuthToken implements RuntimeOperations interface for management service. Triggers token refresh for a specific server.

func (*Runtime) RefreshVersionInfo

func (r *Runtime) RefreshVersionInfo() *updatecheck.VersionInfo

RefreshVersionInfo performs an immediate update check and returns the result. Returns nil if the update checker has not been initialized.

func (*Runtime) ReloadConfiguration

func (r *Runtime) ReloadConfiguration() error

ReloadConfiguration reloads the configuration from disk and resyncs state.

func (*Runtime) ReplayToolCall

func (r *Runtime) ReplayToolCall(id string, arguments map[string]interface{}) (*contracts.ToolCallRecord, error)

ReplayToolCall replays a tool call with modified arguments

func (*Runtime) RestartServer

func (r *Runtime) RestartServer(serverName string) error

RestartServer restarts an upstream server by disconnecting and reconnecting it. This is a synchronous operation that waits for the restart to complete.

func (*Runtime) SaveConfiguration

func (r *Runtime) SaveConfiguration() error

SaveConfiguration persists the runtime configuration to disk.

func (*Runtime) SearchRegistryServers

func (r *Runtime) SearchRegistryServers(registryID, tag, query string, limit int) ([]interface{}, error)

SearchRegistryServers searches for servers in a specific registry (Phase 7)

func (*Runtime) SetManagementService

func (r *Runtime) SetManagementService(svc interface{})

SetManagementService stores the management service instance. This is called after runtime initialization to avoid import cycles.

func (*Runtime) SetRefreshMetricsRecorder added in v0.16.0

func (r *Runtime) SetRefreshMetricsRecorder(recorder oauth.RefreshMetricsRecorder)

SetRefreshMetricsRecorder sets the metrics recorder for OAuth token refresh operations. This enables FR-011: OAuth refresh metrics emission.

func (*Runtime) SetRunning

func (r *Runtime) SetRunning(running bool)

SetRunning records whether the server HTTP layer is active.

func (*Runtime) SetVersion

func (r *Runtime) SetVersion(version string)

SetVersion initializes the update checker with the given version. This should be called once during server startup with the build version.

func (*Runtime) StartBackgroundInitialization

func (r *Runtime) StartBackgroundInitialization()

StartBackgroundInitialization kicks off configuration sync and background loops.

func (*Runtime) StatusChannel

func (r *Runtime) StatusChannel() <-chan Status

StatusChannel exposes the status updates stream.

func (*Runtime) StatusSnapshot

func (r *Runtime) StatusSnapshot(serverRunning bool) map[string]interface{}

StatusSnapshot returns the latest status as a map for API responses. The serverRunning parameter should come from the authoritative server running state.

func (*Runtime) StorageManager

func (r *Runtime) StorageManager() *storage.Manager

StorageManager exposes the storage manager.

func (*Runtime) StreamActivities

func (r *Runtime) StreamActivities(filter storage.ActivityFilter) <-chan *storage.ActivityRecord

StreamActivities returns a channel that yields activity records matching the filter.

func (*Runtime) SubscribeEvents

func (r *Runtime) SubscribeEvents() chan Event

SubscribeEvents registers a new subscriber and returns a channel that will receive runtime events. Callers must not close the returned channel; use UnsubscribeEvents when finished.

func (*Runtime) Supervisor

func (r *Runtime) Supervisor() *supervisor.Supervisor

Supervisor returns the supervisor instance for lock-free state reads via StateView. Phase 6: Provides access to fast server status without storage queries.

func (*Runtime) Tokenizer

func (r *Runtime) Tokenizer() tokens.Tokenizer

Tokenizer returns the tokenizer instance.

func (*Runtime) TriggerOAuthLogin

func (r *Runtime) TriggerOAuthLogin(serverName string) error

TriggerOAuthLogin implements RuntimeOperations interface for management service. Initiates OAuth 2.x authentication flow for a specific server.

func (*Runtime) TriggerOAuthLoginQuick

func (r *Runtime) TriggerOAuthLoginQuick(serverName string) (*core.OAuthStartResult, error)

TriggerOAuthLoginQuick implements RuntimeOperations interface for management service. Returns OAuthStartResult with actual browser status, auth URL, and any errors. This is the synchronous version that provides immediate feedback about browser opening.

func (*Runtime) TriggerOAuthLogout

func (r *Runtime) TriggerOAuthLogout(serverName string) error

TriggerOAuthLogout implements RuntimeOperations interface for management service. Clears OAuth token and disconnects a specific server.

func (*Runtime) Truncator

func (r *Runtime) Truncator() *truncate.Truncator

Truncator exposes the truncator utility.

func (*Runtime) UnsubscribeEvents

func (r *Runtime) UnsubscribeEvents(ch chan Event)

UnsubscribeEvents removes the subscriber and closes the channel.

func (*Runtime) UpdateConfig

func (r *Runtime) UpdateConfig(cfg *config.Config, cfgPath string)

UpdateConfig replaces the runtime configuration in-place. This now updates both the legacy field and the ConfigService.

func (*Runtime) UpdateListenAddress

func (r *Runtime) UpdateListenAddress(addr string) error

UpdateListenAddress mutates the in-memory listen address used by the runtime.

func (*Runtime) UpdatePhase

func (r *Runtime) UpdatePhase(phase Phase, message string)

UpdatePhase gathers runtime metrics and broadcasts a status update.

func (*Runtime) UpdatePhaseMessage

func (r *Runtime) UpdatePhaseMessage(message string)

UpdatePhaseMessage refreshes the status message without moving to a new phase.

func (*Runtime) UpdateStatus

func (r *Runtime) UpdateStatus(phase Phase, message string, stats map[string]interface{}, toolsIndexed int)

UpdateStatus mutates the status object and notifies subscribers.

func (*Runtime) UpstreamManager

func (r *Runtime) UpstreamManager() *upstream.Manager

UpstreamManager exposes the upstream manager.

func (*Runtime) ValidateConfig

func (r *Runtime) ValidateConfig(cfg *config.Config) ([]config.ValidationError, error)

ValidateConfig validates a configuration without applying it

type Status

type Status struct {
	Phase         Phase                  `json:"phase"`
	Message       string                 `json:"message"`
	UpstreamStats map[string]interface{} `json:"upstream_stats"`
	ToolsIndexed  int                    `json:"tools_indexed"`
	LastUpdated   time.Time              `json:"last_updated"`
}

Status captures high-level state for API consumers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL