Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateServerID(server *config.ServerConfig) string
- func GenerateServerIDFromAttributes(attrs ServerAttributes) string
- func TruncateActivityResponse(response string, maxSize int) (string, bool)
- type ActivityFilter
- type ActivityRecord
- type ActivitySource
- type ActivityType
- type AsyncManager
- func (am *AsyncManager) DeleteServerAsync(name string)
- func (am *AsyncManager) EnableServerAsync(name string, enabled bool)
- func (am *AsyncManager) EnableServerSync(name string, enabled bool) error
- func (am *AsyncManager) QuarantineServerAsync(name string, quarantined bool)
- func (am *AsyncManager) QuarantineServerSync(name string, quarantined bool) error
- func (am *AsyncManager) SaveServerAsync(serverConfig *config.ServerConfig)
- func (am *AsyncManager) Start()
- func (am *AsyncManager) Stop()
- type BoltDB
- func (b *BoltDB) Backup(destPath string) error
- func (s *BoltDB) CleanupOldOAuthCompletionEvents() error
- func (b *BoltDB) ClearOAuthClientCredentials(serverKey string) error
- func (b *BoltDB) Close() error
- func (b *BoltDB) DeleteOAuthToken(serverName string) error
- func (b *BoltDB) DeleteToolHash(toolName string) error
- func (b *BoltDB) DeleteUpstream(id string) error
- func (b *BoltDB) GetOAuthClientCredentials(serverKey string) (clientID, clientSecret string, callbackPort int, err error)
- func (b *BoltDB) GetOAuthToken(serverName string) (*OAuthTokenRecord, error)
- func (b *BoltDB) GetSchemaVersion() (uint64, error)
- func (b *BoltDB) GetToolHash(toolName string) (string, error)
- func (b *BoltDB) GetToolStats(toolName string) (*ToolStatRecord, error)
- func (s *BoltDB) GetUnprocessedOAuthCompletionEvents() ([]*OAuthCompletionEvent, error)
- func (b *BoltDB) GetUpstream(id string) (*UpstreamRecord, error)
- func (b *BoltDB) IncrementToolStats(toolName string) error
- func (b *BoltDB) ListOAuthTokens() ([]*OAuthTokenRecord, error)
- func (b *BoltDB) ListToolStats() ([]*ToolStatRecord, error)
- func (b *BoltDB) ListUpstreams() ([]*UpstreamRecord, error)
- func (s *BoltDB) MarkOAuthCompletionEventProcessed(serverName string, completedAt time.Time) error
- func (s *BoltDB) SaveOAuthCompletionEvent(event *OAuthCompletionEvent) error
- func (b *BoltDB) SaveOAuthToken(record *OAuthTokenRecord) error
- func (b *BoltDB) SaveToolHash(toolName, hash string) error
- func (b *BoltDB) SaveUpstream(record *UpstreamRecord) error
- func (b *BoltDB) Stats() (*bbolt.Stats, error)
- func (b *BoltDB) UpdateOAuthClientCredentials(serverKey, clientID, clientSecret string, callbackPort int) error
- type DatabaseLockedError
- type DiagnosticRecord
- type DockerRecoveryState
- type EnableServerData
- type Manager
- func (m *Manager) AddUpstream(serverConfig *config.ServerConfig) (string, error)
- func (m *Manager) Backup(destPath string) error
- func (m *Manager) CleanupStaleServerData(threshold time.Duration) error
- func (m *Manager) ClearDockerRecoveryState() error
- func (m *Manager) ClearOAuthState(serverName string) error
- func (m *Manager) Close() error
- func (m *Manager) CloseAllActiveSessions() error
- func (m *Manager) CloseInactiveSessions(inactivityTimeout time.Duration) (int, error)
- func (m *Manager) CloseSession(sessionID string) error
- func (m *Manager) CountActivities() (int, error)
- func (m *Manager) CreateSession(session *SessionRecord) error
- func (m *Manager) DeleteActivity(id string) error
- func (m *Manager) DeleteToolHash(toolName string) error
- func (m *Manager) DeleteUpstreamServer(name string) error
- func (m *Manager) EnableUpstreamServer(name string, enabled bool) error
- func (m *Manager) GetActivity(id string) (*ActivityRecord, error)
- func (m *Manager) GetActivityByIDScan(id string) (*ActivityRecord, error)
- func (m *Manager) GetBoltDB() *BoltDB
- func (m *Manager) GetDB() *bbolt.DB
- func (m *Manager) GetOAuthToken(serverName string) (*OAuthTokenRecord, error)
- func (m *Manager) GetRecentSessions(limit int) ([]*SessionRecord, int, error)
- func (m *Manager) GetSchemaVersion() (uint64, error)
- func (m *Manager) GetServerDiagnostics(serverID string, limit int) ([]*DiagnosticRecord, error)
- func (m *Manager) GetServerIdentity(server *config.ServerConfig) (*ServerIdentity, error)
- func (m *Manager) GetServerIdentityByID(serverID string) (*ServerIdentity, error)
- func (m *Manager) GetServerStatistics(serverID string) (*ServerStatistics, error)
- func (m *Manager) GetServerToolCalls(serverID string, limit int) ([]*ToolCallRecord, error)
- func (m *Manager) GetSessionByID(sessionID string) (*SessionRecord, error)
- func (m *Manager) GetStats() (map[string]interface{}, error)
- func (m *Manager) GetToolCallsBySession(sessionID string, limit, offset int) ([]*ToolCallRecord, int, error)
- func (m *Manager) GetToolHash(toolName string) (string, error)
- func (m *Manager) GetToolStatistics(topN int) (*config.ToolStats, error)
- func (m *Manager) GetToolStats(topN int) ([]map[string]interface{}, error)
- func (m *Manager) GetToolUsage(toolName string) (*ToolStatRecord, error)
- func (m *Manager) GetUpstreamServer(name string) (*config.ServerConfig, error)
- func (m *Manager) HasToolChanged(toolName, currentHash string) (bool, error)
- func (m *Manager) IncrementToolUsage(toolName string) error
- func (m *Manager) ListActivities(filter ActivityFilter) ([]*ActivityRecord, int, error)
- func (m *Manager) ListOAuthTokens() ([]*OAuthTokenRecord, error)
- func (m *Manager) ListQuarantinedTools(serverName string) ([]map[string]interface{}, error)
- func (m *Manager) ListQuarantinedUpstreamServers() ([]*config.ServerConfig, error)
- func (m *Manager) ListServerIdentities() ([]*ServerIdentity, error)
- func (m *Manager) ListUpstreamServers() ([]*config.ServerConfig, error)
- func (m *Manager) ListUpstreams() ([]*config.ServerConfig, error)
- func (m *Manager) LoadDockerRecoveryState() (*DockerRecoveryState, error)
- func (m *Manager) PruneExcessActivities(maxRecords int, targetPercent float64) (int, error)
- func (m *Manager) PruneOldActivities(maxAge time.Duration) (int, error)
- func (m *Manager) QuarantineUpstreamServer(name string, quarantined bool) error
- func (m *Manager) RecordServerDiagnostic(record *DiagnosticRecord) error
- func (m *Manager) RecordToolCall(record *ToolCallRecord) error
- func (m *Manager) RegisterServerIdentity(server *config.ServerConfig, configPath string) (*ServerIdentity, error)
- func (m *Manager) RemoveUpstream(id string) error
- func (m *Manager) SaveActivity(record *ActivityRecord) error
- func (m *Manager) SaveActivityAsync(record *ActivityRecord)
- func (m *Manager) SaveDockerRecoveryState(state *DockerRecoveryState) error
- func (m *Manager) SaveToolHash(toolName, hash string) error
- func (m *Manager) SaveUpstreamServer(serverConfig *config.ServerConfig) error
- func (m *Manager) StreamActivities(filter ActivityFilter) <-chan *ActivityRecord
- func (m *Manager) UpdateServerStatistics(stats *ServerStatistics) error
- func (m *Manager) UpdateSessionStats(sessionID string, tokens int) error
- func (m *Manager) UpdateUpstream(id string, serverConfig *config.ServerConfig) error
- type OAuthAttributes
- type OAuthCompletionEvent
- type OAuthTokenRecord
- type Operation
- type QuarantineServerData
- type QueueFullError
- type Result
- type ServerAttributes
- type ServerIdentity
- type ServerStatistics
- type SessionRecord
- type TimeoutError
- type TokenMetrics
- type ToolCallRecord
- type ToolHashRecord
- type ToolStatRecord
- type UnsupportedOperationError
- type UpstreamRecord
Constants ¶
const ( UpstreamsBucket = "upstreams" ToolStatsBucket = "toolstats" ToolHashBucket = "toolhash" OAuthTokenBucket = "oauth_tokens" //nolint:gosec // bucket name, not a credential OAuthCompletionBucket = "oauth_completion" MetaBucket = "meta" CacheBucket = "cache" CacheStatsBucket = "cache_stats" SessionsBucket = "sessions" )
Bucket names for bbolt database
const ( SchemaVersionKey = "schema" DockerRecoveryStateKey = "docker_recovery_state" )
Meta keys
const ActivityRecordsBucket = "activity_records"
ActivityRecordsBucket is the BBolt bucket name for activity records
const CurrentSchemaVersion = 2
Current schema version
const DefaultMaxResponseSize = 64 * 1024
DefaultMaxResponseSize is the default maximum size for response truncation (64KB)
Variables ¶
var ValidActivityTypes = []string{ string(ActivityTypeToolCall), string(ActivityTypePolicyDecision), string(ActivityTypeQuarantineChange), string(ActivityTypeServerChange), string(ActivityTypeSystemStart), string(ActivityTypeSystemStop), string(ActivityTypeInternalToolCall), string(ActivityTypeConfigChange), }
ValidActivityTypes is the list of all valid activity types for filtering (Spec 024)
Functions ¶
func GenerateServerID ¶
func GenerateServerID(server *config.ServerConfig) string
GenerateServerID creates a unique, stable identity for a server
func GenerateServerIDFromAttributes ¶
func GenerateServerIDFromAttributes(attrs ServerAttributes) string
GenerateServerIDFromAttributes creates ID from attributes directly
Types ¶
type ActivityFilter ¶
type ActivityFilter struct {
Types []string // Filter by activity types (Spec 024: supports multiple types with OR logic)
Server string // Filter by server name
Tool string // Filter by tool name
SessionID string // Filter by MCP session
Status string // Filter by status (success/error/blocked)
StartTime time.Time // Activities after this time
EndTime time.Time // Activities before this time
Limit int // Max records to return (default 50, max 100)
Offset int // Pagination offset
IntentType string // Filter by intent operation type: read, write, destructive (Spec 018)
RequestID string // Filter by HTTP request ID for correlation (Spec 021)
// ExcludeCallToolSuccess filters out successful call_tool_* internal tool calls.
// These appear as duplicates since the actual upstream tool call is also logged.
// Failed call_tool_* calls are still shown (no corresponding tool_call entry).
// Default: true (to avoid duplicate entries in UI/CLI)
ExcludeCallToolSuccess bool
}
ActivityFilter represents query parameters for filtering activity records
func DefaultActivityFilter ¶
func DefaultActivityFilter() ActivityFilter
DefaultActivityFilter returns an ActivityFilter with sensible defaults
func (*ActivityFilter) Matches ¶
func (f *ActivityFilter) Matches(record *ActivityRecord) bool
Matches checks if an activity record matches the filter criteria
func (*ActivityFilter) Validate ¶
func (f *ActivityFilter) Validate()
Validate validates and normalizes the filter
type ActivityRecord ¶
type ActivityRecord struct {
ID string `json:"id"` // Unique identifier (ULID format)
Type ActivityType `json:"type"` // Type of activity
Source ActivitySource `json:"source,omitempty"` // How activity was triggered: "mcp", "cli", "api"
ServerName string `json:"server_name,omitempty"` // Name of upstream MCP server
ToolName string `json:"tool_name,omitempty"` // Name of tool called
Arguments map[string]interface{} `json:"arguments,omitempty"` // Tool call arguments
Response string `json:"response,omitempty"` // Tool response (potentially truncated)
ResponseTruncated bool `json:"response_truncated,omitempty"` // True if response was truncated
Status string `json:"status"` // Result status: "success", "error", "blocked"
ErrorMessage string `json:"error_message,omitempty"` // Error details if status is "error"
DurationMs int64 `json:"duration_ms,omitempty"` // Execution duration in milliseconds
Timestamp time.Time `json:"timestamp"` // When activity occurred
SessionID string `json:"session_id,omitempty"` // MCP session ID for correlation
RequestID string `json:"request_id,omitempty"` // HTTP request ID for correlation
Metadata map[string]interface{} `json:"metadata,omitempty"` // Additional context-specific data
}
ActivityRecord represents a single activity log entry stored in BBolt
func ActivityRecordFromJSON ¶
func ActivityRecordFromJSON(data []byte) (*ActivityRecord, error)
ActivityRecordFromJSON parses an activity record from JSON bytes.
func (*ActivityRecord) MarshalBinary ¶
func (a *ActivityRecord) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler for BBolt storage
func (*ActivityRecord) UnmarshalBinary ¶
func (a *ActivityRecord) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler for BBolt storage
type ActivitySource ¶
type ActivitySource string
ActivitySource indicates how the activity was triggered
const ( // ActivitySourceMCP indicates the activity was triggered via MCP protocol (AI agent) ActivitySourceMCP ActivitySource = "mcp" // ActivitySourceCLI indicates the activity was triggered via CLI command ActivitySourceCLI ActivitySource = "cli" // ActivitySourceAPI indicates the activity was triggered via REST API ActivitySourceAPI ActivitySource = "api" )
type ActivityType ¶
type ActivityType string
ActivityType represents the type of activity being recorded
const ( // ActivityTypeToolCall represents a tool execution event ActivityTypeToolCall ActivityType = "tool_call" // ActivityTypePolicyDecision represents a policy blocking a tool call ActivityTypePolicyDecision ActivityType = "policy_decision" // ActivityTypeQuarantineChange represents a server quarantine state change ActivityTypeQuarantineChange ActivityType = "quarantine_change" // ActivityTypeServerChange represents a server configuration change ActivityTypeServerChange ActivityType = "server_change" // ActivityTypeSystemStart represents MCPProxy server startup (Spec 024) ActivityTypeSystemStart ActivityType = "system_start" // ActivityTypeSystemStop represents MCPProxy server shutdown (Spec 024) ActivityTypeSystemStop ActivityType = "system_stop" // ActivityTypeInternalToolCall represents internal MCP tool calls like retrieve_tools, call_tool_* (Spec 024) ActivityTypeInternalToolCall ActivityType = "internal_tool_call" // ActivityTypeConfigChange represents configuration changes like server add/remove/update (Spec 024) ActivityTypeConfigChange ActivityType = "config_change" )
type AsyncManager ¶
type AsyncManager struct {
// contains filtered or unexported fields
}
AsyncManager handles asynchronous storage operations to prevent deadlocks
func NewAsyncManager ¶
func NewAsyncManager(db *BoltDB, logger *zap.SugaredLogger) *AsyncManager
NewAsyncManager creates a new async storage manager
func (*AsyncManager) DeleteServerAsync ¶
func (am *AsyncManager) DeleteServerAsync(name string)
DeleteServerAsync queues a delete server operation
func (*AsyncManager) EnableServerAsync ¶
func (am *AsyncManager) EnableServerAsync(name string, enabled bool)
EnableServerAsync queues an enable/disable operation
func (*AsyncManager) EnableServerSync ¶
func (am *AsyncManager) EnableServerSync(name string, enabled bool) error
EnableServerSync queues an enable/disable operation and waits for confirmation
func (*AsyncManager) QuarantineServerAsync ¶
func (am *AsyncManager) QuarantineServerAsync(name string, quarantined bool)
QuarantineServerAsync queues a quarantine operation
func (*AsyncManager) QuarantineServerSync ¶
func (am *AsyncManager) QuarantineServerSync(name string, quarantined bool) error
QuarantineServerSync queues a quarantine operation and waits for confirmation
func (*AsyncManager) SaveServerAsync ¶
func (am *AsyncManager) SaveServerAsync(serverConfig *config.ServerConfig)
SaveServerAsync queues a save server operation
func (*AsyncManager) Start ¶
func (am *AsyncManager) Start()
Start begins processing storage operations in a dedicated goroutine
func (*AsyncManager) Stop ¶
func (am *AsyncManager) Stop()
Stop gracefully shuts down the async manager
type BoltDB ¶
type BoltDB struct {
// contains filtered or unexported fields
}
BoltDB wraps bolt database operations
func NewBoltDB ¶
func NewBoltDB(dataDir string, logger *zap.SugaredLogger) (*BoltDB, error)
NewBoltDB creates a new BoltDB instance
func (*BoltDB) CleanupOldOAuthCompletionEvents ¶
CleanupOldOAuthCompletionEvents removes OAuth completion events older than 24 hours
func (*BoltDB) ClearOAuthClientCredentials ¶
ClearOAuthClientCredentials clears only the DCR-related fields (ClientID, ClientSecret, CallbackPort) while preserving any existing token data. This is called when the callback port conflicts and fresh DCR is required (Spec 022: OAuth Redirect URI Port Persistence)
func (*BoltDB) DeleteOAuthToken ¶
DeleteOAuthToken deletes an OAuth token record
func (*BoltDB) DeleteToolHash ¶
DeleteToolHash deletes a tool hash
func (*BoltDB) DeleteUpstream ¶
DeleteUpstream deletes an upstream server record
func (*BoltDB) GetOAuthClientCredentials ¶
func (b *BoltDB) GetOAuthClientCredentials(serverKey string) (clientID, clientSecret string, callbackPort int, err error)
GetOAuthClientCredentials retrieves the client credentials and callback port for token refresh callbackPort returns 0 if not stored (legacy records or fresh records without DCR)
func (*BoltDB) GetOAuthToken ¶
func (b *BoltDB) GetOAuthToken(serverName string) (*OAuthTokenRecord, error)
GetOAuthToken retrieves an OAuth token record by server name
func (*BoltDB) GetSchemaVersion ¶
GetSchemaVersion returns the current schema version
func (*BoltDB) GetToolHash ¶
GetToolHash retrieves a tool hash
func (*BoltDB) GetToolStats ¶
func (b *BoltDB) GetToolStats(toolName string) (*ToolStatRecord, error)
GetToolStats retrieves tool statistics
func (*BoltDB) GetUnprocessedOAuthCompletionEvents ¶
func (s *BoltDB) GetUnprocessedOAuthCompletionEvents() ([]*OAuthCompletionEvent, error)
GetUnprocessedOAuthCompletionEvents returns all unprocessed OAuth completion events
func (*BoltDB) GetUpstream ¶
func (b *BoltDB) GetUpstream(id string) (*UpstreamRecord, error)
GetUpstream retrieves an upstream server record by ID
func (*BoltDB) IncrementToolStats ¶
IncrementToolStats increments the usage count for a tool
func (*BoltDB) ListOAuthTokens ¶
func (b *BoltDB) ListOAuthTokens() ([]*OAuthTokenRecord, error)
ListOAuthTokens returns all OAuth token records
func (*BoltDB) ListToolStats ¶
func (b *BoltDB) ListToolStats() ([]*ToolStatRecord, error)
ListToolStats returns all tool statistics
func (*BoltDB) ListUpstreams ¶
func (b *BoltDB) ListUpstreams() ([]*UpstreamRecord, error)
ListUpstreams returns all upstream server records
func (*BoltDB) MarkOAuthCompletionEventProcessed ¶
MarkOAuthCompletionEventProcessed marks an OAuth completion event as processed
func (*BoltDB) SaveOAuthCompletionEvent ¶
func (s *BoltDB) SaveOAuthCompletionEvent(event *OAuthCompletionEvent) error
SaveOAuthCompletionEvent saves an OAuth completion event to the database
func (*BoltDB) SaveOAuthToken ¶
func (b *BoltDB) SaveOAuthToken(record *OAuthTokenRecord) error
SaveOAuthToken saves an OAuth token record
func (*BoltDB) SaveToolHash ¶
SaveToolHash saves a tool hash for change detection
func (*BoltDB) SaveUpstream ¶
func (b *BoltDB) SaveUpstream(record *UpstreamRecord) error
SaveUpstream saves an upstream server record
func (*BoltDB) UpdateOAuthClientCredentials ¶
func (b *BoltDB) UpdateOAuthClientCredentials(serverKey, clientID, clientSecret string, callbackPort int) error
UpdateOAuthClientCredentials updates the client credentials (from DCR) and callback port on an existing token This is called after successful Dynamic Client Registration to persist the obtained client_id/secret and the callback port used for the redirect_uri (Spec 022: OAuth Redirect URI Port Persistence)
type DatabaseLockedError ¶
DatabaseLockedError indicates that the database is locked by another process
func (*DatabaseLockedError) Error ¶
func (e *DatabaseLockedError) Error() string
func (*DatabaseLockedError) Unwrap ¶
func (e *DatabaseLockedError) Unwrap() error
type DiagnosticRecord ¶
type DiagnosticRecord struct {
ServerID string `json:"server_id"`
ServerName string `json:"server_name"`
Type string `json:"type"` // error, warning, info
Category string `json:"category"` // oauth, connection, etc.
Message string `json:"message"`
Details map[string]interface{} `json:"details"`
Timestamp time.Time `json:"timestamp"`
ConfigPath string `json:"config_path"`
Resolved bool `json:"resolved"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
}
DiagnosticRecord represents a diagnostic event for a server
type DockerRecoveryState ¶
type DockerRecoveryState struct {
LastAttempt time.Time `json:"last_attempt"`
FailureCount int `json:"failure_count"`
DockerAvailable bool `json:"docker_available"`
RecoveryMode bool `json:"recovery_mode"`
LastError string `json:"last_error,omitempty"`
AttemptsSinceUp int `json:"attempts_since_up"` // Attempts since Docker was last available
LastSuccessfulAt time.Time `json:"last_successful_at,omitempty"`
}
DockerRecoveryState represents the persistent state of Docker recovery This allows recovery to resume after application restart
func (*DockerRecoveryState) MarshalBinary ¶
func (d *DockerRecoveryState) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*DockerRecoveryState) UnmarshalBinary ¶
func (d *DockerRecoveryState) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type EnableServerData ¶
Data structures for different operation types
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager provides a unified interface for storage operations
func NewManager ¶
func NewManager(dataDir string, logger *zap.SugaredLogger) (*Manager, error)
NewManager creates a new storage manager
func (*Manager) AddUpstream ¶
func (m *Manager) AddUpstream(serverConfig *config.ServerConfig) (string, error)
AddUpstream adds an upstream server and returns its ID
func (*Manager) CleanupStaleServerData ¶
CleanupStaleServerData removes data for servers that haven't been seen for a threshold period
func (*Manager) ClearDockerRecoveryState ¶
ClearDockerRecoveryState removes the Docker recovery state from persistent storage
func (*Manager) ClearOAuthState ¶
ClearOAuthState clears all OAuth state for a server (tokens, client registration, etc.) This should be called when OAuth configuration changes to force re-authentication
func (*Manager) CloseAllActiveSessions ¶
CloseAllActiveSessions marks all active sessions as closed This should be called on startup to clean up stale sessions from previous runs
func (*Manager) CloseInactiveSessions ¶
CloseInactiveSessions closes sessions that haven't had activity for the specified duration
func (*Manager) CloseSession ¶
CloseSession marks a session as closed with end time
func (*Manager) CountActivities ¶
CountActivities returns the total number of activity records.
func (*Manager) CreateSession ¶
func (m *Manager) CreateSession(session *SessionRecord) error
CreateSession creates a new session record
func (*Manager) DeleteActivity ¶
DeleteActivity deletes an activity record by ID. Returns nil if the record doesn't exist.
func (*Manager) DeleteToolHash ¶
DeleteToolHash deletes a tool hash
func (*Manager) DeleteUpstreamServer ¶
DeleteUpstreamServer deletes an upstream server
func (*Manager) EnableUpstreamServer ¶
EnableUpstreamServer enables/disables an upstream server using async operations
func (*Manager) GetActivity ¶
func (m *Manager) GetActivity(id string) (*ActivityRecord, error)
GetActivity retrieves an activity record by ID. Returns nil if the record is not found.
func (*Manager) GetActivityByIDScan ¶
func (m *Manager) GetActivityByIDScan(id string) (*ActivityRecord, error)
GetActivityByIDScan performs a full scan to find activity by ID. This is less efficient than GetActivity but works when the timestamp is unknown.
func (*Manager) GetBoltDB ¶
GetBoltDB returns the wrapped BoltDB instance for higher-level operations
func (*Manager) GetOAuthToken ¶
func (m *Manager) GetOAuthToken(serverName string) (*OAuthTokenRecord, error)
GetOAuthToken retrieves an OAuth token for a server from storage
func (*Manager) GetRecentSessions ¶
func (m *Manager) GetRecentSessions(limit int) ([]*SessionRecord, int, error)
GetRecentSessions returns the most recent sessions
func (*Manager) GetSchemaVersion ¶
GetSchemaVersion returns the current schema version
func (*Manager) GetServerDiagnostics ¶
func (m *Manager) GetServerDiagnostics(serverID string, limit int) ([]*DiagnosticRecord, error)
GetServerDiagnostics gets diagnostic records for a server
func (*Manager) GetServerIdentity ¶
func (m *Manager) GetServerIdentity(server *config.ServerConfig) (*ServerIdentity, error)
GetServerIdentity gets server identity by config
func (*Manager) GetServerIdentityByID ¶
func (m *Manager) GetServerIdentityByID(serverID string) (*ServerIdentity, error)
GetServerIdentityByID gets server identity by ID
func (*Manager) GetServerStatistics ¶
func (m *Manager) GetServerStatistics(serverID string) (*ServerStatistics, error)
GetServerStatistics gets statistics for a server
func (*Manager) GetServerToolCalls ¶
func (m *Manager) GetServerToolCalls(serverID string, limit int) ([]*ToolCallRecord, error)
GetServerToolCalls gets tool calls for a server
func (*Manager) GetSessionByID ¶
func (m *Manager) GetSessionByID(sessionID string) (*SessionRecord, error)
GetSessionByID retrieves a session by its ID
func (*Manager) GetToolCallsBySession ¶
func (m *Manager) GetToolCallsBySession(sessionID string, limit, offset int) ([]*ToolCallRecord, int, error)
GetToolCallsBySession retrieves tool calls filtered by session ID
func (*Manager) GetToolHash ¶
GetToolHash retrieves a tool hash
func (*Manager) GetToolStatistics ¶
GetToolStatistics returns aggregated tool statistics
func (*Manager) GetToolStats ¶
GetToolStats gets tool statistics formatted for MCP responses
func (*Manager) GetToolUsage ¶
func (m *Manager) GetToolUsage(toolName string) (*ToolStatRecord, error)
GetToolUsage retrieves usage statistics for a tool
func (*Manager) GetUpstreamServer ¶
func (m *Manager) GetUpstreamServer(name string) (*config.ServerConfig, error)
GetUpstreamServer retrieves an upstream server by name
func (*Manager) HasToolChanged ¶
HasToolChanged checks if a tool has changed based on its hash
func (*Manager) IncrementToolUsage ¶
IncrementToolUsage increments the usage count for a tool
func (*Manager) ListActivities ¶
func (m *Manager) ListActivities(filter ActivityFilter) ([]*ActivityRecord, int, error)
ListActivities returns paginated activity records matching the filter. Records are returned in reverse chronological order (newest first). Returns the records, total matching count, and any error.
func (*Manager) ListOAuthTokens ¶
func (m *Manager) ListOAuthTokens() ([]*OAuthTokenRecord, error)
ListOAuthTokens returns all OAuth token records from storage. Used by RefreshManager to initialize proactive refresh schedules on startup.
func (*Manager) ListQuarantinedTools ¶
ListQuarantinedTools returns tools from quarantined servers with full descriptions for security analysis
func (*Manager) ListQuarantinedUpstreamServers ¶
func (m *Manager) ListQuarantinedUpstreamServers() ([]*config.ServerConfig, error)
ListQuarantinedUpstreamServers returns all quarantined upstream servers
func (*Manager) ListServerIdentities ¶
func (m *Manager) ListServerIdentities() ([]*ServerIdentity, error)
ListServerIdentities lists all server identities
func (*Manager) ListUpstreamServers ¶
func (m *Manager) ListUpstreamServers() ([]*config.ServerConfig, error)
ListUpstreamServers returns all upstream servers
func (*Manager) ListUpstreams ¶
func (m *Manager) ListUpstreams() ([]*config.ServerConfig, error)
ListUpstreams is an alias for ListUpstreamServers
func (*Manager) LoadDockerRecoveryState ¶
func (m *Manager) LoadDockerRecoveryState() (*DockerRecoveryState, error)
LoadDockerRecoveryState loads the Docker recovery state from persistent storage
func (*Manager) PruneExcessActivities ¶
PruneExcessActivities deletes oldest records when count exceeds maxRecords. Deletes records until count is at targetPercent of maxRecords (default 90%). Returns the number of records deleted.
func (*Manager) PruneOldActivities ¶
PruneOldActivities deletes activity records older than the specified duration. Returns the number of records deleted.
func (*Manager) QuarantineUpstreamServer ¶
QuarantineUpstreamServer sets the quarantine status of an upstream server using async operations
func (*Manager) RecordServerDiagnostic ¶
func (m *Manager) RecordServerDiagnostic(record *DiagnosticRecord) error
RecordServerDiagnostic records a diagnostic event for a server
func (*Manager) RecordToolCall ¶
func (m *Manager) RecordToolCall(record *ToolCallRecord) error
RecordToolCall records a tool call for a server
func (*Manager) RegisterServerIdentity ¶
func (m *Manager) RegisterServerIdentity(server *config.ServerConfig, configPath string) (*ServerIdentity, error)
RegisterServerIdentity registers or updates a server identity
func (*Manager) RemoveUpstream ¶
RemoveUpstream removes an upstream server by ID/name
func (*Manager) SaveActivity ¶
func (m *Manager) SaveActivity(record *ActivityRecord) error
SaveActivity stores an activity record in BBolt. The record is stored with a composite key for efficient time-based queries.
func (*Manager) SaveActivityAsync ¶
func (m *Manager) SaveActivityAsync(record *ActivityRecord)
SaveActivityAsync saves an activity record asynchronously. This is non-blocking and suitable for recording tool calls without impacting latency.
func (*Manager) SaveDockerRecoveryState ¶
func (m *Manager) SaveDockerRecoveryState(state *DockerRecoveryState) error
SaveDockerRecoveryState saves the Docker recovery state to persistent storage
func (*Manager) SaveToolHash ¶
SaveToolHash saves a tool hash for change detection
func (*Manager) SaveUpstreamServer ¶
func (m *Manager) SaveUpstreamServer(serverConfig *config.ServerConfig) error
SaveUpstreamServer saves an upstream server configuration
func (*Manager) StreamActivities ¶
func (m *Manager) StreamActivities(filter ActivityFilter) <-chan *ActivityRecord
StreamActivities returns a channel that yields activity records matching the filter. The channel is closed when all matching records have been sent. This is useful for streaming large exports without loading all records into memory.
func (*Manager) UpdateServerStatistics ¶
func (m *Manager) UpdateServerStatistics(stats *ServerStatistics) error
UpdateServerStatistics updates server statistics
func (*Manager) UpdateSessionStats ¶
UpdateSessionStats increments tool call count and adds tokens
func (*Manager) UpdateUpstream ¶
func (m *Manager) UpdateUpstream(id string, serverConfig *config.ServerConfig) error
UpdateUpstream updates an upstream server configuration
type OAuthAttributes ¶
type OAuthAttributes struct {
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"` // Note: This might contain secrets
RedirectURI string `json:"redirect_uri"`
Scopes []string `json:"scopes"`
PKCEEnabled bool `json:"pkce_enabled"`
}
OAuthAttributes represents stable OAuth configuration attributes
type OAuthCompletionEvent ¶
type OAuthCompletionEvent struct {
ServerName string `json:"server_name"`
CompletedAt time.Time `json:"completed_at"`
ProcessedAt *time.Time `json:"processed_at,omitempty"` // Nil if not yet processed by server
}
OAuthCompletionEvent represents an OAuth completion event for cross-process notification
func (*OAuthCompletionEvent) MarshalBinary ¶
func (e *OAuthCompletionEvent) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*OAuthCompletionEvent) UnmarshalBinary ¶
func (e *OAuthCompletionEvent) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type OAuthTokenRecord ¶
type OAuthTokenRecord struct {
ServerName string `json:"server_name"` // Storage key (serverName_hash format)
DisplayName string `json:"display_name,omitempty"` // Actual server name (for RefreshManager lookup)
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token,omitempty"`
TokenType string `json:"token_type"`
ExpiresAt time.Time `json:"expires_at"`
Scopes []string `json:"scopes,omitempty"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
// ClientID and ClientSecret are persisted for DCR (Dynamic Client Registration)
// These are required for token refresh when using DCR-obtained credentials
ClientID string `json:"client_id,omitempty"`
ClientSecret string `json:"client_secret,omitempty"`
// CallbackPort and RedirectURI are persisted for OAuth redirect URI port persistence (Spec 022)
// These ensure re-authentication uses the same port registered during DCR
CallbackPort int `json:"callback_port,omitempty"` // Port used during DCR for redirect_uri
RedirectURI string `json:"redirect_uri,omitempty"` // Full redirect URI registered with DCR
}
OAuthTokenRecord represents stored OAuth tokens for a server
func (*OAuthTokenRecord) GetServerName ¶
func (r *OAuthTokenRecord) GetServerName() string
GetServerName returns the actual server name for RefreshManager lookup. Falls back to ServerName if DisplayName is not set (for backward compatibility).
func (*OAuthTokenRecord) MarshalBinary ¶
func (o *OAuthTokenRecord) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*OAuthTokenRecord) UnmarshalBinary ¶
func (o *OAuthTokenRecord) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type QuarantineServerData ¶
type QueueFullError ¶
type QueueFullError struct {
Operation string
}
func (*QueueFullError) Error ¶
func (e *QueueFullError) Error() string
type Result ¶
type Result struct {
Error error
Data interface{}
}
Result contains the result of a storage operation
type ServerAttributes ¶
type ServerAttributes struct {
Name string `json:"name"` // Server name (required)
Protocol string `json:"protocol"` // http, stdio, etc.
URL string `json:"url"` // For HTTP servers
Command string `json:"command"` // For stdio servers
Args []string `json:"args"` // For stdio servers
WorkingDir string `json:"working_dir"` // Working directory
Env map[string]string `json:"env"` // Environment variables (sorted)
Headers map[string]string `json:"headers"` // HTTP headers (sorted)
OAuth *OAuthAttributes `json:"oauth"` // OAuth configuration (if present)
}
ServerAttributes represents the stable attributes that define a server's identity
type ServerIdentity ¶
type ServerIdentity struct {
ID string `json:"id"` // SHA256 hash of stable attributes
ServerName string `json:"server_name"` // Human-readable name
Fingerprint string `json:"fingerprint"` // Short hash (first 12 chars) for display
Attributes ServerAttributes `json:"attributes"` // Stable configuration attributes
FirstSeen time.Time `json:"first_seen"` // When first encountered
LastSeen time.Time `json:"last_seen"` // When last active
ConfigPaths []string `json:"config_paths"` // All configs that have included this server
Metadata map[string]string `json:"metadata"` // Additional metadata
}
ServerIdentity represents a unique server identity based on stable configuration
func NewServerIdentity ¶
func NewServerIdentity(server *config.ServerConfig, configPath string) *ServerIdentity
NewServerIdentity creates a new ServerIdentity from config
func (*ServerIdentity) GetShortID ¶
func (si *ServerIdentity) GetShortID() string
GetShortID returns a shortened version of the ID for display
func (*ServerIdentity) IsStale ¶
func (si *ServerIdentity) IsStale(threshold time.Duration) bool
IsStale returns true if the server hasn't been seen for a long time
func (*ServerIdentity) UpdateLastSeen ¶
func (si *ServerIdentity) UpdateLastSeen(configPath string)
UpdateLastSeen updates the last seen timestamp and adds config path if new
type ServerStatistics ¶
type ServerStatistics struct {
ServerID string `json:"server_id"`
ServerName string `json:"server_name"`
TotalCalls int `json:"total_calls"`
SuccessfulCalls int `json:"successful_calls"`
ErrorCalls int `json:"error_calls"`
AverageResponseTime int64 `json:"avg_response_time"` // nanoseconds
LastCallTime *time.Time `json:"last_call_time,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
ServerStatistics represents statistical data for a server
type SessionRecord ¶
type SessionRecord struct {
ID string `json:"id"`
ClientName string `json:"client_name,omitempty"`
ClientVersion string `json:"client_version,omitempty"`
Status string `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
LastActivity time.Time `json:"last_activity"`
ToolCallCount int `json:"tool_call_count"`
TotalTokens int `json:"total_tokens"`
// MCP Client Capabilities
HasRoots bool `json:"has_roots,omitempty"` // Whether client supports roots
HasSampling bool `json:"has_sampling,omitempty"` // Whether client supports sampling
Experimental []string `json:"experimental,omitempty"` // Experimental capability names
}
SessionRecord represents a stored MCP session
type TimeoutError ¶
type TimeoutError struct {
Operation string
}
Error types for async operations
func (*TimeoutError) Error ¶
func (e *TimeoutError) Error() string
type TokenMetrics ¶
type TokenMetrics struct {
InputTokens int `json:"input_tokens"` // Tokens in the request
OutputTokens int `json:"output_tokens"` // Tokens in the response
TotalTokens int `json:"total_tokens"` // Total tokens (input + output)
Model string `json:"model"` // Model used for tokenization
Encoding string `json:"encoding"` // Encoding used (e.g., cl100k_base)
EstimatedCost float64 `json:"estimated_cost,omitempty"` // Optional cost estimate
TruncatedTokens int `json:"truncated_tokens,omitempty"` // Tokens removed by truncation
WasTruncated bool `json:"was_truncated"` // Whether response was truncated
}
TokenMetrics represents token usage statistics for a tool call
type ToolCallRecord ¶
type ToolCallRecord struct {
ID string `json:"id"` // UUID
ServerID string `json:"server_id"` // Server identity
ServerName string `json:"server_name"` // For quick reference
ToolName string `json:"tool_name"` // Original tool name (without server prefix)
Arguments map[string]interface{} `json:"arguments"` // Tool arguments
Response interface{} `json:"response"` // Tool response
Error string `json:"error"` // Error if failed
Duration int64 `json:"duration"` // Duration in nanoseconds
Timestamp time.Time `json:"timestamp"` // When the call was made
ConfigPath string `json:"config_path"` // Which config was active
RequestID string `json:"request_id"` // For correlation
Metrics *TokenMetrics `json:"metrics,omitempty"` // Token usage metrics (nil for older records)
ParentCallID string `json:"parent_call_id,omitempty"` // Links nested calls to parent code_execution
ExecutionType string `json:"execution_type,omitempty"` // "direct" or "code_execution"
MCPSessionID string `json:"mcp_session_id,omitempty"` // MCP session identifier
MCPClientName string `json:"mcp_client_name,omitempty"` // MCP client name from InitializeRequest
MCPClientVersion string `json:"mcp_client_version,omitempty"` // MCP client version
Annotations *config.ToolAnnotations `json:"annotations,omitempty"` // Tool behavior hints snapshot
}
ToolCallRecord represents a tool call with server context
type ToolHashRecord ¶
type ToolHashRecord struct {
ToolName string `json:"tool_name"`
Hash string `json:"hash"`
Updated time.Time `json:"updated"`
}
ToolHashRecord represents a tool hash for change detection
func (*ToolHashRecord) MarshalBinary ¶
func (h *ToolHashRecord) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*ToolHashRecord) UnmarshalBinary ¶
func (h *ToolHashRecord) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type ToolStatRecord ¶
type ToolStatRecord struct {
ToolName string `json:"tool_name"`
Count uint64 `json:"count"`
LastUsed time.Time `json:"last_used"`
}
ToolStatRecord represents tool usage statistics
func (*ToolStatRecord) MarshalBinary ¶
func (t *ToolStatRecord) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*ToolStatRecord) UnmarshalBinary ¶
func (t *ToolStatRecord) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type UnsupportedOperationError ¶
type UnsupportedOperationError struct {
Operation string
}
UnsupportedOperationError is returned for unknown operation types
func (*UnsupportedOperationError) Error ¶
func (e *UnsupportedOperationError) Error() string
type UpstreamRecord ¶
type UpstreamRecord struct {
ID string `json:"id"`
Name string `json:"name"`
URL string `json:"url,omitempty"`
Protocol string `json:"protocol,omitempty"` // stdio, http, sse, streamable-http, auto
Command string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
WorkingDir string `json:"working_dir,omitempty"` // Working directory for stdio servers
Env map[string]string `json:"env,omitempty"`
Headers map[string]string `json:"headers,omitempty"` // For HTTP authentication
OAuth *config.OAuthConfig `json:"oauth,omitempty"` // OAuth configuration
Enabled bool `json:"enabled"`
Quarantined bool `json:"quarantined"` // Security quarantine status
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Isolation *config.IsolationConfig `json:"isolation,omitempty"` // Per-server isolation settings
}
UpstreamRecord represents an upstream server record in storage
func (*UpstreamRecord) MarshalBinary ¶
func (u *UpstreamRecord) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*UpstreamRecord) UnmarshalBinary ¶
func (u *UpstreamRecord) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler