mcpx

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: MIT Imports: 19 Imported by: 0

README

mcp-multiplexer

Go Reference CI Go Report Card Latest Release

MCP multiplexer for Go — connect to many Model Context Protocol servers at once, expose one tool list per server kind, and normalize tool arguments across them.

Features

  • Multiple servers, one API. Aggregate tools from any number of MCP servers behind a single Multiplexer.
  • Three transports. stdio (subprocess), http (StreamableHTTP), and sse. Pluggable per-request auth (Bearer / custom header / OAuth2 / your own).
  • Kind grouping. Tag servers with a Kind (e.g. kubernetes, gitlab) and the multiplexer deduplicates tool lists per kind — handy for prompt generation.
  • Argument transformers. Built-in camelCase, joinArrays, singularResourceType. Register your own via WithArgsTransformer.
  • Field maps. Rename argument keys per-server before they hit the wire.
  • Hooks for everything else. Plug in policy enforcement, prompt-injection / drift detection, metrics, caching, eino tool wrapping — without forking the library.
  • Logger-agnostic. A 4-method Logger interface plus zero-config shims for zap and log/slog.

Install

go get github.com/inhuman/mcp-multiplexer

Quick start

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log/slog"
    "os"

    mcpx "github.com/inhuman/mcp-multiplexer"
    "github.com/inhuman/mcp-multiplexer/auth"
    "github.com/inhuman/mcp-multiplexer/log/sloglog"
)

func main() {
    ctx := context.Background()

    cfg := mcpx.MultiplexerConfig{
        Servers: []mcpx.ServerConfig{
            {
                Name:      "fs",
                Kind:      "filesystem",
                Transport: mcpx.TransportStdio,
                Binary:    "mcp-server-filesystem",
                Args:      []string{"/tmp"},
            },
            {
                Name:      "weather",
                Transport: mcpx.TransportHTTP,
                URL:       "https://example.com/mcp",
                Auth:      map[string]any{"token": os.Getenv("WEATHER_TOKEN")},
            },
        },
    }

    mx, err := mcpx.New(ctx, cfg,
        mcpx.WithLogger(sloglog.New(slog.Default())),
        mcpx.WithAuthFunc(auth.Bearer),
    )
    if err != nil {
        panic(err)
    }
    defer mx.Close()

    for _, g := range mx.KindGroups() {
        fmt.Printf("%s: servers=%v tools=%v\n", g.Kind, g.Servers, g.Tools)
    }

    args, _ := json.Marshal(map[string]any{"path": "/tmp/hello.txt"})
    res, err := mx.CallTool(ctx, "fs", "read_file", args)
    if err != nil {
        panic(err)
    }
    fmt.Println(res.Text)
}

Hooks

All hooks are optional. Register any number; they chain in registration order.

// Policy / RBAC — abort or short-circuit before going upstream.
mcpx.WithBeforeCall(func(ctx context.Context, server, tool string, info mcpx.ToolInfo, args json.RawMessage) (context.Context, *mcpx.CallResult, error) {
    if info.Destructive && !isAdmin(ctx) {
        return nil, nil, errors.New("destructive tools require admin")
    }
    return nil, nil, nil
}),

// OTel span — inject span into context, close it in AfterCall.
mcpx.WithBeforeCall(func(ctx context.Context, server, tool string, _ mcpx.ToolInfo, _ json.RawMessage) (context.Context, *mcpx.CallResult, error) {
    span := tracer.Start(ctx, "mcp."+tool)
    return span.Context(), nil, nil
}),

// Prompt-injection / drift detection — sanitize results (text and image parts).
mcpx.WithResultTransform(func(ctx context.Context, server, tool string, info mcpx.ToolInfo, result *mcpx.CallResult) error {
    result.Text = injection.Sanitize(result.Text)
    for i, p := range result.Parts {
        if p.Kind == mcpx.ContentText {
            result.Parts[i].Text = injection.Sanitize(p.Text)
        }
    }
    return nil
}),

// Metrics / events — observe every call with duration and cache status.
mcpx.WithAfterCall(func(ctx context.Context, server, tool string, info mcpx.ToolInfo, args json.RawMessage, res *mcpx.CallResult, err error, dur time.Duration) {
    source := "upstream"
    if mcpx.IsCacheHit(ctx) { source = "cache" }
    metrics.RecordToolCall(server, tool, err, dur, source)
}),

// Tag tools with custom metadata at fetch time.
mcpx.WithMetaEnricher(func(ctx context.Context, server string, info mcpx.ToolInfo) mcpx.ToolInfo {
    if strings.HasPrefix(info.Name, "kubectl_") {
        info.Custom = map[string]string{"category": "k8s"}
    }
    return info
}),

Response caching

The multiplexer includes a bounded in-process LRU cache enabled by default (256 entries, 30 s TTL). A tool is cached when ReadOnly && Idempotent, or when Custom["cacheable"] = "true". Destructive tools are never cached.

// Default cache — no extra options needed.
mx, _ := mcpx.New(ctx, cfg)

// Isolate cache entries per tenant to prevent cross-tenant leaks.
tenantCtx := mcpx.WithCacheScope(ctx, userID)
result, _ := mx.CallTool(tenantCtx, "k8s", "list_pods", nil)

// Check cache hit in AfterCall.
mcpx.WithAfterCall(func(ctx context.Context, ..., dur time.Duration) {
    if mcpx.IsCacheHit(ctx) { /* served from cache */ }
}),

