Documentation
¶
Overview ¶
Package mcp implements the Model Context Protocol (MCP) client for stdio transport. It spawns MCP-compliant subprocesses, runs the JSON-RPC 2.0 initialize handshake, discovers tools, and bridges them into deepseekcode's tools.Tool interface so the agent can invoke MCP tools alongside built-in ones.
Per D5 (ARCHITECTURE.md §4.5), only stdio transport is supported. Remote transports (HTTP/SSE) and OAuth are deferred to v0.4.
internal/mcp/streamable_http.go
Index ¶
- Constants
- func BridgeAll(reg *Registry) []tools.Tool
- type DriftKind
- type DriftReport
- type HTTPTransport
- type LifecycleState
- type MCPChange
- type McpToolMeta
- type Registry
- func (r *Registry) CallTool(ctx context.Context, fullName string, args json.RawMessage) (string, bool, error)
- func (r *Registry) Connect(ctx context.Context, name, command string, args []string, ...) error
- func (r *Registry) ConnectSSE(ctx context.Context, name, sseURL string) error
- func (r *Registry) PendingSchemaChanges(oldTools []McpToolMeta) []MCPChange
- func (r *Registry) Servers() []*ServerProxy
- func (r *Registry) SetTimeout(name string, seconds int)
- func (r *Registry) SetToolFilter(name string, enabledTools, disabledTools []string)
- func (r *Registry) Shutdown()
- func (r *Registry) Snapshots() []ServerSnapshot
- func (r *Registry) Tools() []McpToolMeta
- type SSETransport
- func (t *SSETransport) Close() error
- func (t *SSETransport) Done() <-chan struct{}
- func (t *SSETransport) Notify(ctx context.Context, method string, params any) error
- func (t *SSETransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)
- func (t *SSETransport) Start(ctx context.Context) error
- type ServerCapabilities
- type ServerProxy
- type ServerSnapshot
- type StdioTransport
- type StreamableHTTPTransport
- func (t *StreamableHTTPTransport) Close() error
- func (t *StreamableHTTPTransport) Done() <-chan struct{}
- func (t *StreamableHTTPTransport) Notify(ctx context.Context, method string, params any) error
- func (t *StreamableHTTPTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)
- type Transport
Constants ¶
const ProtocolVersion = "2025-06-18"
ProtocolVersion is the MCP protocol version this client advertises during the initialize handshake.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DriftReport ¶
DriftReport describes the drift between two tool lists.
func CompareToolLists ¶
func CompareToolLists(before, after []McpToolMeta) DriftReport
CompareToolLists compares before and after tool lists and returns a DriftReport describing the difference. Nil and empty lists are equivalent.
type HTTPTransport ¶
type HTTPTransport struct {
// contains filtered or unexported fields
}
HTTPTransport implements Transport for MCP servers reachable via HTTP POST (JSON-RPC request/response). It does not use SSE and is suitable for servers that accept stateless JSON-RPC over HTTP.
func NewHTTPTransport ¶
func NewHTTPTransport(url string) *HTTPTransport
func (*HTTPTransport) Close ¶
func (t *HTTPTransport) Close() error
func (*HTTPTransport) Done ¶ added in v0.3.7
func (t *HTTPTransport) Done() <-chan struct{}
Done returns a channel that is never closed. Stateless HTTP has no liveness signal — the server is always considered reachable.
func (*HTTPTransport) Send ¶
func (t *HTTPTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)
type LifecycleState ¶
type LifecycleState int
LifecycleState is the connection state of a single MCP server proxy.
const ( StateInitializing LifecycleState = iota StateConnected StateDegraded StateFailed )
func (LifecycleState) String ¶
func (s LifecycleState) String() string
String returns a human-readable state label.
type MCPChange ¶
type MCPChange struct {
Kind string // "tool_added", "tool_removed", "tool_schema_changed"
ToolName string
}
MCPChange describes a tool-level mutation detected between two schema snapshots.
type McpToolMeta ¶
type McpToolMeta struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"inputSchema"`
}
McpToolMeta is the metadata for a single MCP server tool.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages the lifecycle of multiple MCP servers and provides unified tool discovery and calling.
LOCK MODEL: r.mu (the RWMutex) is the SOLE authority over r.servers and every state transition (markDegraded, the reconnect commit, the backoff fields). There is intentionally no per-ServerProxy lock. Network/IO (NewStdioTransport/initialize/listTools, performed by dial) MUST run WITHOUT r.mu held; only the final commit re-takes r.mu and re-validates state before installing a new transport.
func (*Registry) CallTool ¶
func (r *Registry) CallTool(ctx context.Context, fullName string, args json.RawMessage) (string, bool, error)
CallTool dispatches a fully-qualified tool name to the right server and returns the content, isError flag, and any infrastructure error.
func (*Registry) Connect ¶
func (r *Registry) Connect(ctx context.Context, name, command string, args []string, env map[string]string) error
Connect spawns an MCP server, runs initialize, discovers tools, and registers it under the given name. Name must be unique. This is the stdio transport path — use ConnectSSE for SSE servers.
func (*Registry) ConnectSSE ¶ added in v0.3.7
ConnectSSE connects to an MCP server via SSE transport. It opens the SSE event stream, runs initialize, discovers tools, and registers the server under the given name. Name must be unique.
func (*Registry) PendingSchemaChanges ¶
func (r *Registry) PendingSchemaChanges(oldTools []McpToolMeta) []MCPChange
PendingSchemaChanges compares the current tool set against a previous tool list snapshot and returns the list of MCP-level changes.
func (*Registry) Servers ¶
func (r *Registry) Servers() []*ServerProxy
Servers returns a snapshot of all registered server proxies. The returned slice is safe to read but must not be mutated.
func (*Registry) SetTimeout ¶
SetTimeout configures the per-call timeout for a given server.
func (*Registry) SetToolFilter ¶ added in v0.3.7
SetToolFilter configures per-server tool filtering. enabledTools is an allowlist (only listed tools are exposed); disabledTools is a blocklist (listed tools are hidden). At most one may be non-nil; the caller (typically main.go) is responsible for ensuring they are not both set, since config validation already rejects that combination.
func (*Registry) Shutdown ¶
func (r *Registry) Shutdown()
Shutdown stops all watchers and closes all server transports. Order:
- close r.done (once) to signal every watcher to exit;
- close each transport (which also closes its readerDone, unblocking any watcher still parked in its select);
- r.wg.Wait() within a deadline so no watcher goroutine outlives Shutdown — this is what keeps -race and goroutine-leak detection clean.
Double Shutdown is safe: closeDoneOnce guards close(r.done).
func (*Registry) Snapshots ¶ added in v0.3.7
func (r *Registry) Snapshots() []ServerSnapshot
Snapshots returns a deterministic, read-only view of every registered server's status. The returned slice is sorted by server name and each Tools list is sorted by tool name so callers get stable ordering without holding the registry lock. All slice fields are deep-copied.
func (*Registry) Tools ¶
func (r *Registry) Tools() []McpToolMeta
Tools returns all MCP tools with their names prefixed as "mcp__<server>__<tool>". Per-server tool filters (enabled_tools, disabled_tools) are applied before returning.
type SSETransport ¶ added in v0.3.7
type SSETransport struct {
// contains filtered or unexported fields
}
SSETransport implements Transport for MCP servers using Server-Sent Events (SSE) for server→client messages and HTTP POST for client→server messages.
Lifecycle:
- GET the SSE URL to open the event stream.
- The server sends an "endpoint" event containing the POST URL.
- All subsequent Send calls POST JSON-RPC to that URL.
- Responses arrive as "message" events on the SSE stream, routed by request ID to waiting Send callers.
If the SSE stream disconnects, the transport is dead and the Registry's watcher detects it via Done(). The Registry handles reconnect (not the transport itself).
func NewSSETransport ¶ added in v0.3.7
func NewSSETransport(sseURL string) *SSETransport
NewSSETransport creates an SSE transport. The SSE stream is NOT started here — call Start to begin reading. This two-phase init lets the caller wire the transport into the Registry before any goroutines start.
func (*SSETransport) Close ¶ added in v0.3.7
func (t *SSETransport) Close() error
Close shuts down the SSE transport. The SSE reader goroutine will detect the closed body and exit.
func (*SSETransport) Done ¶ added in v0.3.7
func (t *SSETransport) Done() <-chan struct{}
Done returns a channel closed when the SSE stream has ended. The Registry's liveness watcher selects on this to detect server death.
func (*SSETransport) Notify ¶ added in v0.3.7
Notify posts a JSON-RPC notification (no response expected).
func (*SSETransport) Send ¶ added in v0.3.7
func (t *SSETransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)
Send posts a JSON-RPC request and waits for the response to arrive as an SSE "message" event.
type ServerCapabilities ¶
ServerCapabilities is the subset of MCP server capabilities we care about. Only tools matter today; resources and prompts may come later.
type ServerProxy ¶
type ServerProxy struct {
Name string
State LifecycleState
Caps ServerCapabilities
Tools []McpToolMeta
// contains filtered or unexported fields
}
ServerProxy wraps a single MCP server: transport + capabilities + discovered tools. Callers interact through the Registry, not directly.
The liveness/backoff bookkeeping fields (reconnectAttempted, backoffUntil, command, args, env, transportKind, sseURL) are carried per-proxy but are guarded exclusively by Registry.mu — there is intentionally NO per-proxy lock. The proxy is replaced wholesale on reconnect and is only ever read or written while holding Registry.mu, so a per-proxy lock would create lock-ordering hazards with Registry.mu and is unnecessary.
func (*ServerProxy) Close ¶
func (s *ServerProxy) Close() error
Close tears down the proxy's transport.
type ServerSnapshot ¶ added in v0.3.7
type ServerSnapshot struct {
Name string
State LifecycleState
ToolCount int
Tools []string
BackoffUntil time.Time
LastError string
}
ServerSnapshot is a read-only, lock-free view of a single MCP server's status, suitable for TUI overlays and tests. All slice/map fields are deep-copied so callers cannot mutate registry state.
type StdioTransport ¶
type StdioTransport struct {
// contains filtered or unexported fields
}
StdioTransport is a Transport that speaks JSON-RPC 2.0 over a subprocess's stdin/stdout. Each JSON message is a single newline- terminated line. Stderr is not parsed; the subprocess inherits the parent's stderr for diagnostics.
func NewStdioTransport ¶
func NewStdioTransport(ctx context.Context, command string, args []string, env map[string]string) (*StdioTransport, error)
NewStdioTransport spawns command with args and optional env, then starts a reader goroutine to consume lines from its stdout.
func (*StdioTransport) Close ¶
func (t *StdioTransport) Close() error
Close kills the subprocess and waits for it to exit.
func (*StdioTransport) Done ¶
func (t *StdioTransport) Done() <-chan struct{}
Done returns a channel that is closed when the reader goroutine exits (on EOF, scanner error, or process death). This is the liveness signal the Registry's watcher selects on: when the channel closes the server process has stopped producing output and is considered dead. The reader goroutine already closes readerDone in readLoop's defer; Done merely exposes that signal without starting a new goroutine.
func (*StdioTransport) Send ¶
func (t *StdioTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)
Send marshals a request, writes it to stdin, and blocks until the matching response arrives or ctx is cancelled.
type StreamableHTTPTransport ¶ added in v0.4.0
type StreamableHTTPTransport struct {
// contains filtered or unexported fields
}
StreamableHTTPTransport implements Transport for MCP "Streamable HTTP" (spec 2025-03-26): one endpoint, JSON-RPC over POST, with an Mcp-Session-Id header the server establishes and the client echoes on later requests.
func NewStreamableHTTPTransport ¶ added in v0.4.0
func NewStreamableHTTPTransport(url string) *StreamableHTTPTransport
func (*StreamableHTTPTransport) Close ¶ added in v0.4.0
func (t *StreamableHTTPTransport) Close() error
func (*StreamableHTTPTransport) Done ¶ added in v0.4.0
func (t *StreamableHTTPTransport) Done() <-chan struct{}
func (*StreamableHTTPTransport) Send ¶ added in v0.4.0
func (t *StreamableHTTPTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)
type Transport ¶
type Transport interface {
// Send sends a request and blocks until the matching response
// arrives or ctx is cancelled.
Send(ctx context.Context, method string, params any) (json.RawMessage, error)
// Notify fires a notification (no response expected).
Notify(ctx context.Context, method string, params any) error
// Done returns a channel that is closed when the transport's
// underlying connection (subprocess, SSE stream) has ended. The
// Registry's liveness watcher selects on this to detect server
// death. Transports that do not have a liveness signal (e.g.
// stateless HTTP) should return a channel that is never closed.
Done() <-chan struct{}
// Close tears down the transport and releases resources.
Close() error
}
Transport abstracts the JSON-RPC message channel to an MCP server. Implementations handle the wire format (stdio, HTTP, SSE) and lifecycle (connect, close). Callers see only Send/Notify/Close.