storage

package
v0.16.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpstreamsBucket       = "upstreams"
	ToolStatsBucket       = "toolstats"
	ToolHashBucket        = "toolhash"
	OAuthTokenBucket      = "oauth_tokens" //nolint:gosec // bucket name, not a credential
	OAuthCompletionBucket = "oauth_completion"
	MetaBucket            = "meta"
	CacheBucket           = "cache"
	CacheStatsBucket      = "cache_stats"
	SessionsBucket        = "sessions"
)

Bucket names for bbolt database

View Source
const (
	SchemaVersionKey       = "schema"
	DockerRecoveryStateKey = "docker_recovery_state"
)

Meta keys

View Source
const ActivityRecordsBucket = "activity_records"

ActivityRecordsBucket is the BBolt bucket name for activity records

View Source
const CurrentSchemaVersion = 2

Current schema version

View Source
const DefaultMaxResponseSize = 64 * 1024

DefaultMaxResponseSize is the default maximum size for response truncation (64KB)

Variables

ValidActivityTypes is the list of all valid activity types for filtering (Spec 024)

Functions

func GenerateServerID

func GenerateServerID(server *config.ServerConfig) string

GenerateServerID creates a unique, stable identity for a server

func GenerateServerIDFromAttributes

func GenerateServerIDFromAttributes(attrs ServerAttributes) string

GenerateServerIDFromAttributes creates ID from attributes directly

func TruncateActivityResponse

func TruncateActivityResponse(response string, maxSize int) (string, bool)

TruncateActivityResponse is a helper to truncate responses for storage.

Types

type ActivityFilter

type ActivityFilter struct {
	Types      []string  // Filter by activity types (Spec 024: supports multiple types with OR logic)
	Server     string    // Filter by server name
	Tool       string    // Filter by tool name
	SessionID  string    // Filter by MCP session
	Status     string    // Filter by status (success/error/blocked)
	StartTime  time.Time // Activities after this time
	EndTime    time.Time // Activities before this time
	Limit      int       // Max records to return (default 50, max 100)
	Offset     int       // Pagination offset
	IntentType string    // Filter by intent operation type: read, write, destructive (Spec 018)
	RequestID  string    // Filter by HTTP request ID for correlation (Spec 021)

	// ExcludeCallToolSuccess filters out successful call_tool_* internal tool calls.
	// These appear as duplicates since the actual upstream tool call is also logged.
	// Failed call_tool_* calls are still shown (no corresponding tool_call entry).
	// Default: true (to avoid duplicate entries in UI/CLI)
	ExcludeCallToolSuccess bool
}

ActivityFilter represents query parameters for filtering activity records

func DefaultActivityFilter

func DefaultActivityFilter() ActivityFilter

DefaultActivityFilter returns an ActivityFilter with sensible defaults

func (*ActivityFilter) Matches

func (f *ActivityFilter) Matches(record *ActivityRecord) bool

Matches checks if an activity record matches the filter criteria

func (*ActivityFilter) Validate

func (f *ActivityFilter) Validate()

Validate validates and normalizes the filter

type ActivityRecord

type ActivityRecord struct {
	ID                string                 `json:"id"`                           // Unique identifier (ULID format)
	Type              ActivityType           `json:"type"`                         // Type of activity
	Source            ActivitySource         `json:"source,omitempty"`             // How activity was triggered: "mcp", "cli", "api"
	ServerName        string                 `json:"server_name,omitempty"`        // Name of upstream MCP server
	ToolName          string                 `json:"tool_name,omitempty"`          // Name of tool called
	Arguments         map[string]interface{} `json:"arguments,omitempty"`          // Tool call arguments
	Response          string                 `json:"response,omitempty"`           // Tool response (potentially truncated)
	ResponseTruncated bool                   `json:"response_truncated,omitempty"` // True if response was truncated
	Status            string                 `json:"status"`                       // Result status: "success", "error", "blocked"
	ErrorMessage      string                 `json:"error_message,omitempty"`      // Error details if status is "error"
	DurationMs        int64                  `json:"duration_ms,omitempty"`        // Execution duration in milliseconds
	Timestamp         time.Time              `json:"timestamp"`                    // When activity occurred
	SessionID         string                 `json:"session_id,omitempty"`         // MCP session ID for correlation
	RequestID         string                 `json:"request_id,omitempty"`         // HTTP request ID for correlation
	Metadata          map[string]interface{} `json:"metadata,omitempty"`           // Additional context-specific data
}

ActivityRecord represents a single activity log entry stored in BBolt

func ActivityRecordFromJSON

func ActivityRecordFromJSON(data []byte) (*ActivityRecord, error)

ActivityRecordFromJSON parses an activity record from JSON bytes.

func (*ActivityRecord) MarshalBinary

func (a *ActivityRecord) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler for BBolt storage

func (*ActivityRecord) UnmarshalBinary

func (a *ActivityRecord) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler for BBolt storage

type ActivitySource

type ActivitySource string

ActivitySource indicates how the activity was triggered

const (
	// ActivitySourceMCP indicates the activity was triggered via MCP protocol (AI agent)
	ActivitySourceMCP ActivitySource = "mcp"
	// ActivitySourceCLI indicates the activity was triggered via CLI command
	ActivitySourceCLI ActivitySource = "cli"
	// ActivitySourceAPI indicates the activity was triggered via REST API
	ActivitySourceAPI ActivitySource = "api"
)

type ActivityType

type ActivityType string

ActivityType represents the type of activity being recorded

const (
	// ActivityTypeToolCall represents a tool execution event
	ActivityTypeToolCall ActivityType = "tool_call"
	// ActivityTypePolicyDecision represents a policy blocking a tool call
	ActivityTypePolicyDecision ActivityType = "policy_decision"
	// ActivityTypeQuarantineChange represents a server quarantine state change
	ActivityTypeQuarantineChange ActivityType = "quarantine_change"
	// ActivityTypeServerChange represents a server configuration change
	ActivityTypeServerChange ActivityType = "server_change"
	// ActivityTypeSystemStart represents MCPProxy server startup (Spec 024)
	ActivityTypeSystemStart ActivityType = "system_start"
	// ActivityTypeSystemStop represents MCPProxy server shutdown (Spec 024)
	ActivityTypeSystemStop ActivityType = "system_stop"
	// ActivityTypeInternalToolCall represents internal MCP tool calls like retrieve_tools, call_tool_* (Spec 024)
	ActivityTypeInternalToolCall ActivityType = "internal_tool_call"
	// ActivityTypeConfigChange represents configuration changes like server add/remove/update (Spec 024)
	ActivityTypeConfigChange ActivityType = "config_change"
)

type AsyncManager

type AsyncManager struct {
	// contains filtered or unexported fields
}

AsyncManager handles asynchronous storage operations to prevent deadlocks

func NewAsyncManager

func NewAsyncManager(db *BoltDB, logger *zap.SugaredLogger) *AsyncManager

NewAsyncManager creates a new async storage manager

func (*AsyncManager) DeleteServerAsync

func (am *AsyncManager) DeleteServerAsync(name string)

DeleteServerAsync queues a delete server operation

func (*AsyncManager) EnableServerAsync

func (am *AsyncManager) EnableServerAsync(name string, enabled bool)

EnableServerAsync queues an enable/disable operation

func (*AsyncManager) EnableServerSync

func (am *AsyncManager) EnableServerSync(name string, enabled bool) error

EnableServerSync queues an enable/disable operation and waits for confirmation

func (*AsyncManager) QuarantineServerAsync

func (am *AsyncManager) QuarantineServerAsync(name string, quarantined bool)

QuarantineServerAsync queues a quarantine operation

func (*AsyncManager) QuarantineServerSync

func (am *AsyncManager) QuarantineServerSync(name string, quarantined bool) error

QuarantineServerSync queues a quarantine operation and waits for confirmation

func (*AsyncManager) SaveServerAsync

func (am *AsyncManager) SaveServerAsync(serverConfig *config.ServerConfig)

SaveServerAsync queues a save server operation

func (*AsyncManager) Start

func (am *AsyncManager) Start()

Start begins processing storage operations in a dedicated goroutine

func (*AsyncManager) Stop

func (am *AsyncManager) Stop()

Stop gracefully shuts down the async manager

type BoltDB

type BoltDB struct {
	// contains filtered or unexported fields
}

BoltDB wraps bolt database operations

func NewBoltDB

func NewBoltDB(dataDir string, logger *zap.SugaredLogger) (*BoltDB, error)

NewBoltDB creates a new BoltDB instance

func (*BoltDB) Backup

func (b *BoltDB) Backup(destPath string) error

Backup creates a backup of the database

func (*BoltDB) CleanupOldOAuthCompletionEvents

func (s *BoltDB) CleanupOldOAuthCompletionEvents() error

CleanupOldOAuthCompletionEvents removes OAuth completion events older than 24 hours

func (*BoltDB) ClearOAuthClientCredentials

func (b *BoltDB) ClearOAuthClientCredentials(serverKey string) error

ClearOAuthClientCredentials clears only the DCR-related fields (ClientID, ClientSecret, CallbackPort) while preserving any existing token data. This is called when the callback port conflicts and fresh DCR is required (Spec 022: OAuth Redirect URI Port Persistence)

func (*BoltDB) Close

func (b *BoltDB) Close() error

Close closes the database

func (*BoltDB) DeleteOAuthToken

func (b *BoltDB) DeleteOAuthToken(serverName string) error

DeleteOAuthToken deletes an OAuth token record

func (*BoltDB) DeleteToolHash