// Custom TTL for a specific tool via MetaEnricher.
mcpx.WithMetaEnricher(func(ctx context.Context, server string, info mcpx.ToolInfo) mcpx.ToolInfo {
    if info.Name == "list_nodes" {
        if info.Custom == nil { info.Custom = map[string]string{} }
        info.Custom["cache_ttl"] = "5m"
    }
    return info
}),

// Plug in Redis or any external cache.
mx, _ = mcpx.New(ctx, cfg, mcpx.WithCache(&redisCache{client: rdb}))

// Disable cache entirely.
mx, _ = mcpx.New(ctx, cfg, mcpx.WithoutCache())

Cache options: WithCache(Cache), WithCacheTTL(d), WithCacheSize(n), WithoutCache(), WithCacheKey(fn).

Rejected-call observability

OnRejectedCall fires before AfterCall on every path that rejects a call before reaching upstream:

mcpx.WithOnRejectedCall(func(ctx context.Context, server, tool string, reason mcpx.RejectReason, err error) {
    metrics.Inc("mcpx.rejected", "reason", string(reason))
}),

Reasons: RejectUnknownServer, RejectUnknownTool, RejectServerDown, RejectBeforeHookAbort.

Connect callback

OnConnect fires once per server after a successful initial connection, before New returns. The tools list is post-MetaEnricher:

mcpx.WithOnConnect(func(server string, tools []mcpx.ToolInfo) {
    log.Printf("connected to %s: %d tools", server, len(tools))
}),

Migrating from v0.3.x

BeforeCallHook
// v0.3.x
func(ctx context.Context, server string, tool mcpx.ToolInfo, args json.RawMessage) error

// v0.4.0
func(ctx context.Context, server, tool string, info mcpx.ToolInfo, args json.RawMessage) (context.Context, *mcpx.CallResult, error)

Return (nil, nil, err) to abort, (nil, result, nil) to short-circuit, (newCtx, nil, nil) to continue.

AfterCallHook
// v0.3.x
func(ctx context.Context, server string, tool mcpx.ToolInfo, args json.RawMessage, result *mcpx.CallResult, callErr error)

// v0.4.0
func(ctx context.Context, server, tool string, info mcpx.ToolInfo, args json.RawMessage, result *mcpx.CallResult, callErr error, duration time.Duration)

AfterCall now fires on all paths including rejections and cache hits.

ResultTransformHook
// v0.3.x
func(ctx context.Context, server string, tool mcpx.ToolInfo, text string) (string, error)

// v0.4.0
func(ctx context.Context, server, tool string, info mcpx.ToolInfo, result *mcpx.CallResult) error

Mutate result.Text and result.Parts in place.

Tool metadata

ToolInfo exposes the standard MCP annotation hints plus a derived Write flag and an open Custom map:

Field Meaning
ReadOnly Tool only reads state.
Write Tool mutates state but is not destructive (derived).
Destructive Tool may make destructive updates (deletes, drops, irreversible).
Idempotent Repeated calls have no additional effect.
OpenWorld Tool interacts with the open world (network, external systems).
Custom User-supplied labels added by a MetaEnricher.

Use these in your BeforeCall hook to drive policy decisions (e.g. require approval for Destructive tools, log every OpenWorld call).

Auth

Per-server authentication is configured declaratively in ServerConfig.Auth (opaque map, parsed from JSON "auth") and applied by a single global AuthFunc registered via mcpx.WithAuthFunc. The library does not interpret the shape of Auth — your AuthFunc does. Two ready-made helpers cover the common cases:

import "github.com/inhuman/mcp-multiplexer/auth"

// Bearer — for {"auth": {"token": "..."}} → Authorization: Bearer <token>
mcpx.WithAuthFunc(auth.Bearer)

// HeaderToken — for {"auth": {"tokenName": "X-MCP-AUTH", "value": "..."}}
//             → X-MCP-AUTH: <value>  (no Bearer prefix)
mcpx.WithAuthFunc(auth.HeaderToken)

For custom schemes (OAuth2 with refresh, AWS SigV4, HMAC, request-scoped JWT) write your own dispatcher:

mcpx.WithAuthFunc(func(ctx context.Context, server string, r *http.Request, data map[string]any) error {
    switch data["scheme"] {
    case "bearer":
        return auth.Bearer(ctx, server, r, data)
    case "oauth2":
        return mySignWithOAuth2(ctx, r, data)
    default:
        return fmt.Errorf("unknown scheme for %s: %v", server, data["scheme"])
    }
})

AuthFunc is called per outbound HTTP request including retries — cache expensive token derivation inside your function. If a server has Auth set but no WithAuthFunc was registered, mcpx.New returns an error before opening any connection (security-relevant misconfig fails loud).

Migrating from v0.0.x

The pre-v0.1.0 ServerConfig.Token / ServerConfig.TokenHeader fields are removed. Translation:

v0.0.x JSON v0.1.0 JSON + Code
{"token": "x"} {"auth": {"token": "x"}} mcpx.WithAuthFunc(auth.Bearer)
{"token": "x", "token_header": "X-MCP-AUTH"} {"auth": {"tokenName": "X-MCP-AUTH", "value": "x"}} mcpx.WithAuthFunc(auth.HeaderToken)

Logger shims

import "github.com/inhuman/mcp-multiplexer/log/zaplog"
mcpx.WithLogger(zaplog.New(zapLogger))

import "github.com/inhuman/mcp-multiplexer/log/sloglog"
mcpx.WithLogger(sloglog.New(slog.Default()))

