3349 lines
116 KiB
TypeScript
3349 lines
116 KiB
TypeScript
import { feature } from 'bun:bundle'
|
|
import type {
|
|
Base64ImageSource,
|
|
ContentBlockParam,
|
|
MessageParam,
|
|
} from '@anthropic-ai/sdk/resources/index.mjs'
|
|
import { Client } from '@modelcontextprotocol/sdk/client/index.js'
|
|
import {
|
|
SSEClientTransport,
|
|
type SSEClientTransportOptions,
|
|
} from '@modelcontextprotocol/sdk/client/sse.js'
|
|
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'
|
|
import {
|
|
StreamableHTTPClientTransport,
|
|
type StreamableHTTPClientTransportOptions,
|
|
} from '@modelcontextprotocol/sdk/client/streamableHttp.js'
|
|
import {
|
|
createFetchWithInit,
|
|
type FetchLike,
|
|
type Transport,
|
|
} from '@modelcontextprotocol/sdk/shared/transport.js'
|
|
import {
|
|
CallToolResultSchema,
|
|
ElicitRequestSchema,
|
|
type ElicitRequestURLParams,
|
|
type ElicitResult,
|
|
ErrorCode,
|
|
type JSONRPCMessage,
|
|
type ListPromptsResult,
|
|
ListPromptsResultSchema,
|
|
ListResourcesResultSchema,
|
|
ListRootsRequestSchema,
|
|
type ListToolsResult,
|
|
ListToolsResultSchema,
|
|
McpError,
|
|
type PromptMessage,
|
|
type ResourceLink,
|
|
} from '@modelcontextprotocol/sdk/types.js'
|
|
import mapValues from 'lodash-es/mapValues.js'
|
|
import memoize from 'lodash-es/memoize.js'
|
|
import zipObject from 'lodash-es/zipObject.js'
|
|
import pMap from 'p-map'
|
|
import { getOriginalCwd, getSessionId } from '../../bootstrap/state.js'
|
|
import type { Command } from '../../commands.js'
|
|
import { getOauthConfig } from '../../constants/oauth.js'
|
|
import { PRODUCT_URL } from '../../constants/product.js'
|
|
import type { AppState } from '../../state/AppState.js'
|
|
import {
|
|
type Tool,
|
|
type ToolCallProgress,
|
|
toolMatchesName,
|
|
} from '../../Tool.js'
|
|
import { ListMcpResourcesTool } from '../../tools/ListMcpResourcesTool/ListMcpResourcesTool.js'
|
|
import { type MCPProgress, MCPTool } from '../../tools/MCPTool/MCPTool.js'
|
|
import { createMcpAuthTool } from '../../tools/McpAuthTool/McpAuthTool.js'
|
|
import { ReadMcpResourceTool } from '../../tools/ReadMcpResourceTool/ReadMcpResourceTool.js'
|
|
import { createAbortController } from '../../utils/abortController.js'
|
|
import { count } from '../../utils/array.js'
|
|
import {
|
|
checkAndRefreshOAuthTokenIfNeeded,
|
|
getClaudeAIOAuthTokens,
|
|
handleOAuth401Error,
|
|
} from '../../utils/auth.js'
|
|
import { registerCleanup } from '../../utils/cleanupRegistry.js'
|
|
import { detectCodeIndexingFromMcpServerName } from '../../utils/codeIndexing.js'
|
|
import { logForDebugging } from '../../utils/debug.js'
|
|
import { isEnvDefinedFalsy, isEnvTruthy } from '../../utils/envUtils.js'
|
|
import {
|
|
errorMessage,
|
|
TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
} from '../../utils/errors.js'
|
|
import { getMCPUserAgent } from '../../utils/http.js'
|
|
import { maybeNotifyIDEConnected } from '../../utils/ide.js'
|
|
import { maybeResizeAndDownsampleImageBuffer } from '../../utils/imageResizer.js'
|
|
import { logMCPDebug, logMCPError } from '../../utils/log.js'
|
|
import {
|
|
getBinaryBlobSavedMessage,
|
|
getFormatDescription,
|
|
getLargeOutputInstructions,
|
|
persistBinaryContent,
|
|
} from '../../utils/mcpOutputStorage.js'
|
|
import {
|
|
getContentSizeEstimate,
|
|
type MCPToolResult,
|
|
mcpContentNeedsTruncation,
|
|
truncateMcpContentIfNeeded,
|
|
} from '../../utils/mcpValidation.js'
|
|
import { WebSocketTransport } from '../../utils/mcpWebSocketTransport.js'
|
|
import { memoizeWithLRU } from '../../utils/memoize.js'
|
|
import { getWebSocketTLSOptions } from '../../utils/mtls.js'
|
|
import {
|
|
getProxyFetchOptions,
|
|
getWebSocketProxyAgent,
|
|
getWebSocketProxyUrl,
|
|
} from '../../utils/proxy.js'
|
|
import { recursivelySanitizeUnicode } from '../../utils/sanitization.js'
|
|
import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
|
|
import { subprocessEnv } from '../../utils/subprocessEnv.js'
|
|
import {
|
|
isPersistError,
|
|
persistToolResult,
|
|
} from '../../utils/toolResultStorage.js'
|
|
import {
|
|
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
logEvent,
|
|
} from '../analytics/index.js'
|
|
import {
|
|
type ElicitationWaitingState,
|
|
runElicitationHooks,
|
|
runElicitationResultHooks,
|
|
} from './elicitationHandler.js'
|
|
import { buildMcpToolName } from './mcpStringUtils.js'
|
|
import { normalizeNameForMCP } from './normalization.js'
|
|
import { getLoggingSafeMcpBaseUrl } from './utils.js'
|
|
|
|
/* eslint-disable @typescript-eslint/no-require-imports */
|
|
const fetchMcpSkillsForClient = feature('MCP_SKILLS')
|
|
? (
|
|
require('../../skills/mcpSkills.js') as typeof import('../../skills/mcpSkills.js')
|
|
).fetchMcpSkillsForClient
|
|
: null
|
|
|
|
import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
|
|
import type { AssistantMessage } from 'src/types/message.js'
|
|
/* eslint-enable @typescript-eslint/no-require-imports */
|
|
import { classifyMcpToolForCollapse } from '../../tools/MCPTool/classifyForCollapse.js'
|
|
import { clearKeychainCache } from '../../utils/secureStorage/macOsKeychainHelpers.js'
|
|
import { sleep } from '../../utils/sleep.js'
|
|
import {
|
|
ClaudeAuthProvider,
|
|
hasMcpDiscoveryButNoToken,
|
|
wrapFetchWithStepUpDetection,
|
|
} from './auth.js'
|
|
import { markClaudeAiMcpConnected } from './claudeai.js'
|
|
import { getAllMcpConfigs, isMcpServerDisabled } from './config.js'
|
|
import { getMcpServerHeaders } from './headersHelper.js'
|
|
import { SdkControlClientTransport } from './SdkControlTransport.js'
|
|
import type {
|
|
ConnectedMCPServer,
|
|
MCPServerConnection,
|
|
McpSdkServerConfig,
|
|
ScopedMcpServerConfig,
|
|
ServerResource,
|
|
} from './types.js'
|
|
|
|
/**
|
|
* Custom error class to indicate that an MCP tool call failed due to
|
|
* authentication issues (e.g., expired OAuth token returning 401).
|
|
* This error should be caught at the tool execution layer to update
|
|
* the client's status to 'needs-auth'.
|
|
*/
|
|
export class McpAuthError extends Error {
|
|
serverName: string
|
|
constructor(serverName: string, message: string) {
|
|
super(message)
|
|
this.name = 'McpAuthError'
|
|
this.serverName = serverName
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Thrown when an MCP session has expired and the connection cache has been cleared.
|
|
* The caller should get a fresh client via ensureConnectedClient and retry.
|
|
*/
|
|
class McpSessionExpiredError extends Error {
|
|
constructor(serverName: string) {
|
|
super(`MCP server "${serverName}" session expired`)
|
|
this.name = 'McpSessionExpiredError'
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Thrown when an MCP tool returns `isError: true`. Carries the result's `_meta`
|
|
* so SDK consumers can still receive it — per the MCP spec, `_meta` is on the
|
|
* base Result type and is valid on error results.
|
|
*/
|
|
export class McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS extends TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS {
|
|
constructor(
|
|
message: string,
|
|
telemetryMessage: string,
|
|
readonly mcpMeta?: { _meta?: Record<string, unknown> },
|
|
) {
|
|
super(message, telemetryMessage)
|
|
this.name = 'McpToolCallError'
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Detects whether an error is an MCP "Session not found" error (HTTP 404 + JSON-RPC code -32001).
|
|
* Per the MCP spec, servers return 404 when a session ID is no longer valid.
|
|
* We check both signals to avoid false positives from generic 404s (wrong URL, server gone, etc.).
|
|
*/
|
|
export function isMcpSessionExpiredError(error: Error): boolean {
|
|
const httpStatus =
|
|
'code' in error ? (error as Error & { code?: number }).code : undefined
|
|
if (httpStatus !== 404) {
|
|
return false
|
|
}
|
|
// The SDK embeds the response body text in the error message.
|
|
// MCP servers return: {"error":{"code":-32001,"message":"Session not found"},...}
|
|
// Check for the JSON-RPC error code to distinguish from generic web server 404s.
|
|
return (
|
|
error.message.includes('"code":-32001') ||
|
|
error.message.includes('"code": -32001')
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Default timeout for MCP tool calls (effectively infinite - ~27.8 hours).
|
|
*/
|
|
const DEFAULT_MCP_TOOL_TIMEOUT_MS = 100_000_000
|
|
|
|
/**
|
|
* Cap on MCP tool descriptions and server instructions sent to the model.
|
|
* OpenAPI-generated MCP servers have been observed dumping 15-60KB of endpoint
|
|
* docs into tool.description; this caps the p95 tail without losing the intent.
|
|
*/
|
|
const MAX_MCP_DESCRIPTION_LENGTH = 2048
|
|
|
|
/**
|
|
* Gets the timeout for MCP tool calls in milliseconds.
|
|
* Uses MCP_TOOL_TIMEOUT environment variable if set, otherwise defaults to ~27.8 hours.
|
|
*/
|
|
function getMcpToolTimeoutMs(): number {
|
|
return (
|
|
parseInt(process.env.MCP_TOOL_TIMEOUT || '', 10) ||
|
|
DEFAULT_MCP_TOOL_TIMEOUT_MS
|
|
)
|
|
}
|
|
|
|
import { isClaudeInChromeMCPServer } from '../../utils/claudeInChrome/common.js'
|
|
|
|
// Lazy: toolRendering.tsx pulls React/ink; only needed when Claude-in-Chrome MCP server is connected
|
|
/* eslint-disable @typescript-eslint/no-require-imports */
|
|
const claudeInChromeToolRendering =
|
|
(): typeof import('../../utils/claudeInChrome/toolRendering.js') =>
|
|
require('../../utils/claudeInChrome/toolRendering.js')
|
|
// Lazy: wrapper.tsx → hostAdapter.ts → executor.ts pulls both native modules
|
|
// (@ant/computer-use-input + @ant/computer-use-swift). Runtime-gated by
|
|
// GrowthBook tengu_malort_pedway (see gates.ts).
|
|
const computerUseWrapper = feature('CHICAGO_MCP')
|
|
? (): typeof import('../../utils/computerUse/wrapper.js') =>
|
|
require('../../utils/computerUse/wrapper.js')
|
|
: undefined
|
|
const isComputerUseMCPServer = feature('CHICAGO_MCP')
|
|
? (
|
|
require('../../utils/computerUse/common.js') as typeof import('../../utils/computerUse/common.js')
|
|
).isComputerUseMCPServer
|
|
: undefined
|
|
|
|
import { mkdir, readFile, unlink, writeFile } from 'fs/promises'
|
|
import { dirname, join } from 'path'
|
|
import { getClaudeConfigHomeDir } from '../../utils/envUtils.js'
|
|
/* eslint-enable @typescript-eslint/no-require-imports */
|
|
import { jsonParse, jsonStringify } from '../../utils/slowOperations.js'
|
|
|
|
const MCP_AUTH_CACHE_TTL_MS = 15 * 60 * 1000 // 15 min
|
|
|
|
type McpAuthCacheData = Record<string, { timestamp: number }>
|
|
|
|
function getMcpAuthCachePath(): string {
|
|
return join(getClaudeConfigHomeDir(), 'mcp-needs-auth-cache.json')
|
|
}
|
|
|
|
// Memoized so N concurrent isMcpAuthCached() calls during batched connection
|
|
// share a single file read instead of N reads of the same file. Invalidated
|
|
// on write (setMcpAuthCacheEntry) and clear (clearMcpAuthCache). Not using
|
|
// lodash memoize because we need to null out the cache, not delete by key.
|
|
let authCachePromise: Promise<McpAuthCacheData> | null = null
|
|
|
|
function getMcpAuthCache(): Promise<McpAuthCacheData> {
|
|
if (!authCachePromise) {
|
|
authCachePromise = readFile(getMcpAuthCachePath(), 'utf-8')
|
|
.then(data => jsonParse(data) as McpAuthCacheData)
|
|
.catch(() => ({}))
|
|
}
|
|
return authCachePromise
|
|
}
|
|
|
|
async function isMcpAuthCached(serverId: string): Promise<boolean> {
|
|
const cache = await getMcpAuthCache()
|
|
const entry = cache[serverId]
|
|
if (!entry) {
|
|
return false
|
|
}
|
|
return Date.now() - entry.timestamp < MCP_AUTH_CACHE_TTL_MS
|
|
}
|
|
|
|
// Serialize cache writes through a promise chain to prevent concurrent
|
|
// read-modify-write races when multiple servers return 401 in the same batch
|
|
let writeChain = Promise.resolve()
|
|
|
|
function setMcpAuthCacheEntry(serverId: string): void {
|
|
writeChain = writeChain
|
|
.then(async () => {
|
|
const cache = await getMcpAuthCache()
|
|
cache[serverId] = { timestamp: Date.now() }
|
|
const cachePath = getMcpAuthCachePath()
|
|
await mkdir(dirname(cachePath), { recursive: true })
|
|
await writeFile(cachePath, jsonStringify(cache))
|
|
// Invalidate the read cache so subsequent reads see the new entry.
|
|
// Safe because writeChain serializes writes: the next write's
|
|
// getMcpAuthCache() call will re-read the file with this entry present.
|
|
authCachePromise = null
|
|
})
|
|
.catch(() => {
|
|
// Best-effort cache write
|
|
})
|
|
}
|
|
|
|
export function clearMcpAuthCache(): void {
|
|
authCachePromise = null
|
|
void unlink(getMcpAuthCachePath()).catch(() => {
|
|
// Cache file may not exist
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Spread-ready analytics field for the server's base URL. Calls
|
|
* getLoggingSafeMcpBaseUrl once (not twice like the inline ternary it replaces).
|
|
* Typed as AnalyticsMetadata since the URL is query-stripped and safe to log.
|
|
*/
|
|
function mcpBaseUrlAnalytics(serverRef: ScopedMcpServerConfig): {
|
|
mcpServerBaseUrl?: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
|
|
} {
|
|
const url = getLoggingSafeMcpBaseUrl(serverRef)
|
|
return url
|
|
? {
|
|
mcpServerBaseUrl:
|
|
url as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
}
|
|
: {}
|
|
}
|
|
|
|
/**
|
|
* Shared handler for sse/http/claudeai-proxy auth failures during connect:
|
|
* emits tengu_mcp_server_needs_auth, caches the needs-auth entry, and returns
|
|
* the needs-auth connection result.
|
|
*/
|
|
function handleRemoteAuthFailure(
|
|
name: string,
|
|
serverRef: ScopedMcpServerConfig,
|
|
transportType: 'sse' | 'http' | 'claudeai-proxy',
|
|
): MCPServerConnection {
|
|
logEvent('tengu_mcp_server_needs_auth', {
|
|
transportType:
|
|
transportType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
...mcpBaseUrlAnalytics(serverRef),
|
|
})
|
|
const label: Record<typeof transportType, string> = {
|
|
sse: 'SSE',
|
|
http: 'HTTP',
|
|
'claudeai-proxy': 'claude.ai proxy',
|
|
}
|
|
logMCPDebug(
|
|
name,
|
|
`Authentication required for ${label[transportType]} server`,
|
|
)
|
|
setMcpAuthCacheEntry(name)
|
|
return { name, type: 'needs-auth', config: serverRef }
|
|
}
|
|
|
|
/**
|
|
* Fetch wrapper for claude.ai proxy connections. Attaches the OAuth bearer
|
|
* token and retries once on 401 via handleOAuth401Error (force-refresh).
|
|
*
|
|
* The Anthropic API path has this retry (withRetry.ts, grove.ts) to handle
|
|
* memoize-cache staleness and clock drift. Without the same here, a single
|
|
* stale token mass-401s every claude.ai connector and sticks them all in the
|
|
* 15-min needs-auth cache.
|
|
*/
|
|
export function createClaudeAiProxyFetch(innerFetch: FetchLike): FetchLike {
|
|
return async (url, init) => {
|
|
const doRequest = async () => {
|
|
await checkAndRefreshOAuthTokenIfNeeded()
|
|
const currentTokens = getClaudeAIOAuthTokens()
|
|
if (!currentTokens) {
|
|
throw new Error('No claude.ai OAuth token available')
|
|
}
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
const headers = new Headers(init?.headers)
|
|
headers.set('Authorization', `Bearer ${currentTokens.accessToken}`)
|
|
const response = await innerFetch(url, { ...init, headers })
|
|
// Return the exact token that was sent. Reading getClaudeAIOAuthTokens()
|
|
// again after the request is wrong under concurrent 401s: another
|
|
// connector's handleOAuth401Error clears the memoize cache, so we'd read
|
|
// the NEW token from keychain, pass it to handleOAuth401Error, which
|
|
// finds same-as-keychain → returns false → skips retry. Same pattern as
|
|
// bridgeApi.ts withOAuthRetry (token passed as fn param).
|
|
return { response, sentToken: currentTokens.accessToken }
|
|
}
|
|
|
|
const { response, sentToken } = await doRequest()
|
|
if (response.status !== 401) {
|
|
return response
|
|
}
|
|
// handleOAuth401Error returns true only if the token actually changed
|
|
// (keychain had a newer one, or force-refresh succeeded). Gate retry on
|
|
// that — otherwise we double round-trip time for every connector whose
|
|
// downstream service genuinely needs auth (the common case: 30+ servers
|
|
// with "MCP server requires authentication but no OAuth token configured").
|
|
const tokenChanged = await handleOAuth401Error(sentToken).catch(() => false)
|
|
logEvent('tengu_mcp_claudeai_proxy_401', {
|
|
tokenChanged:
|
|
tokenChanged as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
})
|
|
if (!tokenChanged) {
|
|
// ELOCKED contention: another connector may have won the lockfile and refreshed — check if token changed underneath us
|
|
const now = getClaudeAIOAuthTokens()?.accessToken
|
|
if (!now || now === sentToken) {
|
|
return response
|
|
}
|
|
}
|
|
try {
|
|
return (await doRequest()).response
|
|
} catch {
|
|
// Retry itself failed (network error). Return the original 401 so the
|
|
// outer handler can classify it.
|
|
return response
|
|
}
|
|
}
|
|
}
|
|
|
|
// Minimal interface for WebSocket instances passed to mcpWebSocketTransport
|
|
type WsClientLike = {
|
|
readonly readyState: number
|
|
close(): void
|
|
send(data: string): void
|
|
}
|
|
|
|
/**
|
|
* Create a ws.WebSocket client with the MCP protocol.
|
|
* Bun's ws shim types lack the 3-arg constructor (url, protocols, options)
|
|
* that the real ws package supports, so we cast the constructor here.
|
|
*/
|
|
async function createNodeWsClient(
|
|
url: string,
|
|
options: Record<string, unknown>,
|
|
): Promise<WsClientLike> {
|
|
const wsModule = await import('ws')
|
|
const WS = wsModule.default as unknown as new (
|
|
url: string,
|
|
protocols: string[],
|
|
options: Record<string, unknown>,
|
|
) => WsClientLike
|
|
return new WS(url, ['mcp'], options)
|
|
}
|
|
|
|
const IMAGE_MIME_TYPES = new Set([
|
|
'image/jpeg',
|
|
'image/png',
|
|
'image/gif',
|
|
'image/webp',
|
|
])
|
|
|
|
function getConnectionTimeoutMs(): number {
|
|
return parseInt(process.env.MCP_TIMEOUT || '', 10) || 30000
|
|
}
|
|
|
|
/**
|
|
* Default timeout for individual MCP requests (auth, tool calls, etc.)
|
|
*/
|
|
const MCP_REQUEST_TIMEOUT_MS = 60000
|
|
|
|
/**
|
|
* MCP Streamable HTTP spec requires clients to advertise acceptance of both
|
|
* JSON and SSE on every POST. Servers that enforce this strictly reject
|
|
* requests without it (HTTP 406).
|
|
* https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sending-messages-to-the-server
|
|
*/
|
|
const MCP_STREAMABLE_HTTP_ACCEPT = 'application/json, text/event-stream'
|
|
|
|
/**
|
|
* Wraps a fetch function to apply a fresh timeout signal to each request.
|
|
* This avoids the bug where a single AbortSignal.timeout() created at connection
|
|
* time becomes stale after 60 seconds, causing all subsequent requests to fail
|
|
* immediately with "The operation timed out." Uses a 60-second timeout.
|
|
*
|
|
* Also ensures the Accept header required by the MCP Streamable HTTP spec is
|
|
* present on POSTs. The MCP SDK sets this inside StreamableHTTPClientTransport.send(),
|
|
* but it is attached to a Headers instance that passes through an object spread here,
|
|
* and some runtimes/agents have been observed dropping it before it reaches the wire.
|
|
* See https://github.com/anthropics/claude-agent-sdk-typescript/issues/202.
|
|
* Normalizing here (the last wrapper before fetch()) guarantees it is sent.
|
|
*
|
|
* GET requests are excluded from the timeout since, for MCP transports, they are
|
|
* long-lived SSE streams meant to stay open indefinitely. (Auth-related GETs use
|
|
* a separate fetch wrapper with its own timeout in auth.ts.)
|
|
*
|
|
* @param baseFetch - The fetch function to wrap
|
|
*/
|
|
export function wrapFetchWithTimeout(baseFetch: FetchLike): FetchLike {
|
|
return async (url: string | URL, init?: RequestInit) => {
|
|
const method = (init?.method ?? 'GET').toUpperCase()
|
|
|
|
// Skip timeout for GET requests - in MCP transports, these are long-lived SSE streams.
|
|
// (OAuth discovery GETs in auth.ts use a separate createAuthFetch() with its own timeout.)
|
|
if (method === 'GET') {
|
|
return baseFetch(url, init)
|
|
}
|
|
|
|
// Normalize headers and guarantee the Streamable-HTTP Accept value. new Headers()
|
|
// accepts HeadersInit | undefined and copies from plain objects, tuple arrays,
|
|
// and existing Headers instances — so whatever shape the SDK handed us, the
|
|
// Accept value survives the spread below as an own property of a concrete object.
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
const headers = new Headers(init?.headers)
|
|
if (!headers.has('accept')) {
|
|
headers.set('accept', MCP_STREAMABLE_HTTP_ACCEPT)
|
|
}
|
|
|
|
// Use setTimeout instead of AbortSignal.timeout() so we can clearTimeout on
|
|
// completion. AbortSignal.timeout's internal timer is only released when the
|
|
// signal is GC'd, which in Bun is lazy — ~2.4KB of native memory per request
|
|
// lingers for the full 60s even when the request completes in milliseconds.
|
|
const controller = new AbortController()
|
|
const timer = setTimeout(
|
|
c =>
|
|
c.abort(new DOMException('The operation timed out.', 'TimeoutError')),
|
|
MCP_REQUEST_TIMEOUT_MS,
|
|
controller,
|
|
)
|
|
timer.unref?.()
|
|
|
|
const parentSignal = init?.signal
|
|
const abort = () => controller.abort(parentSignal?.reason)
|
|
parentSignal?.addEventListener('abort', abort)
|
|
if (parentSignal?.aborted) {
|
|
controller.abort(parentSignal.reason)
|
|
}
|
|
|
|
const cleanup = () => {
|
|
clearTimeout(timer)
|
|
parentSignal?.removeEventListener('abort', abort)
|
|
}
|
|
|
|
try {
|
|
const response = await baseFetch(url, {
|
|
...init,
|
|
headers,
|
|
signal: controller.signal,
|
|
})
|
|
cleanup()
|
|
return response
|
|
} catch (error) {
|
|
cleanup()
|
|
throw error
|
|
}
|
|
}
|
|
}
|
|
|
|
export function getMcpServerConnectionBatchSize(): number {
|
|
return parseInt(process.env.MCP_SERVER_CONNECTION_BATCH_SIZE || '', 10) || 3
|
|
}
|
|
|
|
function getRemoteMcpServerConnectionBatchSize(): number {
|
|
return (
|
|
parseInt(process.env.MCP_REMOTE_SERVER_CONNECTION_BATCH_SIZE || '', 10) ||
|
|
20
|
|
)
|
|
}
|
|
|
|
function isLocalMcpServer(config: ScopedMcpServerConfig): boolean {
|
|
return !config.type || config.type === 'stdio' || config.type === 'sdk'
|
|
}
|
|
|
|
// For the IDE MCP servers, we only include specific tools
|
|
const ALLOWED_IDE_TOOLS = ['mcp__ide__executeCode', 'mcp__ide__getDiagnostics']
|
|
function isIncludedMcpTool(tool: Tool): boolean {
|
|
return (
|
|
!tool.name.startsWith('mcp__ide__') || ALLOWED_IDE_TOOLS.includes(tool.name)
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Generates the cache key for a server connection
|
|
* @param name Server name
|
|
* @param serverRef Server configuration
|
|
* @returns Cache key string
|
|
*/
|
|
export function getServerCacheKey(
|
|
name: string,
|
|
serverRef: ScopedMcpServerConfig,
|
|
): string {
|
|
return `${name}-${jsonStringify(serverRef)}`
|
|
}
|
|
|
|
/**
|
|
* TODO (ollie): The memoization here increases complexity by a lot, and im not sure it really improves performance
|
|
* Attempts to connect to a single MCP server
|
|
* @param name Server name
|
|
* @param serverRef Scoped server configuration
|
|
* @returns A wrapped client (either connected or failed)
|
|
*/
|
|
export const connectToServer = memoize(
|
|
async (
|
|
name: string,
|
|
serverRef: ScopedMcpServerConfig,
|
|
serverStats?: {
|
|
totalServers: number
|
|
stdioCount: number
|
|
sseCount: number
|
|
httpCount: number
|
|
sseIdeCount: number
|
|
wsIdeCount: number
|
|
},
|
|
): Promise<MCPServerConnection> => {
|
|
const connectStartTime = Date.now()
|
|
let inProcessServer:
|
|
| { connect(t: Transport): Promise<void>; close(): Promise<void> }
|
|
| undefined
|
|
try {
|
|
let transport
|
|
|
|
// If we have the session ingress JWT, we will connect via the session ingress rather than
|
|
// to remote MCP's directly.
|
|
const sessionIngressToken = getSessionIngressAuthToken()
|
|
|
|
if (serverRef.type === 'sse') {
|
|
// Create an auth provider for this server
|
|
const authProvider = new ClaudeAuthProvider(name, serverRef)
|
|
|
|
// Get combined headers (static + dynamic)
|
|
const combinedHeaders = await getMcpServerHeaders(name, serverRef)
|
|
|
|
// Use the auth provider with SSEClientTransport
|
|
const transportOptions: SSEClientTransportOptions = {
|
|
authProvider,
|
|
// Use fresh timeout per request to avoid stale AbortSignal bug.
|
|
// Step-up detection wraps innermost so the 403 is seen before the
|
|
// SDK's handler calls auth() → tokens().
|
|
fetch: wrapFetchWithTimeout(
|
|
wrapFetchWithStepUpDetection(createFetchWithInit(), authProvider),
|
|
),
|
|
requestInit: {
|
|
headers: {
|
|
'User-Agent': getMCPUserAgent(),
|
|
...combinedHeaders,
|
|
},
|
|
},
|
|
}
|
|
|
|
// IMPORTANT: Always set eventSourceInit with a fetch that does NOT use the
|
|
// timeout wrapper. The EventSource connection is long-lived (stays open indefinitely
|
|
// to receive server-sent events), so applying a 60-second timeout would kill it.
|
|
// The timeout is only meant for individual API requests (POST, auth refresh), not
|
|
// the persistent SSE stream.
|
|
transportOptions.eventSourceInit = {
|
|
fetch: async (url: string | URL, init?: RequestInit) => {
|
|
// Get auth headers from the auth provider
|
|
const authHeaders: Record<string, string> = {}
|
|
const tokens = await authProvider.tokens()
|
|
if (tokens) {
|
|
authHeaders.Authorization = `Bearer ${tokens.access_token}`
|
|
}
|
|
|
|
const proxyOptions = getProxyFetchOptions()
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
return fetch(url, {
|
|
...init,
|
|
...proxyOptions,
|
|
headers: {
|
|
'User-Agent': getMCPUserAgent(),
|
|
...authHeaders,
|
|
...init?.headers,
|
|
...combinedHeaders,
|
|
Accept: 'text/event-stream',
|
|
},
|
|
})
|
|
},
|
|
}
|
|
|
|
transport = new SSEClientTransport(
|
|
new URL(serverRef.url),
|
|
transportOptions,
|
|
)
|
|
logMCPDebug(name, `SSE transport initialized, awaiting connection`)
|
|
} else if (serverRef.type === 'sse-ide') {
|
|
logMCPDebug(name, `Setting up SSE-IDE transport to ${serverRef.url}`)
|
|
// IDE servers don't need authentication
|
|
// TODO: Use the auth token provided in the lockfile
|
|
const proxyOptions = getProxyFetchOptions()
|
|
const transportOptions: SSEClientTransportOptions =
|
|
proxyOptions.dispatcher
|
|
? {
|
|
eventSourceInit: {
|
|
fetch: async (url: string | URL, init?: RequestInit) => {
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
return fetch(url, {
|
|
...init,
|
|
...proxyOptions,
|
|
headers: {
|
|
'User-Agent': getMCPUserAgent(),
|
|
...init?.headers,
|
|
},
|
|
})
|
|
},
|
|
},
|
|
}
|
|
: {}
|
|
|
|
transport = new SSEClientTransport(
|
|
new URL(serverRef.url),
|
|
Object.keys(transportOptions).length > 0
|
|
? transportOptions
|
|
: undefined,
|
|
)
|
|
} else if (serverRef.type === 'ws-ide') {
|
|
const tlsOptions = getWebSocketTLSOptions()
|
|
const wsHeaders = {
|
|
'User-Agent': getMCPUserAgent(),
|
|
...(serverRef.authToken && {
|
|
'X-Claude-Code-Ide-Authorization': serverRef.authToken,
|
|
}),
|
|
}
|
|
|
|
let wsClient: WsClientLike
|
|
if (typeof Bun !== 'undefined') {
|
|
// Bun's WebSocket supports headers/proxy/tls options but the DOM typings don't
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
wsClient = new globalThis.WebSocket(serverRef.url, {
|
|
protocols: ['mcp'],
|
|
headers: wsHeaders,
|
|
proxy: getWebSocketProxyUrl(serverRef.url),
|
|
tls: tlsOptions || undefined,
|
|
} as unknown as string[])
|
|
} else {
|
|
wsClient = await createNodeWsClient(serverRef.url, {
|
|
headers: wsHeaders,
|
|
agent: getWebSocketProxyAgent(serverRef.url),
|
|
...(tlsOptions || {}),
|
|
})
|
|
}
|
|
transport = new WebSocketTransport(wsClient)
|
|
} else if (serverRef.type === 'ws') {
|
|
logMCPDebug(
|
|
name,
|
|
`Initializing WebSocket transport to ${serverRef.url}`,
|
|
)
|
|
|
|
const combinedHeaders = await getMcpServerHeaders(name, serverRef)
|
|
|
|
const tlsOptions = getWebSocketTLSOptions()
|
|
const wsHeaders = {
|
|
'User-Agent': getMCPUserAgent(),
|
|
...(sessionIngressToken && {
|
|
Authorization: `Bearer ${sessionIngressToken}`,
|
|
}),
|
|
...combinedHeaders,
|
|
}
|
|
|
|
// Redact sensitive headers before logging
|
|
const wsHeadersForLogging = mapValues(wsHeaders, (value, key) =>
|
|
key.toLowerCase() === 'authorization' ? '[REDACTED]' : value,
|
|
)
|
|
|
|
logMCPDebug(
|
|
name,
|
|
`WebSocket transport options: ${jsonStringify({
|
|
url: serverRef.url,
|
|
headers: wsHeadersForLogging,
|
|
hasSessionAuth: !!sessionIngressToken,
|
|
})}`,
|
|
)
|
|
|
|
let wsClient: WsClientLike
|
|
if (typeof Bun !== 'undefined') {
|
|
// Bun's WebSocket supports headers/proxy/tls options but the DOM typings don't
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
wsClient = new globalThis.WebSocket(serverRef.url, {
|
|
protocols: ['mcp'],
|
|
headers: wsHeaders,
|
|
proxy: getWebSocketProxyUrl(serverRef.url),
|
|
tls: tlsOptions || undefined,
|
|
} as unknown as string[])
|
|
} else {
|
|
wsClient = await createNodeWsClient(serverRef.url, {
|
|
headers: wsHeaders,
|
|
agent: getWebSocketProxyAgent(serverRef.url),
|
|
...(tlsOptions || {}),
|
|
})
|
|
}
|
|
transport = new WebSocketTransport(wsClient)
|
|
} else if (serverRef.type === 'http') {
|
|
logMCPDebug(name, `Initializing HTTP transport to ${serverRef.url}`)
|
|
logMCPDebug(
|
|
name,
|
|
`Node version: ${process.version}, Platform: ${process.platform}`,
|
|
)
|
|
logMCPDebug(
|
|
name,
|
|
`Environment: ${jsonStringify({
|
|
NODE_OPTIONS: process.env.NODE_OPTIONS || 'not set',
|
|
UV_THREADPOOL_SIZE: process.env.UV_THREADPOOL_SIZE || 'default',
|
|
HTTP_PROXY: process.env.HTTP_PROXY || 'not set',
|
|
HTTPS_PROXY: process.env.HTTPS_PROXY || 'not set',
|
|
NO_PROXY: process.env.NO_PROXY || 'not set',
|
|
})}`,
|
|
)
|
|
|
|
// Create an auth provider for this server
|
|
const authProvider = new ClaudeAuthProvider(name, serverRef)
|
|
|
|
// Get combined headers (static + dynamic)
|
|
const combinedHeaders = await getMcpServerHeaders(name, serverRef)
|
|
|
|
// Check if this server has stored OAuth tokens. If so, the SDK's
|
|
// authProvider will set Authorization — don't override with the
|
|
// session ingress token (SDK merges requestInit AFTER authProvider).
|
|
// CCR proxy URLs (ccr_shttp_mcp) have no stored OAuth, so they still
|
|
// get the ingress token. See PR #24454 discussion.
|
|
const hasOAuthTokens = !!(await authProvider.tokens())
|
|
|
|
// Use the auth provider with StreamableHTTPClientTransport
|
|
const proxyOptions = getProxyFetchOptions()
|
|
logMCPDebug(
|
|
name,
|
|
`Proxy options: ${proxyOptions.dispatcher ? 'custom dispatcher' : 'default'}`,
|
|
)
|
|
|
|
const transportOptions: StreamableHTTPClientTransportOptions = {
|
|
authProvider,
|
|
// Use fresh timeout per request to avoid stale AbortSignal bug.
|
|
// Step-up detection wraps innermost so the 403 is seen before the
|
|
// SDK's handler calls auth() → tokens().
|
|
fetch: wrapFetchWithTimeout(
|
|
wrapFetchWithStepUpDetection(createFetchWithInit(), authProvider),
|
|
),
|
|
requestInit: {
|
|
...proxyOptions,
|
|
headers: {
|
|
'User-Agent': getMCPUserAgent(),
|
|
...(sessionIngressToken &&
|
|
!hasOAuthTokens && {
|
|
Authorization: `Bearer ${sessionIngressToken}`,
|
|
}),
|
|
...combinedHeaders,
|
|
},
|
|
},
|
|
}
|
|
|
|
// Redact sensitive headers before logging
|
|
const headersForLogging = transportOptions.requestInit?.headers
|
|
? mapValues(
|
|
transportOptions.requestInit.headers as Record<string, string>,
|
|
(value, key) =>
|
|
key.toLowerCase() === 'authorization' ? '[REDACTED]' : value,
|
|
)
|
|
: undefined
|
|
|
|
logMCPDebug(
|
|
name,
|
|
`HTTP transport options: ${jsonStringify({
|
|
url: serverRef.url,
|
|
headers: headersForLogging,
|
|
hasAuthProvider: !!authProvider,
|
|
timeoutMs: MCP_REQUEST_TIMEOUT_MS,
|
|
})}`,
|
|
)
|
|
|
|
transport = new StreamableHTTPClientTransport(
|
|
new URL(serverRef.url),
|
|
transportOptions,
|
|
)
|
|
logMCPDebug(name, `HTTP transport created successfully`)
|
|
} else if (serverRef.type === 'sdk') {
|
|
throw new Error('SDK servers should be handled in print.ts')
|
|
} else if (serverRef.type === 'claudeai-proxy') {
|
|
logMCPDebug(
|
|
name,
|
|
`Initializing claude.ai proxy transport for server ${serverRef.id}`,
|
|
)
|
|
|
|
const tokens = getClaudeAIOAuthTokens()
|
|
if (!tokens) {
|
|
throw new Error('No claude.ai OAuth token found')
|
|
}
|
|
|
|
const oauthConfig = getOauthConfig()
|
|
const proxyUrl = `${oauthConfig.MCP_PROXY_URL}${oauthConfig.MCP_PROXY_PATH.replace('{server_id}', serverRef.id)}`
|
|
|
|
logMCPDebug(name, `Using claude.ai proxy at ${proxyUrl}`)
|
|
|
|
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
|
|
const fetchWithAuth = createClaudeAiProxyFetch(globalThis.fetch)
|
|
|
|
const proxyOptions = getProxyFetchOptions()
|
|
const transportOptions: StreamableHTTPClientTransportOptions = {
|
|
// Wrap fetchWithAuth with fresh timeout per request
|
|
fetch: wrapFetchWithTimeout(fetchWithAuth),
|
|
requestInit: {
|
|
...proxyOptions,
|
|
headers: {
|
|
'User-Agent': getMCPUserAgent(),
|
|
'X-Mcp-Client-Session-Id': getSessionId(),
|
|
},
|
|
},
|
|
}
|
|
|
|
transport = new StreamableHTTPClientTransport(
|
|
new URL(proxyUrl),
|
|
transportOptions,
|
|
)
|
|
logMCPDebug(name, `claude.ai proxy transport created successfully`)
|
|
} else if (
|
|
(serverRef.type === 'stdio' || !serverRef.type) &&
|
|
isClaudeInChromeMCPServer(name)
|
|
) {
|
|
// Run the Chrome MCP server in-process to avoid spawning a ~325 MB subprocess
|
|
const { createChromeContext } = await import(
|
|
'../../utils/claudeInChrome/mcpServer.js'
|
|
)
|
|
const { createClaudeForChromeMcpServer } = await import(
|
|
'@ant/claude-for-chrome-mcp'
|
|
)
|
|
const { createLinkedTransportPair } = await import(
|
|
'./InProcessTransport.js'
|
|
)
|
|
const context = createChromeContext(serverRef.env)
|
|
inProcessServer = createClaudeForChromeMcpServer(context)
|
|
const [clientTransport, serverTransport] = createLinkedTransportPair()
|
|
await inProcessServer.connect(serverTransport)
|
|
transport = clientTransport
|
|
logMCPDebug(name, `In-process Chrome MCP server started`)
|
|
} else if (
|
|
feature('CHICAGO_MCP') &&
|
|
(serverRef.type === 'stdio' || !serverRef.type) &&
|
|
isComputerUseMCPServer!(name)
|
|
) {
|
|
// Run the Computer Use MCP server in-process — same rationale as
|
|
// Chrome above. The package's CallTool handler is a stub; real
|
|
// dispatch goes through wrapper.tsx's .call() override.
|
|
const { createComputerUseMcpServerForCli } = await import(
|
|
'../../utils/computerUse/mcpServer.js'
|
|
)
|
|
const { createLinkedTransportPair } = await import(
|
|
'./InProcessTransport.js'
|
|
)
|
|
inProcessServer = await createComputerUseMcpServerForCli()
|
|
const [clientTransport, serverTransport] = createLinkedTransportPair()
|
|
await inProcessServer.connect(serverTransport)
|
|
transport = clientTransport
|
|
logMCPDebug(name, `In-process Computer Use MCP server started`)
|
|
} else if (serverRef.type === 'stdio' || !serverRef.type) {
|
|
const finalCommand =
|
|
process.env.CLAUDE_CODE_SHELL_PREFIX || serverRef.command
|
|
const finalArgs = process.env.CLAUDE_CODE_SHELL_PREFIX
|
|
? [[serverRef.command, ...serverRef.args].join(' ')]
|
|
: serverRef.args
|
|
transport = new StdioClientTransport({
|
|
command: finalCommand,
|
|
args: finalArgs,
|
|
env: {
|
|
...subprocessEnv(),
|
|
...serverRef.env,
|
|
} as Record<string, string>,
|
|
stderr: 'pipe', // prevents error output from the MCP server from printing to the UI
|
|
})
|
|
} else {
|
|
throw new Error(`Unsupported server type: ${serverRef.type}`)
|
|
}
|
|
|
|
// Set up stderr logging for stdio transport before connecting in case there are any stderr
|
|
// outputs emitted during the connection start (this can be useful for debugging failed connections).
|
|
// Store handler reference for cleanup to prevent memory leaks
|
|
let stderrHandler: ((data: Buffer) => void) | undefined
|
|
let stderrOutput = ''
|
|
if (serverRef.type === 'stdio' || !serverRef.type) {
|
|
const stdioTransport = transport as StdioClientTransport
|
|
if (stdioTransport.stderr) {
|
|
stderrHandler = (data: Buffer) => {
|
|
// Cap stderr accumulation to prevent unbounded memory growth
|
|
if (stderrOutput.length < 64 * 1024 * 1024) {
|
|
try {
|
|
stderrOutput += data.toString()
|
|
} catch {
|
|
// Ignore errors from exceeding max string length
|
|
}
|
|
}
|
|
}
|
|
stdioTransport.stderr.on('data', stderrHandler)
|
|
}
|
|
}
|
|
|
|
const client = new Client(
|
|
{
|
|
name: 'claude-code',
|
|
title: 'Claude Code',
|
|
version: MACRO.VERSION ?? 'unknown',
|
|
description: "Anthropic's agentic coding tool",
|
|
websiteUrl: PRODUCT_URL,
|
|
},
|
|
{
|
|
capabilities: {
|
|
roots: {},
|
|
// Empty object declares the capability. Sending {form:{},url:{}}
|
|
// breaks Java MCP SDK servers (Spring AI) whose Elicitation class
|
|
// has zero fields and fails on unknown properties.
|
|
elicitation: {},
|
|
},
|
|
},
|
|
)
|
|
|
|
// Add debug logging for client events if available
|
|
if (serverRef.type === 'http') {
|
|
logMCPDebug(name, `Client created, setting up request handler`)
|
|
}
|
|
|
|
client.setRequestHandler(ListRootsRequestSchema, async () => {
|
|
logMCPDebug(name, `Received ListRoots request from server`)
|
|
return {
|
|
roots: [
|
|
{
|
|
uri: `file://${getOriginalCwd()}`,
|
|
},
|
|
],
|
|
}
|
|
})
|
|
|
|
// Add a timeout to connection attempts to prevent tests from hanging indefinitely
|
|
logMCPDebug(
|
|
name,
|
|
`Starting connection with timeout of ${getConnectionTimeoutMs()}ms`,
|
|
)
|
|
|
|
// For HTTP transport, try a basic connectivity test first
|
|
if (serverRef.type === 'http') {
|
|
logMCPDebug(name, `Testing basic HTTP connectivity to ${serverRef.url}`)
|
|
try {
|
|
const testUrl = new URL(serverRef.url)
|
|
logMCPDebug(
|
|
name,
|
|
`Parsed URL: host=${testUrl.hostname}, port=${testUrl.port || 'default'}, protocol=${testUrl.protocol}`,
|
|
)
|
|
|
|
// Log DNS resolution attempt
|
|
if (
|
|
testUrl.hostname === '127.0.0.1' ||
|
|
testUrl.hostname === 'localhost'
|
|
) {
|
|
logMCPDebug(name, `Using loopback address: ${testUrl.hostname}`)
|
|
}
|
|
} catch (urlError) {
|
|
logMCPDebug(name, `Failed to parse URL: ${urlError}`)
|
|
}
|
|
}
|
|
|
|
const connectPromise = client.connect(transport)
|
|
const timeoutPromise = new Promise<never>((_, reject) => {
|
|
const timeoutId = setTimeout(() => {
|
|
const elapsed = Date.now() - connectStartTime
|
|
logMCPDebug(
|
|
name,
|
|
`Connection timeout triggered after ${elapsed}ms (limit: ${getConnectionTimeoutMs()}ms)`,
|
|
)
|
|
if (inProcessServer) {
|
|
inProcessServer.close().catch(() => {})
|
|
}
|
|
transport.close().catch(() => {})
|
|
reject(
|
|
new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
`MCP server "${name}" connection timed out after ${getConnectionTimeoutMs()}ms`,
|
|
'MCP connection timeout',
|
|
),
|
|
)
|
|
}, getConnectionTimeoutMs())
|
|
|
|
// Clean up timeout if connect resolves or rejects
|
|
connectPromise.then(
|
|
() => {
|
|
clearTimeout(timeoutId)
|
|
},
|
|
_error => {
|
|
clearTimeout(timeoutId)
|
|
},
|
|
)
|
|
})
|
|
|
|
try {
|
|
await Promise.race([connectPromise, timeoutPromise])
|
|
if (stderrOutput) {
|
|
logMCPError(name, `Server stderr: ${stderrOutput}`)
|
|
stderrOutput = '' // Release accumulated string to prevent memory growth
|
|
}
|
|
const elapsed = Date.now() - connectStartTime
|
|
logMCPDebug(
|
|
name,
|
|
`Successfully connected (transport: ${serverRef.type || 'stdio'}) in ${elapsed}ms`,
|
|
)
|
|
} catch (error) {
|
|
const elapsed = Date.now() - connectStartTime
|
|
// SSE-specific error logging
|
|
if (serverRef.type === 'sse' && error instanceof Error) {
|
|
logMCPDebug(
|
|
name,
|
|
`SSE Connection failed after ${elapsed}ms: ${jsonStringify({
|
|
url: serverRef.url,
|
|
error: error.message,
|
|
errorType: error.constructor.name,
|
|
stack: error.stack,
|
|
})}`,
|
|
)
|
|
logMCPError(name, error)
|
|
|
|
if (error instanceof UnauthorizedError) {
|
|
return handleRemoteAuthFailure(name, serverRef, 'sse')
|
|
}
|
|
} else if (serverRef.type === 'http' && error instanceof Error) {
|
|
const errorObj = error as Error & {
|
|
cause?: unknown
|
|
code?: string
|
|
errno?: string | number
|
|
syscall?: string
|
|
}
|
|
logMCPDebug(
|
|
name,
|
|
`HTTP Connection failed after ${elapsed}ms: ${error.message} (code: ${errorObj.code || 'none'}, errno: ${errorObj.errno || 'none'})`,
|
|
)
|
|
logMCPError(name, error)
|
|
|
|
if (error instanceof UnauthorizedError) {
|
|
return handleRemoteAuthFailure(name, serverRef, 'http')
|
|
}
|
|
} else if (
|
|
serverRef.type === 'claudeai-proxy' &&
|
|
error instanceof Error
|
|
) {
|
|
logMCPDebug(
|
|
name,
|
|
`claude.ai proxy connection failed after ${elapsed}ms: ${error.message}`,
|
|
)
|
|
logMCPError(name, error)
|
|
|
|
// StreamableHTTPError has a `code` property with the HTTP status
|
|
const errorCode = (error as Error & { code?: number }).code
|
|
if (errorCode === 401) {
|
|
return handleRemoteAuthFailure(name, serverRef, 'claudeai-proxy')
|
|
}
|
|
} else if (
|
|
serverRef.type === 'sse-ide' ||
|
|
serverRef.type === 'ws-ide'
|
|
) {
|
|
logEvent('tengu_mcp_ide_server_connection_failed', {
|
|
connectionDurationMs: elapsed,
|
|
})
|
|
}
|
|
if (inProcessServer) {
|
|
inProcessServer.close().catch(() => {})
|
|
}
|
|
transport.close().catch(() => {})
|
|
if (stderrOutput) {
|
|
logMCPError(name, `Server stderr: ${stderrOutput}`)
|
|
}
|
|
throw error
|
|
}
|
|
|
|
const capabilities = client.getServerCapabilities()
|
|
const serverVersion = client.getServerVersion()
|
|
const rawInstructions = client.getInstructions()
|
|
let instructions = rawInstructions
|
|
if (
|
|
rawInstructions &&
|
|
rawInstructions.length > MAX_MCP_DESCRIPTION_LENGTH
|
|
) {
|
|
instructions =
|
|
rawInstructions.slice(0, MAX_MCP_DESCRIPTION_LENGTH) + '… [truncated]'
|
|
logMCPDebug(
|
|
name,
|
|
`Server instructions truncated from ${rawInstructions.length} to ${MAX_MCP_DESCRIPTION_LENGTH} chars`,
|
|
)
|
|
}
|
|
|
|
// Log successful connection details
|
|
logMCPDebug(
|
|
name,
|
|
`Connection established with capabilities: ${jsonStringify({
|
|
hasTools: !!capabilities?.tools,
|
|
hasPrompts: !!capabilities?.prompts,
|
|
hasResources: !!capabilities?.resources,
|
|
hasResourceSubscribe: !!capabilities?.resources?.subscribe,
|
|
serverVersion: serverVersion || 'unknown',
|
|
})}`,
|
|
)
|
|
logForDebugging(
|
|
`[MCP] Server "${name}" connected with subscribe=${!!capabilities?.resources?.subscribe}`,
|
|
)
|
|
|
|
// Register default elicitation handler that returns cancel during the
|
|
// window before registerElicitationHandler overwrites it in
|
|
// onConnectionAttempt (useManageMCPConnections).
|
|
client.setRequestHandler(ElicitRequestSchema, async request => {
|
|
logMCPDebug(
|
|
name,
|
|
`Elicitation request received during initialization: ${jsonStringify(request)}`,
|
|
)
|
|
return { action: 'cancel' as const }
|
|
})
|
|
|
|
if (serverRef.type === 'sse-ide' || serverRef.type === 'ws-ide') {
|
|
const ideConnectionDurationMs = Date.now() - connectStartTime
|
|
logEvent('tengu_mcp_ide_server_connection_succeeded', {
|
|
connectionDurationMs: ideConnectionDurationMs,
|
|
serverVersion:
|
|
serverVersion as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
})
|
|
try {
|
|
void maybeNotifyIDEConnected(client)
|
|
} catch (error) {
|
|
logMCPError(
|
|
name,
|
|
`Failed to send ide_connected notification: ${error}`,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Enhanced connection drop detection and logging for all transport types
|
|
const connectionStartTime = Date.now()
|
|
let hasErrorOccurred = false
|
|
|
|
// Store original handlers
|
|
const originalOnerror = client.onerror
|
|
const originalOnclose = client.onclose
|
|
|
|
// The SDK's transport calls onerror on connection failures but doesn't call onclose,
|
|
// which CC uses to trigger reconnection. We bridge this gap by tracking consecutive
|
|
// terminal errors and manually closing after MAX_ERRORS_BEFORE_RECONNECT failures.
|
|
let consecutiveConnectionErrors = 0
|
|
const MAX_ERRORS_BEFORE_RECONNECT = 3
|
|
|
|
// Guard against re-entry: close() aborts in-flight streams which may fire
|
|
// onerror again before the close chain completes.
|
|
let hasTriggeredClose = false
|
|
|
|
// client.close() → transport.close() → transport.onclose → SDK's _onclose():
|
|
// rejects all pending request handlers (so hung callTool() promises fail with
|
|
// McpError -32000 "Connection closed") and then invokes our client.onclose
|
|
// handler below (which clears the memo cache so the next call reconnects).
|
|
// Calling client.onclose?.() directly would only clear the cache — pending
|
|
// tool calls would stay hung.
|
|
const closeTransportAndRejectPending = (reason: string) => {
|
|
if (hasTriggeredClose) return
|
|
hasTriggeredClose = true
|
|
logMCPDebug(name, `Closing transport (${reason})`)
|
|
void client.close().catch(e => {
|
|
logMCPDebug(name, `Error during close: ${errorMessage(e)}`)
|
|
})
|
|
}
|
|
|
|
const isTerminalConnectionError = (msg: string): boolean => {
|
|
return (
|
|
msg.includes('ECONNRESET') ||
|
|
msg.includes('ETIMEDOUT') ||
|
|
msg.includes('EPIPE') ||
|
|
msg.includes('EHOSTUNREACH') ||
|
|
msg.includes('ECONNREFUSED') ||
|
|
msg.includes('Body Timeout Error') ||
|
|
msg.includes('terminated') ||
|
|
// SDK SSE reconnection intermediate errors — may be wrapped around the
|
|
// actual network error, so the substrings above won't match
|
|
msg.includes('SSE stream disconnected') ||
|
|
msg.includes('Failed to reconnect SSE stream')
|
|
)
|
|
}
|
|
|
|
// Enhanced error handler with detailed logging
|
|
client.onerror = (error: Error) => {
|
|
const uptime = Date.now() - connectionStartTime
|
|
hasErrorOccurred = true
|
|
const transportType = serverRef.type || 'stdio'
|
|
|
|
// Log the connection drop with context
|
|
logMCPDebug(
|
|
name,
|
|
`${transportType.toUpperCase()} connection dropped after ${Math.floor(uptime / 1000)}s uptime`,
|
|
)
|
|
|
|
// Log specific error details for debugging
|
|
if (error.message) {
|
|
if (error.message.includes('ECONNRESET')) {
|
|
logMCPDebug(
|
|
name,
|
|
`Connection reset - server may have crashed or restarted`,
|
|
)
|
|
} else if (error.message.includes('ETIMEDOUT')) {
|
|
logMCPDebug(
|
|
name,
|
|
`Connection timeout - network issue or server unresponsive`,
|
|
)
|
|
} else if (error.message.includes('ECONNREFUSED')) {
|
|
logMCPDebug(name, `Connection refused - server may be down`)
|
|
} else if (error.message.includes('EPIPE')) {
|
|
logMCPDebug(
|
|
name,
|
|
`Broken pipe - server closed connection unexpectedly`,
|
|
)
|
|
} else if (error.message.includes('EHOSTUNREACH')) {
|
|
logMCPDebug(name, `Host unreachable - network connectivity issue`)
|
|
} else if (error.message.includes('ESRCH')) {
|
|
logMCPDebug(
|
|
name,
|
|
`Process not found - stdio server process terminated`,
|
|
)
|
|
} else if (error.message.includes('spawn')) {
|
|
logMCPDebug(
|
|
name,
|
|
`Failed to spawn process - check command and permissions`,
|
|
)
|
|
} else {
|
|
logMCPDebug(name, `Connection error: ${error.message}`)
|
|
}
|
|
}
|
|
|
|
// For HTTP transports, detect session expiry (404 + JSON-RPC -32001)
|
|
// and close the transport so pending tool calls reject and the next
|
|
// call reconnects with a fresh session ID.
|
|
if (
|
|
(transportType === 'http' || transportType === 'claudeai-proxy') &&
|
|
isMcpSessionExpiredError(error)
|
|
) {
|
|
logMCPDebug(
|
|
name,
|
|
`MCP session expired (server returned 404 with session-not-found), triggering reconnection`,
|
|
)
|
|
closeTransportAndRejectPending('session expired')
|
|
if (originalOnerror) {
|
|
originalOnerror(error)
|
|
}
|
|
return
|
|
}
|
|
|
|
// For remote transports (SSE/HTTP), track terminal connection errors
|
|
// and trigger reconnection via close if we see repeated failures.
|
|
if (
|
|
transportType === 'sse' ||
|
|
transportType === 'http' ||
|
|
transportType === 'claudeai-proxy'
|
|
) {
|
|
// The SDK's StreamableHTTP transport fires this after exhausting its
|
|
// own SSE reconnect attempts (default maxRetries: 2) — but it never
|
|
// calls onclose, so pending callTool() promises hang indefinitely.
|
|
// This is the definitive "transport gave up" signal.
|
|
if (error.message.includes('Maximum reconnection attempts')) {
|
|
closeTransportAndRejectPending('SSE reconnection exhausted')
|
|
if (originalOnerror) {
|
|
originalOnerror(error)
|
|
}
|
|
return
|
|
}
|
|
|
|
if (isTerminalConnectionError(error.message)) {
|
|
consecutiveConnectionErrors++
|
|
logMCPDebug(
|
|
name,
|
|
`Terminal connection error ${consecutiveConnectionErrors}/${MAX_ERRORS_BEFORE_RECONNECT}`,
|
|
)
|
|
|
|
if (consecutiveConnectionErrors >= MAX_ERRORS_BEFORE_RECONNECT) {
|
|
consecutiveConnectionErrors = 0
|
|
closeTransportAndRejectPending('max consecutive terminal errors')
|
|
}
|
|
} else {
|
|
// Non-terminal error (e.g., transient issue), reset counter
|
|
consecutiveConnectionErrors = 0
|
|
}
|
|
}
|
|
|
|
// Call original handler
|
|
if (originalOnerror) {
|
|
originalOnerror(error)
|
|
}
|
|
}
|
|
|
|
// Enhanced close handler with connection drop context
|
|
client.onclose = () => {
|
|
const uptime = Date.now() - connectionStartTime
|
|
const transportType = serverRef.type ?? 'unknown'
|
|
|
|
logMCPDebug(
|
|
name,
|
|
`${transportType.toUpperCase()} connection closed after ${Math.floor(uptime / 1000)}s (${hasErrorOccurred ? 'with errors' : 'cleanly'})`,
|
|
)
|
|
|
|
// Clear the memoization cache so next operation reconnects
|
|
const key = getServerCacheKey(name, serverRef)
|
|
|
|
// Also clear fetch caches (keyed by server name). Reconnection
|
|
// creates a new connection object; without clearing, the next
|
|
// fetch would return stale tools/resources from the old connection.
|
|
fetchToolsForClient.cache.delete(name)
|
|
fetchResourcesForClient.cache.delete(name)
|
|
fetchCommandsForClient.cache.delete(name)
|
|
if (feature('MCP_SKILLS')) {
|
|
fetchMcpSkillsForClient!.cache.delete(name)
|
|
}
|
|
|
|
connectToServer.cache.delete(key)
|
|
logMCPDebug(name, `Cleared connection cache for reconnection`)
|
|
|
|
if (originalOnclose) {
|
|
originalOnclose()
|
|
}
|
|
}
|
|
|
|
const cleanup = async () => {
|
|
// In-process servers (e.g. Chrome MCP) don't have child processes or stderr
|
|
if (inProcessServer) {
|
|
try {
|
|
await inProcessServer.close()
|
|
} catch (error) {
|
|
logMCPDebug(name, `Error closing in-process server: ${error}`)
|
|
}
|
|
try {
|
|
await client.close()
|
|
} catch (error) {
|
|
logMCPDebug(name, `Error closing client: ${error}`)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Remove stderr event listener to prevent memory leaks
|
|
if (stderrHandler && (serverRef.type === 'stdio' || !serverRef.type)) {
|
|
const stdioTransport = transport as StdioClientTransport
|
|
stdioTransport.stderr?.off('data', stderrHandler)
|
|
}
|
|
|
|
// For stdio transports, explicitly terminate the child process with proper signals
|
|
// NOTE: StdioClientTransport.close() only sends an abort signal, but many MCP servers
|
|
// (especially Docker containers) need explicit SIGINT/SIGTERM signals to trigger graceful shutdown
|
|
if (serverRef.type === 'stdio') {
|
|
try {
|
|
const stdioTransport = transport as StdioClientTransport
|
|
const childPid = stdioTransport.pid
|
|
|
|
if (childPid) {
|
|
logMCPDebug(name, 'Sending SIGINT to MCP server process')
|
|
|
|
// First try SIGINT (like Ctrl+C)
|
|
try {
|
|
process.kill(childPid, 'SIGINT')
|
|
} catch (error) {
|
|
logMCPDebug(name, `Error sending SIGINT: ${error}`)
|
|
return
|
|
}
|
|
|
|
// Wait for graceful shutdown with rapid escalation (total 500ms to keep CLI responsive)
|
|
await new Promise<void>(async resolve => {
|
|
let resolved = false
|
|
|
|
// Set up a timer to check if process still exists
|
|
const checkInterval = setInterval(() => {
|
|
try {
|
|
// process.kill(pid, 0) checks if process exists without killing it
|
|
process.kill(childPid, 0)
|
|
} catch {
|
|
// Process no longer exists
|
|
if (!resolved) {
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
clearTimeout(failsafeTimeout)
|
|
logMCPDebug(name, 'MCP server process exited cleanly')
|
|
resolve()
|
|
}
|
|
}
|
|
}, 50)
|
|
|
|
// Absolute failsafe: clear interval after 600ms no matter what
|
|
const failsafeTimeout = setTimeout(() => {
|
|
if (!resolved) {
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
logMCPDebug(
|
|
name,
|
|
'Cleanup timeout reached, stopping process monitoring',
|
|
)
|
|
resolve()
|
|
}
|
|
}, 600)
|
|
|
|
try {
|
|
// Wait 100ms for SIGINT to work (usually much faster)
|
|
await sleep(100)
|
|
|
|
if (!resolved) {
|
|
// Check if process still exists
|
|
try {
|
|
process.kill(childPid, 0)
|
|
// Process still exists, SIGINT failed, try SIGTERM
|
|
logMCPDebug(
|
|
name,
|
|
'SIGINT failed, sending SIGTERM to MCP server process',
|
|
)
|
|
try {
|
|
process.kill(childPid, 'SIGTERM')
|
|
} catch (termError) {
|
|
logMCPDebug(name, `Error sending SIGTERM: ${termError}`)
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
clearTimeout(failsafeTimeout)
|
|
resolve()
|
|
return
|
|
}
|
|
} catch {
|
|
// Process already exited
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
clearTimeout(failsafeTimeout)
|
|
resolve()
|
|
return
|
|
}
|
|
|
|
// Wait 400ms for SIGTERM to work (slower than SIGINT, often used for cleanup)
|
|
await sleep(400)
|
|
|
|
if (!resolved) {
|
|
// Check if process still exists
|
|
try {
|
|
process.kill(childPid, 0)
|
|
// Process still exists, SIGTERM failed, force kill with SIGKILL
|
|
logMCPDebug(
|
|
name,
|
|
'SIGTERM failed, sending SIGKILL to MCP server process',
|
|
)
|
|
try {
|
|
process.kill(childPid, 'SIGKILL')
|
|
} catch (killError) {
|
|
logMCPDebug(
|
|
name,
|
|
`Error sending SIGKILL: ${killError}`,
|
|
)
|
|
}
|
|
} catch {
|
|
// Process already exited
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
clearTimeout(failsafeTimeout)
|
|
resolve()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Final timeout - always resolve after 500ms max (total cleanup time)
|
|
if (!resolved) {
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
clearTimeout(failsafeTimeout)
|
|
resolve()
|
|
}
|
|
} catch {
|
|
// Handle any errors in the escalation sequence
|
|
if (!resolved) {
|
|
resolved = true
|
|
clearInterval(checkInterval)
|
|
clearTimeout(failsafeTimeout)
|
|
resolve()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
} catch (processError) {
|
|
logMCPDebug(name, `Error terminating process: ${processError}`)
|
|
}
|
|
}
|
|
|
|
// Close the client connection (which also closes the transport)
|
|
try {
|
|
await client.close()
|
|
} catch (error) {
|
|
logMCPDebug(name, `Error closing client: ${error}`)
|
|
}
|
|
}
|
|
|
|
// Register cleanup for all transport types - even network transports might need cleanup
|
|
// This ensures all MCP servers get properly terminated, not just stdio ones
|
|
const cleanupUnregister = registerCleanup(cleanup)
|
|
|
|
// Create the wrapped cleanup that includes unregistering
|
|
const wrappedCleanup = async () => {
|
|
cleanupUnregister?.()
|
|
await cleanup()
|
|
}
|
|
|
|
const connectionDurationMs = Date.now() - connectStartTime
|
|
logEvent('tengu_mcp_server_connection_succeeded', {
|
|
connectionDurationMs,
|
|
transportType: (serverRef.type ??
|
|
'stdio') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
totalServers: serverStats?.totalServers,
|
|
stdioCount: serverStats?.stdioCount,
|
|
sseCount: serverStats?.sseCount,
|
|
httpCount: serverStats?.httpCount,
|
|
sseIdeCount: serverStats?.sseIdeCount,
|
|
wsIdeCount: serverStats?.wsIdeCount,
|
|
...mcpBaseUrlAnalytics(serverRef),
|
|
})
|
|
return {
|
|
name,
|
|
client,
|
|
type: 'connected' as const,
|
|
capabilities: capabilities ?? {},
|
|
serverInfo: serverVersion,
|
|
instructions,
|
|
config: serverRef,
|
|
cleanup: wrappedCleanup,
|
|
}
|
|
} catch (error) {
|
|
const connectionDurationMs = Date.now() - connectStartTime
|
|
logEvent('tengu_mcp_server_connection_failed', {
|
|
connectionDurationMs,
|
|
totalServers: serverStats?.totalServers || 1,
|
|
stdioCount:
|
|
serverStats?.stdioCount || (serverRef.type === 'stdio' ? 1 : 0),
|
|
sseCount: serverStats?.sseCount || (serverRef.type === 'sse' ? 1 : 0),
|
|
httpCount:
|
|
serverStats?.httpCount || (serverRef.type === 'http' ? 1 : 0),
|
|
sseIdeCount:
|
|
serverStats?.sseIdeCount || (serverRef.type === 'sse-ide' ? 1 : 0),
|
|
wsIdeCount:
|
|
serverStats?.wsIdeCount || (serverRef.type === 'ws-ide' ? 1 : 0),
|
|
transportType: (serverRef.type ??
|
|
'stdio') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
...mcpBaseUrlAnalytics(serverRef),
|
|
})
|
|
logMCPDebug(
|
|
name,
|
|
`Connection failed after ${connectionDurationMs}ms: ${errorMessage(error)}`,
|
|
)
|
|
logMCPError(name, `Connection failed: ${errorMessage(error)}`)
|
|
|
|
if (inProcessServer) {
|
|
inProcessServer.close().catch(() => {})
|
|
}
|
|
return {
|
|
name,
|
|
type: 'failed' as const,
|
|
config: serverRef,
|
|
error: errorMessage(error),
|
|
}
|
|
}
|
|
},
|
|
getServerCacheKey,
|
|
)
|
|
|
|
/**
|
|
* Clears the memoize cache for a specific server
|
|
* @param name Server name
|
|
* @param serverRef Server configuration
|
|
*/
|
|
export async function clearServerCache(
|
|
name: string,
|
|
serverRef: ScopedMcpServerConfig,
|
|
): Promise<void> {
|
|
const key = getServerCacheKey(name, serverRef)
|
|
|
|
try {
|
|
const wrappedClient = await connectToServer(name, serverRef)
|
|
|
|
if (wrappedClient.type === 'connected') {
|
|
await wrappedClient.cleanup()
|
|
}
|
|
} catch {
|
|
// Ignore errors - server might have failed to connect
|
|
}
|
|
|
|
// Clear from cache (both connection and fetch caches so reconnect
|
|
// fetches fresh tools/resources/commands instead of stale ones)
|
|
connectToServer.cache.delete(key)
|
|
fetchToolsForClient.cache.delete(name)
|
|
fetchResourcesForClient.cache.delete(name)
|
|
fetchCommandsForClient.cache.delete(name)
|
|
if (feature('MCP_SKILLS')) {
|
|
fetchMcpSkillsForClient!.cache.delete(name)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensures a valid connected client for an MCP server.
|
|
* For most server types, uses the memoization cache if available, or reconnects
|
|
* if the cache was cleared (e.g., after onclose). This ensures tool/resource
|
|
* calls always use a valid connection.
|
|
*
|
|
* SDK MCP servers run in-process and are handled separately via setupSdkMcpClients,
|
|
* so they are returned as-is without going through connectToServer.
|
|
*
|
|
* @param client The connected MCP server client
|
|
* @returns Connected MCP server client (same or reconnected)
|
|
* @throws Error if server cannot be connected
|
|
*/
|
|
export async function ensureConnectedClient(
|
|
client: ConnectedMCPServer,
|
|
): Promise<ConnectedMCPServer> {
|
|
// SDK MCP servers run in-process and are handled separately via setupSdkMcpClients
|
|
if (client.config.type === 'sdk') {
|
|
return client
|
|
}
|
|
|
|
const connectedClient = await connectToServer(client.name, client.config)
|
|
if (connectedClient.type !== 'connected') {
|
|
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
`MCP server "${client.name}" is not connected`,
|
|
'MCP server not connected',
|
|
)
|
|
}
|
|
return connectedClient
|
|
}
|
|
|
|
/**
|
|
* Compares two MCP server configurations to determine if they are equivalent.
|
|
* Used to detect when a server needs to be reconnected due to config changes.
|
|
*/
|
|
export function areMcpConfigsEqual(
|
|
a: ScopedMcpServerConfig,
|
|
b: ScopedMcpServerConfig,
|
|
): boolean {
|
|
// Quick type check first
|
|
if (a.type !== b.type) return false
|
|
|
|
// Compare by serializing - this handles all config variations
|
|
// We exclude 'scope' from comparison since it's metadata, not connection config
|
|
const { scope: _scopeA, ...configA } = a
|
|
const { scope: _scopeB, ...configB } = b
|
|
return jsonStringify(configA) === jsonStringify(configB)
|
|
}
|
|
|
|
// Max cache size for fetch* caches. Keyed by server name (stable across
|
|
// reconnects), bounded to prevent unbounded growth with many MCP servers.
|
|
const MCP_FETCH_CACHE_SIZE = 20
|
|
|
|
/**
|
|
* Encode MCP tool input for the auto-mode security classifier.
|
|
* Exported so the auto-mode eval scripts can mirror production encoding
|
|
* for `mcp__*` tool stubs without duplicating this logic.
|
|
*/
|
|
export function mcpToolInputToAutoClassifierInput(
|
|
input: Record<string, unknown>,
|
|
toolName: string,
|
|
): string {
|
|
const keys = Object.keys(input)
|
|
return keys.length > 0
|
|
? keys.map(k => `${k}=${String(input[k])}`).join(' ')
|
|
: toolName
|
|
}
|
|
|
|
export const fetchToolsForClient = memoizeWithLRU(
|
|
async (client: MCPServerConnection): Promise<Tool[]> => {
|
|
if (client.type !== 'connected') return []
|
|
|
|
try {
|
|
if (!client.capabilities?.tools) {
|
|
return []
|
|
}
|
|
|
|
const result = (await client.client.request(
|
|
{ method: 'tools/list' },
|
|
ListToolsResultSchema,
|
|
)) as ListToolsResult
|
|
|
|
// Sanitize tool data from MCP server
|
|
const toolsToProcess = recursivelySanitizeUnicode(result.tools)
|
|
|
|
// Check if we should skip the mcp__ prefix for SDK MCP servers
|
|
const skipPrefix =
|
|
client.config.type === 'sdk' &&
|
|
isEnvTruthy(process.env.CLAUDE_AGENT_SDK_MCP_NO_PREFIX)
|
|
|
|
// Convert MCP tools to our Tool format
|
|
return toolsToProcess
|
|
.map((tool): Tool => {
|
|
const fullyQualifiedName = buildMcpToolName(client.name, tool.name)
|
|
return {
|
|
...MCPTool,
|
|
// In skip-prefix mode, use the original name for model invocation so MCP tools
|
|
// can override builtins by name. mcpInfo is used for permission checking.
|
|
name: skipPrefix ? tool.name : fullyQualifiedName,
|
|
mcpInfo: { serverName: client.name, toolName: tool.name },
|
|
isMcp: true,
|
|
// Collapse whitespace: _meta is open to external MCP servers, and
|
|
// a newline here would inject orphan lines into the deferred-tool
|
|
// list (formatDeferredToolLine joins on '\n').
|
|
searchHint:
|
|
typeof tool._meta?.['anthropic/searchHint'] === 'string'
|
|
? tool._meta['anthropic/searchHint']
|
|
.replace(/\s+/g, ' ')
|
|
.trim() || undefined
|
|
: undefined,
|
|
alwaysLoad: tool._meta?.['anthropic/alwaysLoad'] === true,
|
|
async description() {
|
|
return tool.description ?? ''
|
|
},
|
|
async prompt() {
|
|
const desc = tool.description ?? ''
|
|
return desc.length > MAX_MCP_DESCRIPTION_LENGTH
|
|
? desc.slice(0, MAX_MCP_DESCRIPTION_LENGTH) + '… [truncated]'
|
|
: desc
|
|
},
|
|
isConcurrencySafe() {
|
|
return tool.annotations?.readOnlyHint ?? false
|
|
},
|
|
isReadOnly() {
|
|
return tool.annotations?.readOnlyHint ?? false
|
|
},
|
|
toAutoClassifierInput(input) {
|
|
return mcpToolInputToAutoClassifierInput(input, tool.name)
|
|
},
|
|
isDestructive() {
|
|
return tool.annotations?.destructiveHint ?? false
|
|
},
|
|
isOpenWorld() {
|
|
return tool.annotations?.openWorldHint ?? false
|
|
},
|
|
isSearchOrReadCommand() {
|
|
return classifyMcpToolForCollapse(client.name, tool.name)
|
|
},
|
|
inputJSONSchema: tool.inputSchema as Tool['inputJSONSchema'],
|
|
async checkPermissions() {
|
|
return {
|
|
behavior: 'passthrough' as const,
|
|
message: 'MCPTool requires permission.',
|
|
suggestions: [
|
|
{
|
|
type: 'addRules' as const,
|
|
rules: [
|
|
{
|
|
toolName: fullyQualifiedName,
|
|
ruleContent: undefined,
|
|
},
|
|
],
|
|
behavior: 'allow' as const,
|
|
destination: 'localSettings' as const,
|
|
},
|
|
],
|
|
}
|
|
},
|
|
async call(
|
|
args: Record<string, unknown>,
|
|
context,
|
|
_canUseTool,
|
|
parentMessage,
|
|
onProgress?: ToolCallProgress<MCPProgress>,
|
|
) {
|
|
const toolUseId = extractToolUseId(parentMessage)
|
|
const meta = toolUseId
|
|
? { 'claudecode/toolUseId': toolUseId }
|
|
: {}
|
|
|
|
// Emit progress when tool starts
|
|
if (onProgress && toolUseId) {
|
|
onProgress({
|
|
toolUseID: toolUseId,
|
|
data: {
|
|
type: 'mcp_progress',
|
|
status: 'started',
|
|
serverName: client.name,
|
|
toolName: tool.name,
|
|
},
|
|
})
|
|
}
|
|
|
|
const startTime = Date.now()
|
|
const MAX_SESSION_RETRIES = 1
|
|
for (let attempt = 0; ; attempt++) {
|
|
try {
|
|
const connectedClient = await ensureConnectedClient(client)
|
|
const mcpResult = await callMCPToolWithUrlElicitationRetry({
|
|
client: connectedClient,
|
|
clientConnection: client,
|
|
tool: tool.name,
|
|
args,
|
|
meta,
|
|
signal: context.abortController.signal,
|
|
setAppState: context.setAppState,
|
|
onProgress:
|
|
onProgress && toolUseId
|
|
? progressData => {
|
|
onProgress({
|
|
toolUseID: toolUseId,
|
|
data: progressData,
|
|
})
|
|
}
|
|
: undefined,
|
|
handleElicitation: context.handleElicitation,
|
|
})
|
|
|
|
// Emit progress when tool completes successfully
|
|
if (onProgress && toolUseId) {
|
|
onProgress({
|
|
toolUseID: toolUseId,
|
|
data: {
|
|
type: 'mcp_progress',
|
|
status: 'completed',
|
|
serverName: client.name,
|
|
toolName: tool.name,
|
|
elapsedTimeMs: Date.now() - startTime,
|
|
},
|
|
})
|
|
}
|
|
|
|
return {
|
|
data: mcpResult.content,
|
|
...((mcpResult._meta || mcpResult.structuredContent) && {
|
|
mcpMeta: {
|
|
...(mcpResult._meta && {
|
|
_meta: mcpResult._meta,
|
|
}),
|
|
...(mcpResult.structuredContent && {
|
|
structuredContent: mcpResult.structuredContent,
|
|
}),
|
|
},
|
|
}),
|
|
}
|
|
} catch (error) {
|
|
// Session expired — the connection cache has been
|
|
// cleared, so retry with a fresh client.
|
|
if (
|
|
error instanceof McpSessionExpiredError &&
|
|
attempt < MAX_SESSION_RETRIES
|
|
) {
|
|
logMCPDebug(
|
|
client.name,
|
|
`Retrying tool '${tool.name}' after session recovery`,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Emit progress when tool fails
|
|
if (onProgress && toolUseId) {
|
|
onProgress({
|
|
toolUseID: toolUseId,
|
|
data: {
|
|
type: 'mcp_progress',
|
|
status: 'failed',
|
|
serverName: client.name,
|
|
toolName: tool.name,
|
|
elapsedTimeMs: Date.now() - startTime,
|
|
},
|
|
})
|
|
}
|
|
// Wrap MCP SDK errors so telemetry gets useful context
|
|
// instead of just "Error" or "McpError" (the constructor
|
|
// name). MCP SDK errors are protocol-level messages and
|
|
// don't contain user file paths or code.
|
|
if (
|
|
error instanceof Error &&
|
|
!(
|
|
error instanceof
|
|
TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
|
|
)
|
|
) {
|
|
const name = error.constructor.name
|
|
if (name === 'Error') {
|
|
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
error.message,
|
|
error.message.slice(0, 200),
|
|
)
|
|
}
|
|
// McpError has a numeric `code` with the JSON-RPC error
|
|
// code (e.g. -32000 ConnectionClosed, -32001 RequestTimeout)
|
|
if (
|
|
name === 'McpError' &&
|
|
'code' in error &&
|
|
typeof error.code === 'number'
|
|
) {
|
|
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
error.message,
|
|
`McpError ${error.code}`,
|
|
)
|
|
}
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
},
|
|
userFacingName() {
|
|
// Prefer title annotation if available, otherwise use tool name
|
|
const displayName = tool.annotations?.title || tool.name
|
|
return `${client.name} - ${displayName} (MCP)`
|
|
},
|
|
...(isClaudeInChromeMCPServer(client.name) &&
|
|
(client.config.type === 'stdio' || !client.config.type)
|
|
? claudeInChromeToolRendering().getClaudeInChromeMCPToolOverrides(
|
|
tool.name,
|
|
)
|
|
: {}),
|
|
...(feature('CHICAGO_MCP') &&
|
|
(client.config.type === 'stdio' || !client.config.type) &&
|
|
isComputerUseMCPServer!(client.name)
|
|
? computerUseWrapper!().getComputerUseMCPToolOverrides(tool.name)
|
|
: {}),
|
|
}
|
|
})
|
|
.filter(isIncludedMcpTool)
|
|
} catch (error) {
|
|
logMCPError(client.name, `Failed to fetch tools: ${errorMessage(error)}`)
|
|
return []
|
|
}
|
|
},
|
|
(client: MCPServerConnection) => client.name,
|
|
MCP_FETCH_CACHE_SIZE,
|
|
)
|
|
|
|
export const fetchResourcesForClient = memoizeWithLRU(
|
|
async (client: MCPServerConnection): Promise<ServerResource[]> => {
|
|
if (client.type !== 'connected') return []
|
|
|
|
try {
|
|
if (!client.capabilities?.resources) {
|
|
return []
|
|
}
|
|
|
|
const result = await client.client.request(
|
|
{ method: 'resources/list' },
|
|
ListResourcesResultSchema,
|
|
)
|
|
|
|
if (!result.resources) return []
|
|
|
|
// Add server name to each resource
|
|
return result.resources.map(resource => ({
|
|
...resource,
|
|
server: client.name,
|
|
}))
|
|
} catch (error) {
|
|
logMCPError(
|
|
client.name,
|
|
`Failed to fetch resources: ${errorMessage(error)}`,
|
|
)
|
|
return []
|
|
}
|
|
},
|
|
(client: MCPServerConnection) => client.name,
|
|
MCP_FETCH_CACHE_SIZE,
|
|
)
|
|
|
|
export const fetchCommandsForClient = memoizeWithLRU(
|
|
async (client: MCPServerConnection): Promise<Command[]> => {
|
|
if (client.type !== 'connected') return []
|
|
|
|
try {
|
|
if (!client.capabilities?.prompts) {
|
|
return []
|
|
}
|
|
|
|
// Request prompts list from client
|
|
const result = (await client.client.request(
|
|
{ method: 'prompts/list' },
|
|
ListPromptsResultSchema,
|
|
)) as ListPromptsResult
|
|
|
|
if (!result.prompts) return []
|
|
|
|
// Sanitize prompt data from MCP server
|
|
const promptsToProcess = recursivelySanitizeUnicode(result.prompts)
|
|
|
|
// Convert MCP prompts to our Command format
|
|
return promptsToProcess.map(prompt => {
|
|
const argNames = Object.values(prompt.arguments ?? {}).map(k => k.name)
|
|
return {
|
|
type: 'prompt' as const,
|
|
name: 'mcp__' + normalizeNameForMCP(client.name) + '__' + prompt.name,
|
|
description: prompt.description ?? '',
|
|
hasUserSpecifiedDescription: !!prompt.description,
|
|
contentLength: 0, // Dynamic MCP content
|
|
isEnabled: () => true,
|
|
isHidden: false,
|
|
isMcp: true,
|
|
progressMessage: 'running',
|
|
userFacingName() {
|
|
// Use prompt.name (programmatic identifier) not prompt.title (display name)
|
|
// to avoid spaces breaking slash command parsing
|
|
return `${client.name}:${prompt.name} (MCP)`
|
|
},
|
|
argNames,
|
|
source: 'mcp',
|
|
async getPromptForCommand(args: string) {
|
|
const argsArray = args.split(' ')
|
|
try {
|
|
const connectedClient = await ensureConnectedClient(client)
|
|
const result = await connectedClient.client.getPrompt({
|
|
name: prompt.name,
|
|
arguments: zipObject(argNames, argsArray),
|
|
})
|
|
const transformed = await Promise.all(
|
|
result.messages.map(message =>
|
|
transformResultContent(message.content, connectedClient.name),
|
|
),
|
|
)
|
|
return transformed.flat()
|
|
} catch (error) {
|
|
logMCPError(
|
|
client.name,
|
|
`Error running command '${prompt.name}': ${errorMessage(error)}`,
|
|
)
|
|
throw error
|
|
}
|
|
},
|
|
}
|
|
})
|
|
} catch (error) {
|
|
logMCPError(
|
|
client.name,
|
|
`Failed to fetch commands: ${errorMessage(error)}`,
|
|
)
|
|
return []
|
|
}
|
|
},
|
|
(client: MCPServerConnection) => client.name,
|
|
MCP_FETCH_CACHE_SIZE,
|
|
)
|
|
|
|
/**
|
|
* Call an IDE tool directly as an RPC
|
|
* @param toolName The name of the tool to call
|
|
* @param args The arguments to pass to the tool
|
|
* @param client The IDE client to use for the RPC call
|
|
* @returns The result of the tool call
|
|
*/
|
|
export async function callIdeRpc(
|
|
toolName: string,
|
|
args: Record<string, unknown>,
|
|
client: ConnectedMCPServer,
|
|
): Promise<string | ContentBlockParam[] | undefined> {
|
|
const result = await callMCPTool({
|
|
client,
|
|
tool: toolName,
|
|
args,
|
|
signal: createAbortController().signal,
|
|
})
|
|
return result.content
|
|
}
|
|
|
|
/**
|
|
* Note: This should not be called by UI components directly, they should use the reconnectMcpServer
|
|
* function from useManageMcpConnections.
|
|
* @param name Server name
|
|
* @param config Server configuration
|
|
* @returns Object containing the client connection and its resources
|
|
*/
|
|
export async function reconnectMcpServerImpl(
|
|
name: string,
|
|
config: ScopedMcpServerConfig,
|
|
): Promise<{
|
|
client: MCPServerConnection
|
|
tools: Tool[]
|
|
commands: Command[]
|
|
resources?: ServerResource[]
|
|
}> {
|
|
try {
|
|
// Invalidate the keychain cache so we read fresh credentials from disk.
|
|
// This is necessary when another process (e.g. the VS Code extension host)
|
|
// has modified stored tokens (cleared auth, saved new OAuth tokens) and then
|
|
// asks the CLI subprocess to reconnect. Without this, the subprocess would
|
|
// use stale cached data and never notice the tokens were removed.
|
|
clearKeychainCache()
|
|
|
|
await clearServerCache(name, config)
|
|
const client = await connectToServer(name, config)
|
|
|
|
if (client.type !== 'connected') {
|
|
return {
|
|
client,
|
|
tools: [],
|
|
commands: [],
|
|
}
|
|
}
|
|
|
|
if (config.type === 'claudeai-proxy') {
|
|
markClaudeAiMcpConnected(name)
|
|
}
|
|
|
|
const supportsResources = !!client.capabilities?.resources
|
|
|
|
const [tools, mcpCommands, mcpSkills, resources] = await Promise.all([
|
|
fetchToolsForClient(client),
|
|
fetchCommandsForClient(client),
|
|
feature('MCP_SKILLS') && supportsResources
|
|
? fetchMcpSkillsForClient!(client)
|
|
: Promise.resolve([]),
|
|
supportsResources ? fetchResourcesForClient(client) : Promise.resolve([]),
|
|
])
|
|
const commands = [...mcpCommands, ...mcpSkills]
|
|
|
|
// Check if we need to add resource tools
|
|
const resourceTools: Tool[] = []
|
|
if (supportsResources) {
|
|
// Only add resource tools if no other server has them
|
|
const hasResourceTools = [ListMcpResourcesTool, ReadMcpResourceTool].some(
|
|
tool => tools.some(t => toolMatchesName(t, tool.name)),
|
|
)
|
|
if (!hasResourceTools) {
|
|
resourceTools.push(ListMcpResourcesTool, ReadMcpResourceTool)
|
|
}
|
|
}
|
|
|
|
return {
|
|
client,
|
|
tools: [...tools, ...resourceTools],
|
|
commands,
|
|
resources: resources.length > 0 ? resources : undefined,
|
|
}
|
|
} catch (error) {
|
|
// Handle errors gracefully - connection might have closed during fetch
|
|
logMCPError(name, `Error during reconnection: ${errorMessage(error)}`)
|
|
|
|
// Return with failed status
|
|
return {
|
|
client: { name, type: 'failed' as const, config },
|
|
tools: [],
|
|
commands: [],
|
|
}
|
|
}
|
|
}
|
|
|
|
// Replaced 2026-03: previous implementation ran fixed-size sequential batches
|
|
// (await batch 1 fully, then start batch 2). That meant one slow server in
|
|
// batch N held up ALL servers in batch N+1, even if the other 19 slots were
|
|
// idle. pMap frees each slot as soon as its server completes, so a single
|
|
// slow server only occupies one slot instead of blocking an entire batch
|
|
// boundary. Same concurrency ceiling, same results, better scheduling.
|
|
async function processBatched<T>(
|
|
items: T[],
|
|
concurrency: number,
|
|
processor: (item: T) => Promise<void>,
|
|
): Promise<void> {
|
|
await pMap(items, processor, { concurrency })
|
|
}
|
|
|
|
export async function getMcpToolsCommandsAndResources(
|
|
onConnectionAttempt: (params: {
|
|
client: MCPServerConnection
|
|
tools: Tool[]
|
|
commands: Command[]
|
|
resources?: ServerResource[]
|
|
}) => void,
|
|
mcpConfigs?: Record<string, ScopedMcpServerConfig>,
|
|
): Promise<void> {
|
|
let resourceToolsAdded = false
|
|
|
|
const allConfigEntries = Object.entries(
|
|
mcpConfigs ?? (await getAllMcpConfigs()).servers,
|
|
)
|
|
|
|
// Partition into disabled and active entries — disabled servers should
|
|
// never generate HTTP connections or flow through batch processing
|
|
const configEntries: typeof allConfigEntries = []
|
|
for (const entry of allConfigEntries) {
|
|
if (isMcpServerDisabled(entry[0])) {
|
|
onConnectionAttempt({
|
|
client: { name: entry[0], type: 'disabled', config: entry[1] },
|
|
tools: [],
|
|
commands: [],
|
|
})
|
|
} else {
|
|
configEntries.push(entry)
|
|
}
|
|
}
|
|
|
|
// Calculate transport counts for logging
|
|
const totalServers = configEntries.length
|
|
const stdioCount = count(configEntries, ([_, c]) => c.type === 'stdio')
|
|
const sseCount = count(configEntries, ([_, c]) => c.type === 'sse')
|
|
const httpCount = count(configEntries, ([_, c]) => c.type === 'http')
|
|
const sseIdeCount = count(configEntries, ([_, c]) => c.type === 'sse-ide')
|
|
const wsIdeCount = count(configEntries, ([_, c]) => c.type === 'ws-ide')
|
|
|
|
// Split servers by type: local (stdio/sdk) need lower concurrency due to
|
|
// process spawning, remote servers can connect with higher concurrency
|
|
const localServers = configEntries.filter(([_, config]) =>
|
|
isLocalMcpServer(config),
|
|
)
|
|
const remoteServers = configEntries.filter(
|
|
([_, config]) => !isLocalMcpServer(config),
|
|
)
|
|
|
|
const serverStats = {
|
|
totalServers,
|
|
stdioCount,
|
|
sseCount,
|
|
httpCount,
|
|
sseIdeCount,
|
|
wsIdeCount,
|
|
}
|
|
|
|
const processServer = async ([name, config]: [
|
|
string,
|
|
ScopedMcpServerConfig,
|
|
]): Promise<void> => {
|
|
try {
|
|
// Check if server is disabled - if so, just add it to state without connecting
|
|
if (isMcpServerDisabled(name)) {
|
|
onConnectionAttempt({
|
|
client: {
|
|
name,
|
|
type: 'disabled',
|
|
config,
|
|
},
|
|
tools: [],
|
|
commands: [],
|
|
})
|
|
return
|
|
}
|
|
|
|
// Skip connection for servers that recently returned 401 (15min TTL),
|
|
// or that we have probed before but hold no token for. The second
|
|
// check closes the gap the TTL leaves open: without it, every 15min
|
|
// we re-probe servers that cannot succeed until the user runs /mcp.
|
|
// Each probe is a network round-trip for connect-401 plus OAuth
|
|
// discovery, and print mode awaits the whole batch (main.tsx:3503).
|
|
if (
|
|
(config.type === 'claudeai-proxy' ||
|
|
config.type === 'http' ||
|
|
config.type === 'sse') &&
|
|
((await isMcpAuthCached(name)) ||
|
|
((config.type === 'http' || config.type === 'sse') &&
|
|
hasMcpDiscoveryButNoToken(name, config)))
|
|
) {
|
|
logMCPDebug(name, `Skipping connection (cached needs-auth)`)
|
|
onConnectionAttempt({
|
|
client: { name, type: 'needs-auth' as const, config },
|
|
tools: [createMcpAuthTool(name, config)],
|
|
commands: [],
|
|
})
|
|
return
|
|
}
|
|
|
|
const client = await connectToServer(name, config, serverStats)
|
|
|
|
if (client.type !== 'connected') {
|
|
onConnectionAttempt({
|
|
client,
|
|
tools:
|
|
client.type === 'needs-auth'
|
|
? [createMcpAuthTool(name, config)]
|
|
: [],
|
|
commands: [],
|
|
})
|
|
return
|
|
}
|
|
|
|
if (config.type === 'claudeai-proxy') {
|
|
markClaudeAiMcpConnected(name)
|
|
}
|
|
|
|
const supportsResources = !!client.capabilities?.resources
|
|
|
|
const [tools, mcpCommands, mcpSkills, resources] = await Promise.all([
|
|
fetchToolsForClient(client),
|
|
fetchCommandsForClient(client),
|
|
// Discover skills from skill:// resources
|
|
feature('MCP_SKILLS') && supportsResources
|
|
? fetchMcpSkillsForClient!(client)
|
|
: Promise.resolve([]),
|
|
// Fetch resources if supported
|
|
supportsResources
|
|
? fetchResourcesForClient(client)
|
|
: Promise.resolve([]),
|
|
])
|
|
const commands = [...mcpCommands, ...mcpSkills]
|
|
|
|
// If this server resources and we haven't added resource tools yet,
|
|
// include our resource tools with this client's tools
|
|
const resourceTools: Tool[] = []
|
|
if (supportsResources && !resourceToolsAdded) {
|
|
resourceToolsAdded = true
|
|
resourceTools.push(ListMcpResourcesTool, ReadMcpResourceTool)
|
|
}
|
|
|
|
onConnectionAttempt({
|
|
client,
|
|
tools: [...tools, ...resourceTools],
|
|
commands,
|
|
resources: resources.length > 0 ? resources : undefined,
|
|
})
|
|
} catch (error) {
|
|
// Handle errors gracefully - connection might have closed during fetch
|
|
logMCPError(
|
|
name,
|
|
`Error fetching tools/commands/resources: ${errorMessage(error)}`,
|
|
)
|
|
|
|
// Still update with the client but no tools/commands
|
|
onConnectionAttempt({
|
|
client: { name, type: 'failed' as const, config },
|
|
tools: [],
|
|
commands: [],
|
|
})
|
|
}
|
|
}
|
|
|
|
// Process both groups concurrently, each with their own concurrency limits:
|
|
// - Local servers (stdio/sdk): lower concurrency to avoid process spawning resource contention
|
|
// - Remote servers: higher concurrency since they're just network connections
|
|
await Promise.all([
|
|
processBatched(
|
|
localServers,
|
|
getMcpServerConnectionBatchSize(),
|
|
processServer,
|
|
),
|
|
processBatched(
|
|
remoteServers,
|
|
getRemoteMcpServerConnectionBatchSize(),
|
|
processServer,
|
|
),
|
|
])
|
|
}
|
|
|
|
// Not memoized: called only 2-3 times at startup/reconfig. The inner work
|
|
// (connectToServer, fetch*ForClient) is already cached. Memoizing here by
|
|
// mcpConfigs object ref leaked — main.tsx creates fresh config objects each call.
|
|
export function prefetchAllMcpResources(
|
|
mcpConfigs: Record<string, ScopedMcpServerConfig>,
|
|
): Promise<{
|
|
clients: MCPServerConnection[]
|
|
tools: Tool[]
|
|
commands: Command[]
|
|
}> {
|
|
return new Promise(resolve => {
|
|
let pendingCount = 0
|
|
let completedCount = 0
|
|
|
|
pendingCount = Object.keys(mcpConfigs).length
|
|
|
|
if (pendingCount === 0) {
|
|
void resolve({
|
|
clients: [],
|
|
tools: [],
|
|
commands: [],
|
|
})
|
|
return
|
|
}
|
|
|
|
const clients: MCPServerConnection[] = []
|
|
const tools: Tool[] = []
|
|
const commands: Command[] = []
|
|
|
|
getMcpToolsCommandsAndResources(result => {
|
|
clients.push(result.client)
|
|
tools.push(...result.tools)
|
|
commands.push(...result.commands)
|
|
|
|
completedCount++
|
|
if (completedCount >= pendingCount) {
|
|
const commandsMetadataLength = commands.reduce((sum, command) => {
|
|
const commandMetadataLength =
|
|
command.name.length +
|
|
(command.description ?? '').length +
|
|
(command.argumentHint ?? '').length
|
|
return sum + commandMetadataLength
|
|
}, 0)
|
|
logEvent('tengu_mcp_tools_commands_loaded', {
|
|
tools_count: tools.length,
|
|
commands_count: commands.length,
|
|
commands_metadata_length: commandsMetadataLength,
|
|
})
|
|
|
|
void resolve({
|
|
clients,
|
|
tools,
|
|
commands,
|
|
})
|
|
}
|
|
}, mcpConfigs).catch(error => {
|
|
logMCPError(
|
|
'prefetchAllMcpResources',
|
|
`Failed to get MCP resources: ${errorMessage(error)}`,
|
|
)
|
|
// Still resolve with empty results
|
|
void resolve({
|
|
clients: [],
|
|
tools: [],
|
|
commands: [],
|
|
})
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Transform result content from an MCP tool or MCP prompt into message blocks
|
|
*/
|
|
export async function transformResultContent(
|
|
resultContent: PromptMessage['content'],
|
|
serverName: string,
|
|
): Promise<Array<ContentBlockParam>> {
|
|
switch (resultContent.type) {
|
|
case 'text':
|
|
return [
|
|
{
|
|
type: 'text',
|
|
text: resultContent.text,
|
|
},
|
|
]
|
|
case 'audio': {
|
|
const audioData = resultContent as {
|
|
type: 'audio'
|
|
data: string
|
|
mimeType?: string
|
|
}
|
|
return await persistBlobToTextBlock(
|
|
Buffer.from(audioData.data, 'base64'),
|
|
audioData.mimeType,
|
|
serverName,
|
|
`[Audio from ${serverName}] `,
|
|
)
|
|
}
|
|
case 'image': {
|
|
// Resize and compress image data, enforcing API dimension limits
|
|
const imageBuffer = Buffer.from(String(resultContent.data), 'base64')
|
|
const ext = resultContent.mimeType?.split('/')[1] || 'png'
|
|
const resized = await maybeResizeAndDownsampleImageBuffer(
|
|
imageBuffer,
|
|
imageBuffer.length,
|
|
ext,
|
|
)
|
|
return [
|
|
{
|
|
type: 'image',
|
|
source: {
|
|
data: resized.buffer.toString('base64'),
|
|
media_type:
|
|
`image/${resized.mediaType}` as Base64ImageSource['media_type'],
|
|
type: 'base64',
|
|
},
|
|
},
|
|
]
|
|
}
|
|
case 'resource': {
|
|
const resource = resultContent.resource
|
|
const prefix = `[Resource from ${serverName} at ${resource.uri}] `
|
|
|
|
if ('text' in resource) {
|
|
return [
|
|
{
|
|
type: 'text',
|
|
text: `${prefix}${resource.text}`,
|
|
},
|
|
]
|
|
} else if ('blob' in resource) {
|
|
const isImage = IMAGE_MIME_TYPES.has(resource.mimeType ?? '')
|
|
|
|
if (isImage) {
|
|
// Resize and compress image blob, enforcing API dimension limits
|
|
const imageBuffer = Buffer.from(resource.blob, 'base64')
|
|
const ext = resource.mimeType?.split('/')[1] || 'png'
|
|
const resized = await maybeResizeAndDownsampleImageBuffer(
|
|
imageBuffer,
|
|
imageBuffer.length,
|
|
ext,
|
|
)
|
|
const content: MessageParam['content'] = []
|
|
if (prefix) {
|
|
content.push({
|
|
type: 'text',
|
|
text: prefix,
|
|
})
|
|
}
|
|
content.push({
|
|
type: 'image',
|
|
source: {
|
|
data: resized.buffer.toString('base64'),
|
|
media_type:
|
|
`image/${resized.mediaType}` as Base64ImageSource['media_type'],
|
|
type: 'base64',
|
|
},
|
|
})
|
|
return content
|
|
} else {
|
|
return await persistBlobToTextBlock(
|
|
Buffer.from(resource.blob, 'base64'),
|
|
resource.mimeType,
|
|
serverName,
|
|
prefix,
|
|
)
|
|
}
|
|
}
|
|
return []
|
|
}
|
|
case 'resource_link': {
|
|
const resourceLink = resultContent as ResourceLink
|
|
let text = `[Resource link: ${resourceLink.name}] ${resourceLink.uri}`
|
|
if (resourceLink.description) {
|
|
text += ` (${resourceLink.description})`
|
|
}
|
|
return [
|
|
{
|
|
type: 'text',
|
|
text,
|
|
},
|
|
]
|
|
}
|
|
default:
|
|
return []
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Decode base64 binary content, write it to disk with the proper extension,
|
|
* and return a small text block with the file path. Replaces the old behavior
|
|
* of dumping raw base64 into the context.
|
|
*/
|
|
async function persistBlobToTextBlock(
|
|
bytes: Buffer,
|
|
mimeType: string | undefined,
|
|
serverName: string,
|
|
sourceDescription: string,
|
|
): Promise<Array<ContentBlockParam>> {
|
|
const persistId = `mcp-${normalizeNameForMCP(serverName)}-blob-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`
|
|
const result = await persistBinaryContent(bytes, mimeType, persistId)
|
|
|
|
if ('error' in result) {
|
|
return [
|
|
{
|
|
type: 'text',
|
|
text: `${sourceDescription}Binary content (${mimeType || 'unknown type'}, ${bytes.length} bytes) could not be saved to disk: ${result.error}`,
|
|
},
|
|
]
|
|
}
|
|
|
|
return [
|
|
{
|
|
type: 'text',
|
|
text: getBinaryBlobSavedMessage(
|
|
result.filepath,
|
|
mimeType,
|
|
result.size,
|
|
sourceDescription,
|
|
),
|
|
},
|
|
]
|
|
}
|
|
|
|
/**
|
|
* Processes MCP tool result into a normalized format.
|
|
*/
|
|
export type MCPResultType = 'toolResult' | 'structuredContent' | 'contentArray'
|
|
|
|
export type TransformedMCPResult = {
|
|
content: MCPToolResult
|
|
type: MCPResultType
|
|
schema?: string
|
|
}
|
|
|
|
/**
|
|
* Generates a compact, jq-friendly type signature for a value.
|
|
* e.g. "{title: string, items: [{id: number, name: string}]}"
|
|
*/
|
|
export function inferCompactSchema(value: unknown, depth = 2): string {
|
|
if (value === null) return 'null'
|
|
if (Array.isArray(value)) {
|
|
if (value.length === 0) return '[]'
|
|
return `[${inferCompactSchema(value[0], depth - 1)}]`
|
|
}
|
|
if (typeof value === 'object') {
|
|
if (depth <= 0) return '{...}'
|
|
const entries = Object.entries(value).slice(0, 10)
|
|
const props = entries.map(
|
|
([k, v]) => `${k}: ${inferCompactSchema(v, depth - 1)}`,
|
|
)
|
|
const suffix = Object.keys(value).length > 10 ? ', ...' : ''
|
|
return `{${props.join(', ')}${suffix}}`
|
|
}
|
|
return typeof value
|
|
}
|
|
|
|
export async function transformMCPResult(
|
|
result: unknown,
|
|
tool: string, // Tool name for validation (e.g., "search")
|
|
name: string, // Server name for transformation (e.g., "slack")
|
|
): Promise<TransformedMCPResult> {
|
|
if (result && typeof result === 'object') {
|
|
if ('toolResult' in result) {
|
|
return {
|
|
content: String(result.toolResult),
|
|
type: 'toolResult',
|
|
}
|
|
}
|
|
|
|
if (
|
|
'structuredContent' in result &&
|
|
result.structuredContent !== undefined
|
|
) {
|
|
return {
|
|
content: jsonStringify(result.structuredContent),
|
|
type: 'structuredContent',
|
|
schema: inferCompactSchema(result.structuredContent),
|
|
}
|
|
}
|
|
|
|
if ('content' in result && Array.isArray(result.content)) {
|
|
const transformedContent = (
|
|
await Promise.all(
|
|
result.content.map(item => transformResultContent(item, name)),
|
|
)
|
|
).flat()
|
|
return {
|
|
content: transformedContent,
|
|
type: 'contentArray',
|
|
schema: inferCompactSchema(transformedContent),
|
|
}
|
|
}
|
|
}
|
|
|
|
const errorMessage = `MCP server "${name}" tool "${tool}": unexpected response format`
|
|
logMCPError(name, errorMessage)
|
|
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
errorMessage,
|
|
'MCP tool unexpected response format',
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Check if MCP content contains any image blocks.
|
|
* Used to decide whether to persist to file (images should use truncation instead
|
|
* to preserve image compression and viewability).
|
|
*/
|
|
function contentContainsImages(content: MCPToolResult): boolean {
|
|
if (!content || typeof content === 'string') {
|
|
return false
|
|
}
|
|
return content.some(block => block.type === 'image')
|
|
}
|
|
|
|
export async function processMCPResult(
|
|
result: unknown,
|
|
tool: string, // Tool name for validation (e.g., "search")
|
|
name: string, // Server name for IDE check and transformation (e.g., "slack")
|
|
): Promise<MCPToolResult> {
|
|
const { content, type, schema } = await transformMCPResult(result, tool, name)
|
|
|
|
// IDE tools are not going to the model directly, so we don't need to
|
|
// handle large output.
|
|
if (name === 'ide') {
|
|
return content
|
|
}
|
|
|
|
// Check if content needs truncation (i.e., is too large)
|
|
if (!(await mcpContentNeedsTruncation(content))) {
|
|
return content
|
|
}
|
|
|
|
const sizeEstimateTokens = getContentSizeEstimate(content)
|
|
|
|
// If large output files feature is disabled, fall back to old truncation behavior
|
|
if (isEnvDefinedFalsy(process.env.ENABLE_MCP_LARGE_OUTPUT_FILES)) {
|
|
logEvent('tengu_mcp_large_result_handled', {
|
|
outcome: 'truncated',
|
|
reason: 'env_disabled',
|
|
sizeEstimateTokens,
|
|
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
|
|
return await truncateMcpContentIfNeeded(content)
|
|
}
|
|
|
|
// Save large output to file and return instructions for reading it
|
|
// Content is guaranteed to exist at this point (we checked mcpContentNeedsTruncation)
|
|
if (!content) {
|
|
return content
|
|
}
|
|
|
|
// If content contains images, fall back to truncation - persisting images as JSON
|
|
// defeats the image compression logic and makes them non-viewable
|
|
if (contentContainsImages(content)) {
|
|
logEvent('tengu_mcp_large_result_handled', {
|
|
outcome: 'truncated',
|
|
reason: 'contains_images',
|
|
sizeEstimateTokens,
|
|
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
|
|
return await truncateMcpContentIfNeeded(content)
|
|
}
|
|
|
|
// Generate a unique ID for the persisted file (server__tool-timestamp)
|
|
const timestamp = Date.now()
|
|
const persistId = `mcp-${normalizeNameForMCP(name)}-${normalizeNameForMCP(tool)}-${timestamp}`
|
|
// Convert to string for persistence (persistToolResult expects string or specific block types)
|
|
const contentStr =
|
|
typeof content === 'string' ? content : jsonStringify(content, null, 2)
|
|
const persistResult = await persistToolResult(contentStr, persistId)
|
|
|
|
if (isPersistError(persistResult)) {
|
|
// If file save failed, fall back to returning truncated content info
|
|
const contentLength = contentStr.length
|
|
logEvent('tengu_mcp_large_result_handled', {
|
|
outcome: 'truncated',
|
|
reason: 'persist_failed',
|
|
sizeEstimateTokens,
|
|
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
|
|
return `Error: result (${contentLength.toLocaleString()} characters) exceeds maximum allowed tokens. Failed to save output to file: ${persistResult.error}. If this MCP server provides pagination or filtering tools, use them to retrieve specific portions of the data.`
|
|
}
|
|
|
|
logEvent('tengu_mcp_large_result_handled', {
|
|
outcome: 'persisted',
|
|
reason: 'file_saved',
|
|
sizeEstimateTokens,
|
|
persistedSizeChars: persistResult.originalSize,
|
|
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
|
|
|
|
const formatDescription = getFormatDescription(type, schema)
|
|
return getLargeOutputInstructions(
|
|
persistResult.filepath,
|
|
persistResult.originalSize,
|
|
formatDescription,
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Call an MCP tool, handling UrlElicitationRequiredError (-32042) by
|
|
* displaying the URL elicitation to the user, waiting for the completion
|
|
* notification, and retrying the tool call.
|
|
*/
|
|
type MCPToolCallResult = {
|
|
content: MCPToolResult
|
|
_meta?: Record<string, unknown>
|
|
structuredContent?: Record<string, unknown>
|
|
}
|
|
|
|
/** @internal Exported for testing. */
|
|
export async function callMCPToolWithUrlElicitationRetry({
|
|
client: connectedClient,
|
|
clientConnection,
|
|
tool,
|
|
args,
|
|
meta,
|
|
signal,
|
|
setAppState,
|
|
onProgress,
|
|
callToolFn = callMCPTool,
|
|
handleElicitation,
|
|
}: {
|
|
client: ConnectedMCPServer
|
|
clientConnection: MCPServerConnection
|
|
tool: string
|
|
args: Record<string, unknown>
|
|
meta?: Record<string, unknown>
|
|
signal: AbortSignal
|
|
setAppState: (f: (prev: AppState) => AppState) => void
|
|
onProgress?: (data: MCPProgress) => void
|
|
/** Injectable for testing. Defaults to callMCPTool. */
|
|
callToolFn?: (opts: {
|
|
client: ConnectedMCPServer
|
|
tool: string
|
|
args: Record<string, unknown>
|
|
meta?: Record<string, unknown>
|
|
signal: AbortSignal
|
|
onProgress?: (data: MCPProgress) => void
|
|
}) => Promise<MCPToolCallResult>
|
|
/** Handler for URL elicitations when no hook handles them.
|
|
* In print/SDK mode, delegates to structuredIO. In REPL, falls back to queue. */
|
|
handleElicitation?: (
|
|
serverName: string,
|
|
params: ElicitRequestURLParams,
|
|
signal: AbortSignal,
|
|
) => Promise<ElicitResult>
|
|
}): Promise<MCPToolCallResult> {
|
|
const MAX_URL_ELICITATION_RETRIES = 3
|
|
for (let attempt = 0; ; attempt++) {
|
|
try {
|
|
return await callToolFn({
|
|
client: connectedClient,
|
|
tool,
|
|
args,
|
|
meta,
|
|
signal,
|
|
onProgress,
|
|
})
|
|
} catch (error) {
|
|
// The MCP SDK's Protocol creates plain McpError (not UrlElicitationRequiredError)
|
|
// for error responses, so we check the error code instead of instanceof.
|
|
if (
|
|
!(error instanceof McpError) ||
|
|
error.code !== ErrorCode.UrlElicitationRequired
|
|
) {
|
|
throw error
|
|
}
|
|
|
|
// Limit the number of URL elicitation retries
|
|
if (attempt >= MAX_URL_ELICITATION_RETRIES) {
|
|
throw error
|
|
}
|
|
|
|
const errorData = error.data
|
|
const rawElicitations =
|
|
errorData != null &&
|
|
typeof errorData === 'object' &&
|
|
'elicitations' in errorData &&
|
|
Array.isArray(errorData.elicitations)
|
|
? (errorData.elicitations as unknown[])
|
|
: []
|
|
|
|
// Validate each element has the required fields for ElicitRequestURLParams
|
|
const elicitations = rawElicitations.filter(
|
|
(e): e is ElicitRequestURLParams => {
|
|
if (e == null || typeof e !== 'object') return false
|
|
const obj = e as Record<string, unknown>
|
|
return (
|
|
obj.mode === 'url' &&
|
|
typeof obj.url === 'string' &&
|
|
typeof obj.elicitationId === 'string' &&
|
|
typeof obj.message === 'string'
|
|
)
|
|
},
|
|
)
|
|
|
|
const serverName =
|
|
clientConnection.type === 'connected'
|
|
? clientConnection.name
|
|
: 'unknown'
|
|
|
|
if (elicitations.length === 0) {
|
|
logMCPDebug(
|
|
serverName,
|
|
`Tool '${tool}' returned -32042 but no valid elicitations in error data`,
|
|
)
|
|
throw error
|
|
}
|
|
|
|
logMCPDebug(
|
|
serverName,
|
|
`Tool '${tool}' requires URL elicitation (error -32042, attempt ${attempt + 1}), processing ${elicitations.length} elicitation(s)`,
|
|
)
|
|
|
|
// Process each URL elicitation from the error.
|
|
// The completion notification handler (in registerElicitationHandler) sets
|
|
// `completed: true` on the matching queue event; the dialog reacts to this flag.
|
|
for (const elicitation of elicitations) {
|
|
const { elicitationId } = elicitation
|
|
|
|
// Run elicitation hooks — they can resolve URL elicitations programmatically
|
|
const hookResponse = await runElicitationHooks(
|
|
serverName,
|
|
elicitation,
|
|
signal,
|
|
)
|
|
if (hookResponse) {
|
|
logMCPDebug(
|
|
serverName,
|
|
`URL elicitation ${elicitationId} resolved by hook: ${jsonStringify(hookResponse)}`,
|
|
)
|
|
if (hookResponse.action !== 'accept') {
|
|
return {
|
|
content: `URL elicitation was ${hookResponse.action === 'decline' ? 'declined' : hookResponse.action + 'ed'} by a hook. The tool "${tool}" could not complete because it requires the user to open a URL.`,
|
|
}
|
|
}
|
|
// Hook accepted — skip the UI and proceed to retry
|
|
continue
|
|
}
|
|
|
|
// Resolve the URL elicitation via callback (print/SDK mode) or queue (REPL mode).
|
|
let userResult: ElicitResult
|
|
if (handleElicitation) {
|
|
// Print/SDK mode: delegate to structuredIO which sends a control request
|
|
userResult = await handleElicitation(serverName, elicitation, signal)
|
|
} else {
|
|
// REPL mode: queue for ElicitationDialog with two-phase consent/waiting flow
|
|
const waitingState: ElicitationWaitingState = {
|
|
actionLabel: 'Retry now',
|
|
showCancel: true,
|
|
}
|
|
userResult = await new Promise<ElicitResult>(resolve => {
|
|
const onAbort = () => {
|
|
void resolve({ action: 'cancel' })
|
|
}
|
|
if (signal.aborted) {
|
|
onAbort()
|
|
return
|
|
}
|
|
signal.addEventListener('abort', onAbort, { once: true })
|
|
|
|
setAppState(prev => ({
|
|
...prev,
|
|
elicitation: {
|
|
queue: [
|
|
...prev.elicitation.queue,
|
|
{
|
|
serverName,
|
|
requestId: `error-elicit-${elicitationId}`,
|
|
params: elicitation,
|
|
signal,
|
|
waitingState,
|
|
respond: result => {
|
|
// Phase 1 consent: accept is a no-op (doesn't resolve retry Promise)
|
|
if (result.action === 'accept') {
|
|
return
|
|
}
|
|
// Decline or cancel: resolve the retry Promise
|
|
signal.removeEventListener('abort', onAbort)
|
|
void resolve(result)
|
|
},
|
|
onWaitingDismiss: action => {
|
|
signal.removeEventListener('abort', onAbort)
|
|
if (action === 'retry') {
|
|
void resolve({ action: 'accept' })
|
|
} else {
|
|
void resolve({ action: 'cancel' })
|
|
}
|
|
},
|
|
},
|
|
],
|
|
},
|
|
}))
|
|
})
|
|
}
|
|
|
|
// Run ElicitationResult hooks — they can modify or block the response
|
|
const finalResult = await runElicitationResultHooks(
|
|
serverName,
|
|
userResult,
|
|
signal,
|
|
'url',
|
|
elicitationId,
|
|
)
|
|
|
|
if (finalResult.action !== 'accept') {
|
|
logMCPDebug(
|
|
serverName,
|
|
`User ${finalResult.action === 'decline' ? 'declined' : finalResult.action + 'ed'} URL elicitation ${elicitationId}`,
|
|
)
|
|
return {
|
|
content: `URL elicitation was ${finalResult.action === 'decline' ? 'declined' : finalResult.action + 'ed'} by the user. The tool "${tool}" could not complete because it requires the user to open a URL.`,
|
|
}
|
|
}
|
|
|
|
logMCPDebug(
|
|
serverName,
|
|
`Elicitation ${elicitationId} completed, retrying tool call`,
|
|
)
|
|
}
|
|
|
|
// Loop back to retry the tool call
|
|
}
|
|
}
|
|
}
|
|
|
|
async function callMCPTool({
|
|
client: { client, name, config },
|
|
tool,
|
|
args,
|
|
meta,
|
|
signal,
|
|
onProgress,
|
|
}: {
|
|
client: ConnectedMCPServer
|
|
tool: string
|
|
args: Record<string, unknown>
|
|
meta?: Record<string, unknown>
|
|
signal: AbortSignal
|
|
onProgress?: (data: MCPProgress) => void
|
|
}): Promise<{
|
|
content: MCPToolResult
|
|
_meta?: Record<string, unknown>
|
|
structuredContent?: Record<string, unknown>
|
|
}> {
|
|
const toolStartTime = Date.now()
|
|
let progressInterval: NodeJS.Timeout | undefined
|
|
|
|
try {
|
|
logMCPDebug(name, `Calling MCP tool: ${tool}`)
|
|
|
|
// Set up progress logging for long-running tools (every 30 seconds)
|
|
progressInterval = setInterval(
|
|
(startTime, name, tool) => {
|
|
const elapsed = Date.now() - startTime
|
|
const elapsedSeconds = Math.floor(elapsed / 1000)
|
|
const duration = `${elapsedSeconds}s`
|
|
logMCPDebug(name, `Tool '${tool}' still running (${duration} elapsed)`)
|
|
},
|
|
30000, // Log every 30 seconds
|
|
toolStartTime,
|
|
name,
|
|
tool,
|
|
)
|
|
|
|
// Use Promise.race with our own timeout to handle cases where SDK's
|
|
// internal timeout doesn't work (e.g., SSE stream breaks mid-request)
|
|
const timeoutMs = getMcpToolTimeoutMs()
|
|
let timeoutId: NodeJS.Timeout | undefined
|
|
|
|
const timeoutPromise = new Promise<never>((_, reject) => {
|
|
timeoutId = setTimeout(
|
|
(reject, name, tool, timeoutMs) => {
|
|
reject(
|
|
new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
`MCP server "${name}" tool "${tool}" timed out after ${Math.floor(timeoutMs / 1000)}s`,
|
|
'MCP tool timeout',
|
|
),
|
|
)
|
|
},
|
|
timeoutMs,
|
|
reject,
|
|
name,
|
|
tool,
|
|
timeoutMs,
|
|
)
|
|
})
|
|
|
|
const result = await Promise.race([
|
|
client.callTool(
|
|
{
|
|
name: tool,
|
|
arguments: args,
|
|
_meta: meta,
|
|
},
|
|
CallToolResultSchema,
|
|
{
|
|
signal,
|
|
timeout: timeoutMs,
|
|
onprogress: onProgress
|
|
? sdkProgress => {
|
|
onProgress({
|
|
type: 'mcp_progress',
|
|
status: 'progress',
|
|
serverName: name,
|
|
toolName: tool,
|
|
progress: sdkProgress.progress,
|
|
total: sdkProgress.total,
|
|
progressMessage: sdkProgress.message,
|
|
})
|
|
}
|
|
: undefined,
|
|
},
|
|
),
|
|
timeoutPromise,
|
|
]).finally(() => {
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId)
|
|
}
|
|
})
|
|
|
|
if ('isError' in result && result.isError) {
|
|
let errorDetails = 'Unknown error'
|
|
if (
|
|
'content' in result &&
|
|
Array.isArray(result.content) &&
|
|
result.content.length > 0
|
|
) {
|
|
const firstContent = result.content[0]
|
|
if (
|
|
firstContent &&
|
|
typeof firstContent === 'object' &&
|
|
'text' in firstContent
|
|
) {
|
|
errorDetails = firstContent.text
|
|
}
|
|
} else if ('error' in result) {
|
|
// Fallback for legacy error format
|
|
errorDetails = String(result.error)
|
|
}
|
|
logMCPError(name, errorDetails)
|
|
throw new McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
|
|
errorDetails,
|
|
'MCP tool returned error',
|
|
'_meta' in result && result._meta ? { _meta: result._meta } : undefined,
|
|
)
|
|
}
|
|
const elapsed = Date.now() - toolStartTime
|
|
const duration =
|
|
elapsed < 1000
|
|
? `${elapsed}ms`
|
|
: elapsed < 60000
|
|
? `${Math.floor(elapsed / 1000)}s`
|
|
: `${Math.floor(elapsed / 60000)}m ${Math.floor((elapsed % 60000) / 1000)}s`
|
|
|
|
logMCPDebug(name, `Tool '${tool}' completed successfully in ${duration}`)
|
|
|
|
// Log code indexing tool usage
|
|
const codeIndexingTool = detectCodeIndexingFromMcpServerName(name)
|
|
if (codeIndexingTool) {
|
|
logEvent('tengu_code_indexing_tool_used', {
|
|
tool: codeIndexingTool as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
source:
|
|
'mcp' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
success: true,
|
|
})
|
|
}
|
|
|
|
const content = await processMCPResult(result, tool, name)
|
|
return {
|
|
content,
|
|
_meta: result._meta as Record<string, unknown> | undefined,
|
|
structuredContent: result.structuredContent as
|
|
| Record<string, unknown>
|
|
| undefined,
|
|
}
|
|
} catch (e) {
|
|
// Clear intervals on error
|
|
if (progressInterval !== undefined) {
|
|
clearInterval(progressInterval)
|
|
}
|
|
|
|
const elapsed = Date.now() - toolStartTime
|
|
|
|
if (e instanceof Error && e.name !== 'AbortError') {
|
|
logMCPDebug(
|
|
name,
|
|
`Tool '${tool}' failed after ${Math.floor(elapsed / 1000)}s: ${e.message}`,
|
|
)
|
|
}
|
|
|
|
// Check for 401 errors indicating expired/invalid OAuth tokens
|
|
// The MCP SDK's StreamableHTTPError has a `code` property with the HTTP status
|
|
if (e instanceof Error) {
|
|
const errorCode = 'code' in e ? (e.code as number | undefined) : undefined
|
|
if (errorCode === 401 || e instanceof UnauthorizedError) {
|
|
logMCPDebug(
|
|
name,
|
|
`Tool call returned 401 Unauthorized - token may have expired`,
|
|
)
|
|
logEvent('tengu_mcp_tool_call_auth_error', {})
|
|
throw new McpAuthError(
|
|
name,
|
|
`MCP server "${name}" requires re-authorization (token expired)`,
|
|
)
|
|
}
|
|
|
|
// Check for session expiry — two error shapes can surface here:
|
|
// 1. Direct 404 + JSON-RPC -32001 from the server (StreamableHTTPError)
|
|
// 2. -32000 "Connection closed" (McpError) — the SDK closes the transport
|
|
// after the onerror handler fires, so the pending callTool() rejects
|
|
// with this derived error instead of the original 404.
|
|
// In both cases, clear the connection cache so the next tool call
|
|
// creates a fresh session.
|
|
const isSessionExpired = isMcpSessionExpiredError(e)
|
|
const isConnectionClosedOnHttp =
|
|
'code' in e &&
|
|
(e as Error & { code?: number }).code === -32000 &&
|
|
e.message.includes('Connection closed') &&
|
|
(config.type === 'http' || config.type === 'claudeai-proxy')
|
|
if (isSessionExpired || isConnectionClosedOnHttp) {
|
|
logMCPDebug(
|
|
name,
|
|
`MCP session expired during tool call (${isSessionExpired ? '404/-32001' : 'connection closed'}), clearing connection cache for re-initialization`,
|
|
)
|
|
logEvent('tengu_mcp_session_expired', {})
|
|
await clearServerCache(name, config)
|
|
throw new McpSessionExpiredError(name)
|
|
}
|
|
}
|
|
|
|
// When the users hits esc, avoid logspew
|
|
if (!(e instanceof Error) || e.name !== 'AbortError') {
|
|
throw e
|
|
}
|
|
return { content: undefined }
|
|
} finally {
|
|
// Always clear intervals
|
|
if (progressInterval !== undefined) {
|
|
clearInterval(progressInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
function extractToolUseId(message: AssistantMessage): string | undefined {
|
|
if (message.message.content[0]?.type !== 'tool_use') {
|
|
return undefined
|
|
}
|
|
return message.message.content[0].id
|
|
}
|
|
|
|
/**
|
|
* Sets up SDK MCP clients by creating transports and connecting them.
|
|
* This is used for SDK MCP servers that run in the same process as the SDK.
|
|
*
|
|
* @param sdkMcpConfigs - The SDK MCP server configurations
|
|
* @param sendMcpMessage - Callback to send MCP messages through the control channel
|
|
* @returns Connected clients, their tools, and transport map for message routing
|
|
*/
|
|
export async function setupSdkMcpClients(
|
|
sdkMcpConfigs: Record<string, McpSdkServerConfig>,
|
|
sendMcpMessage: (
|
|
serverName: string,
|
|
message: JSONRPCMessage,
|
|
) => Promise<JSONRPCMessage>,
|
|
): Promise<{
|
|
clients: MCPServerConnection[]
|
|
tools: Tool[]
|
|
}> {
|
|
const clients: MCPServerConnection[] = []
|
|
const tools: Tool[] = []
|
|
|
|
// Connect to all servers in parallel
|
|
const results = await Promise.allSettled(
|
|
Object.entries(sdkMcpConfigs).map(async ([name, config]) => {
|
|
const transport = new SdkControlClientTransport(name, sendMcpMessage)
|
|
|
|
const client = new Client(
|
|
{
|
|
name: 'claude-code',
|
|
title: 'Claude Code',
|
|
version: MACRO.VERSION ?? 'unknown',
|
|
description: "Anthropic's agentic coding tool",
|
|
websiteUrl: PRODUCT_URL,
|
|
},
|
|
{
|
|
capabilities: {},
|
|
},
|
|
)
|
|
|
|
try {
|
|
// Connect the client
|
|
await client.connect(transport)
|
|
|
|
// Get capabilities from the server
|
|
const capabilities = client.getServerCapabilities()
|
|
|
|
// Create the connected client object
|
|
const connectedClient: MCPServerConnection = {
|
|
type: 'connected',
|
|
name,
|
|
capabilities: capabilities || {},
|
|
client,
|
|
config: { ...config, scope: 'dynamic' as const },
|
|
cleanup: async () => {
|
|
await client.close()
|
|
},
|
|
}
|
|
|
|
// Fetch tools if the server has them
|
|
const serverTools: Tool[] = []
|
|
if (capabilities?.tools) {
|
|
const sdkTools = await fetchToolsForClient(connectedClient)
|
|
serverTools.push(...sdkTools)
|
|
}
|
|
|
|
return {
|
|
client: connectedClient,
|
|
tools: serverTools,
|
|
}
|
|
} catch (error) {
|
|
// If connection fails, return failed server
|
|
logMCPError(name, `Failed to connect SDK MCP server: ${error}`)
|
|
return {
|
|
client: {
|
|
type: 'failed' as const,
|
|
name,
|
|
config: { ...config, scope: 'user' as const },
|
|
},
|
|
tools: [],
|
|
}
|
|
}
|
|
}),
|
|
)
|
|
|
|
// Process results and collect clients and tools
|
|
for (const result of results) {
|
|
if (result.status === 'fulfilled') {
|
|
clients.push(result.value.client)
|
|
tools.push(...result.value.tools)
|
|
}
|
|
// If rejected (unexpected), the error was already logged inside the promise
|
|
}
|
|
|
|
return { clients, tools }
|
|
}
|