func (b *BoltDB) DeleteToolHash(toolName string) error

DeleteToolHash deletes a tool hash

func (*BoltDB) DeleteUpstream

func (b *BoltDB) DeleteUpstream(id string) error

DeleteUpstream deletes an upstream server record

func (*BoltDB) GetOAuthClientCredentials

func (b *BoltDB) GetOAuthClientCredentials(serverKey string) (clientID, clientSecret string, callbackPort int, err error)

GetOAuthClientCredentials retrieves the client credentials and callback port for token refresh callbackPort returns 0 if not stored (legacy records or fresh records without DCR)

func (*BoltDB) GetOAuthToken

func (b *BoltDB) GetOAuthToken(serverName string) (*OAuthTokenRecord, error)

GetOAuthToken retrieves an OAuth token record by server name

func (*BoltDB) GetSchemaVersion

func (b *BoltDB) GetSchemaVersion() (uint64, error)

GetSchemaVersion returns the current schema version

func (*BoltDB) GetToolHash

func (b *BoltDB) GetToolHash(toolName string) (string, error)

GetToolHash retrieves a tool hash

func (*BoltDB) GetToolStats

func (b *BoltDB) GetToolStats(toolName string) (*ToolStatRecord, error)

GetToolStats retrieves tool statistics

func (*BoltDB) GetUnprocessedOAuthCompletionEvents

func (s *BoltDB) GetUnprocessedOAuthCompletionEvents() ([]*OAuthCompletionEvent, error)

GetUnprocessedOAuthCompletionEvents returns all unprocessed OAuth completion events

func (*BoltDB) GetUpstream

func (b *BoltDB) GetUpstream(id string) (*UpstreamRecord, error)

GetUpstream retrieves an upstream server record by ID

func (*BoltDB) IncrementToolStats

func (b *BoltDB) IncrementToolStats(toolName string) error

IncrementToolStats increments the usage count for a tool

func (*BoltDB) ListOAuthTokens

func (b *BoltDB) ListOAuthTokens() ([]*OAuthTokenRecord, error)

ListOAuthTokens returns all OAuth token records

func (*BoltDB) ListToolStats

func (b *BoltDB) ListToolStats() ([]*ToolStatRecord, error)

ListToolStats returns all tool statistics

func (*BoltDB) ListUpstreams

func (b *BoltDB) ListUpstreams() ([]*UpstreamRecord, error)

ListUpstreams returns all upstream server records

func (*BoltDB) MarkOAuthCompletionEventProcessed

func (s *BoltDB) MarkOAuthCompletionEventProcessed(serverName string, completedAt time.Time) error

MarkOAuthCompletionEventProcessed marks an OAuth completion event as processed

func (*BoltDB) SaveOAuthCompletionEvent

func (s *BoltDB) SaveOAuthCompletionEvent(event *OAuthCompletionEvent) error

SaveOAuthCompletionEvent saves an OAuth completion event to the database

func (*BoltDB) SaveOAuthToken

func (b *BoltDB) SaveOAuthToken(record *OAuthTokenRecord) error

SaveOAuthToken saves an OAuth token record

func (*BoltDB) SaveToolHash

func (b *BoltDB) SaveToolHash(toolName, hash string) error

SaveToolHash saves a tool hash for change detection

func (*BoltDB) SaveUpstream

func (b *BoltDB) SaveUpstream(record *UpstreamRecord) error

SaveUpstream saves an upstream server record

func (*BoltDB) Stats

func (b *BoltDB) Stats() (*bbolt.Stats, error)

Stats returns database statistics

func (*BoltDB) UpdateOAuthClientCredentials

func (b *BoltDB) UpdateOAuthClientCredentials(serverKey, clientID, clientSecret string, callbackPort int) error

UpdateOAuthClientCredentials updates the client credentials (from DCR) and callback port on an existing token This is called after successful Dynamic Client Registration to persist the obtained client_id/secret and the callback port used for the redirect_uri (Spec 022: OAuth Redirect URI Port Persistence)

type DatabaseLockedError

type DatabaseLockedError struct {
	Path string
	Err  error
}

DatabaseLockedError indicates that the database is locked by another process

func (*DatabaseLockedError) Error

func (e *DatabaseLockedError) Error() string

func (*DatabaseLockedError) Unwrap

func (e *DatabaseLockedError) Unwrap() error

type DiagnosticRecord

type DiagnosticRecord struct {
	ServerID   string                 `json:"server_id"`
	ServerName string                 `json:"server_name"`
	Type       string                 `json:"type"`     // error, warning, info
	Category   string                 `json:"category"` // oauth, connection, etc.
	Message    string                 `json:"message"`
	Details    map[string]interface{} `json:"details"`
	Timestamp  time.Time              `json:"timestamp"`
	ConfigPath string                 `json:"config_path"`
	Resolved   bool                   `json:"resolved"`
	ResolvedAt *time.Time             `json:"resolved_at,omitempty"`
}

DiagnosticRecord represents a diagnostic event for a server

type DockerRecoveryState

type DockerRecoveryState struct {
	LastAttempt      time.Time `json:"last_attempt"`
	FailureCount     int       `json:"failure_count"`
	DockerAvailable  bool      `json:"docker_available"`
	RecoveryMode     bool      `json:"recovery_mode"`
	LastError        string    `json:"last_error,omitempty"`
	AttemptsSinceUp  int       `json:"attempts_since_up"` // Attempts since Docker was last available
	LastSuccessfulAt time.Time `json:"last_successful_at,omitempty"`
}

DockerRecoveryState represents the persistent state of Docker recovery This allows recovery to resume after application restart

func (*DockerRecoveryState) MarshalBinary

func (d *DockerRecoveryState) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*DockerRecoveryState) UnmarshalBinary

func (d *DockerRecoveryState) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type EnableServerData

type EnableServerData struct {
	Name    string
	Enabled bool
}

Data structures for different operation types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager provides a unified interface for storage operations

func NewManager

func NewManager(dataDir string, logger *zap.SugaredLogger) (*Manager, error)

NewManager creates a new storage manager

func (*Manager) AddUpstream

func (m *Manager) AddUpstream(serverConfig *config.ServerConfig) (string, error)

AddUpstream adds an upstream server and returns its ID

func (*Manager) Backup

func (m *Manager) Backup(destPath string) error

Backup creates a backup of the database

func (*Manager) CleanupStaleServerData

func (m *Manager) CleanupStaleServerData(threshold time.Duration) error

CleanupStaleServerData removes data for servers that haven't been seen for a threshold period

func (*Manager) ClearDockerRecoveryState

func (m *Manager) ClearDockerRecoveryState() error

ClearDockerRecoveryState removes the Docker recovery state from persistent storage

func (*Manager) ClearOAuthState

func (m *Manager) ClearOAuthState(serverName string) error

ClearOAuthState clears all OAuth state for a server (tokens, client registration, etc.) This should be called when OAuth configuration changes to force re-authentication

func (*Manager) Close

func (m *Manager) Close() error

Close closes the storage manager

func (*Manager) CloseAllActiveSessions

func (m *Manager) CloseAllActiveSessions() error

CloseAllActiveSessions marks all active sessions as closed This should be called on startup to clean up stale sessions from previous runs

func (*Manager) CloseInactiveSessions

func (m *Manager) CloseInactiveSessions(inactivityTimeout time.Duration) (int, error)

CloseInactiveSessions closes sessions that haven't had activity for the specified duration

func (*Manager) CloseSession

func (m *Manager) CloseSession(sessionID string) error

CloseSession marks a session as closed with end time

func (*Manager) CountActivities

func (m *Manager) CountActivities() (int, error)

CountActivities returns the total number of activity records.

func (*Manager) CreateSession

func (m *Manager) CreateSession(session *SessionRecord) error

CreateSession creates a new session record

func (*Manager) DeleteActivity

func (m *Manager) DeleteActivity(id string) error

DeleteActivity deletes an activity record by ID. Returns nil if the record doesn't exist.

func (*Manager) DeleteToolHash

func (m *Manager) DeleteToolHash(toolName string) error

DeleteToolHash deletes a tool hash

func (*Manager) DeleteUpstreamServer

func (m *Manager) DeleteUpstreamServer(name string) error

DeleteUpstreamServer deletes an upstream server

func (*Manager) EnableUpstreamServer

func (m *Manager) EnableUpstreamServer(name string, enabled bool) error

EnableUpstreamServer enables/disables an upstream server using async operations

func (*Manager) GetActivity

func (m *Manager) GetActivity(id string) (*ActivityRecord, error)

GetActivity retrieves an activity record by ID. Returns nil if the record is not found.

func (*Manager) GetActivityByIDScan

func (m *Manager) GetActivityByIDScan(id string) (*ActivityRecord, error)

GetActivityByIDScan performs a full scan to find activity by ID. This is less efficient than GetActivity but works when the timestamp is unknown.

func (*Manager) GetBoltDB

func (m *Manager) GetBoltDB() *BoltDB

GetBoltDB returns the wrapped BoltDB instance for higher-level operations

func (*Manager) GetDB

func (m *Manager) GetDB() *bbolt.DB

GetDB returns the underlying BBolt database for direct access

func (*Manager) GetOAuthToken

func (m *Manager) GetOAuthToken(serverName string) (*OAuthTokenRecord, error)

GetOAuthToken retrieves an OAuth token for a server from storage

func (*Manager) GetRecentSessions

func (m *Manager) GetRecentSessions(limit int) ([]*SessionRecord, int, error)

GetRecentSessions returns the most recent sessions

func (*Manager) GetSchemaVersion

func (m *Manager) GetSchemaVersion() (uint64, error)

GetSchemaVersion returns the current schema version

func (*Manager) GetServerDiagnostics

func (m *Manager) GetServerDiagnostics(serverID string, limit int) ([]*DiagnosticRecord, error)

GetServerDiagnostics gets diagnostic records for a server

func (*Manager) GetServerIdentity

func (m *Manager) GetServerIdentity(server *config.ServerConfig) (*ServerIdentity, error)

GetServerIdentity gets server identity by config

func (*Manager) GetServerIdentityByID

func (m *Manager) GetServerIdentityByID(serverID string) (*ServerIdentity, error)

GetServerIdentityByID gets server identity by ID

func (*Manager) GetServerStatistics

func (m *Manager) GetServerStatistics(serverID string) (*ServerStatistics, error)

GetServerStatistics gets statistics for a server

func (*Manager) GetServerToolCalls

func (m *Manager) GetServerToolCalls(serverID string, limit int) ([]*ToolCallRecord, error)

GetServerToolCalls gets tool calls for a server

func (*Manager) GetSessionByID

func (m *Manager) GetSessionByID(sessionID string) (*SessionRecord, error)

GetSessionByID retrieves a session by its ID

func (*Manager) GetStats

func (m *Manager) GetStats() (map[string]interface{}, error)

GetStats returns storage statistics

func (*Manager) GetToolCallsBySession

func (m *Manager) GetToolCallsBySession(sessionID string, limit, offset int) ([]*ToolCallRecord, int, error)

GetToolCallsBySession retrieves tool calls filtered by session ID

func (*Manager) GetToolHash

func (m *Manager) GetToolHash(toolName string) (string, error)

GetToolHash retrieves a tool hash

func (*Manager) GetToolStatistics

func (m *Manager) GetToolStatistics(topN int) (*config.ToolStats, error)

GetToolStatistics returns aggregated tool statistics

func (*Manager) GetToolStats

func (m *Manager) GetToolStats(topN int) ([]map[string]interface{}, error)

GetToolStats gets tool statistics formatted for MCP responses

func (*Manager) GetToolUsage

func (m *Manager) GetToolUsage(toolName string) (*ToolStatRecord, error)

GetToolUsage retrieves usage statistics for a tool

func (*Manager) GetUpstreamServer

func (m *Manager) GetUpstreamServer(name string) (*config.ServerConfig, error)

GetUpstreamServer retrieves an upstream server by name

func (*Manager) HasToolChanged

func (m *Manager) HasToolChanged(toolName, currentHash string) (bool, error)

HasToolChanged checks if a tool has changed based on its hash

func (*Manager) IncrementToolUsage

func (m *Manager) IncrementToolUsage(toolName string) error

IncrementToolUsage increments the usage count for a tool

func (*Manager) ListActivities

func (m *Manager) ListActivities(filter ActivityFilter) ([]*ActivityRecord, int, error)

ListActivities returns paginated activity records matching the filter. Records are returned in reverse chronological order (newest first). Returns the records, total matching count, and any error.

func (*Manager) ListOAuthTokens

func (m *Manager) ListOAuthTokens() ([]*OAuthTokenRecord, error)

ListOAuthTokens returns all OAuth token records from storage. Used by RefreshManager to initialize proactive refresh schedules on startup.

func (*Manager) ListQuarantinedTools

func (m *Manager) ListQuarantinedTools(serverName string) ([]map[string]interface{}, error)

ListQuarantinedTools returns tools from quarantined servers with full descriptions for security analysis

func (*Manager) ListQuarantinedUpstreamServers

func (m *Manager) ListQuarantinedUpstreamServers() ([]*config.ServerConfig, error)

ListQuarantinedUpstreamServers returns all quarantined upstream servers

func (*Manager) ListServerIdentities

func (m *Manager) ListServerIdentities() ([]*ServerIdentity, error)

ListServerIdentities lists all server identities

func (*Manager) ListUpstreamServers

func (m *Manager) ListUpstreamServers() ([]*config.ServerConfig, error)

ListUpstreamServers returns all upstream servers

func (*Manager) ListUpstreams

func (m *Manager) ListUpstreams() ([]*config.ServerConfig, error)

ListUpstreams is an alias for ListUpstreamServers

func (*Manager) LoadDockerRecoveryState

func (m *Manager) LoadDockerRecoveryState() (*DockerRecoveryState, error)

LoadDockerRecoveryState loads the Docker recovery state from persistent storage

func (*Manager) PruneExcessActivities

func (m *Manager) PruneExcessActivities(maxRecords int, targetPercent float64) (int, error)

PruneExcessActivities deletes oldest records when count exceeds maxRecords. Deletes records until count is at targetPercent of maxRecords (default 90%). Returns the number of records deleted.

func (*Manager) PruneOldActivities

