会员登录 - 用户注册 - 设为首页 - 加入收藏 - 网站地图 一文详解MCP传输机制!
当前位置:首页 > 男性 > 一文详解MCP传输机制 正文

一文详解MCP传输机制

时间:2025-05-22 07:47:19 来源:锐评时讯 作者:新闻 阅读:550次

来历:艾逗笔。

介绍 MCP 传输机制。

MCP 传输机制(Transport)是 MCP 客户端与 MCP 服务器。通讯。的一个桥梁,界说了客户端与服务器通讯的细节,协助客户端和服务器交流音讯。

MCP 协议运用 JSON-RPC 来编码音讯。JSON-RPC 音讯有必要运用 U。TF。-8 编码。

MCP 协议现在界说了三种传输机制用于客户端-服务器通讯:

stdio:经过规范输入和规范输出进行通讯。

SSE:经过 HTTP 进行通讯,支撑流式传输。(协议版别 2024-11-05 开端支撑,行将抛弃)。

Streamble HTTP:经过 HTTP 进行通讯,支撑流式传输。(协议版别 2025-03-26 开端支撑,用于代替 SSE)。

MCP 协议要求客户端应尽或许支撑 stdio。

MCP 协议的传输机制是可插拔的,也便是说,客户端和服务器不约束于 MCP 协议规范界说的这几种传输机制,也可以经过自界说的传输机制来完结通讯。

stdio 传输。

stdio 即 standard input & output(规范输入 / 输出)。

是 MCP 协议引荐运用的一种传输机制,首要用于本地进程通讯。

在 stdio 传输中:

客户端以子进程的办法发动 MCP 服务器。

服务器从其规范输入(stdin)读取 JSON-RPC 音讯,并将音讯发送到其规范输出(stdout)。

音讯或许是单个 JSON-RPC 恳求、告诉、呼应,或许包括多个恳求、告诉、呼应的 JSON-RPC 批处理。

音讯由换行符分隔,且不得包括嵌套的换行符。

服务器可以将其 UTF-8 字符串写入规范过错(stderr)以进行日志记载。客户端可以捕获、转发或疏忽此日志。

服务器不得向 stdout 写入无效的 MCP 音讯内容。

客户端不得向服务器的 stdin 写入无效的 MCP 音讯内容。

stdio 通讯流程。

客户端以子进程的办法发动服务器。

客户端往服务器的 stdin 写入音讯。

服务器从本身的 stdin 读取音讯。

服务端往本身的 stdout 写入音讯。

客户端从服务器的 stdout 读取音讯。

客户端停止子进程,封闭服务器的 stdin。

服务器封闭本身的 stdout。

stdio 传输完结。

参阅 MCP 官方的typescript-。sd。k来看 stdio 传输机制是怎么完结的:

发动 MCP 服务器。

以指令行的办法,在本地发动 MCP 服务器:

npx -y mcp-server-。ti。me。

创立 stdio 通讯管道。

MCP 服务器发动时,会创立 stdio 通讯管道(。pi。peline),用于跟 MCP 客户端进行音讯通讯。在 MCP 客户端发送封闭。信号。,或许 MCP 服务器反常退出之前,这个通讯管道会一向坚持,常驻进程。

exportclassStdioServerTransportimplementsTransport {priva。te。_re。ad。Buffer: ReadBuffer =newReadBuffer();private_started =false;constructor( private_stdin: Readable = process.stdin, private_stdout: Writable = process.stdout) {} onclose?:()=>void; onerror?:(error:Error) =>void; onmessage?:(message: JSONRPCMessage) =>void;}。

从 stdin 读取恳求音讯。

MCP 客户端把音讯发到通讯管道。MCP 服务器经过规范输入 stdin 读取客户端发送的音讯,以换行符: 作为读取完结标识。

MCP 服务器读取到的有用音讯,是 JSON-RPC 编码的结构体。

readMessage(): JSONRPCMessage |null{ if(!this._buffer) {  returnnull;  } constindex =this._buffer.indexOf(""); if(index ===-1) {  returnnull;  } constline =this._buffer.toString("utf8",0, index).repl。ac。e(/$/,''); this._buffer =this._buffer.。sub。array(index +1); returndeserializeMessage(line);}。

往 stdout 写入呼应音讯。

MCP 服务器运转完内部逻辑,需求给 MCP 客户端呼应音讯。

MCP 服务器先用 JSON-RPC 编码音讯,再把音讯写入规范输出 stdout。

send(message: JSONRPCMessage):Promise。{ returnnewPromise((resolve) =>{  constjson = serializeMessage(message);  if(this._stdout.write(json)) {    resolve();   }else{   this._stdout.once("drain", resolve);   }  }); }}。

MCP 客户端从 MCP 服务器的规范输出 stdout 读取音讯,取得 MCP 服务器的呼应内容。

封闭 stdio 通讯管道。

MCP 客户端退出,给 MCP 服务器发送封闭信号。

MCP 服务器经过 stdio 通讯管道读到客户端发送的停止信号,或许内部运转过错,自动封闭 stdio 通讯管道。

stdio 通讯管道封闭之后,MCP 客户端与 MCP 服务器之间不能再彼此发送音讯,直到再次树立 stdio 通讯管道。

asyncclose():Promise。{ // Remove our event listene。rs。first this._stdin.off("data",this._ondata); this._stdin.off("error",this._onerror); // Check if we were the only data listener constremainingDataListeners =this._stdin.listenerCount('data'); if(remainingDataListeners ===0) {  // Only pause stdin if we were the only listener  // This prevents interfering with other parts of the application that might be using stdin  this._stdin.pause();  } // Clear the buffer and notify closure this._readBuffer.clear(); this.onclose?.();}。

stdio 传输的利害。

stdio 传输机制首要依托本地进程通讯完结。

首要的优势是:

无外部依托,完结简略。

无网络传输,通讯速度快。

本地通讯,安全性高。

也有一些约束性:

单进程通讯,无法并行处理多个客户端恳求。

进程通讯的资源开支大,很难在本地运转十分多的服务。

stdio 传输的适用场景。

stdio 传输适用于要操作的数据资源坐落本地计算机,且不期望露出外部拜访的场景。

比方,你期望经过一个谈天客户端,来总结你的微信音讯,微信音讯文件存储在你的本地。电脑。,外部拜访不了,也不应该拜访。

这种状况,你可以完结一个 MCP 服务器来读取你电脑上的微信音讯文件,经过 stdio 传输接纳 MCP 客户端的拜访恳求。

假如你要拜访的是一个长途服务器上的文件,也可以运用 stdio 传输,流程会杂乱一些:

先写一个 API 服务,布置在长途服务器,操作长途服务器上的资源,露出公网拜访。

写一个 MCP 服务器,对接长途 API,再经过 stdio 传输与客户端本地通讯。

已然 stdio 传输拜访长途资源这么费事,是不是应该有一种更合适长途资源拜访的传输机制?

当然有。可以运用 SSE 传输。

SSE 传输。

MCP 协议运用 SSE(Server-Sent Events) 传输来处理长途资源拜访的问题。底层是依据 HTTP 通讯,经过相似 API 的办法,让 MCP 客户端直接拜访长途资源,而不必经过 stdio 传输做中转。

在 SSE 传输中,服务器作为一个独立进程运转,可以处理多个客户端衔接。

服务器有必要供给两个端点:

一个 SSE 端点,供客户端树立衔接并从服务器接纳音讯。

一个惯例 HTTP POST 端点,供客户端向服务器发送音讯。

当客户端衔接时,服务器有必要发送一个包括客户端用于发送音讯的 URL 的端点事情。一切后续客户端音讯有必要作为 HTTP POST 恳求发送到该端点。

服务器音讯作为 SSE 音讯事情发送,音讯内容以 JSON 格局编码在事情数据中。

SSE 通讯流程。

客户端向服务器的 /sse 端点发送恳求(一般是 GET 恳求),树立 SSE 衔接。

服务器给客户端回来一个包括音讯端点地址的事情音讯。

客户端给音讯端点发送音讯。

服务器给客户端呼应音讯已接纳状况码。

服务器给两边树立的 SSE 衔接推送事情音讯。

客户端从 SSE 衔接读取服务器发送的事情音讯。

客户端封闭 SSE 衔接。

SSE 传输完结。

参阅 MCP 官方的typescript-sdk来看 SSE 传输机制是怎么完结的:

发动 MCP 服务器。

系统办理员在长途服务器(也可以是本地电脑)输入指令,发动 MCP 服务器,监听服务器。端口。,对外露出 HTTP。 接口。

constserver =newMcpServer({ name:"example-server", version:"1.0.0",});constapp = express();app.get("/sse",async(_: Request, res: Response) =>{consttransport =newSSEServerTransport("/messages", res);aw。ai。tserver.connect(transport);});app.post("/messages",async(req: Request, res: Response) =>{awaittransport.handlePostMessage(req, res);});app.listen(3001);

在这个示例中,运用了express结构,发动了一个 HTTP 服务,监听在 3001 端口,对外露出了两个端点:

/sse:GET 恳求,用于树立 SSE 衔接。

/messages:POST 恳求,用于接纳客户端发送的音讯。

解析一个 MCP 客户端可拜访的域名到 MCP 服务器。比方:abc.mcp.so。

树立 SSE 衔接。

MCP 客户端恳求 MCP 服务器的 URL 地址:https://abc.mcp.so:3001/sse与 MCP 服务器树立衔接,MCP 服务器需求给 MCP 客户端回来一个用于音讯通讯的地址:

res.writeHead(200, {"Content-Type":"text/event-stream","Cache-Control":"no-cache, no-transform", Connection:"keep-alive",});constmessagesUrl ="https://abc.mcp.so:3001/messages?sessionId=xxx";res.write(`event: endpointdata:${messagesUrl}`);

MCP 客户端从 MCP 服务器回来的 endpoint 事情中得到了音讯通讯的地址,与 MCP 服务器树立 SSE 衔接成功。

MCP 客户端经过 POST 恳求把音讯发到这个音讯通讯地址,与 MCP 服务器进行音讯交互。

音讯交互。

MCP 客户端在与 MCP 服务器树立 SSE 衔接之后,开端给 MCP 服务器回来的通讯地址发送音讯。

MCP 协议中的 SSE 传输是双通道呼应机制。也便是说,MCP 服务器在接纳到 MCP 客户端的音讯之后,既要给当时的恳求回复一个呼应,也要给之前树立的 SSE 衔接发送一条呼应音讯。(告诉类型的音讯,不需求给 SSE 衔接发音讯)。

举个比方,MCP 客户端与 MCP 服务器树立 SSE 衔接之后,给 MCP 服务器发送的第一条音讯,用于初始化阶段做才能洽谈。

MCP 客户端恳求示例:

curl -X POST https://abc.mcp.so/messages?sessionId=xxx -H"Content-Type: application/json"-d '{"jsonrpc":"2.0","id":"1","method":"initialize","pa。ram。s": { "protocolVersion":"1.0", "capabilities": {}, "clientInfo": {  "name":"mcp-client",  "version":"1.0.0"  } }}'。

MCP 服务器从 HTTP 恳求体里边读取 MCP 客户端发送的音讯:

asynchandlePostMessage( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown,):Promise。{if(!this._sseResponse) { constmessage ="SSE connection not established";  res.writeHead(500).end(message); thrownewError(message); }letbody:string| unknown;try{ constct = contentType.parse(req.headers["content-type"] ??""); if(ct.type !=="application/json") {  thrownewError(`Unsupported content-type:${ct}`);  }  body = parsedBody ??awaitgetRawBody(req, {   limit:。 MAXIM。UM_MESSAGE_SIZE,   encoding: ct.parameters.charset ??"utf-8",  }); }catch(error) {  res.writeHead(400).end(String(error)); this.onerror?.(errorasError); return; }try{ awaitthis.handleMessage(typeofbody ==='string'?JSON.parse(body) : body); }catch{  res.writeHead(400).end(`Invalid message:${body}`); return; } res.writeHead(202).end("Accepted");}。

先给当时恳求,呼应一个 HTTP 202 状况码,奉告 MCP 客户端,恳求已收到。

然后 MCP 服务器运转内部逻辑,完结事务功用。

再给之前与 MCP 客户端树立的 SSE 衔接发送一个事情音讯,data 里边放 JSON-RPC 编码的音讯内容:

asyncsend(message: JSONRPCMessage):Promise。{if(!this._sseResponse) { thrownewError("Not connected"); }this._sseResponse.write( `event: messagedata:${JSON.stringify(message)}`, );}。

MCP 客户端依据 MCP 服务器同步呼应的 2** 状况码,判别 MCP 服务器现已接到恳求,并开端读取 MCP 服务器发到 SSE 衔接的音讯内容。

MCP 客户端从与 MCP 服务器树立的 SSE 衔接中读取event: message事情音讯,取得 MCP 服务器发送的事务数据data。

MCP 客户端与 MCP 服务器树立的 SSE 衔接,应该是 1:1 的。为了防止串数据的问题,在树立 SSE 衔接阶段,MCP 服务器回来的通讯地址,应该为当时衔接分配一个仅有标识,叫做sessionId,给 MCP 客户端回来的通讯地址带上这个标识,比方/messages?sessionId=xxx。

在音讯交互阶段,MCP 服务器依据 MCP 客户端恳求地址参数里边的 sessionId,找到之前树立的 SSE 衔接,并只给这个 SSE 衔接发送音讯。

断开 SSE 衔接。

MCP 服务器与 MCP 客户端两边都或许会自动断开 SSE 衔接。

还坚持衔接的一方,应该加上必要的衔接检测和超时封闭机制。

比方经过 SSE 衔接,给对方守时发送一条心跳检测音讯,假如屡次无呼应,可以认作对方已断开衔接,此刻可以自动封闭 SSE 衔接,防止资源走漏。

一个用 go 完结的心跳检测和超时封闭示例:

// Setup heartbeat tickerheartbeatInterval :=30* time.SecondheartbeatTicker := time.NewTicker(heartbeatInterval)deferheartbeatTicker.Stop()// Setup idle timeoutidleTimeout :=5* time.MinuteidleTimer := time.NewTimer(idleTimeout)deferidleTimer.Stop()gofunc(){for{ select{ case。<-session.Done():      return    case <-heartbeatTicker.C:      // Send heartbeat      if err := writer.SendHeartbeat(); err != nil {        session.Close()        return     }    case <-idleTimer.C:      // Close connection due to inactivity      session.Close()      return    }  }}()

SSE 安全防护。

当运用 SSE 传输时,服务器一方需求完结一些必要的安全防护办法:

服务器有必要验证一切传入衔接的 Origin 头,以防止 DNS 重绑定进犯。

在本地运转时,服务器应仅绑定到 localhost(127.0.0.1),而不是一切网络接口(0.0.0.0)。

服务器应对一切衔接施行恰当的身份验证。

假如没有这些保护办法,进犯者或许会运用 DNS 重绑定从长途网站与本地 MCP 服务器交互。

SSE 传输的适用场景。

SSE 传输适用于 MCP 客户端与 MCP 服务器不在同一个网络下的通讯场景。

比方,你期望在本地电脑,经过对话的办法,查询你云服务器上的数据库。你就可以在你的云服务器上布置一个 MCP 服务器,去读取数据库,再跟你本地电脑上的 MCP 客户端树立衔接通讯。

当然,一切用 SSE 传输完结的 MCP 服务器,理论上都可以经过 stdio 传输 + API 的办法完结。

差异在于:

用 SSE 传输,MCP 客户端直接与 MCP 服务器通讯,而不必经过本地的 stdio 传输调用 API 进行中转。

用 SSE 传输,在 MCP 客户端只需求一个 URL 即可接入,对本地环境无要求,也无需在本地运转 MCP 服务器,用户侧的运用门槛更低。

SSE 传输的利害。

SSE 传输首要处理长途资源拜访的问题,依托 HTTP 协议完结底层通讯。

SSE 传输的首要优势:

支撑长途资源拜访,让 MCP 客户端可以直接拜访长途服务,处理了 stdio 传输仅适用于本地资源的约束。

依据规范 HTTP 协议完结,兼容性好,便于与现有 Web 根底设施集成。

服务器可作为独立进程运转,支撑处理多个客户端衔接。

比较 WebSocket 完结简略,是一般 HTTP 的扩展,不需求协议晋级。

SSE 传输的首要下风与问题:

衔接不安稳:在无服务器(serverless)环境中,SSE 衔接会随机、频频断开,影响 AI 署理需求的牢靠耐久衔接。

扩展性应战:SSE 不是为云原生架构规划的,在扩展渠道时会遇到瓶颈。

浏览器衔接约束:每个浏览器和域名的最大翻开衔接数很低(6 个),当用户翻开多个。标签。页时会出现问题。

署理和防火墙问题:某些署理和防火墙会由于短少 Content-Length 头而阻挠 SSE 衔接,在企业环境布置时形成应战。

杂乱的双通道呼应机制:MCP 中的 SSE 完结要求服务器在接纳客户端音讯后,既要给当时恳求呼应,也要给之前树立的 SSE 衔接发送呼应音讯。

无法支撑长期的无服务器布置:无服务器架构一般自动扩缩容,不合适长期衔接,而 SSE 需求坚持耐久衔接。

需求很多会话办理:需求为每个 SSE 衔接分配仅有标识(sessionId)来防止数据混杂,增加了完结杂乱度。

需求额定的衔接检测和超时封闭机制:需求完结心跳检测和超时机制来防止资源走漏。

正由于这些问题,MCP 协议现已引进了新的 Streamable HTTP 传输机制(2025-03-26 版别)来代替 SSE,并计划抛弃 SSE 传输。新的传输机制保存了 HTTP 的根底,但支撑更灵敏的衔接办法,更合适现代云架构和无服务器环境。

Streamable HTTP 传输。

Streamable HTTP 传输是 MCP 协议在 2025-03-26 版别中引进的新传输机制,用于代替之前的 SSE 传输。

在 Streamable HTTP 传输中,服务器作为一个独立进程运转,可以处理多个客户端衔接。此传输运用 HTTP POST 和 GET 恳求,服务器可以挑选运用服务器发送事情(SSE)来流式传输多个服务器音讯。

服务器有必要供给一个一起支撑 POST 和 GET 办法的单个 HTTP 端点。例如:https://xyz.mcp.so/mcp。

Streamable HTTP 通讯流程。

客户端给服务器的通讯端点发音讯。

服务器给客户端呼应音讯。

客户端依据服务器的呼应类型,持续给服务器发音讯。

服务器持续呼应客户端音讯。

跟 SSE 传输不同的点在于,Streamable HTTP 传输中,客户端与服务器的音讯交互,基本上是“一来一回”的(单通道呼应)。而这个“一来一回”的音讯交互,或许会有很多种组合类型。

客户端发送 GET 恳求给服务器,服务器回来 SSE 衔接。

客户端 POST JSON-RPC 编码的音讯给服务器,服务器回来 JSON-RPC 编码的音讯呼应。

客户端 POST JSON-RPC 编码的音讯给服务器,服务器回来一个 SSE 衔接。

客户端给 SSE 衔接发音讯,服务器收到后给 SSE 衔接呼应音讯。

服务器呼应的音讯,或许包括状况标识:Mcp-Session-Id。

客户端发音讯时分需求带上状况标识:Mcp-Session-Id。

Streamable HTTP 传输完结。

参阅 MCP 官方的typescript-sdk来看 Streamable HTTP 传输机制是怎么完结的:

发动服务器。

跟 SSE 传输机制相同,Streamable HTTP 传输本质上也是依据 HTTP 协议通讯,需求先发动一个 HTTP 服务:

constserver =newMcpServer({ name:"example-server", version:"1.0.0",});constapp = express();app.all("/mcp",async(req: Request, res: Response) =>{consttransport =newStreamableHTTPServerTransport();awaitserver.connect(transport);awaittransport.handleMessage(req, res);});app.listen(3002);

跟 SSE 传输不相同的点在于,Streamable HTTP 传输只需求露出一个端点(endpoint),来接纳各种类型的客户端恳求(GET / POST / DELETE)。

比方在这个比方,运用express结构,发动了一个 HTTP 服务,监听在 3002 端口,对外露出了一个端点:

/mcp:接纳客户端树立衔接、交流音讯的恳求。

解析一个 MCP 客户端可拜访的域名到 MCP 服务器。比方:xyz.mcp.so。

音讯交互。

在 MCP 服务器发动成功之后,MCP 客户端可以直接给 MCP 服务器露出的地址发送音讯。

跟 SSE 传输不同,Streamable HTTP 传输机制中,MCP 客户端给服务器发送音讯,无需先跟一个端点树立 SSE 衔接,再给另一个端点发音讯。而是单通道方法,即客户端给服务器发音讯,直接取得服务器的呼应内容。

MCP 客户端可以运用 GET 或许 POST 恳求给服务器发音讯,每个恳求有必要设置恳求头Accept,传递以下两个值:

application/json 接纳服务器呼应的 JSON-RPC 编码音讯。

text/event-stream 由服务器敞开流式传输通道,客户端从这个流里边读取事情音讯。

MCP 客户端恳求示例:

curl -X POST https://xyz.mcp.so/mcp -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" -d '{ "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": {  "protocolVersion": "1.0",  "capabilities": {},  "clientInfo": {   "name": "mcp-client",   "version": "1.0.0"  } }}'。

Streamable HTTP 传输机制下,MCP 客户端与服务器通讯的几个关键:

客户端可以给服务器发送不包括恳求体的 GET 恳求,用于树立 SSE 衔接,让服务器可以自动给客户端先发音讯。

客户端给服务器发送 JSON-RPC 音讯的状况,有必要运用 POST 恳求。

服务器接到客户端的 GET 恳求时,要么回来Contet-Type: text/event-stream敞开 SSE 衔接,要么回来 HTTP 405 状况码,表明不支撑 SSE 衔接。

服务器接到客户端的 POST 恳求时,从恳求体里边读取 JSON-RPC 音讯,假如是告诉音讯,就呼应 HTTP 202 状况码,表明音讯已收到。假如对错告诉音讯,服务器可以挑选回来Content-Type: text/event-stream敞开 SSE 传输,或许回来Content-Type: application/json同步呼应一条 JSON-RPC 音讯。

会话坚持。

Streamable HTTP 传输既支撑无状况的恳求:每一次恳求都是独立的,无需记载状况。

也支撑有状况的恳求:一次新的恳求,或许需求同步之前的恳求 / 呼应信息作为参阅。这种状况叫做:会话坚持。

假如需求坚持会话,MCP 服务器与 MCP 客户端之间的交互应该恪守以下准则:

运用 Streamable HTTP 传输的服务器可以在初始化时分配一个会话 ID,办法是在包括InitializeResult的 HTTP 呼应中包括它,放在Mcp-Session-Id头中。

假如服务器在初始化期间回来了Mcp-Session-Id,运用 Streamable HTTP 传输的客户端有必要在一切后续的 HTTP 恳求中在Mcp-Session-Id头中包括它。

服务器可以随时停止会话,之后它有必要运用 HTTP 404 Not Found 呼应包括该会话 ID 的恳求。

当客户端收到对包括Mcp-Session-Id的恳求的 HTTP 404 呼应时,它有必要经过发送一个不带会话 ID 的新InitializeRequest来发动一个新会话。

不再需求特定会话的客户端应该发送一个带有Mcp-Session-Id头的 HTTP DELETE 到 MCP 端点,以显式停止会话。

MCP 服务器验证会话的一个示例:

/*** Validates session ID for non-initialization requests* Returns true if the session is valid, false otherwise*/privatevalidateSession(req: IncomingMessage, res: ServerResponse):boolean{if(!this._initialized) { // If the server has not been initialized yet, reject all requests  res.writeHead(400).end(JSON.stringify({   jsonrpc:"2.0",   error: {    code:-32000,    message:"Bad Request: Server not initialized"   },   id:null  })); returnfalse; }if(this.sessionId ===undefined) { // If the session ID is not set, the session management is disabled // and we don't need to validate the session ID returntrue; }constsessionId = req.headers["mcp-session-id"];if(!sessionId) { // Non-initialization requests without a session ID should return 400 Bad Request  res.writeHead(400).end(JSON.stringify({   jsonrpc:"2.0",   error: {    code:-32000,    message:"Bad Request: Mcp-Session-Id header is required"   },   id:null  })); returnfalse; }elseif(Array.isArray(sessionId)) {  res.writeHead(400).end(JSON.stringify({   jsonrpc:"2.0",   error: {    code:-32000,    message:"Bad Request: Mcp-Session-Id header must be a single value"   },   id:null  })); returnfalse; }elseif(sessionId !==this.sessionId) { // Reject requests with invalid session ID with 404 Not Found  res.writeHead(404).end(JSON.stringify({   jsonrpc:"2.0",   error: {    code:-32001,    message:"Session not found"   },   id:null  })); returnfalse; }returntrue;}。

衔接断开与重连。

Streamable HTTP 传输,假如客户端与服务器运用 SSE 衔接通讯,断开衔接的办法跟 SSE 传输断开衔接的办法一起。

可以由衔接的恣意一方自动断开衔接。还坚持着衔接的一方,需求完结心跳检测和超时机制,以便能及时封闭衔接,防止资源走漏。

Streamable HTTP 传输比起 SSE 传输,做了一些改善,支撑康复已中止的衔接,从头发送或许丢掉的音讯:

服务器可以在其 SSE 事情中附加一个 ID 字段。假如存在,ID 有必要在一切会话一切流中大局仅有。

假如客户端期望在断开衔接后康复,它应该向服务器宣布 HTTP GET 恳求,并包括 Last-Event-ID 头,奉告服务器它接纳到的最终一个事情 ID。服务器可以重放在最终一个事情 ID 之后将发送的音讯,并从该点康复流。

支撑断点重连的 Streamable HTTP 传输,在音讯传输方面会比 SSE 传输愈加牢靠。

Streamable HTTP 传输的利害。

Streamable HTTP 传输机制结合了 SSE 传输的长途拜访才能和无状况 HTTP 的灵敏性,一起处理了 SSE 传输中的许多问题。

首要优势:

兼容无服务器环境,可以在短衔接方法下作业。

灵敏的衔接方法,支撑简略的恳求-呼应和流式传输。

会话办理愈加规范化和明晰。

支撑断开衔接康复和音讯重传。

保存了 SSE 的流式传输才能,一起处理了其安稳性问题。

向后兼容,可以支撑旧版客户端和服务器。

首要下风:

比较单纯的 stdio 传输完结杂乱度更高。

仍需处理网络衔接断开和康复的逻辑。

会话办理需求服务器引进额定的组件(比方用 Redis 来存储 Session)。

Streamable HTTP 传输的适用场景。

Streamable HTTP 传输适用于:

需求长途拜访服务的场景,特别是云环境和无服务器架构。

需求支撑流式输出的 AI 服务。

需求服务器自动推送音讯给客户端的场景。

大规模布置需求高牢靠性和可扩展性的服务。

需求在不安稳网络环境中坚持牢靠通讯的场景。

与 SSE 传输比较,Streamable HTTP 传输是一个更全面、更灵敏的处理计划,特别合适现代云原生运用和无服务器环境。

自界说传输。

MCP 客户端和 MCP 服务器可以完结额定的自界说传输机制以满意其特定需求。MCP 协议与传输无关,可以在支撑双向音讯交流的任何通讯通道上完结。

挑选支撑自界说传输的完结者有必要保证他们保存由 MCP 界说的 JSON-RPC 音讯格局和生命周期要求。自界说传输应记载其特定的衔接树立和音讯交流方法,以完结互操作性。

用一个实践的比方来阐明,怎么完结一个自界说传输,来满意特定的需求。

需求剖析。

现在市面上大部分的 MCP 服务器是运用 stdio 和 SSE 传输机制完结的,用户要运用这些 MCP 服务器,需求拉代码到本地运转,运用门槛有点高。

咱们期望完结一个 MCP 署理服务,在云上布置,让用户仅需求装备一个 URL 即可接入。由这个云端布置的 MCP 署理去对接第三方的 MCP 服务器。

大致的流程是:

这个 MCP 署理服务以 HTTP 的办法供给接入,需求支撑并发调用。这个 MCP 署理服务作为 stdio 传输或许 SSE 传输的客户端,发送音讯给后台的 MCP 服务器。

依照这个计划,MCP 署理服务跟后台布置的第三方服务器之间的交互存在着一些问题:

假如后台对接的 MCP 服务器是依据 stdio 传输完结的,MCP 署理每接到一个用户恳求,都需求调用 MCP 服务器创立一个独立的 stdio 通讯进程。服务器开支特别大(进程创立和毁掉、上下文切换,内存阻隔等操作,都很耗费服务器资源)。

假如后台对接的 MCP 服务器是依据 SSE 传输完结的,早年面临 SSE 传输的剖析咱们知道,MCP 署理需求跟后台布置的 MCP 服务器之间坚持一个 SSE 衔接。假如每个用户恳求都需求坚持一个 SSE 的长衔接,对服务器也是不小的压力。

为了处理这些问题,关于 MCP 署理和其背面衔接的 MCP 服务器,可以想到的一些优化计划:

用 k8s 集群分布式布置,代替单机布置,坚持 MCP 署理的弹性扩容才能,支撑更高的并发恳求。

修正第三方服务器的传输机制,假如是 stdio 进程通讯,可以改成轻量级的协程通讯,或许运用 HTTP 通讯,支撑多路复用。

假如第三方服务器的传输机制是 SSE,需求把双通道呼应改成单通道呼应,而且不应该运用长衔接。

分布式网络下,会话坚持需求引进一些额定的组件(比方 redis)或许额定的战略(比方用负载均衡的 IP Hash 战略把恳求路由到固定的机器),假如 MCP 服务器不是有必要要用到会话坚持,那就最好改成无状况的传输。

依据以上几点剖析,咱们可以规划一套自界说的传输机制,来改造在分布式集群布置的第三方 MCP 服务器。

这套自界说的传输机制应该具有的几个特性:

依据 HTTP 通讯。

非流式传输。

无状况,无会话坚持。

短衔接。

尽管 Streamable HTTP 传输经过装备参数也能完结这些功用,可是咱们仍是期望能自界说一套更简略,更直接,零装备的传输机制。

咱们把这个新的传输机制取名为:Restful HTTP Transport,简称 Rest Transport。

由于要改造的首要是 MCP 服务器,接下来首要解说怎么完结一个服务器运用的 Rest Server Transport.。

完结 Rest Server Transport。

参阅 MCP 官方完结的 Streamable HTTP Transport,咱们来完结这个自界说的 Rest Server Transport。

界说新的传输类,完结 MCP 协议的 Transport 接口。

/*** Server transport for Synchronous HTTP: a stateless implementation for direct HTTP responses.* It supports concurrent requests with no streaming, no SSE, and no persistent connections.*/exportclassRestServerTransportimplementsTransport {// ...}。

界说发动服务办法,用来发动一个 HTTP 服务器,处理客户端的恳求。

/*** Start the HTTP server*/asyncstartServer():Promise。{if(this._server) { thrownewError("Server is already running"); }this._server = express();this._server.post(this._endpoint,(req, res) =>{ this.handleRequest(req, res, req.body); });returnnewPromise((resolve, reject) =>{ try{  this._httpServer =this._server!.listen(this._port,()=>{   console.log(    `Server is running on http://localhost:${this._port}${this._endpoint}`    );    resolve();   });  this._httpServer.on("error",(error) =>{   console.error("Server error:", error);   this.onerror?.(error);   });  }catch(error) {   reject(error);  } });}。

跟 Streamable HTTP 传输相同,自界说的传输在发动 HTTP 服务器之后,咱们也只露出一个端点来接纳客户端恳求,这个端点和 HTTP 服务器监听的端口,都可以在发动服务器的时分自界说。

exportinterfaceRestServerTransportOptions { endpoint?:string; port?:string|number;}。

接纳客户端恳求。

跟 Streamable HTTP 传输最首要的差异,咱们自界说的这个传输,仅支撑客户端建议 POST 恳求,客户端也只会收到application/json呼应,不会收到text/event-stream呼应。

客户端与服务器运用短衔接通讯,不会有Connection: "keep-alive"。服务器呼应完,与客户端的衔接就会断开。

客户端与服务器之间的音讯交互是无状况的,所以客户端无需传递Mcp-Session-Id,服务器也不会判别客户端的会话有用性,不会保护任何 session 相关的数据。

Rest Server Transport 处理用户恳求的首要完结逻辑:

/*** Handles an incoming HTTP request*/asynchandleRequest( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown):Promise。{if(req.method ==="POST") { awaitthis.handlePostRequest(req, res, parsedBody); }else{  res.writeHead(405).end(  JSON.stringify({    jsonrpc:"2.0",    error: {     code:-32000,     message:"Method not allowed",    },    id:null,   })  ); }}/*** Handles POST requests containing JSON-RPC messages*/privateasynchandlePostRequest( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown):Promise。{try{ // validate the Accept header constacceptHeader = req.headers.accept; if(   acceptHeader &&   acceptHeader !=="*/*"&&   !acceptHeader.includes("application/json")  ) {   res.writeHead(406).end(   JSON.stringify({     jsonrpc:"2.0",     error: {      code:-32000,      message:"Not Acceptable: Client must accept application/json",     },     id:null,    })   );  return;  } constct = req.headers["content-type"]; if(!ct || !ct.includes("application/json")) {   res.writeHead(415).end(   JSON.stringify({     jsonrpc:"2.0",     error: {      code:-32000,      message:      "Unsupported Media Type: Content-Type must be application/json",     },     id:null,    })   );  return;  } letrawMessage; if(parsedBody !==undefined) {   rawMessage = parsedBody;  }else{  constparsedCt = contentType.parse(ct);  constbody =awaitgetRawBody(req, {    limit: MAXIMUM_MESSAGE_SIZE,    encoding: parsedCt.parameters.charset ??"utf-8",   });   rawMessage =JSON.parse(body.toString());  } letmessages: JSONRPCMessage[]; // handle batch and single messages if(Array.isArray(rawMessage)) {   messages = rawMessage.map((msg) =>JSONRPCMessageSchema.parse(msg));  }else{   messages = [JSONRPCMessageSchema.parse(rawMessage)];  } // check if it contains requests consthasRequests = messages.some(  (msg) =>"method"inmsg &&"id"inmsg  ); consthasOnlyNotifications = messages.every(  (msg) =>"method"inmsg && !("id"inmsg)  ); if(hasOnlyNotifications) {  // if it only contains notifications, return 202   res.writeHead(202).end();  // handle each message  for(constmessage of messages) {   this.onmessage?.(message);   }  }elseif(hasRequests) {  // Create a unique identifier for this request batch  constrequestBatchId = randomUUID();  // Extract the request IDs that we need to collect responses for  constrequestIds = messages    .filter((msg) =>"method"inmsg &&"id"inmsg)    .map((msg) =>String(msg.id));  // Set up a promise that will be resolved with all the responses  constresponsePromise =newPromise。((resolve) =>{   this._pendingRequests.set(requestBatchId, {     resolve,     responseMessages: [],     requestIds,    });   });   //Processallmessages  for(constmessage of messages) {   this.onmessage?.(message);   }   //Waitforresponsesandsendthem  constresponses=awaitPromise.race([    responsePromise,   // 30 second timeout   newPromise。((resolve) =>setTimeout(() =>resolve([]), 30000)   ),   ]);   //Cleanupthependingrequest  this._pendingRequests.delete(requestBatchId);   //Setresponseheaders  constheaders:Record。= {    "Content-Type": "application/json",   };  res.writeHead(200, headers);   //FormattheresponseaccordingtoJSON-RPCspec  constresponseBody=responses.length=== 1 ?responses[0] :responses;  res.end(JSON.stringify(responseBody));  } }catch(error) {  //returnJSON-RPCformattederror res.writeHead(400).end(  JSON.stringify({    jsonrpc: "2.0",    error: {     code: -32700,     message: "Parse error",     data:String(error),    },    id:null,   }) ); this.onerror?.(errorasError); }}。

服务器发送音讯。

MCP 服务器接到客户端恳求后,把恳求数据发到内部完结的各个功用函数内,得到呼应内容后,用 JSON-RPC 编码,呼应给 MCP 客户端。

Rest Server Transport 发送音讯的首要完结逻辑:

asyncsend(message: JSONRPCMessage):Promise。{// Only process response messagesif(!("id"inmessage) || !("result"inmessage ||"error"inmessage)) { return; }constmessageId =String(message.id);// Find the pending request that is waiting for this responsefor(const[batchId, pendingRequest] ofthis._pendingRequests.entries()) { if(pendingRequest.requestIds.includes(messageId)) {  // Add this response to the collection   pendingRequest.responseMessages.push(message);  // If we've collected all responses for this batch, resolve the promise  if(    pendingRequest.responseMessages.length ===    pendingRequest.requestIds.length   ) {    pendingRequest.resolve(pendingRequest.responseMessages);   }  break;  } }}。

封闭服务。

Streamable HTTP / SSE 以及咱们自界说的 Rest Transport,封闭服务的逻辑都是一起的。

由于服务发动时是发动了一个 HTTP 服务器,所以服务封闭时,只需求封闭这个 HTTP 服务器即可。

Rest Server Transport 完结的封闭服务的首要逻辑:

asyncstopServer():Promise。{if(this._httpServer) { returnnewPromise((resolve, reject) =>{  this._httpServer!.close((err) =>{   if(err) {     reject(err);    return;    }   this._server =null;   this._httpServer =null;    resolve();   });  }); }}。

在完结完这个自界说的传输后,咱们就可以把代码打包,发布到 npm 公共库房。这样第三方 MCP 服务器都可以运用这个 Transport 来改造自己的完结。

比方,我用 typescript 完结的Rest Server Transport封装在chatmcp/sdk这个 npm 包里边,第三方服务器可以这样引进:

import{ RestServerTransport }from"chatmcp/sdk/server/index.js";

改造 MCP 服务器。

为了把第三方 MCP 服务器布置到云端,支撑多租户并发调用,咱们需求对第三方 MCP 服务器做必定的改造。

以Perplexity Ask Server这个 MCP 服务器为例,来演示详细的改造流程:

装置包括 Rest Server Transport 的 SDK。

npm install chatmcp/sdk。

获取服务发动参数。

经过chatmcp/sdk供给的getParamValue办法,可以取得 MCP 服务器发动时分的参数,从指令行读取,或许从环境变量读取:

import { getParamValue } from "chatmcp/sdk/utils/index.js";const perplexityApiKey = getParamValue("perplexity_api_key") || "";const mode = getParamValue("mode") || "stdio";const port = getParamValue("port") || 9593;const endpoint = getParamValue("endpoint") || "/rest";

获取恳求参数。

MCP 服务器的规划初衷,是让用户运转在本地,跟私有数据打交道。

所以现在绝大部分的 MCP 服务器,都是不支撑多租户的(也便是不能让多个用户一起运用)。

首要的约束在于,在 MCP 服务器内部的完结逻辑里边,用到鉴权参数的当地,是在服务器发动时获取的,不是在恳求时动态获取的。

假如要在云端布置 MCP 服务器,支撑多租户运用,咱们需求修正 MCP 服务器内部的参数获取逻辑,改成从恳求参数里边动态获取。

MCP 协议答应在每个恳求的 Request 目标经过 _meta 字段传递自界说的数据。

那么就可以改造 MCP 服务器,从 request.params._meta 里边获取客户端传递的 auth 参数。

在chatmcp/sdk中,完结了一个getAuthValue办法,可以让 MCP 服务器获取 MCP 客户端传递的鉴权参数。

const auth: any = request.params?._meta?.auth;

以这个Perplexity Ask Server为例,咱们把读取发动时参数,改成读取恳求时参数:

// before: get params from env and set as global params// after: get params from env or command line, set as global paramsimport{ getParamValue, getAuthValue }from"chatmcp/sdk/utils/index.js";constperplexityApiKey = getParamValue("perplexity_api_key") ||"";constmode = getParamValue("mode") ||"stdio";constport = getParamValue("port") ||9593;constendpoint = getParamValue("endpoint") ||"/rest";server.setRequestHandler(CallToolRequestSchema,async(request) =>{try{ // before: use global params // after: get auth params from request, use global params if request params not set constapiKey =   getAuthValue(request,"PERPLEXITY_API_KEY") || perplexityApiKey; if(!apiKey) {  thrownewError("PERPLEXITY_API_KEY not set");  } const{ name,arguments: args } = request.params; if(!args) {  thrownewError("No arguments provided");  } switch(name) {  case"perplexity_ask": {   if(!Array.isArray(args.messages)) {    thrownewError(     "Invalid arguments for perplexity_ask: 'messages' must be an array"     );    }   constmessages = args.messages;   // before: use global params in every function   // const result = await pe。rf。ormChatCompletion(   //  messages,   //  "sonar-pro"   // );   // after: pass params to every function   constresult =awaitperformChatCompletion(     apiKey,     messages,    "sonar-pro"    );   return{     content: [{type:"text", text: result}],     isError:false,    };   }  // ...  } }catch(error) { // Return error details in the response return{   content: [    {    type:"text",     text:`Error:${      errorinstanceofError? error.message :String(error)     }`,    },   ],   isError:true,  }; }});

改造完之后,云端的 MCP 署理,就可以把用户设置的鉴权参数:perplexity_api_key,经过这种办法传给 MCP 服务器了:

request.params._meta.auth = { perplexity_api_key:"xxx",};

新增 Rest 传输。

修正这个 MCP 服务器,在本来仅支撑 stdio 传输的根底上,新增一个 rest http 传输:

import{ RestServerTransport }from"chatmcp/sdk/server/rest.js";import{ getParamValue }from"chatmcp/sdk/utils/index.js";constmode = getParamValue("mode") ||"stdio";constport = getParamValue("port") ||9593;constendpoint = getParamValue("endpoint") ||"/rest";asyncfunctionrunServer(){try{ // after: MCP Server run with rest transport and stdio transport if(mode ==="rest") {  consttransport =newRestServerTransport({    port,    endpoint,   });  awaitserver.connect(transport);  awaittransport.startServer();  return;  } // before: MCP Server only run with stdio transport consttransport =newStdioServerTransport(); awaitserver.connect(transport); console.error(  "Perplexity MCP Server running on stdio with Ask, Research, and Reason tools"  ); }catch(error) { console.error("Fatal error running server:", error);  process.exit(1); }}。

改造之后,这个 MCP 服务器就支撑两种传输机制了。在不同的场景下,运用不同的传输机制运转:

运用 MCP 服务器。

本地运用。

经过源代码运转:

export PERPLEXITY_API_KEY=xxx && node build/index.js。

或许运用二进制运转:

PERPLEXITY_API_KEY=xxx npx -y server-perplexity-ask。

或许运用 docker 运转:

docker run -i --rm -e PERPLEXITY_API_KEY=xxx mcp/perplexity-ask。

这三种办法,都会在本地发动 stdio 通讯,在服务器发动的时分传递 PERPLEXITY_API_KEY 参数。

云端调用。

改造完的 MCP 服务器在云端布置的发动指令:

node build/index.js --mode=rest --port=8081 --endpoint=/rest。

运用了自界说的 Rest 传输,发动 HTTP 服务器,监听在本地的 8081 端口。经过K8S Service创立一个内网拜访域名:perplexity-svc。

MCP 署理把客户端的恳求转发到http://perplexity-svc:8081/rest,就可以处理多个用户的并发恳求了。

MCP 客户端仅需求装备一个 MCP 署理的 URL,就可以运用 MCP 服务器了:

总结。

在本节中,咱们详细解说了 MCP 协议支撑的三种规范传输机制以及自界说传输的完结办法:

stdio 传输。

经过规范输入/输出进行本地进程间通讯。

优势在于完结简略、通讯速度快、安全性高。

首要适用于本地数据拜访场景。

约束于单进程通讯,资源开支较大。

SSE 传输。(行将抛弃):

依据 HTTP 协议,支撑长途资源拜访。

运用双通道呼应机制(SSE 衔接 + POST 端点)。

存在衔接不安稳、扩展性差、浏览器约束等问题。

不合适无服务器(serverless)环境和云原生架构。

Streamable HTTP 传输。

代替 SSE 的新传输机制,兼容现代云架构。

更灵敏的衔接方法,支撑简略恳求-呼应和流式传输。

规范化的会话办理和断点康复功用。

合适长途拜访、无服务器环境和大规模布置。

自界说传输机制。

MCP 协议支撑完结自界说传输以满意特定需求。

可以针对特定布置环境和运用场景进行优化。

示例中完结了一个 Rest Transport,合适无状况、短衔接场景。

传输机制的挑选应依据详细运用场景:

本地数据拜访优先挑选 stdio 传输。

长途资源拜访优先挑选 Streamable HTTP 传输。

特别需求场景可考虑完结自界说传输。

MCP 协议的可插拔传输架构使其可以灵敏习惯不同的布置环境和通讯需求,从简略的本地东西到杂乱的云服务均可支撑。跟着技术发展,传输机制也在不断优化,以供给更好的功能、牢靠性和可扩展性。

内容来源:https://fastrans.nhobethoi.com/app-1/baccarat tattoo,http://chatbotjud-teste.saude.mg.gov.br/app-1/ganhar-dinheiro-conversando-com-árabes

(责任编辑:最新热点)

    系统发生错误

    系统发生错误

    您可以选择 [ 重试 ] [ 返回 ] 或者 [ 回到首页 ]

    [ 错误信息 ]

    页面发生异常错误,系统设置开启调试模式后,刷新本页查看具体错误!