Or implement the 4-method mcpx.Logger interface yourself.

Filtering

FilterByNames produces a View restricted to a subset of servers — useful when you want to expose only some tools to a particular agent or user without re-connecting:

view, err := mx.FilterByNames([]string{"fs"})
if err != nil { /* ... */ }
res, err := view.CallTool(ctx, "fs", "read_file", args)

Testing

# Unit + in-process integration (default; no docker required):
go test -race -cover ./...

# Full set including container-based stdio lifecycle:
go test -tags integration_docker -race ./...

# Lint with the project config (golangci-lint v2.11.4 pinned):
golangci-lint run

In-process integration tests use the same github.com/modelcontextprotocol/go-sdk that production code targets — no MCP-SDK mocks. Container tests use the public mcp/filesystem image via dockertest; they are gated behind the integration_docker build tag so the default test run never pulls Docker into the dependency graph.

License

MIT — see LICENSE.

Documentation

Overview

Package mcpx is a Model Context Protocol (MCP) multiplexer for Go.

It connects to many MCP servers behind a single Multiplexer object and gives the consumer a uniform API to:

  • aggregate per-server tool lists (with optional kind-based grouping for prompt generation),
  • normalise tool arguments before sending (built-in camelCase / joinArrays / singularResourceType transformers, plus custom ones),
  • intercept calls via BeforeCall / AfterCall / ResultTransform / MetaEnricher hooks for policy, observability, sanitisation, and metadata enrichment,
  • filter the visible server set per consumer (View) without re-establishing connections.

Three transports are supported out of the box: stdio (subprocess), http (StreamableHTTP), and sse. Authentication is pluggable per outbound HTTP/SSE request: declare an opaque `auth` block per server in JSON, register a single AuthFunc via WithAuthFunc, and dispatch on data["scheme"] inside your function. Subpackage github.com/inhuman/mcp-multiplexer/auth ships ready-made helpers for the two most common shapes (Bearer, custom header).

An opt-in health-check supervisor (WithHealthCheck) pings each server on a configurable interval and reconnects with exponential backoff when a server becomes unreachable. The per-server liveness state is queryable via ServerStatus; CallTool short-circuits with ErrServerDown when a server is marked down, avoiding unnecessary timeouts.

The multiplexer automatically subscribes to notifications/tools/list_changed from each connected server and refreshes the per-server tool cache when the server's tool set changes at runtime (e.g. due to plugins, feature flags, or permission changes). An optional WithOnToolsChanged callback notifies the consumer after each refresh that produces a different tool list.

Per-server call timeouts are supported via ServerConfig.CallTimeout; a zero value inherits the global default set via WithCallTimeout (30 s by default).

Typed observability is available via the Metrics interface (RecordCall, RecordToolList). Register an implementation via WithMetrics; the default is a no-op. Panics inside Metrics methods are recovered by the library. The MCP handshake automatically advertises the consuming module's real version (read from build info); it falls back to "dev" when build info is unavailable. Override both name and version with WithClientIdentity.

The library is logger-agnostic via the Logger interface (4 methods). Adapters for go.uber.org/zap and log/slog are provided as separate packages under log/zaplog and log/sloglog so the core stays dependency-light.

The "singularResourceType" transformer accepts a configurable plural→singular map: extend or override the built-in Kubernetes map globally via WithResourceSingular, or per-server via ServerConfig.ResourceSingular (per-server entries win over global, which in turn wins over built-in).

Runnable examples demonstrating common patterns live in the examples/ directory: examples/basic (multi-server setup), examples/policy (BeforeCallHook gate), and examples/redact (ResultTransformHook PII redaction).

See README.md for usage examples and the project constitution for design principles (zero-dependency core, real-dependency testing, secure defaults).

Index

Constants

This section is empty.

Variables

View Source
var ErrServerDown = errors.New("mcpx: server is down")

ErrServerDown is returned by Multiplexer.CallTool when the target server is currently unreachable and has been marked down by the health-check supervisor. Use errors.Is to distinguish it from ErrServerNotFound.

View Source
var ErrServerNotFound = errors.New("mcpx: server not found")

ErrServerNotFound is returned by CallTool when the named server is not registered.

View Source
var ErrToolNotFound = errors.New("mcpx: tool not found")

ErrToolNotFound is returned by CallTool when the named tool is not exposed by the server.

Functions

func BearerRoundTripper

func BearerRoundTripper(token string, base http.RoundTripper) http.RoundTripper

BearerRoundTripper returns an http.RoundTripper that injects an `Authorization: Bearer <token>` header into every request.

This is a low-level helper for users assembling their own *http.Client outside the ServerConfig flow. For the config-driven path, prefer WithAuthFunc together with the auth.Bearer helper from github.com/inhuman/mcp-multiplexer/auth.

func CacheScope added in v0.4.0

func CacheScope(ctx context.Context) string

CacheScope retrieves the scope set by WithCacheScope. Returns "" if not set.

func DefaultResourceSingular

func DefaultResourceSingular() map[string]string

DefaultResourceSingular returns a copy of the built-in plural→singular map used by the "singularResourceType" transformer.

func IsCacheHit added in v0.4.0

func IsCacheHit(ctx context.Context) bool

IsCacheHit reports whether the current call was served from cache. Valid to call from AfterCallHook; always false from BeforeCallHook.

func WithCacheScope added in v0.4.0

func WithCacheScope(ctx context.Context, scope string) context.Context

WithCacheScope injects a per-call scope string into ctx for cache key isolation. Use it to prevent cross-tenant cache collisions.

Types

type AfterCallHook

type AfterCallHook func(ctx context.Context, server, tool string, info ToolInfo, args json.RawMessage, result *CallResult, callErr error, duration time.Duration)

AfterCallHook runs after every tool call, on every code path (success, cache hit, short-circuit, upstream error, ResultTransform error, and all four rejection reasons). Errors returned from this hook are ignored. duration is wall time from CallTool entry.

type ArgsTransformer

type ArgsTransformer string

ArgsTransformer names a transformation applied to tool arguments before sending. Built-in values:

  • "camelCase" — converts snake_case map keys to camelCase (needed for MCP servers like mcp/kubernetes that use camelCase).
  • "joinArrays" — converts string slices into space-joined strings (some servers expect a single argv string).
  • "singularResourceType" — if args contains a "resourceType" string with a known plural form, normalises it to singular (pods→pod, etc.).

Custom names can be registered via WithArgsTransformer.

const (
	ArgsTransformerCamelCase        ArgsTransformer = "camelCase"
	ArgsTransformerJoinArrays       ArgsTransformer = "joinArrays"
	ArgsTransformerSingularResource ArgsTransformer = "singularResourceType"
)

type ArgsTransformers

type ArgsTransformers []ArgsTransformer

ArgsTransformers is an ordered list of transformer names applied left to right.

type AuthFunc added in v0.1.0

type AuthFunc func(ctx context.Context, server string, r *http.Request, data map[string]any) error

AuthFunc applies authentication to an outgoing HTTP/SSE request before it reaches the upstream MCP server.

AuthFunc is invoked once per outbound HTTP request — including each retry attempt performed by the underlying retryable HTTP client. Implementations whose token derivation is expensive (for example OAuth2 client-credentials with refresh) should cache the result internally; the library does not memoise across attempts.

The library calls fn on a *cloned* *http.Request so concurrent callers of the same connection do not race on Header / Body mutations. Mutate r in place; mutating r.Header is the common case but adjusting r.Body or r.URL is also allowed.

data is ServerConfig.Auth — the parsed JSON "auth" block, opaque to the library. The function defines its own shape; missing or malformed fields should yield a descriptive error (the library does not validate it).

Returning a non-nil error aborts the request: the upstream server is NOT contacted; the library wraps the error as `mcpx: auth <server>: <err>` and propagates it to the caller of CallTool.

AuthFunc applies only to HTTP and SSE transports. Stdio transports ignore it because they have no HTTP layer.

Register an AuthFunc with WithAuthFunc. See subpackage github.com/inhuman/mcp-multiplexer/auth for ready-made implementations covering the most common cases (Bearer, custom-header).

type BeforeCallHook

type BeforeCallHook func(ctx context.Context, server, tool string, info ToolInfo, args json.RawMessage) (context.Context, *CallResult, error)

BeforeCallHook runs before a tool call is dispatched to the upstream MCP server. Hooks chain in registration order; the first non-nil result or non-nil error stops the chain.

Return semantics (in priority order):

  • (_, _, err) where err != nil — abort; both non-nil means error wins.
  • (_, result, nil) where result != nil — short-circuit: upstream and ResultTransform are skipped; AfterCall still fires.
  • (newCtx, nil, nil) — continue; newCtx replaces ctx if non-nil.

args is the JSON-encoded payload AFTER all transformers and field maps.

type Cache added in v0.4.0

type Cache interface {
	// Get returns a deep copy of the cached result and true on hit.
	// Returns nil and false on miss or expiry.
	Get(ctx context.Context, key string) (*CallResult, bool)
	// Set stores a deep copy of value with the given TTL.
	// Set with nil value or zero TTL is a no-op.
	Set(ctx context.Context, key string, value *CallResult, ttl time.Duration)
}

Cache is the storage backend for CallTool response caching. Implementations MUST be goroutine-safe and MUST store/return deep copies.

type CallResult

type CallResult struct {
	Text    string
	Parts   []ContentPart
	IsError bool
}

CallResult is the structured outcome of a tool call.

Text is the joined text content of the result. Parts preserves the original content blocks (text, image, etc.) so callers can format them however they want without losing structure.

func (*CallResult) Clone added in v0.4.0

func (r *CallResult) Clone() *CallResult

Clone returns a deep copy of r. Each ContentPart is individually copied; Data and Raw byte slices are cloned to independent allocations. Clone on a nil receiver returns nil.

type ContentKind

type ContentKind string

ContentKind identifies the type of content in a CallResult part.

const (
	ContentText  ContentKind = "text"
	ContentImage ContentKind = "image"
	ContentOther ContentKind = "other"
)

type ContentPart

type ContentPart struct {
	Kind     ContentKind
	Text     string // populated when Kind == ContentText
	MIMEType string // populated when Kind == ContentImage
	Data     []byte // raw image bytes when Kind == ContentImage
	Raw      []byte // JSON-encoded original for ContentOther
}

ContentPart is one block returned from an MCP tool call.

type CustomTransformer

type CustomTransformer func(args map[string]any) map[string]any

CustomTransformer is a user-defined argument transformer registered via WithArgsTransformer and selectable from ServerConfig.ArgsTransformers by the same name.

type ErrInvalidArgs

type ErrInvalidArgs struct {
	BadFields    []string
	SchemaErrors []string
}