func (m *Manager) PruneOldActivities(maxAge time.Duration) (int, error)

PruneOldActivities deletes activity records older than the specified duration. Returns the number of records deleted.

func (*Manager) QuarantineUpstreamServer

func (m *Manager) QuarantineUpstreamServer(name string, quarantined bool) error

QuarantineUpstreamServer sets the quarantine status of an upstream server using async operations

func (*Manager) RecordServerDiagnostic

func (m *Manager) RecordServerDiagnostic(record *DiagnosticRecord) error

RecordServerDiagnostic records a diagnostic event for a server

func (*Manager) RecordToolCall

func (m *Manager) RecordToolCall(record *ToolCallRecord) error

RecordToolCall records a tool call for a server

func (*Manager) RegisterServerIdentity

func (m *Manager) RegisterServerIdentity(server *config.ServerConfig, configPath string) (*ServerIdentity, error)

RegisterServerIdentity registers or updates a server identity

func (*Manager) RemoveUpstream

func (m *Manager) RemoveUpstream(id string) error

RemoveUpstream removes an upstream server by ID/name

func (*Manager) SaveActivity

func (m *Manager) SaveActivity(record *ActivityRecord) error

SaveActivity stores an activity record in BBolt. The record is stored with a composite key for efficient time-based queries.

func (*Manager) SaveActivityAsync

func (m *Manager) SaveActivityAsync(record *ActivityRecord)

SaveActivityAsync saves an activity record asynchronously. This is non-blocking and suitable for recording tool calls without impacting latency.

func (*Manager) SaveDockerRecoveryState

func (m *Manager) SaveDockerRecoveryState(state *DockerRecoveryState) error

SaveDockerRecoveryState saves the Docker recovery state to persistent storage

func (*Manager) SaveToolHash

func (m *Manager) SaveToolHash(toolName, hash string) error

SaveToolHash saves a tool hash for change detection

func (*Manager) SaveUpstreamServer

func (m *Manager) SaveUpstreamServer(serverConfig *config.ServerConfig) error

SaveUpstreamServer saves an upstream server configuration

func (*Manager) StreamActivities

func (m *Manager) StreamActivities(filter ActivityFilter) <-chan *ActivityRecord

StreamActivities returns a channel that yields activity records matching the filter. The channel is closed when all matching records have been sent. This is useful for streaming large exports without loading all records into memory.

func (*Manager) UpdateServerStatistics

func (m *Manager) UpdateServerStatistics(stats *ServerStatistics) error

UpdateServerStatistics updates server statistics

func (*Manager) UpdateSessionStats

func (m *Manager) UpdateSessionStats(sessionID string, tokens int) error

UpdateSessionStats increments tool call count and adds tokens

func (*Manager) UpdateUpstream

func (m *Manager) UpdateUpstream(id string, serverConfig *config.ServerConfig) error

UpdateUpstream updates an upstream server configuration

type OAuthAttributes

type OAuthAttributes struct {
	ClientID     string   `json:"client_id"`
	ClientSecret string   `json:"client_secret"` // Note: This might contain secrets
	RedirectURI  string   `json:"redirect_uri"`
	Scopes       []string `json:"scopes"`
	PKCEEnabled  bool     `json:"pkce_enabled"`
}

OAuthAttributes represents stable OAuth configuration attributes

type OAuthCompletionEvent

type OAuthCompletionEvent struct {
	ServerName  string     `json:"server_name"`
	CompletedAt time.Time  `json:"completed_at"`
	ProcessedAt *time.Time `json:"processed_at,omitempty"` // Nil if not yet processed by server
}

OAuthCompletionEvent represents an OAuth completion event for cross-process notification

func (*OAuthCompletionEvent) MarshalBinary

func (e *OAuthCompletionEvent) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*OAuthCompletionEvent) UnmarshalBinary

func (e *OAuthCompletionEvent) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type OAuthTokenRecord

type OAuthTokenRecord struct {
	ServerName   string    `json:"server_name"`            // Storage key (serverName_hash format)
	DisplayName  string    `json:"display_name,omitempty"` // Actual server name (for RefreshManager lookup)
	AccessToken  string    `json:"access_token"`
	RefreshToken string    `json:"refresh_token,omitempty"`
	TokenType    string    `json:"token_type"`
	ExpiresAt    time.Time `json:"expires_at"`
	Scopes       []string  `json:"scopes,omitempty"`
	Created      time.Time `json:"created"`
	Updated      time.Time `json:"updated"`
	// ClientID and ClientSecret are persisted for DCR (Dynamic Client Registration)
	// These are required for token refresh when using DCR-obtained credentials
	ClientID     string `json:"client_id,omitempty"`
	ClientSecret string `json:"client_secret,omitempty"`
	// CallbackPort and RedirectURI are persisted for OAuth redirect URI port persistence (Spec 022)
	// These ensure re-authentication uses the same port registered during DCR
	CallbackPort int    `json:"callback_port,omitempty"` // Port used during DCR for redirect_uri
	RedirectURI  string `json:"redirect_uri,omitempty"`  // Full redirect URI registered with DCR
}

OAuthTokenRecord represents stored OAuth tokens for a server

func (*OAuthTokenRecord) GetServerName

func (r *OAuthTokenRecord) GetServerName() string

GetServerName returns the actual server name for RefreshManager lookup. Falls back to ServerName if DisplayName is not set (for backward compatibility).

func (*OAuthTokenRecord) MarshalBinary

func (o *OAuthTokenRecord) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*OAuthTokenRecord) UnmarshalBinary

func (o *OAuthTokenRecord) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type Operation

type Operation struct {
	Type     string
	Data     interface{}
	ResultCh chan Result
}

Operation represents a queued storage operation

type QuarantineServerData

type QuarantineServerData struct {
	Name        string
	Quarantined bool
}

type QueueFullError

type QueueFullError struct {
	Operation string
}

func (*QueueFullError) Error

func (e *QueueFullError) Error() string

type Result

type Result struct {
	Error error
	Data  interface{}
}

Result contains the result of a storage operation

type ServerAttributes

type ServerAttributes struct {
	Name       string            `json:"name"`        // Server name (required)
	Protocol   string            `json:"protocol"`    // http, stdio, etc.
	URL        string            `json:"url"`         // For HTTP servers
	Command    string            `json:"command"`     // For stdio servers
	Args       []string          `json:"args"`        // For stdio servers
	WorkingDir string            `json:"working_dir"` // Working directory
	Env        map[string]string `json:"env"`         // Environment variables (sorted)
	Headers    map[string]string `json:"headers"`     // HTTP headers (sorted)
	OAuth      *OAuthAttributes  `json:"oauth"`       // OAuth configuration (if present)
}

ServerAttributes represents the stable attributes that define a server's identity

type ServerIdentity

type ServerIdentity struct {
	ID          string            `json:"id"`           // SHA256 hash of stable attributes
	ServerName  string            `json:"server_name"`  // Human-readable name
	Fingerprint string            `json:"fingerprint"`  // Short hash (first 12 chars) for display
	Attributes  ServerAttributes  `json:"attributes"`   // Stable configuration attributes
	FirstSeen   time.Time         `json:"first_seen"`   // When first encountered
	LastSeen    time.Time         `json:"last_seen"`    // When last active
	ConfigPaths []string          `json:"config_paths"` // All configs that have included this server
	Metadata    map[string]string `json:"metadata"`     // Additional metadata
}

ServerIdentity represents a unique server identity based on stable configuration

func NewServerIdentity

func NewServerIdentity(server *config.ServerConfig, configPath string) *ServerIdentity

NewServerIdentity creates a new ServerIdentity from config

func (*ServerIdentity) GetShortID

func (si *ServerIdentity) GetShortID() string

GetShortID returns a shortened version of the ID for display

func (*ServerIdentity) IsStale

func (si *ServerIdentity) IsStale(threshold time.Duration) bool

IsStale returns true if the server hasn't been seen for a long time

func (*ServerIdentity) UpdateLastSeen

func (si *ServerIdentity) UpdateLastSeen(configPath string)

UpdateLastSeen updates the last seen timestamp and adds config path if new

type ServerStatistics

type ServerStatistics struct {
	ServerID            string     `json:"server_id"`
	ServerName          string     `json:"server_name"`
	TotalCalls          int        `json:"total_calls"`
	SuccessfulCalls     int        `json:"successful_calls"`
	ErrorCalls          int        `json:"error_calls"`
	AverageResponseTime int64      `json:"avg_response_time"` // nanoseconds
	LastCallTime        *time.Time `json:"last_call_time,omitempty"`
	UpdatedAt           time.Time  `json:"updated_at"`
}

ServerStatistics represents statistical data for a server

type SessionRecord

type SessionRecord struct {
	ID            string     `json:"id"`
	ClientName    string     `json:"client_name,omitempty"`
	ClientVersion string     `json:"client_version,omitempty"`
	Status        string     `json:"status"`
	StartTime     time.Time  `json:"start_time"`
	EndTime       *time.Time `json:"end_time,omitempty"`
	LastActivity  time.Time  `json:"last_activity"`
	ToolCallCount int        `json:"tool_call_count"`
	TotalTokens   int        `json:"total_tokens"`
	// MCP Client Capabilities
	HasRoots     bool     `json:"has_roots,omitempty"`    // Whether client supports roots
	HasSampling  bool     `json:"has_sampling,omitempty"` // Whether client supports sampling
	Experimental []string `json:"experimental,omitempty"` // Experimental capability names
}

SessionRecord represents a stored MCP session

type TimeoutError

