Junki
Junki
Published on 2025-03-20 / 34 Visits
0
0

MCP 客户端访问远程服务器:Transports 详解

模型上下文协议 (MCP) 中的 Transports(传输)为客户端和服务器之间的通信提供了基础。传输处理消息发送和接收方式的基本机制。

一、消息格式

Transports 底层的消息消息格式,一般情况下无需开发者关心。我们简单了解一下。

MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将收到的 JSON-RPC 消息转换回 MCP 协议消息。

涉及三种类型的 JSON-RPC 消息:

Requests 请求

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

Responses 响应

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

Notifications 通知

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

二、传输方式

MCP 包括两个标准传输实现,对于开发者来说比较重要。

标准输入/输出(stdio)

stdio 传输支持通过标准输入和输出流进行通信。这对于本地集成和命令行工具特别有用。

我们可以在以下情况下使用 stdio:

  • 构建命令行工具
  • 实施本地集成
  • 需要简单的过程通信
  • 使用 shell 脚本

服务器代码示例:

const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new StdioServerTransport();
await server.connect(transport);

客户端代码示例:

const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new StdioClientTransport({
  command: "/usr/local/bin/node",
  args: ["example-server/build/index.js"]
});
await client.connect(transport);

简单理解就是 MCP Client 和 MCP Server 在一个机器上运行的场景。

例如:《MCP 开发快速入门:构建你的专属 Client 和 Server》 一文中所选用的传输方式。

服务器发送的事件(SSE)

SSE 传输支持使用 HTTP POST 请求进行服务器到客户端流式处理,以实现客户端到服务器的通信。

我们可以在以下情况下使用 SSE:

  • 只需要服务器到客户端流式传输
  • 使用受限网络
  • 实施简单更新

服务器代码示例:

import express from "express";

const app = express();

const server = new McpClient({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
  transport = new SSEServerTransport("/messages", res);
  server.connect(transport);
});

app.post("/messages", (req, res) => {
  if (transport) {
    transport.handlePostMessage(req, res);
  }
});

app.listen(3000);

客户端代码示例:

const client = new McpClient({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEClientTransport(
  new URL("http://localhost:3000/sse")
);
await client.connect(transport);

可以看到服务器提供了两个接口,客户端通过 SSE 接口链接服务器。

下文针对 SSE 传输方式进行实战。

三、使用 SSE 传输方式实现客户端访问远程服务器

具体工程构建见源码。

案例源码仓库:https://gitee.com/fangjunjie/mcp-sse-demo

MCP Server 代码

import express from "express";
import {SSEServerTransport} from "@modelcontextprotocol/sdk/server/sse.js";
import {McpServer} from "@modelcontextprotocol/sdk/server/mcp.js";
import {z} from "zod";

const app = express();

const server = new McpServer({
    name: "example-server",
    version: "1.0.0"
});

// 注册一个提供加密功能的工具
server.tool(
    // 工具名称
    "pdx-encryption-tool",
    // 工具描述
    "这是自定义加密算法“派大星算法”的加密函数",
    // 工具参数
    {
        // 待加密的文本参数
        textContent: z.string().max(200).describe("待加密的文本(不超过200字)")
    },
    // 工具的异步处理函数
    async ({textContent}) => {
        // 这里只做演示,简单将文本字符倒序,作为加密规则
        const resultContent = textContent.split('').reverse().join('')

        return {
            content: [
                {
                    type: "text",
                    text: resultContent,
                },
            ]
        }
    }
)

// 注册一个提供解密功能的工具
server.tool(
    // 工具名称
    "pdx-decryption-tool",
    // 工具描述
    "这是自定义加密算法“派大星算法”的解密函数",
    // 工具参数
    {
        // 待解密的文本参数
        textContent: z.string().max(200).describe("待解密的文本(不超过200字)")
    },
    // 工具的异步处理函数
    async ({textContent}) => {
        // 这里只做演示,简单将文本字符倒序,作为解密规则
        const resultContent = textContent.split('').reverse().join('')

        return {
            content: [
                {
                    type: "text",
                    text: resultContent,
                },
            ]
        }
    }
)

let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
    transport = new SSEServerTransport("/messages", res);
    server.connect(transport);
});

app.post("/messages", (req, res) => {
    if (transport) {
        transport.handlePostMessage(req, res);
    }
});

app.listen(3000);

除了基本的 SSE 传输以外,注册了两个工具用于测试。

MCP Client 代码

import {Client} from "@modelcontextprotocol/sdk/client/index.js";
import {SSEClientTransport} from "@modelcontextprotocol/sdk/client/sse.js";
import dotenv from "dotenv";
import OpenAI from "openai";
import readline from "readline/promises";

