# MCP上下文协议完全指南:构建智能模型通信的实践路径
Model Context Protocol (MCP) 作为连接大语言模型与外部工具的标准协议,正在重新定义AI系统的可扩展性和互操作性。本文将深入解析MCP的核心原理,并提供从基础到实践的完整实现方案。
## MCP协议核心架构解析
MCP基于JSON-RPC 2.0协议,为模型提供了标准化的工具调用和资源访问接口。
```python
# mcp_core/protocol.py
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
class MCPMessageType(Enum):
"""MCP消息类型枚举"""
INITIALIZE = "initialize"
TOOLS_LIST = "tools/list"
TOOLS_CALL = "tools/call"
RESOURCES_LIST = "resources/list"
RESOURCES_READ = "resources/read"
NOTIFICATIONS = "notifications"
@dataclass
class MCPRequest:
"""MCP请求数据结构"""
<"share.m.hbgufen.com">
<"tv.share.hbgufen.com">
<"read.share.hbgufen.com">
jsonrpc: str = "2.0"
id: Optional[str] = None
method: str = ""
params: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
data = {
"jsonrpc": self.jsonrpc,
"method": self.method
}
if self.id:
data["id"] = self.id
if self.params:
data["params"] = self.params
return data
@dataclass
class MCPResponse:
"""MCP响应数据结构"""
jsonrpc: str
id: Optional[str]
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MCPResponse':
"""从字典创建响应对象"""
return cls(
jsonrpc=data.get("jsonrpc", "2.0"),
id=data.get("id"),
result=data.get("result"),
error=data.get("error")
)
def is_success(self) -> bool:
"""判断请求是否成功"""
return self.error is None
class MCPErrorCode(Enum):
"""MCP错误码枚举"""
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
<"share.tv.hbgufen.com">
<"live.share.hbgufen.com">
<"wap.share.hbgufen.com">
```
## MCP服务器基础实现
构建符合MCP标准的服务器需要实现核心协议处理方法。
```python
# mcp_server/base_server.py
from abc import ABC, abstractmethod
from typing import Dict, List, Any
import asyncio
class MCPServer(ABC):
"""MCP服务器基类"""
def __init__(self, server_name: str, version: str = "1.0.0"):
self.server_name = server_name
self.version = version
self.tools = {}
self.resources = {}
self._initialize_capabilities()
def _initialize_capabilities(self):
"""初始化服务器能力"""
self.capabilities = {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
"roots": {"listChanged": True}
}
async def handle_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理MCP请求"""
try:
request = MCPRequest(
id=request_data.get("id"),
method=request_data.get("method"),
params=request_data.get("params")
)
# 路由到对应的处理方法
if request.method == "initialize":
response = await self._handle_initialize(request)
elif request.method == "tools/list":
response = await self._handle_tools_list(request)
elif request.method == "tools/call":
response = await self._handle_tools_call(request)
elif request.method == "resources/list":
response = await self._handle_resources_list(request)
elif request.method == "resources/read":
response = await self._handle_resources_read(request)
else:
response = MCPResponse(
jsonrpc="2.0",
id=request.id,
error={
"code": -32601,
"message": f"Method not found: {request.method}"
}
)
return response.to_dict() if hasattr(response, 'to_dict') else {
"jsonrpc": "2.0",
"id": response.id,
"result": response.result,
"error": response.error
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_data.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
<"share.share.hbgufen.com">
<"gov.cn.hbgufen.com">
<"nba.share.hbgufen.com">
}
}
async def _handle_initialize(self, request: MCPRequest) -> MCPResponse:
"""处理初始化请求"""
return MCPResponse(
jsonrpc="2.0",
id=request.id,
result={
"protocolVersion": "2024-11-07",
"capabilities": self.capabilities,
"serverInfo": {
"name": self.server_name,
"version": self.version
}
}
)
async def _handle_tools_list(self, request: MCPRequest) -> MCPResponse:
"""处理工具列表请求"""
tools_list = []
for tool_name, tool_info in self.tools.items():
tools_list.append({
"name": tool_name,
"description": tool_info.get("description", ""),
"inputSchema": tool_info.get("inputSchema", {})
})
return MCPResponse(
jsonrpc="2.0",
id=request.id,
result={"tools": tools_list}
)
async def _handle_tools_call(self, request: MCPRequest) -> MCPResponse:
"""处理工具调用请求"""
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
if tool_name not in self.tools:
return MCPResponse(
jsonrpc="2.0",
id=request.id,
error={
"code": -32601,
"message": f"Tool not found: {tool_name}"
}
)
try:
result = await self._execute_tool(tool_name, arguments)
return MCPResponse(
jsonrpc="2.0",
id=request.id,
result={"content": [{"type": "text", "text": str(result)}]}
)
except Exception as e:
return MCPResponse(
jsonrpc="2.0",
id=request.id,
error={
"code": -32603,
"message": f"Tool execution failed: {str(e)}"
}
<"www.share.okjiabao.com">
<"share.m.okjiabao.com">
<"tv.share.okjiabao.com">
)
@abstractmethod
async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""执行具体工具(由子类实现)"""
pass
async def _handle_resources_list(self, request: MCPRequest) -> MCPResponse:
"""处理资源列表请求"""
resources_list = []
for resource_uri, resource_info in self.resources.items():
resources_list.append({
"uri": resource_uri,
"name": resource_info.get("name", ""),
"description": resource_info.get("description", ""),
"mimeType": resource_info.get("mimeType", "text/plain")
})
return MCPResponse(
jsonrpc="2.0",
id=request.id,
result={"resources": resources_list}
)
async def _handle_resources_read(self, request: MCPRequest) -> MCPResponse:
"""处理资源读取请求"""
resource_uri = request.params.get("uri")
if resource_uri not in self.resources:
return MCPResponse(
jsonrpc="2.0",
id=request.id,
error={
"code": -32601,
"message": f"Resource not found: {resource_uri}"
}
)
try:
content = await self._read_resource(resource_uri)
return MCPResponse(
jsonrpc="2.0",
id=request.id,
result={"contents": [{"uri": resource_uri, "text": content}]}
)
except Exception as e:
return MCPResponse(
jsonrpc="2.0",
id=request.id,
error={
"code": -32603,
"message": f"Resource read failed: {str(e)}"
}
)
@abstractmethod
async def _read_resource(self, resource_uri: str) -> str:
"""读取具体资源(由子类实现)"""
pass
<"read.share.okjiabao.com">
<"share.tv.okjiabao.com">
<"live.share.okjiabao.com">
```
## 具体MCP服务器实现
实现具有实际功能的MCP服务器,展示协议的实际应用。
```python
# mcp_server/file_system_server.py
import os
import json
from typing import Dict, Any
from .base_server import MCPServer
class FileSystemMCPServer(MCPServer):
"""文件系统MCP服务器"""
def __init__(self, base_directory: str = "."):
super().__init__("file-system-server", "1.0.0")
self.base_directory = os.path.abspath(base_directory)
self._register_tools()
self._register_resources()
def _register_tools(self):
"""注册文件系统工具"""
self.tools = {
"read_file": {
"description": "读取文件内容",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "文件路径"
}
},
"required": ["filepath"]
}
},
"write_file": {
"description": "写入文件内容",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "文件路径"
},
"content": {
"type": "string",
"description": "文件内容"
}
},
"required": ["filepath", "content"]
}
},
"list_directory": {
"description": "列出目录内容",
"inputSchema": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "目录路径"
}
},
"required": ["directory"]
}
},
"file_info": {
"description": "获取文件信息",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "文件路径"
}
},
"required": ["filepath"]
}
}
}
def _register_resources(self):
"""注册文件系统资源"""
self.resources = {
"file:///README": {
"name": "README文件",
"description": "项目说明文件",
"mimeType": "text/plain"
<"wap.share.okjiabao.com">
<"share.share.okjiabao.com">
<"gov.cn.okjiabao.com">
},
"file:///config": {
"name": "配置文件",
"description": "系统配置文件",
"mimeType": "application/json"
}
}
def _get_absolute_path(self, filepath: str) -> str:
"""获取绝对路径"""
absolute_path = os.path.join(self.base_directory, filepath)
# 安全检查:确保路径在基础目录内
if not absolute_path.startswith(self.base_directory):
raise ValueError("访问路径超出允许范围")
return absolute_path
async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""执行文件系统工具"""
if tool_name == "read_file":
return await self._read_file(arguments["filepath"])
elif tool_name == "write_file":
return await self._write_file(arguments["filepath"], arguments["content"])
elif tool_name == "list_directory":
return await self._list_directory(arguments["directory"])
elif tool_name == "file_info":
return await self._get_file_info(arguments["filepath"])
else:
raise ValueError(f"未知工具: {tool_name}")
async def _read_file(self, filepath: str) -> str:
"""读取文件"""
absolute_path = self._get_absolute_path(filepath)
with open(absolute_path, 'r', encoding='utf-8') as f:
return f.read()
async def _write_file(self, filepath: str, content: str) -> str:
"""写入文件"""
absolute_path = self._get_absolute_path(filepath)
os.makedirs(os.path.dirname(absolute_path), exist_ok=True)
with open(absolute_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"文件已写入: {filepath}"
<"nba.share.okjiabao.com">
<"hbgufen.com">
<"okjiabao.com">
async def _list_directory(self, directory: str) -> List[Dict[str, Any]]:
"""列出目录内容"""
absolute_path = self._get_absolute_path(directory)
items = []
for item in os.listdir(absolute_path):
item_path = os.path.join(absolute_path, item)
stat = os.stat(item_path)
items.append({
"name": item,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": stat.st_size,
"modified": stat.st_mtime
})
return items
async def _get_file_info(self, filepath: str) -> Dict[str, Any]:
"""获取文件信息"""
absolute_path = self._get_absolute_path(filepath)
stat = os.stat(absolute_path)
return {
"path": filepath,
"type": "directory" if os.path.isdir(absolute_path) else "file",
"size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"accessed": stat.st_atime
}
async def _read_resource(self, resource_uri: str) -> str:
"""读取资源"""
if resource_uri == "file:///README":
return "# MCP文件系统服务器\n\n这是一个MCP协议的文件系统服务器实现。"
elif resource_uri == "file:///config":
return json.dumps({
"server": {
"name": self.server_name,
"version": self.version,
"base_directory": self.base_directory
}
}, indent=2)
else:
raise ValueError(f"资源不存在: {resource_uri}")
```
## MCP客户端实现
构建能够与MCP服务器通信的客户端,完成完整的协议交互。
```python
# mcp_client/client.py
import aiohttp
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class ToolInfo:
"""工具信息"""
name: str
description: str
input_schema: Dict[str, Any]
@dataclass
class ResourceInfo:
"""资源信息"""
uri: str
name: str
description: str
mime_type: str
class MCPClient:
"""MCP客户端"""
def __init__(self, server_url: str):
self.server_url = server_url
self.session: Optional[aiohttp.ClientSession] = None
self.initialized = False
self.request_id = 0
async def connect(self):
"""连接到MCP服务器"""
self.session = aiohttp.ClientSession()
# 初始化连接
init_result = await self.initialize()
if not init_result.get("error"):
self.initialized = True
print("✅ MCP客户端连接成功")
else:
raise ConnectionError(f"初始化失败: {init_result.get('error')}")
async def close(self):
"""关闭连接"""
if self.session:
await self.session.close()
self.initialized = False
async def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""发送MCP请求"""
if not self.session:
raise RuntimeError("客户端未连接")
self.request_id += 1
request = {
"jsonrpc": "2.0",
"id": str(self.request_id),
"method": method,
"params": params or {}
}
try:
async with self.session.post(
self.server_url,
json=request,
headers={"Content-Type": "application/json"}
) as response:
return await response.json()
except Exception as e:
return {
"jsonrpc": "2.0",
"id": str(self.request_id),
"error": {
"code": -32000,
"message": f"请求失败: {str(e)}"
}
}
async def initialize(self) -> Dict[str, Any]:
"""初始化连接"""
params = {
"protocolVersion": "2024-11-07",
"capabilities": {
"roots": {"listChanged": True},
"tools": {"listChanged": True}
},
"clientInfo": {
"name": "python-mcp-client",
"version": "1.0.0"
<"www.hbgufen.com">
<"www.okjiabao.com">
}
}
return await self._send_request("initialize", params)
async def list_tools(self) -> List[ToolInfo]:
"""获取工具列表"""
response = await self._send_request("tools/list")
if "error" in response:
raise RuntimeError(f"获取工具列表失败: {response['error']}")
tools = []
for tool_data in response["result"]["tools"]:
tools.append(ToolInfo(
name=tool_data["name"],
description=tool_data["description"],
input_schema=tool_data["inputSchema"]
))
return tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""调用工具"""
params = {
"name": tool_name,
"arguments": arguments
}
response = await self._send_request("tools/call", params)
if "error" in response:
raise RuntimeError(f"工具调用失败: {response['error']}")
# 提取文本内容
content = response["result"]["content"]
for item in content:
if item["type"] == "text":
return item["text"]
return content
async def list_resources(self) -> List[ResourceInfo]:
"""获取资源列表"""
response = await self._send_request("resources/list")
if "error" in response:
raise RuntimeError(f"获取资源列表失败: {response['error']}")
resources = []
for resource_data in response["result"]["resources"]:
resources.append(ResourceInfo(
uri=resource_data["uri"],
name=resource_data["name"],
description=resource_data["description"],
mime_type=resource_data["mimeType"]
))
return resources
async def read_resource(self, resource_uri: str) -> str:
"""读取资源"""
params = {
"uri": resource_uri
}
response = await self._send_request("resources/read", params)
if "error" in response:
raise RuntimeError(f"资源读取失败: {response['error']}")
# 提取资源内容
contents = response["result"]["contents"]
for content in contents:
if content["uri"] == resource_uri:
return content["text"]
raise ValueError(f"资源内容未找到: {resource_uri}")
```
## 完整工作流示例
展示MCP协议在实际应用中的完整工作流程。
```python
# examples/complete_workflow.py
import asyncio
import sys
import os
# 添加模块路径
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from mcp_server.file_system_server import FileSystemMCPServer
from mcp_client.client import MCPClient
class MCPWorkflowDemo:
"""MCP工作流演示"""
def __init__(self, server_url: str = "http://localhost:8080/mcp"):
self.server_url = server_url
self.client = MCPClient(server_url)
async def run_demo(self):
"""运行完整演示"""
print("? 开始MCP协议完整工作流演示")
print("=" * 50)
try:
# 连接服务器
await self.client.connect()
# 演示1: 查看可用工具
await self.demo_tools_list()
# 演示2: 工具调用
await self.demo_tool_calls()
# 演示3: 资源访问
await self.demo_resource_access()
# 演示4: 错误处理
await self.demo_error_handling()
finally:
await self.client.close()
async def demo_tools_list(self):
"""演示工具列表功能"""
print("\n? 演示1: 查看可用工具")
print("-" * 30)
tools = await self.client.list_tools()
print(f"发现 {len(tools)} 个可用工具:")
for tool in tools:
print(f" • {tool.name}: {tool.description}")
async def demo_tool_calls(self):
"""演示工具调用功能"""
print("\n? 演示2: 工具调用")
print("-" * 30)
# 创建测试文件
test_content = "这是通过MCP客户端创建的文件内容\n创建时间戳"
try:
# 写入文件
write_result = await self.client.call_tool("write_file", {
"filepath": "test_demo.txt",
"content": test_content
})
print(f"✅ 文件写入: {write_result}")
# 读取文件
read_result = await self.client.call_tool("read_file", {
"filepath": "test_demo.txt"
})
print(f"? 文件内容: {read_result}")
# 列出目录
list_result = await self.client.call_tool("list_directory", {
"directory": "."
})
print(f"? 目录内容: {len(list_result)} 个项目")
except Exception as e:
print(f"❌ 工具调用失败: {e}")
async def demo_resource_access(self):
"""演示资源访问功能"""
print("\n? 演示3: 资源访问")
print("-" * 30)
try:
resources = await self.client.list_resources()
print(f"发现 {len(resources)} 个可用资源:")
for resource in resources:
print(f" • {resource.name} ({resource.uri})")
# 读取资源内容
content = await self.client.read_resource(resource.uri)
print(f" 内容预览: {content[:50]}...")
except Exception as e:
print(f"❌ 资源访问失败: {e}")
async def demo_error_handling(self):
"""演示错误处理"""
print("\n⚠️ 演示4: 错误处理")
print("-" * 30)
# 测试不存在的工具
try:
await self.client.call_tool("non_existent_tool", {})
except Exception as e:
print(f"✅ 正确捕获错误: {e}")
# 测试无效参数
try:
await self.client.call_tool("read_file", {
"invalid_param": "value"
})
except Exception as e:
print(f"✅ 正确捕获参数错误: {e}")
# HTTP服务器包装
from aiohttp import web
import json
class MCPServerWrapper:
"""MCP服务器HTTP包装器"""
def __init__(self, mcp_server: MCPServer):
self.mcp_server = mcp_server
self.app = web.Application()
self.app.router.add_post('/mcp', self.handle_mcp_request)
self.app.router.add_get('/health', self.handle_health_check)
async def handle_mcp_request(self, request: web.Request) -> web.Response:
"""处理MCP请求"""
try:
data = await request.json()
result = await self.mcp_server.handle_request(data)
return web.Response(
text=json.dumps(result),
content_type='application/json'
)
except Exception as e:
return web.Response(
text=json.dumps({
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}),
content_type='application/json',
status=400
)
async def handle_health_check(self, request: web.Request) -> web.Response:
"""健康检查"""
return web.Response(
text=json.dumps({"status": "healthy", "service": "mcp-server"}),
content_type='application/json'
)
async def start(self, host: str = 'localhost', port: int = 8080):
"""启动服务器"""
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
print(f"? MCP服务器运行在 http://{host}:{port}")
return runner
async def main():
"""主函数"""
# 创建文件系统MCP服务器
file_server = FileSystemMCPServer()
server_wrapper = MCPServerWrapper(file_server)
# 启动服务器
runner = await server_wrapper.start()
try:
# 运行客户端演示
demo = MCPWorkflowDemo()
await demo.run_demo()
# 保持服务器运行
print("\n? 服务器运行中,按 Ctrl+C 停止...")
await asyncio.Future()
except KeyboardInterrupt:
print("\n? 停止服务器")
finally:
await runner.cleanup()
if __name__ == "__main__":
asyncio.run(main())
```
## 高级特性与最佳实践
实现MCP协议的高级特性和生产环境最佳实践。
```python
# mcp_server/advanced_features.py
import asyncio
from typing import Dict, Any, List
from contextlib import asynccontextmanager
class AdvancedMCPServer(FileSystemMCPServer):
"""高级MCP服务器特性"""
def __init__(self, base_directory: str = "."):
super().__init__(base_directory)
self.session_storage = {}
self.request_log = []
# 注册高级工具
self._register_advanced_tools()
def _register_advanced_tools(self):
"""注册高级工具"""
self.tools.update({
"batch_operations": {
"description": "批量文件操作",
"inputSchema": {
"type": "object",
"properties": {
"operations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["read", "write", "delete"]},
"filepath": {"type": "string"},
"content": {"type": "string"}
},
"required": ["type", "filepath"]
}
}
},
"required": ["operations"]
}
},
"search_files": {
"description": "搜索文件内容",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"directory": {"type": "string"},
"file_pattern": {"type": "string"}
},
"required": ["query"]
}
}
})
async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""执行工具(包含高级特性)"""
# 记录请求日志
self._log_request(tool_name, arguments)
if tool_name == "batch_operations":
return await self._batch_operations(arguments["operations"])
elif tool_name == "search_files":
return await self._search_files(
arguments["query"],
arguments.get("directory", "."),
arguments.get("file_pattern", "*")
)
else:
return await super()._execute_tool(tool_name, arguments)
async def _batch_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
"""批量操作"""
results = []
for op in operations:
try:
if op["type"] == "read":
result = await self._read_file(op["filepath"])
elif op["type"] == "write":
result = await self._write_file(op["filepath"], op["content"])
elif op["type"] == "delete":
result = await self._delete_file(op["filepath"])
else:
result = f"未知操作类型: {op['type']}"
results.append({"operation": op, "result": result, "success": True})
except Exception as e:
results.append({"operation": op, "error": str(e), "success": False})
return results
async def _search_files(self, query: str, directory: str, file_pattern: str) -> List[Dict[str, Any]]:
"""搜索文件内容"""
import glob
import re
search_results = []
absolute_dir = self._get_absolute_path(directory)
pattern = os.path.join(absolute_dir, "**", file_pattern)
for filepath in glob.glob(pattern, recursive=True):
if os.path.isfile(filepath):
try:
content = await self._read_file(os.path.relpath(filepath, self.base_directory))
# 搜索匹配行
matches = []
for line_num, line in enumerate(content.split('\n'), 1):
if re.search(query, line, re.IGNORECASE):
matches.append({
"line": line_num,
"content": line.strip()
})
if matches:
search_results.append({
"filepath": os.path.relpath(filepath, self.base_directory),
"matches": matches
})
except Exception:
continue # 跳过无法读取的文件
return search_results
async def _delete_file(self, filepath: str) -> str:
"""删除文件"""
absolute_path = self._get_absolute_path(filepath)
if os.path.exists(absolute_path):
os.remove(absolute_path)
return f"文件已删除: {filepath}"
else:
raise FileNotFoundError(f"文件不存在: {filepath}")
def _log_request(self, tool_name: str, arguments: Dict[str, Any]):
"""记录请求日志"""
log_entry = {
"timestamp": asyncio.get_event_loop().time(),
"tool": tool_name,
"arguments": arguments
}
self.request_log.append(log_entry)
# 限制日志大小
if len(self.request_log) > 1000:
self.request_log = self.request_log[-1000:]
def get_usage_statistics(self) -> Dict[str, Any]:
"""获取使用统计"""
tool_usage = {}
for log in self.request_log:
tool = log["tool"]
tool_usage[tool] = tool_usage.get(tool, 0) + 1
return {
"total_requests": len(self.request_log),
"tool_usage": tool_usage,
"server_uptime": asyncio.get_event_loop().time()
}
```
MCP上下文协议通过标准化的工具调用和资源访问机制,为构建可扩展的AI应用提供了坚实基础。从基础协议实现到高级特性开发,MCP正在成为连接大语言模型与现实世界的重要桥梁。