type TimeoutError struct {
	Operation string
}

Error types for async operations

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

type TokenMetrics

type TokenMetrics struct {
	InputTokens     int     `json:"input_tokens"`               // Tokens in the request
	OutputTokens    int     `json:"output_tokens"`              // Tokens in the response
	TotalTokens     int     `json:"total_tokens"`               // Total tokens (input + output)
	Model           string  `json:"model"`                      // Model used for tokenization
	Encoding        string  `json:"encoding"`                   // Encoding used (e.g., cl100k_base)
	EstimatedCost   float64 `json:"estimated_cost,omitempty"`   // Optional cost estimate
	TruncatedTokens int     `json:"truncated_tokens,omitempty"` // Tokens removed by truncation
	WasTruncated    bool    `json:"was_truncated"`              // Whether response was truncated
}

TokenMetrics represents token usage statistics for a tool call

type ToolCallRecord

type ToolCallRecord struct {
	ID               string                  `json:"id"`                           // UUID
	ServerID         string                  `json:"server_id"`                    // Server identity
	ServerName       string                  `json:"server_name"`                  // For quick reference
	ToolName         string                  `json:"tool_name"`                    // Original tool name (without server prefix)
	Arguments        map[string]interface{}  `json:"arguments"`                    // Tool arguments
	Response         interface{}             `json:"response"`                     // Tool response
	Error            string                  `json:"error"`                        // Error if failed
	Duration         int64                   `json:"duration"`                     // Duration in nanoseconds
	Timestamp        time.Time               `json:"timestamp"`                    // When the call was made
	ConfigPath       string                  `json:"config_path"`                  // Which config was active
	RequestID        string                  `json:"request_id"`                   // For correlation
	Metrics          *TokenMetrics           `json:"metrics,omitempty"`            // Token usage metrics (nil for older records)
	ParentCallID     string                  `json:"parent_call_id,omitempty"`     // Links nested calls to parent code_execution
	ExecutionType    string                  `json:"execution_type,omitempty"`     // "direct" or "code_execution"
	MCPSessionID     string                  `json:"mcp_session_id,omitempty"`     // MCP session identifier
	MCPClientName    string                  `json:"mcp_client_name,omitempty"`    // MCP client name from InitializeRequest
	MCPClientVersion string                  `json:"mcp_client_version,omitempty"` // MCP client version
	Annotations      *config.ToolAnnotations `json:"annotations,omitempty"`        // Tool behavior hints snapshot
}

ToolCallRecord represents a tool call with server context

type ToolHashRecord

type ToolHashRecord struct {
	ToolName string    `json:"tool_name"`
	Hash     string    `json:"hash"`
	Updated  time.Time `json:"updated"`
}

ToolHashRecord represents a tool hash for change detection

func (*ToolHashRecord) MarshalBinary

func (h *ToolHashRecord) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*ToolHashRecord) UnmarshalBinary

func (h *ToolHashRecord) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type ToolStatRecord

type ToolStatRecord struct {
	ToolName string    `json:"tool_name"`
	Count    uint64    `json:"count"`
	LastUsed time.Time `json:"last_used"`
}

ToolStatRecord represents tool usage statistics

func (*ToolStatRecord) MarshalBinary

func (t *ToolStatRecord) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*ToolStatRecord) UnmarshalBinary

func (t *ToolStatRecord) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type UnsupportedOperationError

type UnsupportedOperationError struct {
	Operation string
}

UnsupportedOperationError is returned for unknown operation types

func (*UnsupportedOperationError) Error

func (e *UnsupportedOperationError) Error() string

type UpstreamRecord

type UpstreamRecord struct {
	ID          string                  `json:"id"`
	Name        string                  `json:"name"`
	URL         string                  `json:"url,omitempty"`
	Protocol    string                  `json:"protocol,omitempty"` // stdio, http, sse, streamable-http, auto
	Command     string                  `json:"command,omitempty"`
	Args        []string                `json:"args,omitempty"`
	WorkingDir  string                  `json:"working_dir,omitempty"` // Working directory for stdio servers
	Env         map[string]string       `json:"env,omitempty"`
	Headers     map[string]string       `json:"headers,omitempty"` // For HTTP authentication
	OAuth       *config.OAuthConfig     `json:"oauth,omitempty"`   // OAuth configuration
	Enabled     bool                    `json:"enabled"`
	Quarantined bool                    `json:"quarantined"` // Security quarantine status
	Created     time.Time               `json:"created"`
	Updated     time.Time               `json:"updated"`
	Isolation   *config.IsolationConfig `json:"isolation,omitempty"` // Per-server isolation settings
}

UpstreamRecord represents an upstream server record in storage

func (*UpstreamRecord) MarshalBinary

func (u *UpstreamRecord) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*UpstreamRecord) UnmarshalBinary

func (u *UpstreamRecord) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL