模型上下文协议 (MCP) 中的 Transports(传输)为客户端和服务器之间的通信提供了基础。传输处理消息发送和接收方式的基本机制。
一、消息格式
Transports 底层的消息消息格式,一般情况下无需开发者关心。我们简单了解一下。
MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将收到的 JSON-RPC 消息转换回 MCP 协议消息。
涉及三种类型的 JSON-RPC 消息:
Requests 请求
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}
Responses 响应
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}
Notifications 通知
{
jsonrpc: "2.0",
method: string,
params?: object
}
二、传输方式
MCP 包括两个标准传输实现,对于开发者来说比较重要。
标准输入/输出(stdio)
stdio 传输支持通过标准输入和输出流进行通信。这对于本地集成和命令行工具特别有用。
我们可以在以下情况下使用 stdio:
- 构建命令行工具
- 实施本地集成
- 需要简单的过程通信
- 使用 shell 脚本
服务器代码示例:
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport();
await server.connect(transport);
客户端代码示例:
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioClientTransport({
command: "/usr/local/bin/node",
args: ["example-server/build/index.js"]
});
await client.connect(transport);
简单理解就是 MCP Client 和 MCP Server 在一个机器上运行的场景。
例如:《MCP 开发快速入门:构建你的专属 Client 和 Server》 一文中所选用的传输方式。
服务器发送的事件(SSE)
SSE 传输支持使用 HTTP POST 请求进行服务器到客户端流式处理,以实现客户端到服务器的通信。
我们可以在以下情况下使用 SSE:
- 只需要服务器到客户端流式传输
- 使用受限网络
- 实施简单更新
服务器代码示例:
import express from "express";
const app = express();
const server = new McpClient({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
app.listen(3000);
客户端代码示例:
const client = new McpClient({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);
可以看到服务器提供了两个接口,客户端通过 SSE 接口链接服务器。
下文针对 SSE 传输方式进行实战。
三、使用 SSE 传输方式实现客户端访问远程服务器
具体工程构建见源码。
案例源码仓库:https://gitee.com/fangjunjie/mcp-sse-demo
MCP Server 代码
import express from "express";
import {SSEServerTransport} from "@modelcontextprotocol/sdk/server/sse.js";
import {McpServer} from "@modelcontextprotocol/sdk/server/mcp.js";
import {z} from "zod";
const app = express();
const server = new McpServer({
name: "example-server",
version: "1.0.0"
});
// 注册一个提供加密功能的工具
server.tool(
// 工具名称
"pdx-encryption-tool",
// 工具描述
"这是自定义加密算法“派大星算法”的加密函数",
// 工具参数
{
// 待加密的文本参数
textContent: z.string().max(200).describe("待加密的文本(不超过200字)")
},
// 工具的异步处理函数
async ({textContent}) => {
// 这里只做演示,简单将文本字符倒序,作为加密规则
const resultContent = textContent.split('').reverse().join('')
return {
content: [
{
type: "text",
text: resultContent,
},
]
}
}
)
// 注册一个提供解密功能的工具
server.tool(
// 工具名称
"pdx-decryption-tool",
// 工具描述
"这是自定义加密算法“派大星算法”的解密函数",
// 工具参数
{
// 待解密的文本参数
textContent: z.string().max(200).describe("待解密的文本(不超过200字)")
},
// 工具的异步处理函数
async ({textContent}) => {
// 这里只做演示,简单将文本字符倒序,作为解密规则
const resultContent = textContent.split('').reverse().join('')
return {
content: [
{
type: "text",
text: resultContent,
},
]
}
}
)
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
app.listen(3000);
除了基本的 SSE 传输以外,注册了两个工具用于测试。
MCP Client 代码
import {Client} from "@modelcontextprotocol/sdk/client/index.js";
import {SSEClientTransport} from "@modelcontextprotocol/sdk/client/sse.js";
import dotenv from "dotenv";
import OpenAI from "openai";
import readline from "readline/promises";
// 加载环境变量
dotenv.config();
const LLM_API_KEY = process.env.LLM_API_KEY;
const LLM_BASE_URL = process.env.LLM_BASE_URL;
const LLM_MODEL = process.env.LLM_MODEL;
// 创建客户端实例
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
// 初始化 sse 传输实例
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
// 连接到服务器
await client.connect(transport);
// 获取工具列表
const toolsResult = await client.listTools();
// 处理工具列表,转换为特定格式
const tools: OpenAI.ChatCompletionTool[] = toolsResult.tools.map((tool) => {
return {
type: "function",
function: {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
}
};
});
// 打印连接成功信息和工具名称
console.log(
"成功链接服务器,获取到工具如下:",
tools.map((item) => item.function.name)
);
// 创建 readline 接口
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
// 获取用户输入
const query = await rl.question("\n请输入: ");
// 初始化 openai 实例,传入 API 密钥和基础 URL
const openai = new OpenAI({
apiKey: LLM_API_KEY,
baseURL: LLM_BASE_URL
});
// 初始化消息数组
const messages: OpenAI.ChatCompletionMessageParam[] = [
{
role: "user",
content: query,
},
];
// 调用 OpenAI 的聊天完成接口
const response = await openai.chat.completions.create({
model: LLM_MODEL!,
max_tokens: 1000,
messages,
tools: tools,
});
// openai 非流式对话,只有一个 choice
const resMessage = response.choices[0].message
// 如果没有工具调用
if (!resMessage.tool_calls || resMessage.tool_calls.length === 0) {
// 直接输出回答
console.log(resMessage.content)
} else {
// 大模型回答加入上下文
messages.push(resMessage)
// 获取工具ID
const toolId = resMessage.tool_calls[0].id
// 获取工具名称
const toolName = resMessage.tool_calls[0].function.name;
// 获取工具参数
const toolArgs = JSON.parse(resMessage.tool_calls[0].function.arguments);
// 调用工具
const toolResult = await client.callTool({
name: toolName,
arguments: toolArgs,
});
// 将工具调用信息添加到最终
console.log(`\n[调用工具 ${toolName} 参数为 ${JSON.stringify(toolArgs)} 结果为 ${JSON.stringify(toolResult)}]\n`)
// 将工具调用结果添加到消息数组
messages.push({
role: "tool",
content: JSON.stringify(toolResult.content),
tool_call_id: toolId
});
// 再次调用 OpenAI 的聊天完成接口
const response = await openai.chat.completions.create({
model: LLM_MODEL!,
max_tokens: 1000,
// 此时消息列表中有三条消息:1用户提问 2大模型返回工具调用 3工具调用结果
messages,
});
console.log(response.choices[0].message.content)
}
// 关闭客户端
client.close()
process.exit(0);
这里实现简单的非流式单轮对话,用于测试工具调用。
联调验证
启动 MCP Server
进入 MCP Server 项目,构建并启动脚本。
cd mcp-sse-server
npm run build
node build/index.js
启动 MCP Client
进入 MCP Client 项目,构建并启动脚本。
cd mcp-sse-client
npm run build
node build/index.js
启动后,可以看到工具列表获取成功:
成功链接服务器,获取到工具如下: [ 'abc-encryption-tool', 'abc-decryption-tool' ]
请输入:
验证工具调用
调用加密工具:
成功链接服务器,获取到工具如下: [ 'abc-encryption-tool', 'abc-decryption-tool' ]
请输入: 使用派大星算法加密这句话:大雁一日同风起,扶摇直上九万里
[调用工具 abc-encryption-tool 参数为 {"textContent":"大雁一日同风起,扶摇直上九万里"} 结果为 {"content":[{"type":"text","text":"里万九上直摇扶,起风同日一雁大"}]}]
使用派大星算法加密后的句子是:里万九上直摇扶,起风同日一雁大。
调用解密工具:
成功链接服务器,获取到工具如下: [ 'pdx-encryption-tool', 'pdx-decryption-tool' ]
请输入: 使用派大星算法解密这句话:里万九上直摇扶,起风同日一雁大
[调用工具 pdx-decryption-tool 参数为 {"textContent":"里万九上直摇扶,起风同日一雁大"} 结果为 {"content":[{"type":"text","text":"大雁一日同风起,扶摇直上九万里"}]}]
使用派大星算法解密后,这句话是:“大雁一日同风起,扶摇直上九万里”。