MCP Client
title: MCP Client
Section titled “title: MCP Client”MCP Client
Section titled “MCP Client”Source: src/plugins/mcp/, src/helpers/mcp.ts.
Purpose and responsibilities
Section titled “Purpose and responsibilities”A client-side Model Context Protocol (MCP) integration. Connects to MCP servers, lists
their tools, and exposes each as a normal AgentTool so the model calls them like any
function tool and the AgentLoop executes them via tools/call. Works uniformly across
all five providers — the model never sees provider-specific MCP surfaces.
Three transports: stdio (spawn a local process; Node/Bun only), Streamable HTTP
(POST-based, cross-env including browser), WebSocket (wss://, cross-env).
Zero runtime dependencies. JSON-RPC 2.0 and all transports are hand-rolled. The reference
SDK (official-skds/mcp-ts v1.29.0) is the correctness oracle.
Does NOT implement the MCP server role — this is a client-only implementation.
Architecture
Section titled “Architecture”src/plugins/mcp/ types.ts MCP wire types + McpServerConfig (HttpConfig / StdioConfig) jsonrpc.ts McpErrorCode constants + McpError class transport.ts McpTransport interface + IncomingMcpHandlers base-transport.ts BaseJsonRpcTransport (shared pending-map, routing, timeout) transport-stdio.ts StdioTransport — node:child_process, NDJSON, browser-guarded transport-http.ts HttpTransport — Streamable HTTP via engine.fetch (cross-env) transport-ws.ts WsTransport — WebSocket via engine.connect (cross-env) client.ts McpClient — connect/listTools/callTool/close + P2/P4 methods tools.ts MCP tool -> AgentTool adapter; mcpContentToResult; mcpPromptToMessages sampling.ts samplingHandler — fulfill server's sampling/createMessage oauth.ts McpOAuth — OAuth 2.1 + PKCE + DCR + SSRF guard url-guard.ts assertSafeAuthUrl + McpSsrfError (SSRF protection) win-spawn.ts windowsSpawnPlan — Windows .cmd/.bat spawn resolutionsrc/helpers/mcp.ts connectMcp() / mcpToolset() / finishMcpAuth() — public APIProtocol constants (src/plugins/mcp/types.ts)
Section titled “Protocol constants (src/plugins/mcp/types.ts)”const MCP_PROTOCOL_VERSION = '2025-11-25';Advertised in initialize. The client accepts any version returned by the server in the
initialize result (no strict version gating on responses).
JSON-RPC 2.0 response shape:
interface JsonRpcResponse { jsonrpc: '2.0'; id: number | string | null; result?: unknown; error?: JsonRpcError;}interface JsonRpcError { code: number; message: string; data?: unknown; }McpError and error codes (src/plugins/mcp/jsonrpc.ts)
Section titled “McpError and error codes (src/plugins/mcp/jsonrpc.ts)”const McpErrorCode = { ConnectionClosed: -32000, RequestTimeout: -32001, ParseError: -32700, InvalidRequest: -32600, MethodNotFound: -32601, InvalidParams: -32602, InternalError: -32603,} as const;
class McpError extends Error { readonly code: number; readonly data?: unknown; constructor(err: JsonRpcError)}McpError is thrown for protocol-level failures (server unreachable, JSON-RPC error
response, timeout). Tool execution failures arrive as a normal result with isError: true
and are NOT thrown — mcpContentToResult surfaces them as error text to the model.
Transport interface (src/plugins/mcp/transport.ts)
Section titled “Transport interface (src/plugins/mcp/transport.ts)”interface McpTransport { start(): Promise<void>; request(method: string, params?: unknown): Promise<unknown>; notify(method: string, params?: unknown): Promise<void>; setHandlers(handlers: IncomingMcpHandlers): void; setProtocolVersion?(version: string): void; listen?(): Promise<void> | void; close(): Promise<void>;}
interface IncomingMcpHandlers { onRequest?: (method: string, params: unknown) => Promise<unknown>; onNotification?: (method: string, params: unknown) => void;}listen() opens the server-to-client channel. For stdio it is a no-op (the process
stdout is already duplex). For HTTP it opens a GET SSE stream. McpClient.connect calls
transport.listen?.() after the initialize handshake.
BaseJsonRpcTransport (src/plugins/mcp/base-transport.ts)
Section titled “BaseJsonRpcTransport (src/plugins/mcp/base-transport.ts)”Abstract base class shared by all three transports. Owns:
nextId: number— monotonic JSON-RPC request id counter.pending: Map<number, Pending>— in-flight requests waiting for a response.handlers: IncomingMcpHandlers— set viasetHandlers().
interface Pending { resolve: (v: unknown) => void; reject: (e: unknown) => void; timer: ReturnType<typeof setTimeout>;}routeIncoming(msg) dispatches a parsed message:
msg.idpresent,msg.methodabsent → response:resolveResponse(msg).msg.idandmsg.methodboth present → server-initiated request:handleRequest(msg).msg.methodpresent,msg.idabsent → notification:handlers.onNotification?.().
registerPending(id, resolve, reject, timeoutMs, method) arms a timer. On timeout,
removes from pending and rejects with McpError({ code: RequestTimeout }).
resolveResponse(msg) looks up msg.id in pending, clears the timer, and either
resolves with msg.result or rejects with new McpError(msg.error).
handleRequest(msg) calls handlers.onRequest and sends back { jsonrpc, id, result } or { jsonrpc, id, error } via the abstract sendMessage(obj). If no handler
is registered, responds with MethodNotFound.
failAll(err) rejects all in-flight pending requests (called on transport close / process
exit).
Concrete transports implement:
protected abstract sendMessage(obj: unknown): void | Promise<void>— write a serialized JSON-RPC object to the peer.
StdioTransport (src/plugins/mcp/transport-stdio.ts)
Section titled “StdioTransport (src/plugins/mcp/transport-stdio.ts)”Node/Bun only. Spawns the MCP server as a child process.
Startup (start())
Section titled “Startup (start())”- Lazy-loads
node:child_processvianodeChildProcess()(browser guard). - On Windows: calls
windowsSpawnPlan(file, args, env, existsSync)fromsrc/plugins/mcp/win-spawn.tsto resolve.cmd/.batpaths and route throughcmd.exewithwindowsVerbatimArguments: true. - Calls
cp.spawn(file, args, { env: safeEnv() + config.env, stdio: ['pipe', 'pipe', 'inherit'], windowsHide: true }). safeEnv()(module-level function) copies only essential env keys (PATH, HOME, etc.) — platform-branched between Windows and POSIX. Prevents leaking PYTHONHOME, NODE_PATH, and other version-manager vars into the child.- Registers
'data'handler onproc.stdoutfor NDJSON parsing. - Registers
'exit'and'error'handlers to callfailAll().
NDJSON parsing (onData, parseAndRoute)
Section titled “NDJSON parsing (onData, parseAndRoute)”Accumulates chunks in this.buffer. Splits on \n, strips \r, skips empty lines.
Each complete line is JSON.parsed and dispatched to routeIncoming. Malformed JSON
lines are silently dropped — server stderr is inherited so diagnostics appear in the
parent’s stderr.
Shutdown (close())
Section titled “Shutdown (close())”failAll()to reject all in-flight requests.proc.stdin.end().- Waits up to 500 ms, then sends
SIGTERM. - Waits up to 2500 ms, then sends
SIGKILL. - Resolves on
proc 'exit'event.
setProtocolVersion() is a no-op for stdio (no per-message headers).
sendMessage(obj) writes JSON.stringify(obj) + '\n' to proc.stdin.
HttpTransport (src/plugins/mcp/transport-http.ts)
Section titled “HttpTransport (src/plugins/mcp/transport-http.ts)”Cross-env. Every request is a POST through engine.fetch. Session state is tracked via
the mcp-session-id response header.
Per-request headers
Section titled “Per-request headers”private headers(): Record<string, string> { return { 'content-type': 'application/json', accept: 'application/json, text/event-stream', ...this.config.headers, // consumer-supplied (Bearer, etc.) ...(sessionId ? { 'mcp-session-id': sessionId } : {}), ...(protocolVersion ? { 'mcp-protocol-version': protocolVersion } : {}), };}Auth headers from deps.getAuthHeaders() are merged in authedHeaders() on every call.
request(method, params) flow
Section titled “request(method, params) flow”- Build
{ jsonrpc: '2.0', id, method, params }with a monotonicnextHttpId. - POST via
this.post(message)— usesengine.fetchwithresponseType: 'text',provider: 'mcp',model: config.name ?? 'server', optionalqueueName. - If
mcp-session-idappears in response headers, store it for future requests. - On 401 with
deps.onUnauthorized: call the hook; if it returns true, retry once (OAuth re-auth path). - On
status >= 400: throwMcpError({ code: ConnectionClosed }). pickResponse(contentType, text, id)extracts the matching JSON-RPC response:text/event-stream: split on\r?\n\r?\n, extractdata:lines from each frame, JSON.parse each, find the response withmsg.id === id.application/json: parse the body as a single object or array, find by id.
- Throw
McpErroron error field; returnmsg.result.
notify(method, params) POSTs without waiting for a meaningful response body.
Server-to-client GET SSE stream (listen())
Section titled “Server-to-client GET SSE stream (listen())”Opens an AbortController-controlled background loop that calls deps.fetchStream (the
streaming-capable engine fetch) with method: 'GET' and accept: text/event-stream. If
the server returns 405 on the first attempt, the loop stops (request/response-only server
mode). On connection drops, reconnects with exponential backoff up to 5 retries. Tracks
last-event-id for resumption. Each SSE frame is JSON.parsed and routed via
this.routeIncoming(msg) (using BaseJsonRpcTransport.routeIncoming).
close() aborts the event loop and sends DELETE {url} with the mcp-session-id header
(best-effort: 405 is silently ignored).
sendMessage(obj) (abstract impl) POSTs the JSON-RPC object (used by BaseJsonRpcTransport. handleRequest to reply to server-initiated requests).
McpClient (src/plugins/mcp/client.ts)
Section titled “McpClient (src/plugins/mcp/client.ts)”class McpClient { constructor(transport: McpTransport, opts: McpClientOptions = {})
get info(): McpInitializeResult | null // null before connect() async connect(): Promise<McpInitializeResult> async listTools(): Promise<McpToolDef[]> async callTool(name: string, args?: {}, trace?: TraceContext): Promise<McpCallResult> // P2: listResources, readResource, subscribeResource, listPrompts, getPrompt, setLogLevel // P4: callToolTask, getTask, awaitTask, cancelTask async close(): Promise<void>}
interface McpClientOptions { clientInfo?: { name: string; version: string }; capabilities?: Record<string, unknown>; onNotification?: (method: string, params: unknown) => void; onServerRequest?: (method: string, params: unknown) => Promise<unknown>; hooks?: HookBus; server?: string; // for onMcpToolCall/onMcpError telemetry?: { hooks: HookBus; server: string }; // @deprecated keepAliveMs?: number;}connect() lifecycle
Section titled “connect() lifecycle”transport.setHandlers({ onRequest, onNotification }):onRequest→handleServerRequest(method, params).onNotification→opts.onNotification?.(method, params).
transport.start().transport.request('initialize', { protocolVersion, capabilities, clientInfo })→McpInitializeResult. Stores inthis.serverInfo.transport.setProtocolVersion?.(result.protocolVersion).transport.notify('notifications/initialized').transport.listen?.().- If
keepAliveMs > 0: startsetInterval(() => transport.request('ping')). The timer isunref()d so it does not hold the Node/Bun process open.
listTools() and pagination
Section titled “listTools() and pagination”paginate<T>(method, field) (private) follows cursor pagination:
do { res = await transport.request(method, cursor ? { cursor } : {}); out.push(...res[field]); cursor = res.nextCursor;} while (cursor);callTool(name, args, trace?)
Section titled “callTool(name, args, trace?)”- Record
t0 = performance.now(). transport.request('tools/call', { name, arguments: args }).- On success:
hooks?.emitSync('onMcpToolCall', { server, tool: name, latencyMs, isError: res.isError ?? false, trace }). - On error:
hooks?.emitSync('onMcpError', { server, phase: 'request', error, trace }), then rethrow.
trace is the TraceContext from the AgentLoop run (sessionId + requestId + callId).
When present, onMcpToolCall and onMcpError carry the full trace so MCP tool calls
appear in telemetry correlated with the agent run that triggered them.
Server-initiated request handling (handleServerRequest)
Section titled “Server-initiated request handling (handleServerRequest)”ping is handled internally (returns {}). All other methods are routed to
opts.onServerRequest. If no handler is registered, throws McpError({ code: MethodNotFound }).
connectMcp wires onServerRequest to handle sampling/createMessage,
elicitation/create, and roots/list based on which ConnectMcpOptions are provided.
Tool adapter (src/plugins/mcp/tools.ts)
Section titled “Tool adapter (src/plugins/mcp/tools.ts)”mcpToolToAgentTool(client, tool, namespace, opts)
Section titled “mcpToolToAgentTool(client, tool, namespace, opts)”Returns an AgentTool:
{ definition: { type: 'function', name: `${namespace}__${tool.name}`, // double-underscore namespace separator description: tool.description ?? tool.title ?? tool.name, parameters: tool.inputSchema ?? { type: 'object', properties: {} }, }, execute: async (args, ctx) => { const res = await client.callTool(tool.name, args, ctx.trace); // optional outputSchema validation via validateJsonSchema return mcpContentToResult(res); },}The namespaced name <ns>__<tool> avoids collisions across servers. client.callTool is
called with the un-namespaced name (what the server registered).
ctx.trace from ToolExecutionContext is threaded into callTool so that telemetry
hooks carry the run’s sessionId, requestId, and callId.
If opts.validateOutput is true and tool.outputSchema is present: validate
res.structuredContent via validateJsonSchema (src/util/json-schema.ts); on
mismatch return an error string to the model.
mcpContentToResult(res)
Section titled “mcpContentToResult(res)”Maps McpCallResult to string | ContentPart[]:
- If all content is text (no media): returns joined text string. If
res.isError, wraps with"Tool error: "prefix. - If any block is image or audio: returns
ContentPart[]with base64 sources. - Unknown block types are silently dropped (null from
blockToPart). resourceblocks: ifresource.textis set, returns text; ifresource.uri, returns"[resource {uri}]".resource_linkblocks: returns"[resource {uri}]".
mcpPromptToMessages(result)
Section titled “mcpPromptToMessages(result)”Converts McpGetPromptResult.messages to Message[] for direct injection into a request.
Sampling (src/plugins/mcp/sampling.ts)
Section titled “Sampling (src/plugins/mcp/sampling.ts)”When the server sends sampling/createMessage, it asks the client to run an LLM call on
its behalf. samplingHandler(config) builds an McpSamplingHandler:
- If
configis a function: use it directly. - If
configis{ model, provider?, engine? }: auto-wire tocomplete()fromsrc/helpers/one-shot.ts. MapsMcpSamplingMessage[]→Message[]and maps finish reason ('stop'→'endTurn','length'→'maxTokens').
Enabled by passing sampling: config to connectMcp. The sampling capability is
declared in the initialize call when a handler is configured.
OAuth 2.1 + PKCE (src/plugins/mcp/oauth.ts)
Section titled “OAuth 2.1 + PKCE (src/plugins/mcp/oauth.ts)”HTTP-only. Zero-dependency implementation of:
- Metadata discovery at
/.well-known/oauth-authorization-serveror/.well-known/openid-configuration. - Dynamic Client Registration (RFC 7591) via
registration_endpoint. - Authorization code flow with PKCE (S256 code challenge via
crypto.subtle.digest). - CSRF state token generation and constant-time comparison (
safeEqual). - Token refresh via
refresh_tokengrant. - 60-second expiry buffer in
isExpired.
All HTTP goes through engine.fetch on queue 'mcp/oauth'.
class McpOAuth { async authorize(): Promise<'authorized' | 'redirect'> async authHeader(): Promise<Record<string, string>> async reauthorize(): Promise<boolean> // 401 handler: refresh or redirect async finish(code: string, returnedState: string): Promise<void>}McpAuthProvider is the consumer-implemented interface for token/verifier/state storage
and user redirect. The library handles all non-interactive machinery.
McpUnauthorizedError is thrown by connectMcp when authorize() returns 'redirect'
(the user must complete the browser flow). Catch it, handle the redirect, then call
finishMcpAuth and reconnect.
SSRF guard (src/plugins/mcp/url-guard.ts)
Section titled “SSRF guard (src/plugins/mcp/url-guard.ts)”assertSafeAuthUrl(url, issuerUrl, opts) validates any server-controlled URL before it is
fetched. Three checks:
- Scheme: must be
https:. Allowhttp:only whenopts.allowInsecureHttp === true. - Host: must not be loopback, link-local, or private. Blocked by default; allow via
opts.allowLoopback. - Origin: hostname must match
issuerUrl’s hostname or be inopts.allowedHosts. WhenallowLoopbackis true and the host is a private address, the origin check is skipped (explicit local-dev mode).
parseCanonicalIpv4 (exported) handles 1-, 2-, 3-, and 4-part IPv4 literals in decimal,
octal (0-prefix), and hex (0x-prefix) — covering all forms inet_aton accepts. Private
range checks in isPrivateIpv4 cover RFC-1918, loopback, link-local, CGNAT, and reserved
blocks. IPv6 private prefixes: fc/fd (ULA), fe80 (link-local), ff (multicast),
and IPv4-mapped ranges.
Throws McpSsrfError with url and reason properties.
Public API (src/helpers/mcp.ts)
Section titled “Public API (src/helpers/mcp.ts)”connectMcp(config, opts?): Promise<McpConnection>
Section titled “connectMcp(config, opts?): Promise<McpConnection>”type McpServerConfig = McpHttpConfig | McpStdioConfig;
interface McpHttpConfig { url: string; headers?: Record<string, string>; name?: string; }interface McpStdioConfig { command: string; args?: string[]; env?: Record<string, string>; cwd?: string; name?: string; }
interface McpConnection { readonly serverInfo: McpInitializeResult | null; readonly tools: AgentTool[]; // stable array, mutated in place on refresh listTools(): Promise<AgentTool[]>; readonly client: McpClient; close(): Promise<void>;}Transport selection: isHttpConfig(config) checks for typeof url === 'string'. WebSocket
is detected by /^wss?:/i.test(config.url). Engine is required for HTTP/WS transports
and is sourced from opts.engine ?? coreRegistry.get().
The tools array is stable — the same array reference is mutated in place on tool
refresh. An AgentLoop holding a reference to connection.tools automatically sees tool
list changes after autoRefreshTools triggers a refresh.
Namespace: derived from config.name, URL hostname first label, or command basename
(with extension stripped). Special chars are replaced by _ via sanitizeNs.
OAuth flow (HTTP, non-WS): McpOAuth.authorize() is called before client.connect().
On 'redirect' result, throws McpUnauthorizedError. Engine hooks receive onMcpError
with phase: 'connect'.
Capability declaration: sampling/elicitation/roots capabilities are added to
the initialize request only when the corresponding options are present.
Telemetry: emits onMcpConnect { server, transport, serverName, serverVersion, toolCount } on success; onMcpError { server, phase: 'connect', error } on failure.
mcpToolset(configs, opts?)
Section titled “mcpToolset(configs, opts?)”Calls connectMcp for each config in parallel, returns { tools, connections, close() }.
tools is a flat array of all servers’ tools (namespaces prevent collisions).
finishMcpAuth(serverUrl, code, state, opts)
Section titled “finishMcpAuth(serverUrl, code, state, opts)”Exchanges an OAuth authorization code for tokens. Must be called after catching
McpUnauthorizedError. Validates state against the persisted value (constant-time
comparison). After this call succeeds, call connectMcp again.
Data flow: tool call through the stack
Section titled “Data flow: tool call through the stack”AgentLoop calls tool.execute(args, toolCtx) [toolCtx.trace set by loop] -> mcpToolToAgentTool.execute(args, ctx) -> McpClient.callTool(tool.name, args, ctx.trace) -> transport.request('tools/call', { name, arguments: args }) [StdioTransport]: write NDJSON to child stdin; await pending[id] [HttpTransport]: engine.fetch POST; pickResponse from JSON/SSE body -> emitSync('onMcpToolCall', { server, tool, latencyMs, isError, trace }) <- McpCallResult -> mcpContentToResult(res) [map blocks -> string | ContentPart[]] <- tool result string or ContentPart[] returned to AgentLoop -> AgentLoop appends tool_result message, continues loopExtension points
Section titled “Extension points”Custom transport: implement McpTransport (extend BaseJsonRpcTransport for free
pending-map, routing, and timeout logic). Pass to new McpClient(transport, opts).
Custom tool adapter: call mcpToolToAgentTool directly with a custom McpClient and
McpToolDef. Pass validateOutput: true to enforce outputSchema validation.
Sampling: implement McpSamplingHandler or pass { model: 'provider/model' } to
auto-wire. Handler receives the full McpCreateMessageParams including modelPreferences.
OAuth: implement McpAuthProvider for custom token/verifier storage and redirect
mechanism.
Gotchas and edge cases
Section titled “Gotchas and edge cases”StdioTransportis browser-guarded via the lazynodeChildProcess()loader. CallingconnectMcp({ command: '...' })in a browser throws atstart()time with a clear error. CheckisHttpConfig(config)before calling in cross-env code.HttpTransportrequest IDs (nextHttpId) are separate fromBaseJsonRpcTransport’snextId. OnlynextHttpIdis used for HTTP requests;nextIdwould be used by server-initiated response correlation insendMessage. Do not conflate the two counters.HttpTransport.listen()runs in the background and is not awaited byMcpClient. connect(). Errors in the event loop are swallowed (the loop reconnects). To detect persistent GET stream failures, subscribe toonMcpErrorhooks.- The
toolsarray returned byconnectMcpis mutated in place onlistTools()and onautoRefreshToolsnotification. Code that copiesconnection.toolsto a local variable at connect time will NOT see updates. Always use theconnection.toolsreference directly. McpClient.connect()does not retry. A transport-level failure duringinitializethrows and the connection is left in an unusable state. Callclose()and reconnect.McpOAuth.finish()validates the returned state withsafeEqual(constant-time). If theMcpAuthProviderdoes not persist the state between redirect and callback (e.g. on a page reload),provider.state()returnsundefinedandfinishthrows.assertSafeAuthUrluses hostname-exact comparison for origin checks (not same-site/ public-suffix matching). An MCP server atapi.example.comwhose auth server is atauth.example.comrequiresallowedHosts: ['auth.example.com']insecurityoptions.callToolis called with the un-namespaced tool name. The namespace prefix is only in theAgentTool.definition.nameseen by the model. If you callclient.callTooldirectly (bypassing the adapter), pass the raw server tool name, not the namespaced one.keepAliveMstimer isunref()d when the runtime supports it. On runtimes that do not exposeunref(e.g. non-Bun/Node environments), the timer fires normally and is silently ignored on error.