// 加载环境变量
dotenv.config();
const LLM_API_KEY = process.env.LLM_API_KEY;
const LLM_BASE_URL = process.env.LLM_BASE_URL;
const LLM_MODEL = process.env.LLM_MODEL;

// 创建客户端实例
const client = new Client({
    name: "example-client",
    version: "1.0.0"
}, {
    capabilities: {}
});

// 初始化 sse 传输实例
const transport = new SSEClientTransport(
    new URL("http://localhost:3000/sse")
);

// 连接到服务器
await client.connect(transport);

// 获取工具列表
const toolsResult = await client.listTools();
// 处理工具列表,转换为特定格式
const tools: OpenAI.ChatCompletionTool[] = toolsResult.tools.map((tool) => {
    return {
        type: "function",
        function: {
            name: tool.name,
            description: tool.description,
            parameters: tool.inputSchema,
        }
    };
});
// 打印连接成功信息和工具名称
console.log(
    "成功链接服务器,获取到工具如下:",
    tools.map((item) => item.function.name)
);

// 创建 readline 接口
const rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout,
});
// 获取用户输入
const query = await rl.question("\n请输入: ");

// 初始化 openai 实例,传入 API 密钥和基础 URL
const openai = new OpenAI({
    apiKey: LLM_API_KEY,
    baseURL: LLM_BASE_URL
});
// 初始化消息数组
const messages: OpenAI.ChatCompletionMessageParam[] = [
    {
        role: "user",
        content: query,
    },
];
// 调用 OpenAI 的聊天完成接口
const response = await openai.chat.completions.create({
    model: LLM_MODEL!,
    max_tokens: 1000,
    messages,
    tools: tools,
});
// openai 非流式对话,只有一个 choice
const resMessage = response.choices[0].message
// 如果没有工具调用
if (!resMessage.tool_calls || resMessage.tool_calls.length === 0) {
    // 直接输出回答
    console.log(resMessage.content)
} else {
    // 大模型回答加入上下文
    messages.push(resMessage)

    // 获取工具ID
    const toolId = resMessage.tool_calls[0].id
    // 获取工具名称
    const toolName = resMessage.tool_calls[0].function.name;
    // 获取工具参数
    const toolArgs = JSON.parse(resMessage.tool_calls[0].function.arguments);

    // 调用工具
    const toolResult = await client.callTool({
        name: toolName,
        arguments: toolArgs,
    });
    // 将工具调用信息添加到最终
    console.log(`\n[调用工具 ${toolName} 参数为 ${JSON.stringify(toolArgs)} 结果为 ${JSON.stringify(toolResult)}]\n`)

    // 将工具调用结果添加到消息数组
    messages.push({
        role: "tool",
        content: JSON.stringify(toolResult.content),
        tool_call_id: toolId
    });

    // 再次调用 OpenAI 的聊天完成接口
    const response = await openai.chat.completions.create({
        model: LLM_MODEL!,
        max_tokens: 1000,
        // 此时消息列表中有三条消息:1用户提问 2大模型返回工具调用 3工具调用结果
        messages,
    });
    console.log(response.choices[0].message.content)

}

// 关闭客户端
client.close()
process.exit(0);

这里实现简单的非流式单轮对话,用于测试工具调用。

联调验证

启动 MCP Server

进入 MCP Server 项目,构建并启动脚本。

cd mcp-sse-server
npm run build
node build/index.js

启动 MCP Client

进入 MCP Client 项目,构建并启动脚本。

cd mcp-sse-client
npm run build
node build/index.js

启动后,可以看到工具列表获取成功:

成功链接服务器,获取到工具如下: [ 'abc-encryption-tool', 'abc-decryption-tool' ]

请输入: 

验证工具调用

调用加密工具:

成功链接服务器,获取到工具如下: [ 'abc-encryption-tool', 'abc-decryption-tool' ]

请输入: 使用派大星算法加密这句话:大雁一日同风起,扶摇直上九万里

[调用工具 abc-encryption-tool 参数为 {"textContent":"大雁一日同风起,扶摇直上九万里"} 结果为 {"content":[{"type":"text","text":"里万九上直摇扶,起风同日一雁大"}]}]

使用派大星算法加密后的句子是:里万九上直摇扶,起风同日一雁大。

调用解密工具:

成功链接服务器,获取到工具如下: [ 'pdx-encryption-tool', 'pdx-decryption-tool' ]

请输入: 使用派大星算法解密这句话:里万九上直摇扶,起风同日一雁大

[调用工具 pdx-decryption-tool 参数为 {"textContent":"里万九上直摇扶,起风同日一雁大"} 结果为 {"content":[{"type":"text","text":"大雁一日同风起,扶摇直上九万里"}]}]

使用派大星算法解密后,这句话是:“大雁一日同风起,扶摇直上九万里”。

Comment