mcpc

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 22 Imported by: 0

README

mcpc is an MCP Client for Go

一个轻量级的 Model Context Protocol (MCP) 客户端实现,支持:

  • ✅ WebSocket (WS) 传输
  • ✅ Server-Sent Events (SSE) 传输
  • ✅ JSON-RPC 2.0 调用与通知
  • ✅ 自动 pending 管理与断线清理
  • ✅ Hook 调试接口
  • ✅ Context 控制超时与重连

安装

go get github.com/llmdo/mcpc

快速开始

WebSocket
transport := mcpc.NewWSTransport("ws://localhost:8080/mcp", opts)
client := mcpc.NewMCPClient(transport, &mcpc.DialOptions{}, opts)
defer client.Close()

resp, err := client.Call(context.Background(), "ping", nil)
SSE
transport := mcpc.NewSSETransport("http://localhost:8080/mcp", opts)
client := mcpc.NewMCPClient(transport, &mcpc.DialOptions{}, opts)
defer client.Close()

_ = client.Notify(context.Background(), "log", map[string]any{"msg": "hello"})
STDIO
exePath, err := os.Executable()
if err != nil {
    log.Fatal(err)
} else {
    fmt.Println(exePath)
}

opts := (&mcpc.DialOptions{
    Headers:        http.Header{"X-Client": []string{"go-mcp-demo"}},
    OnDisconnected: func(e error) { log.Printf("[transport] disconnected: %v", e) },
    OnReconnected:  func() { log.Printf("[transport] reconnected") },
}).WithDefaults()
// 初始化 SSE 传输层
transport, err := mcpc.NewStdioSubprocess(
    filepath.Join(filepath.Dir(exePath), "../mcpserver_for_test/mcpserver_for_test.exe"), 
    []string{"stdio"}, 
    opts)
if err != nil {
    log.Fatal(err)
}

// 设置 Hook
hooks := &mcpc.ClientHooks{
    OnSend: func(id, method string) {
        log.Printf("[send] id=%s method=%s", id, method)
    },
    OnResponse: func(id string, err *mcpc.RPCError) {
        log.Printf("[resp] id=%s err=%v", id, err)
    },
    OnDisconnect: func(temp bool) {
        log.Printf("[disconnect] temporary=%v", temp)
    },
}

// 创建客户端
client := mcpc.NewMCPClient(transport, opts, hooks)
defer client.Close()

// 注册通知回调
client.SetNotificationHandler(func(method string, params json.RawMessage) {
    log.Printf("[notify] method=%s params=%s", method, string(params))
})

// 发起 RPC 请求
// !!! sse示例的方法在ws都可以使用,Call CallBatch ToolsCall ToolsList 都是可用的方法
resp, err := client.ToolsCall(context.Background(), "echo", map[string]any{"message": "Hello from MCP client"})
if err != nil {
    log.Fatalf("call failed: %v", err)
}
fmt.Println("pong:", string(resp))

// 发送通知
_ = client.Notify(context.Background(), "log", map[string]string{"msg": "hello from SSE"})

time.Sleep(3 * time.Second)

resp, err = client.ToolsCall(context.Background(), "uppercase", map[string]any{"message": "Hello from MCP client"})
if err != nil {
    log.Fatalf("call failed: %v", err)
}
fmt.Println("pong:", string(resp))

特性

  • Call:RPC 请求,带超时
  • Notify:通知,不等待返回
  • CallBatch:批量请求
  • Hook:调试事件
  • Context:超时控制 & 强制退出重连
  • IsConnected:连接状态检查

Documentation

Index

Constants

View Source
const (
	ClientTransportClosedCode = -32099 // 客户端内部:传输关闭
)
View Source
const InternalDisconnectedMethod = "$transport/disconnected"
View Source
const JsonrpcVersion = "2.0"

Variables

View Source
var (
	// 可用于识别:传输层已关闭或不可用(可重试)
	ErrTransportClosed = errors.New("transport closed")
)

Functions

func EnvStr

func EnvStr(k, def string) string

*

** 对外作为封装使用环境变量的函数

func MustJSON

func MustJSON(v any) []byte

Types

type ClientHooks

type ClientHooks struct {
	OnSend       func(id, method string)
	OnResponse   func(id string, err *RPCError)
	OnNotify     func(method string)
	OnDisconnect func(temporary bool)
}

type DialOptions

type DialOptions struct {
	// 通用
	Headers        http.Header
	AuthToken      string        // 可选:Bearer
	RequestTimeout time.Duration // 每次调用的默认超时

	// 心跳(WS)
	PingInterval time.Duration
	PongWait     time.Duration

	// 回调(已有)
	OnDisconnected func(error)
	OnReconnected  func()

	// New hooks
	OnMessage          func([]byte)                             // 每条收到的消息(transport 层)
	OnReconnectAttempt func(attempt int, backoff time.Duration) // 每次尝试重连时触发(用于 metrics / debug)

	// TLS / Transport
	InsecureSkipVerify bool

	// SSE: 分离的发送与事件流地址
	SSEEventsURL string // e.g. http(s)://host/mcp/events

	// HTTPClient(可注入自定义传输/代理)
	HTTPClient *http.Client

	// 重试
	MaxRetries int

	// 重连策略
	ReconnectInitialBackoff time.Duration // e.g. 500ms
	ReconnectMaxBackoff     time.Duration // e.g. 10s

	// CancelCtx allows caller to cancel all reconnect attempts and in-flight connect requests.
	CancelCtx context.Context

	// PingFailureThreshold indicates how many consecutive ping write failures are tolerated before
	// declaring the connection unhealthy and closing it to trigger a reconnect. Default: 3
	PingFailureThreshold int
}

func (*DialOptions) WithDefaults

func (o *DialOptions) WithDefaults() *DialOptions

type MCPClient

type MCPClient struct {
	// contains filtered or unexported fields
}

func NewMCPClient

func NewMCPClient(t Transport, opts *DialOptions, hooks *ClientHooks) *MCPClient

func (*MCPClient) Call

func (c *MCPClient) Call(ctx context.Context, method string, params any) (*RPCResponse, error)

func (*MCPClient) CallBatch

func (c *MCPClient) CallBatch(ctx context.Context, batch []RPCRequest) ([]RPCResponse, error)

CallBatch:使用轮询等待响应,避免 goroutine 爆炸

func (*MCPClient) Close

func (c *MCPClient) Close() error

func (*MCPClient) IsConnected

func (c *MCPClient) IsConnected() bool

func (*MCPClient) NextID

func (c *MCPClient) NextID() string

func (*MCPClient) Notify

func (c *MCPClient) Notify(ctx context.Context, method string, params any) error

func (*MCPClient) SetNotificationHandler

func (c *MCPClient) SetNotificationHandler(h NotificationHandler)

func (*MCPClient) ToolsCall

func (c *MCPClient) ToolsCall(ctx context.Context, name string, params any) (json.RawMessage, error)

func (*MCPClient) ToolsList

func (c *MCPClient) ToolsList(ctx context.Context) (*ToolsListResult, error)

type NotificationHandler

type NotificationHandler func(method string, params json.RawMessage)

type RPCError

type RPCError struct {
	Code    int             `json:"code"`
	Message string          `json:"message"`
	Data    json.RawMessage `json:"data,omitempty"`
}

func (*RPCError) Error

func (e *RPCError) Error() string

type RPCRequest

type RPCRequest struct {
	JSONRPC string           `json:"jsonrpc"`
	ID      *string          `json:"id,omitempty"`     // notification 时为 nil
	Method  string           `json:"method"`           // e.g. "tools/call", "tools/list"
	Params  *json.RawMessage `json:"params,omitempty"` // 任意 JSON
}

type RPCResponse

type RPCResponse struct {
	JSONRPC string           `json:"jsonrpc"`
	ID      *string          `json:"id,omitempty"`
	Result  *json.RawMessage `json:"result,omitempty"`
	Error   *RPCError        `json:"error,omitempty"`
}

type Reconnector

type Reconnector struct {
	// contains filtered or unexported fields
}

func NewReconnector

func NewReconnector(opts *DialOptions) *Reconnector

func (*Reconnector) Manage

func (r *Reconnector) Manage(connectAndServe func(connected chan<- struct{}) error, recvC chan<- []byte, alive *atomic.Bool, stopCh <-chan struct{})

Manage runs a loop that calls connectAndServe; when connectAndServe signals 'connected' (by writing to the channel), Manage marks alive, calls OnReconnected, and waits until connectAndServe returns (connection closed). On errors it sends internal-notes and invokes OnDisconnected, then waits with backoff before retrying.

func (*Reconnector) WaitForReconnect

func (r *Reconnector) WaitForReconnect(ctx context.Context) bool

WaitForReconnect waits until the next successful reconnect or ctx.Done(). Returns true if a reconnect happened.

type SSETransport

type SSETransport struct {
	// contains filtered or unexported fields
}

func NewSSETransport

func NewSSETransport(postURL string, opts *DialOptions) (*SSETransport, error)

func (*SSETransport) Close

func (t *SSETransport) Close() error

func (*SSETransport) IsConnected

func (t *SSETransport) IsConnected() bool

func (*SSETransport) Recv

func (t *SSETransport) Recv() <-chan []byte

func (*SSETransport) Send

func (t *SSETransport) Send(ctx context.Context, payload []byte) error

type StdioTransport

type StdioTransport struct {
	// contains filtered or unexported fields
}

func NewStdioSubprocess

func NewStdioSubprocess(serverPath string, args []string, opts *DialOptions) (*StdioTransport, error)

启动一个子进程,并使用其 stdio 作为传输层

func NewStdioTransport

func NewStdioTransport(r io.Reader, w io.Writer, opts *DialOptions) *StdioTransport

func (*StdioTransport) Close

func (t *StdioTransport) Close() error

func (*StdioTransport) IsConnected

func (t *StdioTransport) IsConnected() bool

func (*StdioTransport) Recv

func (t *StdioTransport) Recv() <-chan []byte

func (*StdioTransport) Send

func (t *StdioTransport) Send(ctx context.Context, payload []byte) error

type ToolInfo

type ToolInfo struct {
	Name        string           `json:"name"`
	Description string           `json:"description,omitempty"`
	InputSchema *json.RawMessage `json:"inputSchema,omitempty"`
}

type ToolsListResult

type ToolsListResult struct {
	Tools []ToolInfo `json:"tools"`
}

type Transport

type Transport interface {
	Send(ctx context.Context, payload []byte) error
	Recv() <-chan []byte
	Close() error
	IsConnected() bool
}

type TransportError

type TransportError struct {
	Op        string // 读 / 写 / 连接
	Err       error  // 原始错误
	Temporary bool   // 是否临时错误(建议重试)
}

TransportError 用于标识传输层错误(可能可重试)

func (*TransportError) Error

func (e *TransportError) Error() string

func (*TransportError) Unwrap

func (e *TransportError) Unwrap() error

type WebSocketTransport

type WebSocketTransport struct {
	// contains filtered or unexported fields
}

func NewWebSocketTransport

func NewWebSocketTransport(urlStr string, opts *DialOptions) (*WebSocketTransport, error)

func (*WebSocketTransport) Close

func (t *WebSocketTransport) Close() error

func (*WebSocketTransport) IsConnected

func (t *WebSocketTransport) IsConnected() bool

func (*WebSocketTransport) Recv

func (t *WebSocketTransport) Recv() <-chan []byte

func (*WebSocketTransport) Send

func (t *WebSocketTransport) Send(ctx context.Context, payload []byte) error

Directories

Path Synopsis
examples
sse_mcpc command
stdio_mcpc command
ws_mcpc command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL