mcp

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package mcp implements the Model Context Protocol (MCP) client for stdio transport. It spawns MCP-compliant subprocesses, runs the JSON-RPC 2.0 initialize handshake, discovers tools, and bridges them into deepseekcode's tools.Tool interface so the agent can invoke MCP tools alongside built-in ones.

Per D5 (ARCHITECTURE.md §4.5), only stdio transport is supported. Remote transports (HTTP/SSE) and OAuth are deferred to v0.4.

internal/mcp/streamable_http.go

Index

Constants

View Source
const ProtocolVersion = "2025-06-18"

ProtocolVersion is the MCP protocol version this client advertises during the initialize handshake.

Variables

This section is empty.

Functions

func BridgeAll

func BridgeAll(reg *Registry) []tools.Tool

BridgeAll returns one tools.Tool per registered MCP tool from connected servers. Results are wrapped as mcpAdaptedTool so Execute dispatches through the registry.

Types

type DriftKind

type DriftKind string

DriftKind identifies the type of tool list drift.

const (
	DriftNone       DriftKind = "none"
	DriftAdded      DriftKind = "added"
	DriftRemoved    DriftKind = "removed"
	DriftReordered  DriftKind = "reordered"
	DriftStructured DriftKind = "structured"
)

type DriftReport

type DriftReport struct {
	Kind    DriftKind
	Message string
}

DriftReport describes the drift between two tool lists.

func CompareToolLists

func CompareToolLists(before, after []McpToolMeta) DriftReport

CompareToolLists compares before and after tool lists and returns a DriftReport describing the difference. Nil and empty lists are equivalent.

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport implements Transport for MCP servers reachable via HTTP POST (JSON-RPC request/response). It does not use SSE and is suitable for servers that accept stateless JSON-RPC over HTTP.

func NewHTTPTransport

func NewHTTPTransport(url string) *HTTPTransport

func (*HTTPTransport) Close

func (t *HTTPTransport) Close() error

func (*HTTPTransport) Done added in v0.3.7

func (t *HTTPTransport) Done() <-chan struct{}

Done returns a channel that is never closed. Stateless HTTP has no liveness signal — the server is always considered reachable.

func (*HTTPTransport) Notify

func (t *HTTPTransport) Notify(ctx context.Context, method string, params any) error

func (*HTTPTransport) Send

func (t *HTTPTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)

type LifecycleState

type LifecycleState int

LifecycleState is the connection state of a single MCP server proxy.

const (
	StateInitializing LifecycleState = iota
	StateConnected
	StateDegraded
	StateFailed
)

func (LifecycleState) String

func (s LifecycleState) String() string

String returns a human-readable state label.

type MCPChange

type MCPChange struct {
	Kind     string // "tool_added", "tool_removed", "tool_schema_changed"
	ToolName string
}

MCPChange describes a tool-level mutation detected between two schema snapshots.

type McpToolMeta

type McpToolMeta struct {
	Name        string          `json:"name"`
	Description string          `json:"description"`
	InputSchema json.RawMessage `json:"inputSchema"`
}

McpToolMeta is the metadata for a single MCP server tool.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages the lifecycle of multiple MCP servers and provides unified tool discovery and calling.

LOCK MODEL: r.mu (the RWMutex) is the SOLE authority over r.servers and every state transition (markDegraded, the reconnect commit, the backoff fields). There is intentionally no per-ServerProxy lock. Network/IO (NewStdioTransport/initialize/listTools, performed by dial) MUST run WITHOUT r.mu held; only the final commit re-takes r.mu and re-validates state before installing a new transport.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns an empty Registry.

func (*Registry) CallTool

func (r *Registry) CallTool(ctx context.Context, fullName string, args json.RawMessage) (string, bool, error)

CallTool dispatches a fully-qualified tool name to the right server and returns the content, isError flag, and any infrastructure error.

func (*Registry) Connect

func (r *Registry) Connect(ctx context.Context, name, command string, args []string, env map[string]string) error

Connect spawns an MCP server, runs initialize, discovers tools, and registers it under the given name. Name must be unique. This is the stdio transport path — use ConnectSSE for SSE servers.

func (*Registry) ConnectSSE added in v0.3.7

func (r *Registry) ConnectSSE(ctx context.Context, name, sseURL string) error

ConnectSSE connects to an MCP server via SSE transport. It opens the SSE event stream, runs initialize, discovers tools, and registers the server under the given name. Name must be unique.

func (*Registry) PendingSchemaChanges

func (r *Registry) PendingSchemaChanges(oldTools []McpToolMeta) []MCPChange

PendingSchemaChanges compares the current tool set against a previous tool list snapshot and returns the list of MCP-level changes.

func (*Registry) Servers

func (r *Registry) Servers() []*ServerProxy

Servers returns a snapshot of all registered server proxies. The returned slice is safe to read but must not be mutated.

func (*Registry) SetTimeout

func (r *Registry) SetTimeout(name string, seconds int)

SetTimeout configures the per-call timeout for a given server.

func (*Registry) SetToolFilter added in v0.3.7

func (r *Registry) SetToolFilter(name string, enabledTools, disabledTools []string)

SetToolFilter configures per-server tool filtering. enabledTools is an allowlist (only listed tools are exposed); disabledTools is a blocklist (listed tools are hidden). At most one may be non-nil; the caller (typically main.go) is responsible for ensuring they are not both set, since config validation already rejects that combination.

func (*Registry) Shutdown

func (r *Registry) Shutdown()

Shutdown stops all watchers and closes all server transports. Order:

  1. close r.done (once) to signal every watcher to exit;
  2. close each transport (which also closes its readerDone, unblocking any watcher still parked in its select);
  3. r.wg.Wait() within a deadline so no watcher goroutine outlives Shutdown — this is what keeps -race and goroutine-leak detection clean.

Double Shutdown is safe: closeDoneOnce guards close(r.done).

func (*Registry) Snapshots added in v0.3.7

func (r *Registry) Snapshots() []ServerSnapshot

Snapshots returns a deterministic, read-only view of every registered server's status. The returned slice is sorted by server name and each Tools list is sorted by tool name so callers get stable ordering without holding the registry lock. All slice fields are deep-copied.

func (*Registry) Tools

func (r *Registry) Tools() []McpToolMeta

Tools returns all MCP tools with their names prefixed as "mcp__<server>__<tool>". Per-server tool filters (enabled_tools, disabled_tools) are applied before returning.

type SSETransport added in v0.3.7

type SSETransport struct {
	// contains filtered or unexported fields
}

SSETransport implements Transport for MCP servers using Server-Sent Events (SSE) for server→client messages and HTTP POST for client→server messages.

Lifecycle:

  1. GET the SSE URL to open the event stream.
  2. The server sends an "endpoint" event containing the POST URL.
  3. All subsequent Send calls POST JSON-RPC to that URL.
  4. Responses arrive as "message" events on the SSE stream, routed by request ID to waiting Send callers.

If the SSE stream disconnects, the transport is dead and the Registry's watcher detects it via Done(). The Registry handles reconnect (not the transport itself).

func NewSSETransport added in v0.3.7

func NewSSETransport(sseURL string) *SSETransport

NewSSETransport creates an SSE transport. The SSE stream is NOT started here — call Start to begin reading. This two-phase init lets the caller wire the transport into the Registry before any goroutines start.

func (*SSETransport) Close added in v0.3.7

func (t *SSETransport) Close() error

Close shuts down the SSE transport. The SSE reader goroutine will detect the closed body and exit.

func (*SSETransport) Done added in v0.3.7

func (t *SSETransport) Done() <-chan struct{}

Done returns a channel closed when the SSE stream has ended. The Registry's liveness watcher selects on this to detect server death.

func (*SSETransport) Notify added in v0.3.7

func (t *SSETransport) Notify(ctx context.Context, method string, params any) error

Notify posts a JSON-RPC notification (no response expected).

func (*SSETransport) Send added in v0.3.7

func (t *SSETransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)

Send posts a JSON-RPC request and waits for the response to arrive as an SSE "message" event.

func (*SSETransport) Start added in v0.3.7

func (t *SSETransport) Start(ctx context.Context) error

Start opens the SSE connection and begins reading events. It blocks until the "endpoint" event is received (so postURL is known) or ctx is cancelled.

type ServerCapabilities

type ServerCapabilities struct {
	Tools     bool
	Resources bool
	Prompts   bool
}

ServerCapabilities is the subset of MCP server capabilities we care about. Only tools matter today; resources and prompts may come later.

type ServerProxy

type ServerProxy struct {
	Name  string
	State LifecycleState
	Caps  ServerCapabilities
	Tools []McpToolMeta
	// contains filtered or unexported fields
}

ServerProxy wraps a single MCP server: transport + capabilities + discovered tools. Callers interact through the Registry, not directly.

The liveness/backoff bookkeeping fields (reconnectAttempted, backoffUntil, command, args, env, transportKind, sseURL) are carried per-proxy but are guarded exclusively by Registry.mu — there is intentionally NO per-proxy lock. The proxy is replaced wholesale on reconnect and is only ever read or written while holding Registry.mu, so a per-proxy lock would create lock-ordering hazards with Registry.mu and is unnecessary.

func (*ServerProxy) Close

func (s *ServerProxy) Close() error

Close tears down the proxy's transport.

type ServerSnapshot added in v0.3.7

type ServerSnapshot struct {
	Name         string
	State        LifecycleState
	ToolCount    int
	Tools        []string
	BackoffUntil time.Time
	LastError    string
}

ServerSnapshot is a read-only, lock-free view of a single MCP server's status, suitable for TUI overlays and tests. All slice/map fields are deep-copied so callers cannot mutate registry state.

type StdioTransport

type StdioTransport struct {
	// contains filtered or unexported fields
}

StdioTransport is a Transport that speaks JSON-RPC 2.0 over a subprocess's stdin/stdout. Each JSON message is a single newline- terminated line. Stderr is not parsed; the subprocess inherits the parent's stderr for diagnostics.

func NewStdioTransport

func NewStdioTransport(ctx context.Context, command string, args []string, env map[string]string) (*StdioTransport, error)

NewStdioTransport spawns command with args and optional env, then starts a reader goroutine to consume lines from its stdout.

func (*StdioTransport) Close

func (t *StdioTransport) Close() error

Close kills the subprocess and waits for it to exit.

func (*StdioTransport) Done

func (t *StdioTransport) Done() <-chan struct{}

Done returns a channel that is closed when the reader goroutine exits (on EOF, scanner error, or process death). This is the liveness signal the Registry's watcher selects on: when the channel closes the server process has stopped producing output and is considered dead. The reader goroutine already closes readerDone in readLoop's defer; Done merely exposes that signal without starting a new goroutine.

func (*StdioTransport) Notify

func (t *StdioTransport) Notify(ctx context.Context, method string, params any) error

Notify sends a notification (id-less request, no response).

func (*StdioTransport) Send

func (t *StdioTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)

Send marshals a request, writes it to stdin, and blocks until the matching response arrives or ctx is cancelled.

type StreamableHTTPTransport added in v0.4.0

type StreamableHTTPTransport struct {
	// contains filtered or unexported fields
}

StreamableHTTPTransport implements Transport for MCP "Streamable HTTP" (spec 2025-03-26): one endpoint, JSON-RPC over POST, with an Mcp-Session-Id header the server establishes and the client echoes on later requests.

func NewStreamableHTTPTransport added in v0.4.0

func NewStreamableHTTPTransport(url string) *StreamableHTTPTransport

func (*StreamableHTTPTransport) Close added in v0.4.0

func (t *StreamableHTTPTransport) Close() error

func (*StreamableHTTPTransport) Done added in v0.4.0

func (t *StreamableHTTPTransport) Done() <-chan struct{}

func (*StreamableHTTPTransport) Notify added in v0.4.0

func (t *StreamableHTTPTransport) Notify(ctx context.Context, method string, params any) error

func (*StreamableHTTPTransport) Send added in v0.4.0

func (t *StreamableHTTPTransport) Send(ctx context.Context, method string, params any) (json.RawMessage, error)

type Transport

type Transport interface {
	// Send sends a request and blocks until the matching response
	// arrives or ctx is cancelled.
	Send(ctx context.Context, method string, params any) (json.RawMessage, error)

	// Notify fires a notification (no response expected).
	Notify(ctx context.Context, method string, params any) error

	// Done returns a channel that is closed when the transport's
	// underlying connection (subprocess, SSE stream) has ended. The
	// Registry's liveness watcher selects on this to detect server
	// death. Transports that do not have a liveness signal (e.g.
	// stateless HTTP) should return a channel that is never closed.
	Done() <-chan struct{}

	// Close tears down the transport and releases resources.
	Close() error
}

Transport abstracts the JSON-RPC message channel to an MCP server. Implementations handle the wire format (stdio, HTTP, SSE) and lifecycle (connect, close). Callers see only Send/Notify/Close.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL