在现代分布式系统架构中,灵活可靠的通信机制是系统设计的核心要素。MCP(Model Context Protocol)框架通过支持stdio、HTTP+SSE等多种通信方式,为不同场景下的客户端-服务器交互提供了完整的解决方案。本文将深入探讨这三种通信模式的实现原理和实战应用。
## MCP框架架构概述
MCP框架采用分层设计,将通信协议与业务逻辑分离,使得开发者能够根据具体需求选择合适的通信方式。
```python
# mcp_core/architecture.py
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
import asyncio
import json
class CommunicationMode(Enum):
"""通信模式枚举"""
STDIO = "stdio"
HTTP = "http"
HTTP_SSE = "http_sse"
<"www.jiaxing.gov.cn.felli.cn">
<"www.tonghua.gov.cn.felli.cn">
<"www.baishan.gov.cn.felli.cn">
class BaseMCPServer(ABC):
"""MCP服务器基类"""
def __init__(self, mode: CommunicationMode):
self.mode = mode
self.clients = {}
self.message_handlers = {}
self._setup_handlers()
def _setup_handlers(self):
"""设置消息处理器"""
self.message_handlers = {
"initialize": self._handle_initialize,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
"notifications": self._handle_notifications
}
@abstractmethod
async def start(self, **kwargs):
"""启动服务器"""
pass
@abstractmethod
async def stop(self):
"""停止服务器"""
pass
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理初始化请求"""
return {
"protocolVersion": "2024-11-07",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True}
},
<"www.songyuan.gov.cn.felli.cn">
<"www.baiquan.gov.cn.felli.cn">
<"www.jinhua.gov.cn.felli.cn">
"serverInfo": {
"name": "MCP Server",
"version": "1.0.0"
}
}
async def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理工具列表请求"""
return {
"tools": [
{
"name": "echo",
"description": "回显输入文本",
"inputSchema": {
"type": "object",
"properties": {
"message": {"type": "string"}
}
}
}
]
}
async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理工具调用请求"""
tool_name = params["name"]
arguments = params.get("arguments", {})
if tool_name == "echo":
return {
"content": [{
"type": "text",
"text": f"回显: {arguments.get('message', '')}"
}]
}
raise ValueError(f"未知工具: {tool_name}")
```
## STDIO通信方式实现
STDIO(标准输入输出)通信适用于本地进程间通信,具有低延迟、简单可靠的特性。
```python
# mcp_servers/stdio_server.py
import sys
import asyncio
import json
from typing import Any, Dict
from mcp_core.architecture import BaseMCPServer, CommunicationMode
class StdioMCPServer(BaseMCPServer):
"""基于STDIO的MCP服务器"""
def __init__(self):
super().__init__(CommunicationMode.STDIO)
self.reader: Optional[asyncio.StreamReader] = None
self.writer: Optional[asyncio.StreamWriter] = None
async def start(self):
"""启动STDIO服务器"""
loop = asyncio.get_event_loop()
# 使用标准输入输出创建流
self.reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(self.reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
self.writer = asyncio.StreamWriter(transport, protocol, None, loop)
print("MCP STDIO服务器已启动", file=sys.stderr)
await self._message_loop()
async def _message_loop(self):
"""消息处理循环"""
while True:
<"www.taizhou.gov.cn.felli.cn">
<"www.qiqihar.gov.cn.felli.cn">
<"www.jixi.gov.cn.felli.cn">
try:
# 读取消息长度头
header = await self.reader.readuntil(b'\n')
if not header:
break
content_length = int(header.decode('utf-8').strip())
# 读取消息体
body_data = await self.reader.readexactly(content_length)
message = json.loads(body_data.decode('utf-8'))
# 处理消息
response = await self._process_message(message)
# 发送响应
await self._send_message(response)
except asyncio.IncompleteReadError:
break
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": message.get("id") if 'message' in locals() else None,
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}
await self._send_message(error_response)
async def _process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""处理单个消息"""
<"www.hegang.gov.cn.felli.cn">
<"www.shuangyashan.gov.cn.felli.cn">
<"www.jiamusi.gov.cn.felli.cn">
method = message.get("method")
message_id = message.get("id")
if method in self.message_handlers:
params = message.get("params", {})
result = await self.message_handlers[method](params)
return {
"jsonrpc": "2.0",
"id": message_id,
"result": result
}
else:
return {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": -32601,
"message": f"方法未找到: {method}"
}
}
async def _send_message(self, message: Dict[str, Any]):
"""发送消息到客户端"""
message_str = json.dumps(message, ensure_ascii=False)
message_bytes = message_str.encode('utf-8')
# 发送消息长度头
header = f"{len(message_bytes)}\n".encode('utf-8')
self.writer.write(header)
# 发送消息体
self.writer.write(message_bytes)
await self.writer.drain()
async def stop(self):
"""停止服务器"""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
# STDIO客户端实现
class StdioMCPClient:
"""基于STDIO的MCP客户端"""
def __init__(self):
self.reader = None
self.writer = None
self.request_id = 0
async def connect(self):
"""连接到STDIO服务器"""
loop = asyncio.get_event_loop()
# 创建标准输入输出流
self.reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(self.reader)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
self.writer = asyncio.StreamWriter(transport, protocol, None, loop)
async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送请求到服务器"""
self.request_id += 1
<"www.qitaihe.gov.cn.felli.cn">
<"www.heihe.gov.cn.felli.cn">
<"www.suihua.gov.cn.felli.cn">
request = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": method,
"params": params
}
await self._send_message(request)
response = await self._receive_message()
return response
async def _send_message(self, message: Dict[str, Any]):
"""发送消息"""
message_str = json.dumps(message, ensure_ascii=False)
message_bytes = message_str.encode('utf-8')
header = f"{len(message_bytes)}\n".encode('utf-8')
self.writer.write(header)
self.writer.write(message_bytes)
await self.writer.drain()
async def _receive_message(self) -> Dict[str, Any]:
"""接收消息"""
header = await self.reader.readuntil(b'\n')
content_length = int(header.decode('utf-8').strip())
body_data = await self.reader.readexactly(content_length)
return json.loads(body_data.decode('utf-8'))
```
## HTTP通信方式实现
HTTP通信支持远程客户端连接,适合分布式部署场景。
```python
# mcp_servers/http_server.py
from aiohttp import web
import json
<"www.hulunbeier.gov.cn.felli.cn">
<"www.qiqihaer.gov.cn.felli.cn">
<"www.liyang.gov.cn.felli.cn">
from typing import Any, Dict
from mcp_core.architecture import BaseMCPServer, CommunicationMode
class HttpMCPServer(BaseMCPServer):
"""基于HTTP的MCP服务器"""
def __init__(self, host: str = "localhost", port: int = 8080):
super().__init__(CommunicationMode.HTTP)
self.host = host
self.port = port
self.app = web.Application()
self._setup_routes()
def _setup_routes(self):
"""设置HTTP路由"""
self.app.router.add_post('/mcp', self._handle_mcp_request)
self.app.router.add_get('/health', self._handle_health_check)
async def start(self):
"""启动HTTP服务器"""
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
print(f"HTTP MCP服务器运行在 http://{self.host}:{self.port}")
async def _handle_mcp_request(self, request: web.Request) -> web.Response:
"""处理MCP请求"""
try:
# 解析JSON请求
data = await request.json()
# 处理消息
response_data = await self._process_message(data)
return web.Response(
text=json.dumps(response_data, ensure_ascii=False),
content_type='application/json'
)
except json.JSONDecodeError:
return web.Response(
text=json.dumps({
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "解析错误"
}
}),
status=400,
content_type='application/json'
)
except Exception as e:
return web.Response(
text=json.dumps({
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}),
status=500,
content_type='application/json'
)
async def _handle_health_check(self, request: web.Request) -> web.Response:
"""健康检查端点"""
return web.Response(
text=json.dumps({"status": "healthy", "mode": "http"}),
content_type='application/json'
)
<"www.dongcheng.gov.cn.felli.cn">
<"www.xicheng.gov.cn.felli.cn">
<"www.chaoyang.gov.cn.felli.cn">
async def stop(self):
"""停止HTTP服务器"""
# 在aiohttp中,停止通过外部控制实现
pass
# HTTP客户端实现
import aiohttp
import asyncio
class HttpMCPClient:
"""基于HTTP的MCP客户端"""
def __init__(self, base_url: str):
self.base_url = base_url
self.session = None
self.request_id = 0
async def connect(self):
"""创建HTTP会话"""
self.session = aiohttp.ClientSession()
async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送HTTP请求"""
self.request_id += 1
request_data = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": method,
"params": params
}
async with self.session.post(f"{self.base_url}/mcp",
json=request_data) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"HTTP错误: {response.status}")
async def close(self):
"""关闭客户端"""
if self.session:
await self.session.close()
```
## HTTP+SSE实时通信实现
HTTP+SSE(Server-Sent Events)结合了HTTP的简单性和实时推送能力,适合需要服务器主动推送的场景。
```python
# mcp_servers/http_sse_server.py
from aiohttp import web, SSEResponse
import asyncio
import json
from typing import Any, Dict, Set
from mcp_core.architecture import BaseMCPServer, CommunicationMode
class HttpSSEMCPServer(BaseMCPServer):
"""基于HTTP+SSE的MCP服务器"""
def __init__(self, host: str = "localhost", port: int = 8081):
super().__init__(CommunicationMode.HTTP_SSE)
self.host = host
self.port = port
self.app = web.Application()
self.sse_connections: Set[SSEResponse] = set()
self._setup_routes()
def _setup_routes(self):
"""设置HTTP+SSE路由"""
self.app.router.add_post('/mcp', self._handle_mcp_request)
self.app.router.add_get('/events', self._handle_sse_connection)
self.app.router.add_post('/notify', self._handle_notification)
self.app.router.add_get('/health', self._handle_health_check)
async def start(self):
"""启动HTTP+SSE服务器"""
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
<"www.fengtai.gov.cn.felli.cn">
<"www.shijingshan.gov.cn.felli.cn">
<"www.haidian.gov.cn.felli.cn">
print(f"HTTP+SSE MCP服务器运行在 http://{self.host}:{self.port}")
async def _handle_sse_connection(self, request: web.Request) -> SSEResponse:
"""处理SSE连接"""
response = SSEResponse()
await response.prepare(request)
self.sse_connections.add(response)
try:
# 发送连接确认
await response.send(json.dumps({
"type": "connected",
"message": "SSE连接已建立"
}))
# 保持连接
async for _ in response:
await asyncio.sleep(1)
except ConnectionError:
print("SSE连接断开")
finally:
self.sse_connections.remove(response)
return response
async def _handle_mcp_request(self, request: web.Request) -> web.Response:
"""处理MCP请求"""
try:
data = await request.json()
response_data = await self._process_message(data)
return web.Response(
text=json.dumps(response_data, ensure_ascii=False),
content_type='application/json'
)
except Exception as e:
return web.Response(
text=json.dumps({
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}),
status=500,
content_type='application/json'
)
async def _handle_notification(self, request: web.Request) -> web.Response:
"""处理通知请求"""
try:
data = await request.json()
notification_type = data.get("type")
message = data.get("message")
# 广播通知到所有SSE客户端
await self._broadcast_notification(notification_type, message)
return web.Response(
text=json.dumps({"status": "notification_sent"}),
content_type='application/json'
)
except Exception as e:
return web.Response(
text=json.dumps({"error": str(e)}),
status=500,
content_type='application/json'
)
async def _broadcast_notification(self, notification_type: str, message: str):
"""广播通知到所有SSE客户端"""
notification = {
"type": notification_type,
"timestamp": asyncio.get_event_loop().time(),
"message": message
}
disconnected = set()
for connection in self.sse_connections:
try:
await connection.send(json.dumps(notification))
except ConnectionError:
disconnected.add(connection)
# 清理断开的连接
for connection in disconnected:
self.sse_connections.remove(connection)
async def _handle_health_check(self, request: web.Request) -> web.Response:
"""健康检查端点"""
health_info = {
"status": "healthy",
"mode": "http_sse",
"connected_clients": len(self.sse_connections),
"active_handlers": len(self.message_handlers)
}
return web.Response(
text=json.dumps(health_info),
content_type='application/json'
)
<"www.mentougou.gov.cn.felli.cn">
<"www.fangshan.gov.cn.felli.cn">
<"www.tongzhou.gov.cn.felli.cn">
async def stop(self):
"""停止服务器"""
# 关闭所有SSE连接
for connection in list(self.sse_connections):
await connection._close()
self.sse_connections.clear()
# HTTP+SSE客户端实现
import aiohttp
import asyncio
class HttpSSEMCPClient:
"""基于HTTP+SSE的MCP客户端"""
def __init__(self, base_url: str):
self.base_url = base_url
self.http_session = None
self.event_handlers = {}
self.is_listening = False
async def connect(self):
"""连接到服务器"""
self.http_session = aiohttp.ClientSession()
async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送HTTP请求"""
request_data = {
"jsonrpc": "2.0",
"method": method,
"params": params
}
async with self.http_session.post(f"{self.base_url}/mcp",
json=request_data) as response:
return await response.json()
async def listen_events(self):
"""监听服务器事件"""
self.is_listening = True
async with self.http_session.get(f"{self.base_url}/events") as response:
async for line in response.content:
if line.startswith(b'data: '):
try:
event_data = json.loads(line[6:].decode('utf-8'))
await self._handle_event(event_data)
except json.JSONDecodeError:
continue
async def _handle_event(self, event_data: Dict[str, Any]):
"""处理服务器事件"""
event_type = event_data.get("type")
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
await handler(event_data)
def on_event(self, event_type: str):
"""事件处理器装饰器"""
def decorator(handler):
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
return handler
return decorator
async def close(self):
"""关闭客户端"""
self.is_listening = False
if self.http_session:
await self.http_session.close()
```
## 统一服务器工厂与配置管理
为了简化不同通信方式的服务器的创建和管理,实现统一的服务器工厂。
```python
# mcp_core/server_factory.py
from typing import Dict, Type, Any
from mcp_servers.stdio_server import StdioMCPServer
from mcp_servers.http_server import HttpMCPServer
from mcp_servers.http_sse_server import HttpSSEMCPServer
from mcp_core.architecture import CommunicationMode, BaseMCPServer
class MCPServerFactory:
<"www.shunyi.gov.cn.felli.cn">
<"www.changping.gov.cn.felli.cn">
<"www.daxing.gov.cn.felli.cn">
"""MCP服务器工厂"""
_server_classes: Dict[CommunicationMode, Type[BaseMCPServer]] = {
CommunicationMode.STDIO: StdioMCPServer,
CommunicationMode.HTTP: HttpMCPServer,
CommunicationMode.HTTP_SSE: HttpSSEMCPServer
}
@classmethod
def create_server(cls, mode: CommunicationMode, **kwargs) -> BaseMCPServer:
"""创建指定模式的服务器"""
if mode not in cls._server_classes:
raise ValueError(f"不支持的通信模式: {mode}")
server_class = cls._server_classes[mode]
return server_class(**kwargs)
@classmethod
async def start_server(cls, config: Dict[str, Any]) -> BaseMCPServer:
"""根据配置启动服务器"""
mode = CommunicationMode(config.get("mode", "stdio"))
server_config = config.get("server_config", {})
server = cls.create_server(mode, **server_config)
await server.start()
return server
# 配置管理
import yaml
from pathlib import Path
class MCPConfigManager:
"""MCP配置管理器"""
def __init__(self, config_path: str = "mcp_config.yaml"):
self.config_path = Path(config_path)
self.config = self._load_default_config()
def _load_default_config(self) -> Dict[str, Any]:
"""加载默认配置"""
return {
"servers": {
"stdio": {
"mode": "stdio",
"enabled": True
},
"http": {
"mode": "http",
"enabled": False,
"server_config": {
"host": "localhost",
"port": 8080
}
},
"http_sse": {
"mode": "http_sse",
"enabled": True,
"server_config": {
"host": "localhost",
"port": 8081
}
}
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
}
def load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
if self.config_path.exists():
with open(self.config_path, 'r', encoding='utf-8') as f:
user_config = yaml.safe_load(f)
self._merge_configs(self.config, user_config)
return self.config
def _merge_configs(self, base: Dict[str, Any], update: Dict[str, Any]):
"""合并配置"""
for key, value in update.items():
if isinstance(value, dict) and key in base and isinstance(base[key], dict):
self._merge_configs(base[key], value)
else:
base[key] = value
# 使用示例
async def main():
"""主函数示例"""
config_manager = MCPConfigManager()
config = config_manager.load_config()
servers = []
# 启动所有启用的服务器
for server_name, server_config in config["servers"].items():
if server_config["enabled"]:
print(f"启动 {server_name} 服务器...")
server = await MCPServerFactory.start_server(server_config)
servers.append(server)<"www.huairou.gov.cn.felli.cn">
try:
# 保持服务器运行
await asyncio.Future()
except KeyboardInterrupt:
print("正在停止服务器...")
for server in servers:
await server.stop()
if __name__ == "__main__":
asyncio.run(main())
```
通过本文的详细讲解和代码示例,我们完整展示了MCP框架支持的三种通信方式:STDIO适用于本地进程间通信,HTTP支持远程客户端访问,HTTP+SSE则提供了实时推送能力。这种灵活的架构设计使得MCP框架能够适应各种不同的应用场景,从简单的本地工具到复杂的分布式系统都能得到良好支持。