引言:Model Context Protocol 的崛起
说实话,当 Anthropic 在 2024 年末开源 Model Context Protocol (MCP) 的时候,我并没有太当回事。又一个协议?又一个标准?但事实证明,这次真的不一样。到 2025 年 12 月,MCP 被正式捐赠给 Linux 基金会的 AI 与数据基金会 (AAIF),OpenAI、Google、Microsoft 和 AWS 纷纷作为联合创始成员加入——这个阵容本身就说明了一切。MCP 从一个单一厂商的协议,真正演变成了行业标准。
MCP 的核心理念其实很好理解。你可以把它想象成 AI 应用的"USB-C 接口"。就像 USB-C 统一了各种设备的连接标准,MCP 提供了一个通用的协议,让 Claude、ChatGPT、Gemini 这些 AI 应用能标准化地连接到各种数据源和工具。截至 2026 年初,MCP SDK 的月下载量已经突破了 9700 万次,社区贡献的 MCP 服务器超过 1000 个,覆盖数据库、API、文件系统、DevOps 工具等等。
这组数据还是蛮惊人的。
在企业环境中,MCP 解决了一个大家都深有体会的痛点:每次集成新的 AI 能力或数据源,都得编写一大堆定制化的集成代码。有了 MCP,企业只需部署标准的 MCP 服务器,任何支持 MCP 的 AI 应用都能即插即用。开发成本大幅降低不说,AI 能力的横向扩展也变得前所未有地简单。
MCP 核心架构解析
三层架构模型
MCP 采用了清晰的三层架构:Host → Client → Server。简单来说,Host 就是你直接交互的应用程序(比如 Claude Desktop、Cursor、Zed 编辑器),它内置了 MCP Client。Client 负责跟一个或多个 MCP Server 建立连接并通信。Server 则封装了具体的业务逻辑、数据源或工具能力。
这种架构的好处在于解耦——Host 完全不需要知道 Server 的具体实现细节,只要通过 Client 发送标准化的 JSON-RPC 请求就行。Server 想用什么语言实现都可以(Python、TypeScript、Go、Rust 等),遵循 MCP 协议规范即可。
三大核心原语
MCP 定义了三种核心原语,它们构成了协议的基础能力:
- Tools(工具):这是最常用的原语。可执行的函数,AI 可以调用它们来执行特定操作——数据库查询、发邮件、调外部 API 等。说白了,Tools 让 AI 从"只能对话"变成了"能够行动"。
- Resources(资源):结构化的数据源,AI 可以读取它们来获取上下文信息。文件内容、数据库记录、API 响应都可以是 Resources。跟 Tools 不同的是,Resources 是只读的,主要用来给 AI 提供背景知识。
- Prompts(提示模板):可复用的提示词模板,支持动态参数。团队可以用 Prompts 来标准化常用的 AI 交互模式,确保一致的输出质量。
传输层机制
MCP 支持两种传输机制,适用于不同场景:
stdio 传输适合本地开发和单机部署。Client 通过标准输入输出(stdin/stdout)与 Server 进程通信,简单直接,无需网络配置,开发调试的时候特别方便。Claude Desktop 默认就是用 stdio 传输来连接本地 MCP 服务器的。
Streamable HTTP 传输则是生产环境的首选。这是 2025 年 3 月规范更新中引入的新传输层,替代了之前的 SSE (Server-Sent Events) 传输。它用单个 HTTP 端点处理所有请求和响应,支持双向流式通信。最关键的是,它完全基于标准 HTTP,能轻松集成到现有的负载均衡器、API 网关、CDN 等基础设施中。
JSON-RPC 协议通信
MCP 的所有通信都基于 JSON-RPC 2.0 协议。来看一个典型的工具调用请求:
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "query_database",
"arguments": {
"sql": "SELECT * FROM users WHERE status = 'active'"
}
}
}
Server 返回的响应也是标准的 JSON-RPC 格式:
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "找到 42 个活跃用户:[用户数据...]"
}
]
}
}
这种标准化的设计让 MCP 具备了极强的互操作性和可扩展性。
快速入门:构建你的第一个 MCP Server
好了,理论讲够了。让我们动手用 Python 构建一个实用的 MCP 服务器,它提供两个工具:数据库查询和网页搜索。
环境准备
首先安装必要的依赖。MCP Python SDK 要求 Python 3.10 或更高版本:
pip install mcp>=1.2.0 httpx sqlite3
完整的 MCP Server 实现
创建文件 my_first_server.py:
import asyncio
import sqlite3
import httpx
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server
# 创建 MCP 服务器实例
mcp = Server("my-first-mcp-server")
# 初始化示例数据库
def init_database():
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT,
price REAL,
stock INTEGER
)
""")
cursor.execute("INSERT OR IGNORE INTO products VALUES (1, 'Laptop', 999.99, 15)")
cursor.execute("INSERT OR IGNORE INTO products VALUES (2, 'Mouse', 29.99, 50)")
cursor.execute("INSERT OR IGNORE INTO products VALUES (3, 'Keyboard', 79.99, 30)")
conn.commit()
conn.close()
init_database()
@mcp.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用工具"""
return [
Tool(
name="query_database",
description="执行 SQLite 数据库查询。返回查询结果的 JSON 格式数据。",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "要执行的 SQL 查询语句"
}
},
"required": ["sql"]
}
),
Tool(
name="search_web",
description="搜索网页内容。返回搜索结果的摘要信息。",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"limit": {
"type": "integer",
"description": "返回结果数量限制",
"default": 5
}
},
"required": ["query"]
}
)
]
@mcp.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""处理工具调用"""
if name == "query_database":
sql = arguments["sql"]
# 安全检查:只允许 SELECT 语句
if not sql.strip().upper().startswith("SELECT"):
return [TextContent(
type="text",
text="错误:出于安全考虑,只允许执行 SELECT 查询"
)]
try:
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
conn.close()
# 格式化结果
formatted_results = []
for row in results:
formatted_results.append(dict(zip(columns, row)))
return [TextContent(
type="text",
text=f"查询成功,返回 {len(formatted_results)} 条记录:\n{formatted_results}"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"数据库查询错误:{str(e)}"
)]
elif name == "search_web":
query = arguments["query"]
limit = arguments.get("limit", 5)
try:
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.duckduckgo.com/",
params={"q": query, "format": "json"},
timeout=10.0
)
data = response.json()
results_text = f"搜索 '{query}' 的结果:\n\n"
results_text += data.get("AbstractText", "未找到相关结果")
return [TextContent(
type="text",
text=results_text
)]
except Exception as e:
return [TextContent(
type="text",
text=f"网页搜索错误:{str(e)}"
)]
else:
return [TextContent(
type="text",
text=f"未知工具:{name}"
)]
async def main():
"""使用 stdio 传输运行服务器"""
async with stdio_server() as (read_stream, write_stream):
await mcp.run(
read_stream,
write_stream,
mcp.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
运行和测试
运行服务器:
python my_first_server.py
要在 Claude Desktop 中使用这个服务器,需要在配置文件中添加以下内容:
{
"mcpServers": {
"my-first-server": {
"command": "python",
"args": ["/absolute/path/to/my_first_server.py"]
}
}
}
重启 Claude Desktop 后,你就可以直接对话了——试试说"请查询数据库中价格低于 50 的产品",Claude 会自动调用 query_database 工具并返回结果。第一次看到它工作的时候,确实有种"哇"的感觉。
TypeScript MCP Server 开发
TypeScript 是 MCP 生态中另一个主流的开发语言。Anthropic 官方提供的 @modelcontextprotocol/sdk 包(v1.x 系列)提供了完整的 TypeScript 支持,v2 版本预计在 2026 年 Q2 发布,届时将带来更好的类型推断和性能优化。
环境准备
npm install @modelcontextprotocol/sdk@^1.0.0
完整的 TypeScript MCP Server
创建文件 server.ts:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import axios from "axios";
// 创建 MCP 服务器
const server = new Server(
{
name: "typescript-demo-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
// 定义工具列表
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "fetch_api_data",
description: "从指定的 REST API 端点获取数据",
inputSchema: {
type: "object",
properties: {
url: {
type: "string",
description: "API 端点 URL",
},
method: {
type: "string",
enum: ["GET", "POST"],
description: "HTTP 方法",
default: "GET",
},
},
required: ["url"],
},
},
{
name: "calculate",
description: "执行数学计算表达式",
inputSchema: {
type: "object",
properties: {
expression: {
type: "string",
description: "数学表达式,例如:(10 + 5) * 2",
},
},
required: ["expression"],
},
},
],
};
});
// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
if (name === "fetch_api_data") {
const { url, method = "GET" } = args as { url: string; method?: string };
const response = await axios({
method,
url,
timeout: 5000,
});
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
if (name === "calculate") {
const { expression } = args as { expression: string };
// 简单的安全检查
if (!/^[\d+\-*/().\s]+$/.test(expression)) {
throw new Error("表达式包含非法字符");
}
const result = Function('"return " + expression')();
return {
content: [
{
type: "text",
text: `计算结果:${expression} = ${result}`,
},
],
};
}
throw new Error(`未知工具:${name}`);
} catch (error) {
return {
content: [
{
type: "text",
text: `错误:${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
});
// 定义资源列表
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "file:///config/settings.json",
name: "应用配置",
description: "应用程序的配置文件",
mimeType: "application/json",
},
{
uri: "file:///logs/latest.log",
name: "最新日志",
description: "应用程序的最新日志文件",
mimeType: "text/plain",
},
],
};
});
// 处理资源读取
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (uri === "file:///config/settings.json") {
return {
contents: [
{
uri,
mimeType: "application/json",
text: JSON.stringify(
{
appName: "MCP Demo",
version: "1.0.0",
environment: "production",
},
null,
2
),
},
],
};
}
if (uri === "file:///logs/latest.log") {
return {
contents: [
{
uri,
mimeType: "text/plain",
text: "[2026-02-16 10:30:15] INFO: Server started\n[2026-02-16 10:30:20] INFO: Connection established",
},
],
};
}
throw new Error(`未知资源:${uri}`);
});
// 启动服务器
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("TypeScript MCP Server 已启动");
}
main().catch((error) => {
console.error("服务器错误:", error);
process.exit(1);
});
编译和运行
# 编译 TypeScript
npx tsc server.ts --module nodenext --moduleResolution nodenext --target es2022
# 运行服务器
node server.js
这个例子同时展示了 Tools 和 Resources 两种原语。Tools 提供了 API 调用和计算能力,Resources 暴露了配置文件和日志——AI 可以读取这些资源来获取上下文,然后再用 Tools 来执行操作。两者配合起来,威力相当大。
MCP Client 开发与集成
除了开发 MCP Server,你也可以构建自己的 MCP Client,把 MCP 服务器集成到自定义应用中。这部分其实很多人忽略了,但它的灵活性非常高。
Python MCP Client 基础
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
# 连接到 MCP 服务器
server_params = StdioServerParameters(
command="python",
args=["my_first_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化连接
await session.initialize()
# 列出可用工具
tools_result = await session.list_tools()
print(f"可用工具:{[tool.name for tool in tools_result.tools]}")
# 调用工具
result = await session.call_tool(
"query_database",
arguments={"sql": "SELECT * FROM products WHERE price < 100"}
)
print(f"查询结果:{result.content[0].text}")
if __name__ == "__main__":
asyncio.run(main())
与 LangChain 集成
MCP 可以无缝集成到 LangChain 工作流中。这一点对于已经在用 LangChain 的团队来说是个好消息:
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPToolWrapper:
"""将 MCP 工具包装为 LangChain Tool"""
def __init__(self, session: ClientSession, tool_name: str):
self.session = session
self.tool_name = tool_name
async def _arun(self, **kwargs):
result = await self.session.call_tool(self.tool_name, arguments=kwargs)
return result.content[0].text
def _run(self, **kwargs):
return asyncio.run(self._arun(**kwargs))
async def create_langchain_agent():
server_params = StdioServerParameters(
command="python",
args=["my_first_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 获取 MCP 工具列表
tools_result = await session.list_tools()
# 转换为 LangChain Tools
langchain_tools = []
for mcp_tool in tools_result.tools:
wrapper = MCPToolWrapper(session, mcp_tool.name)
langchain_tool = Tool(
name=mcp_tool.name,
func=wrapper._run,
description=mcp_tool.description
)
langchain_tools.append(langchain_tool)
# 创建 Agent
llm = ChatOpenAI(model="gpt-4", temperature=0)
agent = initialize_agent(
langchain_tools,
llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
# 执行任务
result = agent.run("查询数据库中所有库存少于 20 的产品")
print(result)
asyncio.run(create_langchain_agent())
与 OpenAI Agents SDK 集成
OpenAI 的 Agents SDK 同样可以轻松集成 MCP 工具:
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import asyncio
import json
client = OpenAI()
async def call_mcp_tool(tool_name: str, arguments: dict) -> str:
"""调用 MCP 工具的辅助函数"""
server_params = StdioServerParameters(
command="python",
args=["my_first_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(tool_name, arguments=arguments)
return result.content[0].text
def run_agent():
messages = [
{"role": "user", "content": "查询价格低于 50 的产品有哪些"}
]
# 定义工具
tools = [
{
"type": "function",
"function": {
"name": "query_database",
"description": "执行 SQLite 数据库查询",
"parameters": {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL 查询语句"}
},
"required": ["sql"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=messages,
tools=tools,
tool_choice="auto"
)
# 处理工具调用
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# 调用 MCP 工具
result = asyncio.run(call_mcp_tool(function_name, function_args))
print(f"查询结果:{result}")
run_agent()
传输层深度解析
stdio 传输的优势与局限
stdio 是 MCP 最简单的传输方式。Client 启动 Server 进程,然后通过 stdin 发送请求、从 stdout 读取响应。它的优势很明显:
- 零配置:无需网络设置、端口分配或防火墙配置
- 进程隔离:每个 Server 运行在独立进程中,一个崩了不影响其他的
- 开发便利:适合本地开发和调试,可以直接用 IDE 断点调试 Server 代码
但 stdio 的局限也很明显:
- 无法远程访问——Server 必须和 Client 在同一台机器上
- 没法做负载均衡、多实例部署
- 每个连接都得启动一个新进程,资源管理比较麻烦
为什么 Streamable HTTP 替代了 SSE
在 2025 年 3 月之前,MCP 的远程传输方案用的是 SSE (Server-Sent Events)。SSE 采用双端点架构:一个 HTTP POST 端点用于 Client 向 Server 发消息,一个 SSE 端点用于 Server 向 Client 推送消息。听起来还行对吧?但实际用起来问题不少:
- 架构复杂:两个独立的连接,状态同步很头疼
- 负载均衡困难:两个端点可能被路由到不同的 Server 实例(踩过这个坑的人都知道有多痛苦)
- 防火墙和代理问题:SSE 的长连接经常被中间件意外关闭
- 错误恢复复杂:任一端点失败都需要复杂的重连逻辑
Streamable HTTP 彻底解决了这些问题。单个 HTTP POST 端点处理所有通信,用 NDJSON(换行符分隔的 JSON)格式做流式传输。每个 HTTP 请求可以包含多个 JSON-RPC 消息,Server 的响应也是流式返回。简洁、干净。
Streamable HTTP 架构详解
来看一个典型的通信流程:
POST /mcp HTTP/1.1
Host: mcp-server.example.com
Content-Type: application/json
Transfer-Encoding: chunked
{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","clientInfo":{"name":"my-client","version":"1.0"}}}
{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}
Server 响应:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","serverInfo":{"name":"my-server","version":"1.0"}}}
{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"tool1","description":"..."}]}}
这种设计的优势一目了然:
- 简单:一个端点搞定一切,状态管理轻松
- 标准化:完全基于标准 HTTP,兼容所有现代基础设施
- 可扩展:负载均衡、缓存、CDN 加速都不在话下
- 可靠:HTTP 语义清晰,错误处理成熟
Python 实现 Streamable HTTP Server
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from mcp.server import Server
import json
app = FastAPI()
mcp = Server("streamable-http-server")
# ... 定义 tools 和 handlers(与前面的示例相同)...
@app.post("/mcp")
async def mcp_endpoint(request: Request):
"""处理 Streamable HTTP 请求"""
async def message_generator():
# 读取请求体中的所有消息
body = await request.body()
messages = [json.loads(line) for line in body.decode().split('\n') if line.strip()]
# 处理每个消息并流式返回响应
for message in messages:
response = await mcp.handle_message(message)
yield json.dumps(response) + '\n'
return StreamingResponse(
message_generator(),
media_type="application/json",
headers={
"X-Protocol-Version": "2025-03-26",
"Cache-Control": "no-cache"
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
生产环境部署最佳实践
Docker 容器化部署
在生产环境中,容器化是部署 MCP Server 的首选方案。Docker 提供了隔离性、可复现性和可扩展性——说实话,如果你现在还没用 Docker 部署,那真的该考虑一下了。
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非 root 用户
RUN useradd -m -u 1000 mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
EXPOSE 8000
CMD ["python", "server.py"]
构建和运行:
docker build -t mcp-server:latest .
docker run -d -p 8000:8000 --name mcp-server mcp-server:latest
Docker Compose 多服务编排
实际项目中你通常需要部署多个 MCP Server。Docker Compose 可以优雅地管理这些服务:
# docker-compose.yml
version: '3.8'
services:
database-server:
build: ./servers/database
environment:
- DB_HOST=postgres
- DB_PASSWORD_FILE=/run/secrets/db_password
secrets:
- db_password
networks:
- mcp-network
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
api-server:
build: ./servers/api
environment:
- API_KEY_FILE=/run/secrets/api_key
secrets:
- api_key
networks:
- mcp-network
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
gateway:
image: nginx:alpine
ports:
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
depends_on:
- database-server
- api-server
networks:
- mcp-network
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_PASSWORD_FILE=/run/secrets/db_password
secrets:
- db_password
volumes:
- postgres-data:/var/lib/postgresql/data
networks:
- mcp-network
networks:
mcp-network:
driver: bridge
secrets:
db_password:
file: ./secrets/db_password.txt
api_key:
file: ./secrets/api_key.txt
volumes:
postgres-data:
启动整个服务栈:
docker-compose up -d
OAuth 2.1 认证实现
生产环境的 MCP Server 必须有安全的认证机制,这一点没商量。MCP 规范推荐使用 OAuth 2.1 with PKCE (Proof Key for Code Exchange):
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
import secrets
import hashlib
import base64
app = FastAPI()
# PKCE 实现
class PKCEManager:
def __init__(self):
self.challenges = {}
def create_challenge(self, client_id: str) -> tuple[str, str]:
"""创建 PKCE 挑战"""
verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode().rstrip('=')
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).decode().rstrip('=')
self.challenges[client_id] = verifier
return verifier, challenge
def verify_challenge(self, client_id: str, verifier: str) -> bool:
"""验证 PKCE 挑战"""
expected = self.challenges.get(client_id)
if not expected:
return False
return secrets.compare_digest(expected, verifier)
pkce_manager = PKCEManager()
async def verify_token(token: str = Depends(OAuth2AuthorizationCodeBearer(
authorizationUrl="https://auth.example.com/authorize",
tokenUrl="https://auth.example.com/token"
))):
"""验证访问令牌"""
try:
claims = verify_jwt_token(token)
required_scopes = ["mcp:tools:execute"]
if not all(scope in claims.get("scope", []) for scope in required_scopes):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return claims
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的访问令牌"
)
@app.post("/mcp")
async def mcp_endpoint(request: Request, claims: dict = Depends(verify_token)):
"""受保护的 MCP 端点"""
pass
令牌管理最佳实践
MCP 规范在这方面有几个硬性要求,值得特别注意:
- 每个 Server 使用独立令牌:千万不要在多个 Server 之间共享访问令牌,即使它们属于同一个应用
- 最小权限原则:每个令牌只包含 Server 需要的最小权限范围
- 令牌轮换:定期刷新访问令牌,建议使用短期令牌(通常 1 小时)配合刷新令牌
- 安全存储:令牌要存储在加密的密钥管理服务中(比如 AWS Secrets Manager、HashiCorp Vault)
HTTPS 强制执行
所有生产环境的 Streamable HTTP 传输必须使用 HTTPS——这是底线。以下是 Nginx 的配置示例:
server {
listen 443 ssl http2;
server_name mcp-server.example.com;
ssl_certificate /etc/nginx/certs/fullchain.pem;
ssl_certificate_key /etc/nginx/certs/privkey.pem;
ssl_protocols TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 速率限制
limit_req_zone $binary_remote_addr zone=mcp_limit:10m rate=10r/s;
limit_req zone=mcp_limit burst=20 nodelay;
location /mcp {
proxy_pass http://mcp-server:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 流式传输配置
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding on;
}
}
# HTTP 重定向到 HTTPS
server {
listen 80;
server_name mcp-server.example.com;
return 301 https://$server_name$request_uri;
}
MCP Gateway 模式
如果你的部署规模比较大,可以考虑使用 MCP Gateway 来统一管理多个 Server:
- Docker MCP Gateway:开源项目,提供容器化的 Gateway 方案,支持自动服务发现、负载均衡和健康检查
- Lunar.dev MCPX:企业级 MCP Gateway,提供请求重试、熔断、监控、审计日志等高级功能
Gateway 的核心价值在于把一堆分散的关注点集中处理:
- 统一的认证和授权
- 请求路由和负载均衡
- 集中式监控和日志
- API 版本管理
- 速率限制和配额管理
安全性与企业级考量
输入验证与清洗
安全这块真的怎么强调都不为过。所有进入 MCP Server 的输入都必须经过严格验证:
from pydantic import BaseModel, validator, Field
class DatabaseQueryInput(BaseModel):
sql: str = Field(..., max_length=1000)
@validator('sql')
def validate_sql(cls, v):
# 只允许 SELECT 语句
if not v.strip().upper().startswith('SELECT'):
raise ValueError('只允许 SELECT 查询')
# 禁止危险的 SQL 关键字
dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'EXEC']
if any(keyword in v.upper() for keyword in dangerous_keywords):
raise ValueError('查询包含禁止的关键字')
# 禁止注释符号(防止 SQL 注入)
if '--' in v or '/*' in v or '*/' in v:
raise ValueError('查询包含非法字符')
return v
@mcp.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_database":
try:
validated_input = DatabaseQueryInput(**arguments)
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute(validated_input.sql)
# ...
except ValueError as e:
return [TextContent(type="text", text=f"输入验证失败:{str(e)}")]
最小权限访问控制
实现细粒度的权限控制是企业级部署的必备要求:
from enum import Enum
class Permission(Enum):
READ_DATABASE = "db:read"
WRITE_DATABASE = "db:write"
CALL_API = "api:call"
READ_FILES = "files:read"
class RBACManager:
def __init__(self):
self.role_permissions = {
"viewer": {Permission.READ_DATABASE, Permission.READ_FILES},
"operator": {Permission.READ_DATABASE, Permission.CALL_API},
"admin": {Permission.READ_DATABASE, Permission.WRITE_DATABASE,
Permission.CALL_API, Permission.READ_FILES}
}
def check_permission(self, user_role: str, required_permission: Permission) -> bool:
return required_permission in self.role_permissions.get(user_role, set())
rbac = RBACManager()
@mcp.call_tool()
async def call_tool(name: str, arguments: dict, user_context: dict):
user_role = user_context.get("role", "viewer")
if name == "query_database":
if not rbac.check_permission(user_role, Permission.READ_DATABASE):
return [TextContent(type="text", text="权限不足:需要数据库读取权限")]
# 执行查询...
审计日志
记录所有关键操作,这不光是为了合规,排查问题的时候也能救命:
import logging
import json
from datetime import datetime
# 配置结构化日志
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[
logging.FileHandler('/var/log/mcp/audit.log'),
logging.StreamHandler()
]
)
audit_logger = logging.getLogger('mcp.audit')
def log_tool_call(user_id: str, tool_name: str, arguments: dict, result: str, success: bool):
"""记录工具调用审计日志"""
audit_log = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": "tool_call",
"user_id": user_id,
"tool_name": tool_name,
"arguments": arguments,
"result_summary": result[:200],
"success": success
}
audit_logger.info(json.dumps(audit_log))
容器镜像签名
在供应链安全日益受到重视的今天,对容器镜像进行签名验证也是很有必要的。可以使用 Docker Content Trust 或 Sigstore:
# 启用 Docker Content Trust
export DOCKER_CONTENT_TRUST=1
# 构建和推送签名镜像
docker build -t myregistry.com/mcp-server:1.0.0 .
docker push myregistry.com/mcp-server:1.0.0
# 使用 Cosign 签名(Sigstore)
cosign sign --key cosign.key myregistry.com/mcp-server:1.0.0
# 在部署时验证签名
cosign verify --key cosign.pub myregistry.com/mcp-server:1.0.0
实际应用场景
DevOps 自动化
MCP 可以把 Terraform、CI/CD 等 DevOps 工具暴露给 AI,让运维人员用自然语言操作基础设施。比如直接说:"在 staging 环境执行 terraform plan,变量 instance_type 设置为 t3.medium"——AI 就会帮你搞定。
@mcp.tool()
async def terraform_plan(workspace: str, variables: dict) -> str:
"""执行 Terraform plan"""
import subprocess
subprocess.run(["terraform", "workspace", "select", workspace], check=True)
var_args = []
for key, value in variables.items():
var_args.extend(["-var", f"{key}={value}"])
result = subprocess.run(
["terraform", "plan", *var_args],
capture_output=True,
text=True,
check=True
)
return result.stdout
知识库与 RAG 集成
把企业知识库作为 MCP 工具暴露出来,AI 就能基于内部文档回答问题了。这对于客服、技术支持这类场景来说简直是刚需:
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer
qdrant = QdrantClient(url="http://qdrant:6333")
encoder = SentenceTransformer("all-MiniLM-L6-v2")
@mcp.tool()
async def semantic_search(collection: str, query: str, limit: int = 5) -> str:
"""在知识库中进行语义搜索"""
query_vector = encoder.encode(query).tolist()
results = qdrant.search(
collection_name=collection,
query_vector=query_vector,
limit=limit
)
formatted_results = []
for i, hit in enumerate(results, 1):
formatted_results.append(
f"{i}. [{hit.score:.3f}] {hit.payload['title']}\n"
f" {hit.payload['text'][:200]}...\n"
)
return "\n".join(formatted_results)
多智能体编排
这是 MCP 最令人兴奋的应用方向之一。用 MCP 构建多智能体系统,每个智能体就是一个独立的 MCP Server,由编排器来协调它们的工作:
class AgentOrchestrator:
"""多智能体编排器"""
def __init__(self):
self.agents = {
"researcher": MCPClient("researcher-server"),
"analyzer": MCPClient("analyzer-server"),
"writer": MCPClient("writer-server")
}
async def execute_workflow(self, task: str) -> str:
"""执行多智能体工作流"""
research_result = await self.agents["researcher"].call_tool(
"web_search",
{"query": task, "depth": "comprehensive"}
)
analysis_result = await self.agents["analyzer"].call_tool(
"analyze_data",
{"data": research_result, "focus": "key_insights"}
)
report = await self.agents["writer"].call_tool(
"generate_report",
{
"research": research_result,
"analysis": analysis_result,
"style": "executive_summary"
}
)
return report
企业工作流自动化
把 MCP 集成到企业工作流中,可以实现端到端的业务流程自动化。来看一个发票处理的例子:
@mcp.tool()
async def process_invoice(image_url: str) -> str:
"""处理发票自动化流程"""
# 1. OCR 提取发票信息
ocr_result = await call_external_mcp_tool(
"ocr-server",
"extract_invoice_data",
{"image_url": image_url}
)
# 2. 验证发票信息
validation_result = validate_invoice(ocr_result)
if not validation_result["valid"]:
return f"发票验证失败:{validation_result['errors']}"
# 3. 记录到 ERP 系统
erp_response = await post_to_erp({
"vendor": ocr_result["vendor"],
"amount": ocr_result["amount"],
"date": ocr_result["date"]
})
# 4. 触发审批流程
approval_id = await trigger_approval_workflow({
"invoice_id": erp_response["id"],
"approver": determine_approver(ocr_result["amount"])
})
return f"发票已录入系统,审批单号:{approval_id}"
总结与展望
Model Context Protocol 代表了 AI 应用集成领域的一次重大范式转变。通过提供标准化的协议,MCP 让 AI 应用能够轻松连接到各种数据源和工具,大幅降低了集成的复杂度。从 Anthropic 的开源到 Linux 基金会的治理,从 stdio 本地传输到 Streamable HTTP 远程部署,MCP 正在快速走向成熟。
回顾一下,本文覆盖了 MCP 的核心架构、Python/TypeScript 开发实践、传输机制对比、安全性考量和生产部署最佳实践。我们还探讨了如何在 DevOps、知识库 RAG、多智能体编排等实际场景中应用 MCP。
展望未来,MCP 生态还会继续快速发展:
- 更丰富的 SDK:预计 2026 年将看到 Go、Rust、Java 等更多语言的官方 SDK
- 企业级功能:更完善的认证、授权、审计和监控能力
- 性能优化:更高效的传输协议、连接池、缓存机制
- 标准化工具库:社区将涌现更多高质量的 MCP Server
- AI 编排能力:MCP 将成为构建复杂多智能体系统的基础设施
对于开发者和企业来说,现在确实是拥抱 MCP 的好时机。通过标准化的接口,我们可以构建更灵活、更可维护的 AI 应用架构。MCP 不仅仅是一个技术协议——它正在成为 AI 时代应用架构的新基石。如果你还没开始用,那就从本文的入门示例开始吧。