diff --git a/apps/sim/app/api/mcp/copilot/route.ts b/apps/sim/app/api/mcp/copilot/route.ts index 6ae73c4126d..93e24c23086 100644 --- a/apps/sim/app/api/mcp/copilot/route.ts +++ b/apps/sim/app/api/mcp/copilot/route.ts @@ -1,5 +1,5 @@ import { Server } from '@modelcontextprotocol/sdk/server/index.js' -import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js' +import { WebStandardStreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js' import { CallToolRequestSchema, type CallToolResult, @@ -166,16 +166,6 @@ function createError(id: RequestId, code: ErrorCode | number, message: string): } } -function normalizeRequestHeaders(request: NextRequest): HeaderMap { - const headers: HeaderMap = {} - - request.headers.forEach((value, key) => { - headers[key.toLowerCase()] = value - }) - - return headers -} - function readHeader(headers: HeaderMap | undefined, name: string): string | undefined { if (!headers) return undefined const value = headers[name.toLowerCase()] @@ -185,190 +175,6 @@ function readHeader(headers: HeaderMap | undefined, name: string): string | unde return value } -class NextResponseCapture { - private _status = 200 - private _headers = new Headers() - private _controller: ReadableStreamDefaultController | null = null - private _pendingChunks: Uint8Array[] = [] - private _closeHandlers: Array<() => void> = [] - private _errorHandlers: Array<(error: Error) => void> = [] - private _headersWritten = false - private _ended = false - private _headersPromise: Promise - private _resolveHeaders: (() => void) | null = null - private _endedPromise: Promise - private _resolveEnded: (() => void) | null = null - readonly readable: ReadableStream - - constructor() { - this._headersPromise = new Promise((resolve) => { - this._resolveHeaders = resolve - }) - - this._endedPromise = new Promise((resolve) => { - this._resolveEnded = resolve - }) - - this.readable = new ReadableStream({ - start: (controller) => { - this._controller = controller - if (this._pendingChunks.length > 0) { - for (const chunk of this._pendingChunks) { - controller.enqueue(chunk) - } - this._pendingChunks = [] - } - }, - cancel: () => { - this._ended = true - this._resolveEnded?.() - this.triggerCloseHandlers() - }, - }) - } - - private markHeadersWritten(): void { - if (this._headersWritten) return - this._headersWritten = true - this._resolveHeaders?.() - } - - private triggerCloseHandlers(): void { - for (const handler of this._closeHandlers) { - try { - handler() - } catch (error) { - this.triggerErrorHandlers(toError(error)) - } - } - } - - private triggerErrorHandlers(error: Error): void { - for (const errorHandler of this._errorHandlers) { - errorHandler(error) - } - } - - private normalizeChunk(chunk: unknown): Uint8Array | null { - if (typeof chunk === 'string') { - return new TextEncoder().encode(chunk) - } - - if (chunk instanceof Uint8Array) { - return chunk - } - - if (chunk === undefined || chunk === null) { - return null - } - - return new TextEncoder().encode(String(chunk)) - } - - writeHead(status: number, headers?: Record): this { - this._status = status - - if (headers) { - Object.entries(headers).forEach(([key, value]) => { - if (Array.isArray(value)) { - this._headers.set(key, value.join(', ')) - } else { - this._headers.set(key, String(value)) - } - }) - } - - this.markHeadersWritten() - return this - } - - flushHeaders(): this { - this.markHeadersWritten() - return this - } - - write(chunk: unknown): boolean { - const normalized = this.normalizeChunk(chunk) - if (!normalized) return true - - this.markHeadersWritten() - - if (this._controller) { - try { - this._controller.enqueue(normalized) - } catch (error) { - this.triggerErrorHandlers(toError(error)) - } - } else { - this._pendingChunks.push(normalized) - } - - return true - } - - end(chunk?: unknown): this { - if (chunk !== undefined) this.write(chunk) - this.markHeadersWritten() - if (this._ended) return this - - this._ended = true - this._resolveEnded?.() - - if (this._controller) { - try { - this._controller.close() - } catch (error) { - this.triggerErrorHandlers(toError(error)) - } - } - - this.triggerCloseHandlers() - - return this - } - - async waitForHeaders(timeoutMs = 30000): Promise { - if (this._headersWritten) return - - await Promise.race([ - this._headersPromise, - new Promise((resolve) => { - setTimeout(resolve, timeoutMs) - }), - ]) - } - - async waitForEnd(timeoutMs = 30000): Promise { - if (this._ended) return - - await Promise.race([ - this._endedPromise, - new Promise((resolve) => { - setTimeout(resolve, timeoutMs) - }), - ]) - } - - on(event: 'close' | 'error', handler: (() => void) | ((error: Error) => void)): this { - if (event === 'close') { - this._closeHandlers.push(handler as () => void) - } - - if (event === 'error') { - this._errorHandlers.push(handler as (error: Error) => void) - } - - return this - } - - toNextResponse(): NextResponse { - return new NextResponse(this.readable, { - status: this._status, - headers: this._headers, - }) - } -} - function buildMcpServer(abortSignal?: AbortSignal): Server { const server = new Server( { @@ -503,29 +309,17 @@ function buildMcpServer(abortSignal?: AbortSignal): Server { async function handleMcpRequestWithSdk( request: NextRequest, parsedBody: unknown -): Promise { +): Promise { const server = buildMcpServer(request.signal) - const transport = new StreamableHTTPServerTransport({ + const transport = new WebStandardStreamableHTTPServerTransport({ sessionIdGenerator: undefined, enableJsonResponse: true, }) - const responseCapture = new NextResponseCapture() - const requestAdapter = { - method: request.method, - headers: normalizeRequestHeaders(request), - } - await server.connect(transport) try { - await transport.handleRequest(requestAdapter as any, responseCapture as any, parsedBody) - await responseCapture.waitForHeaders() - // Must exceed the longest possible tool execution. - // Using ORCHESTRATION_TIMEOUT_MS + 60 s buffer so the orchestrator can - // finish or time-out on its own before the transport is torn down. - await responseCapture.waitForEnd(ORCHESTRATION_TIMEOUT_MS + 60_000) - return responseCapture.toNextResponse() + return await transport.handleRequest(request, { parsedBody }) } finally { await server.close().catch(() => {}) await transport.close().catch(() => {}) @@ -567,6 +361,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => { return await handleMcpRequestWithSdk(request, parsedBody) } catch (error) { + if (request.signal.aborted || (error as Error)?.name === 'AbortError') { + return NextResponse.json( + createError(0, ErrorCode.ConnectionClosed, 'Client cancelled request'), + { status: 499 } + ) + } + logger.error('Error handling MCP request', { error }) return NextResponse.json(createError(0, ErrorCode.InternalError, 'Internal error'), { status: 500,