ErrInvalidArgs is returned by CallTool when arguments fail validation. BadFields lists argument paths that contain unresolved placeholder values. SchemaErrors lists JSON Schema violations; populated only when WithSchemaValidation is enabled and args do not conform to the tool schema.

func (*ErrInvalidArgs) Error

func (e *ErrInvalidArgs) Error() string

type Field

type Field struct {
	Key   string
	Value any
}

Field is a structured key/value pair logged alongside a message.

func F

func F(key string, value any) Field

F is a convenience constructor for Field.

type KeyFunc added in v0.4.0

type KeyFunc func(ctx context.Context, server, tool string, args json.RawMessage) string

KeyFunc computes the cache key for a call. When registered via WithCacheKey it replaces the built-in canonicalisation entirely.

type KindGroup

type KindGroup struct {
	Kind    string
	Servers []string
	Tools   []string
	Hints   []string
}

KindGroup groups servers of the same kind for prompt generation or routing.

type KindSettings

type KindSettings struct {
	ArgsTransformers ArgsTransformers  `json:"args_transformer,omitempty"`
	FieldMap         map[string]string `json:"field_map,omitempty"`
}

KindSettings holds shared defaults for all servers of a given kind.

type Logger

type Logger interface {
	Debug(msg string, fields ...Field)
	Info(msg string, fields ...Field)
	Warn(msg string, fields ...Field)
	Error(msg string, fields ...Field)
}

Logger is the minimal logging interface mcpx uses. Implement it once and pass via WithLogger. Adapters for zap and log/slog ship in subpackages log/zaplog and log/sloglog respectively.

func NopLogger

func NopLogger() Logger

NopLogger returns a Logger that discards all messages.

type MetaEnricher

type MetaEnricher func(ctx context.Context, server string, info ToolInfo) ToolInfo

MetaEnricher runs once per tool right after the multiplexer fetches the tool list from a server. It can return an updated ToolInfo with extra labels in Custom or adjusted boolean flags. Original input is never nil.

type Metrics added in v0.3.0

type Metrics interface {
	// RecordCall is invoked after every [Multiplexer.CallTool] invocation.
	// dur is the wall-clock time of the upstream MCP call only (argument
	// validation and hook overhead are excluded).
	// err is nil on success and matches the error returned to the caller.
	RecordCall(server, tool string, dur time.Duration, err error)

	// RecordToolList is invoked after every successful tool-list fetch —
	// both on initial connect and after a live notifications/tools/list_changed
	// refresh. count is the number of tools the server currently exposes.
	RecordToolList(server string, count int)
}

Metrics is an optional observability sink for the multiplexer. Implement this interface to receive call-level and tool-list events and forward them to any backend (Prometheus, OpenTelemetry, statsd, …). Register via WithMetrics.

Implementations must be safe for concurrent use. Panics inside any method are recovered by the library and do not propagate to callers.

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields
}

Multiplexer connects to multiple MCP servers and exposes a unified API for listing and invoking tools across them.

func New

func New(ctx context.Context, cfg MultiplexerConfig, opts ...Option) (*Multiplexer, error)

New connects to all servers in cfg, caches their tool lists, and returns a ready Multiplexer. Errors from individual servers are logged but do not prevent the rest from initialising. Inspect ServerNames() to see which servers are live.

func NewFromSessions

func NewFromSessions(ctx context.Context, sessions map[string]*mcp.ClientSession, opts ...Option) *Multiplexer

NewFromSessions builds a Multiplexer from already-connected MCP sessions. Useful in tests and integration harnesses where sessions are established before the multiplexer is constructed.

Each session is queried for its tool list. The constructed Multiplexer's Close() will close the supplied sessions.

func (*Multiplexer) AllTools

func (mx *Multiplexer) AllTools() []ToolInfo

AllTools returns every (server, tool) pair across all connected servers. Order is non-deterministic.

func (*Multiplexer) CallTool

func (mx *Multiplexer) CallTool(ctx context.Context, server, toolName string, argsJSON json.RawMessage) (*CallResult, error)

CallTool invokes the named tool on the named server with the given JSON arguments. It runs all configured BeforeCall/AfterCall/ResultTransform hooks and consults the response cache when the tool is cacheable. The argsJSON parameter must be a JSON object (or empty/nil).

Errors returned:

  • ErrServerNotFound, ErrToolNotFound, ErrServerDown — caller mistake.
  • *ErrInvalidArgs — args contain unresolved placeholders or schema violations.
  • errors from BeforeCallHook are propagated as-is.
  • upstream MCP errors are wrapped via fmt.Errorf("server %s: %w", ...).

func (*Multiplexer) Close

func (mx *Multiplexer) Close()

Close shuts down all MCP sessions and stops underlying subprocesses.

func (*Multiplexer) ConfigHints

func (mx *Multiplexer) ConfigHints() map[string][]string

ConfigHints returns the kind_hints map from MultiplexerConfig (may be nil).

func (*Multiplexer) FilterByNames

func (mx *Multiplexer) FilterByNames(names []string) (*View, error)

FilterByNames returns a view (View) of the multiplexer that exposes only the requested servers. The view shares all sessions and configuration with the parent — no new connections are opened. Closing the view is a no-op; only the parent Multiplexer's Close() shuts down sessions.

Returns an error if any requested server name is unknown.

func (*Multiplexer) KindGroups

func (mx *Multiplexer) KindGroups() []KindGroup

KindGroups returns servers grouped by Kind (or by name if Kind is empty), with deduplicated tool names per group. Groups are sorted by Kind.

func (*Multiplexer) KindsForServers

func (mx *Multiplexer) KindsForServers(names []string) []string

KindsForServers returns unique kinds for the given server names, preserving input order. If a server has no Kind set, its name is used as the kind.

func (*Multiplexer) ServerNames

func (mx *Multiplexer) ServerNames() []string

ServerNames returns the sorted list of live (connected) MCP server names. Servers that are currently down (initial connect failed or lost) are excluded.

func (*Multiplexer) ServerStatus added in v0.2.0

func (mx *Multiplexer) ServerStatus() map[string]ServerState

ServerStatus returns a snapshot of the liveness state of every registered server. When health-check is disabled (WithHealthCheck not called), all values are ServerStateConnected.

func (*Multiplexer) ToolsForServers

func (mx *Multiplexer) ToolsForServers(names []string) []ToolInfo

ToolsForServers returns ToolInfo for tools across the given server names, in input order with stable per-server ordering. Each (server, tool) pair appears once.

type MultiplexerConfig

type MultiplexerConfig struct {
	Servers []ServerConfig `json:"servers"`
	// KindSettings provides shared transformer/field-map defaults for every
	// server with the matching Kind. Per-server settings take precedence.
	KindSettings map[string]KindSettings `json:"kind_settings,omitempty"`
	// KindHints is opaque metadata returned via KindGroup.Hints — useful for
	// downstream prompt generation, e.g. "kubernetes" -> ["use kubectl_logs for logs"].
	KindHints map[string][]string `json:"kind_hints,omitempty"`
}

MultiplexerConfig is the top-level config for New().

type OnConnectFunc added in v0.4.0

type OnConnectFunc func(server string, tools []ToolInfo)

OnConnectFunc is called once per server after the initial successful connection, before New returns. tools is the post-MetaEnricher tool list. Panics are recovered.

type OnReconnectFunc added in v0.2.0

type OnReconnectFunc func(server string, err error)

OnReconnectFunc is called by the health-check supervisor on every reconnect attempt. err is nil when the attempt succeeded, non-nil on failure. It is invoked synchronously from the supervisor goroutine and must not block for extended periods.

type OnRejectedCallFunc added in v0.4.0

type OnRejectedCallFunc func(ctx context.Context, server, tool string, reason RejectReason, err error)

OnRejectedCallFunc is called when CallTool is rejected before dispatch. It fires before AfterCall on rejection paths. Panics are recovered. reason identifies which rejection path was taken.

type OnToolsChangedFunc added in v0.2.0

type OnToolsChangedFunc func(server string, before, after []ToolInfo)

OnToolsChangedFunc is called after a successful tool-list refresh that produces a different set of tools than the previously cached list. server is the name of the server whose tool list changed. before is a snapshot of the tool list prior to the refresh. after is the updated tool list.

The callback runs synchronously from the per-server drain goroutine and must not block for extended periods. Panics inside the callback are recovered by the library; the multiplexer continues operating normally.

type Option

type Option func(*options)

Option configures a Multiplexer at construction time.

func WithAfterCall

func WithAfterCall(h AfterCallHook) Option

WithAfterCall registers a hook that runs after every tool call. Multiple hooks run in registration order; their errors are ignored.

func WithArgsTransformer

func WithArgsTransformer(name string, fn CustomTransformer) Option

WithArgsTransformer registers a custom transformer under the given name. Reference it from ServerConfig.ArgsTransformers (or KindSettings) by name.

func WithAuthFunc added in v0.1.0

func WithAuthFunc(fn AuthFunc) Option

WithAuthFunc registers the global AuthFunc applied to every server whose [ServerConfig.Auth] is non-nil. It is REQUIRED whenever any server has Auth set; otherwise New returns an error before opening any connection.

There is no per-server registry — dispatch on data["scheme"] (or the server name) inside the function if multiple schemes coexist.

Calling WithAuthFunc more than once overwrites the previous value; no chaining is provided.

func WithBeforeCall

func WithBeforeCall(h BeforeCallHook) Option

WithBeforeCall registers a hook that runs before every tool call. Multiple hooks run in registration order and any error aborts the call.

func WithCache added in v0.4.0

func WithCache(c Cache) Option

WithCache replaces the built-in LRU with the given Cache implementation. Passing nil keeps the built-in LRU. WithCache and WithoutCache are mutually exclusive — last one registered wins.

func WithCacheKey added in v0.4.0

func WithCacheKey(fn KeyFunc) Option

WithCacheKey replaces the built-in cache key function. The KeyFunc receives the call context (with scope), server, tool, and canonicalised args.

func WithCacheSize added in v0.4.0

func WithCacheSize(n int) Option

WithCacheSize sets the maximum number of entries in the built-in LRU. Default: 256. Ignored when WithCache is used.

func WithCacheTTL added in v0.4.0

func WithCacheTTL(d time.Duration) Option

WithCacheTTL sets the default TTL for cached results. Default: 30s. Per-tool overrides can be set via ToolInfo.Custom["cache_ttl"].

func WithCallTimeout

func WithCallTimeout(d time.Duration) Option

WithCallTimeout overrides the per-call timeout. Default: 30s.

func WithClientIdentity

func WithClientIdentity(name, version string) Option

WithClientIdentity overrides the MCP client name/version sent during handshake. Default: "mcpx" / library version.

func WithHTTPRetryMax

func WithHTTPRetryMax(n int) Option

WithHTTPRetryMax overrides the maximum retries for HTTP/SSE transports. Default: 5.

func WithHealthCheck added in v0.2.0

func WithHealthCheck(interval time.Duration) Option

WithHealthCheck enables the liveness supervisor. The supervisor probes each server at the given interval using a ListTools call and reconnects with exponential backoff (1 s → 2 s → … → 60 s) on failure. interval must be positive; zero or negative values cause New to return an error. Without this option the supervisor does not start and Multiplexer.ServerStatus always returns ServerStateConnected for every registered server.

func WithLogger

func WithLogger(l Logger) Option

WithLogger attaches a Logger. Default: NopLogger.

func WithMetaEnricher

func WithMetaEnricher(h MetaEnricher) Option

WithMetaEnricher registers a hook that adjusts ToolInfo metadata after the initial fetch. Multiple enrichers chain in registration order.

func WithMetrics added in v0.3.0

func WithMetrics(m Metrics) Option

WithMetrics registers a Metrics implementation that receives call-level and tool-list events. Passing nil is a no-op (leaves the default no-op implementation in place). Calling WithMetrics more than once overwrites the previous value.

func WithOnConnect added in v0.4.0

func WithOnConnect(fn OnConnectFunc) Option

WithOnConnect registers a callback invoked once per server after the initial successful connection, before New returns. tools is the post-MetaEnricher list. Panics recovered.

func WithOnReconnect added in v0.2.0

func WithOnReconnect(fn OnReconnectFunc) Option

WithOnReconnect registers a callback invoked on every reconnect attempt. err is nil on success, non-nil on failure. Registering more than once overwrites the previous value. The callback runs synchronously from the supervisor goroutine and must not block for extended periods.

func WithOnRejectedCall added in v0.4.0

func WithOnRejectedCall(fn OnRejectedCallFunc) Option

WithOnRejectedCall registers an observer called when CallTool is rejected before dispatching to upstream. It fires before AfterCall. Panics recovered.

func WithOnToolsChanged added in v0.2.0

func WithOnToolsChanged(fn OnToolsChangedFunc) Option

WithOnToolsChanged registers a callback invoked after each successful tool-list refresh that changes the cached tool list for a server. The callback receives the server name and before/after snapshots. Registering more than once overwrites the previous value; passing nil clears any previously registered callback.

func WithResourceSingular added in v0.3.0

func WithResourceSingular(m map[string]string) Option

WithResourceSingular merges m into the global custom singular map used by the "singularResourceType" argument transformer. Entries in m override built-in entries with the same key. Passing nil or an empty map is a no-op. Call multiple times to accumulate entries (each call merges, not replaces).

func WithResultTransform

func WithResultTransform(h ResultTransformHook) Option

WithResultTransform registers a hook that may rewrite a successful result's text. Hooks chain in registration order; the first error short-circuits.

func WithSchemaValidation added in v0.4.0

func WithSchemaValidation() Option

WithSchemaValidation enables JSON Schema validation of tool arguments against each tool's InputSchema before the call is dispatched. When a tool declares no InputSchema the check is skipped. Violations are returned as *ErrInvalidArgs with SchemaErrors populated.

func WithoutCache added in v0.4.0

func WithoutCache() Option

WithoutCache disables the response cache entirely. Takes priority over WithCache if called after it.

type RejectReason added in v0.4.0

type RejectReason string

RejectReason identifies why a CallTool request was rejected before reaching the upstream MCP server.

const (
	RejectUnknownServer   RejectReason = "unknown_server"
	RejectUnknownTool     RejectReason = "unknown_tool"
	RejectServerDown      RejectReason = "server_down"
	RejectBeforeHookAbort RejectReason = "before_hook_abort"
	RejectInvalidArgs     RejectReason = "invalid_args"
)

type ResultTransformHook

type ResultTransformHook func(ctx context.Context, server, tool string, info ToolInfo, result *CallResult) error

ResultTransformHook runs after a successful upstream tool call and mutates *CallResult in place. It can modify Text, Parts, and IsError (e.g. PII redaction, prompt-injection filtering across image parts). Returning an error aborts the call; AfterCall still fires with the error.

type ServerConfig

type ServerConfig struct {
	Name string `json:"name"`
	// Kind is an optional semantic label grouping servers of the same type
	// (e.g. "kubernetes", "gitlab"). Empty kind is treated as a unique kind
	// equal to the server name.
	Kind      string        `json:"kind,omitempty"`
	Transport TransportType `json:"transport"`

	// Stdio only.
	Binary string   `json:"binary,omitempty"`
	Args   []string `json:"args,omitempty"`
	Env    []string `json:"env,omitempty"` // additional env vars for the subprocess

	// HTTP/SSE only.
	URL string `json:"url,omitempty"`

	// CallTimeout is the maximum duration allowed for a single tool call to
	// this server. A zero or negative value inherits the multiplexer-wide
	// default set via [WithCallTimeout] (default 30 s).
	//
	// Use a shorter value for local stdio servers and a longer value for HTTP
	// servers that may need retries.
	CallTimeout time.Duration `json:"call_timeout,omitempty"`

	// Auth is an opaque parameter block read verbatim from the JSON "auth"
	// field. The library does not interpret its shape — it is forwarded
	// as-is to the AuthFunc registered via [WithAuthFunc].
	//
	// When non-nil and no AuthFunc is registered, [New] returns an error
	// before opening any connection. Auth applies only to HTTP and SSE
	// transports; stdio servers ignore it.
	//
	// Helpers covering the two most common shapes
	// ({"token":"..."} and {"tokenName":"...","value":"..."}) ship in
	// subpackage github.com/inhuman/mcp-multiplexer/auth.
	Auth map[string]any `json:"auth,omitempty"`

	// ArgsTransformers is an ordered list of transformations applied to tool
	// arguments before sending. Built-in names: "camelCase", "joinArrays",
	// "singularResourceType". Custom names registered via
	// WithArgsTransformer are also resolved here.
	ArgsTransformers ArgsTransformers `json:"args_transformer,omitempty"`
	// FieldMap renames argument keys (top-level only) before sending.
	FieldMap map[string]string `json:"field_map,omitempty"`
	// ResourceSingular is an optional per-server override map for the
	// "singularResourceType" transformer. Entries here win over both the
	// global custom map (WithResourceSingular) and the built-in map.
	// A nil value means no per-server override.
	ResourceSingular map[string]string `json:"resource_singular,omitempty"`
}

ServerConfig describes one MCP server.

type ServerState added in v0.2.0

type ServerState string

ServerState reports the observed liveness of a single MCP server. It is set to ServerStateConnected at construction and updated by the health-check supervisor when WithHealthCheck is used.

const (
	// ServerStateConnected means the server is reachable and calls proceed normally.
	ServerStateConnected ServerState = "connected"
	// ServerStateDown means the last health probe failed; [Multiplexer.CallTool]
	// returns [ErrServerDown] immediately for this server.
	ServerStateDown ServerState = "down"
)

type ToolInfo

type ToolInfo struct {
	Server      string
	Name        string
	Description string
	InputSchema []byte // raw JSON schema

	ReadOnly    bool
	Write       bool
	Destructive bool
	Idempotent  bool
	OpenWorld   bool

	Custom map[string]string
}

ToolInfo holds cached metadata about a single tool from an MCP server.

The boolean flags map to MCP tool annotations (ReadOnlyHint, DestructiveHint, IdempotentHint, OpenWorldHint). Write is a derived flag — true when the tool is not read-only and not destructive (i.e. a non-destructive mutation).

Custom is an open extension point for user-supplied labels added by a MetaEnricher hook (e.g. "category=database", "owner=team-a").

type TransportType

type TransportType string

TransportType selects the underlying MCP transport.

const (
	TransportStdio TransportType = "stdio"
	TransportHTTP  TransportType = "http"
	TransportSSE   TransportType = "sse"
)

type View

type View struct {
	// contains filtered or unexported fields
}

View is a subset of a Multiplexer scoped to a fixed list of server names. Methods mirror the parent's read API so a view can be passed to code that expects a Multiplexer-like surface for a restricted server set.

func (*View) CallTool

func (v *View) CallTool(ctx context.Context, server, tool string, argsJSON json.RawMessage) (*CallResult, error)

CallTool delegates to the parent multiplexer after verifying the server is part of this view. Returns ErrServerNotFound if the server is hidden.

func (*View) ServerNames

func (v *View) ServerNames() []string

ServerNames returns the names visible through this view, in input order.

func (*View) Tools

func (v *View) Tools() []ToolInfo

Tools returns ToolInfo for all tools across the view's servers.

Directories

Path Synopsis
Package auth provides ready-made [mcpx.AuthFunc] implementations for the two most common authentication shapes carried in [mcpx.ServerConfig.Auth].
Package auth provides ready-made [mcpx.AuthFunc] implementations for the two most common authentication shapes carried in [mcpx.ServerConfig.Auth].
examples
basic command
Package main demonstrates basic mcpx usage: connecting to multiple MCP servers and calling a tool.
Package main demonstrates basic mcpx usage: connecting to multiple MCP servers and calling a tool.
policy command
Package main demonstrates using BeforeCallHook as a policy gate: any tool marked Destructive is blocked before the call reaches the upstream server.
Package main demonstrates using BeforeCallHook as a policy gate: any tool marked Destructive is blocked before the call reaches the upstream server.
redact command
Package main demonstrates using ResultTransformHook for PII redaction: any SSN pattern in tool result text is replaced with [REDACTED].
Package main demonstrates using ResultTransformHook for PII redaction: any SSN pattern in tool result text is replaced with [REDACTED].
internal
testutil/capturelog
Package capturelog provides an in-memory implementation of mcpx.Logger that records every event for later assertion.
Package capturelog provides an in-memory implementation of mcpx.Logger that records every event for later assertion.
testutil/dockertarget command
Command dockertarget is a minimal stdio MCP server used for the transport_integration_test (real subprocess lifecycle) and for the integration_docker test set.
Command dockertarget is a minimal stdio MCP server used for the transport_integration_test (real subprocess lifecycle) and for the integration_docker test set.
testutil/mcptest
Package mcptest provides an in-process MCP server factory for use from tests inside the mcp-multiplexer module.
Package mcptest provides an in-process MCP server factory for use from tests inside the mcp-multiplexer module.
log
sloglog
Package sloglog adapts a *log/slog.Logger to the mcpx.Logger interface.
Package sloglog adapts a *log/slog.Logger to the mcpx.Logger interface.
zaplog
Package zaplog adapts a *go.uber.org/zap.Logger to the mcpx.Logger interface.
Package zaplog adapts a *go.uber.org/zap.Logger to the mcpx.Logger interface.
Package policy provides ready-made BeforeCallHook and AfterCallHook builders for common call-control patterns.
Package policy provides ready-made BeforeCallHook and AfterCallHook builders for common call-control patterns.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL