MCP框架搭建详解:三种通信方式与实时推送实战


在现代分布式系统架构中,灵活可靠的通信机制是系统设计的核心要素。MCP(Model Context Protocol)框架通过支持stdio、HTTP+SSE等多种通信方式,为不同场景下的客户端-服务器交互提供了完整的解决方案。本文将深入探讨这三种通信模式的实现原理和实战应用。


## MCP框架架构概述


MCP框架采用分层设计,将通信协议与业务逻辑分离,使得开发者能够根据具体需求选择合适的通信方式。


```python

# mcp_core/architecture.py

from abc import ABC, abstractmethod

from enum import Enum

from typing import Any, Dict, List, Optional

import asyncio

import json


class CommunicationMode(Enum):

    """通信模式枚举"""

    STDIO = "stdio"

    HTTP = "http"

    HTTP_SSE = "http_sse"

<"www.jiaxing.gov.cn.felli.cn">

<"www.tonghua.gov.cn.felli.cn">

<"www.baishan.gov.cn.felli.cn">


class BaseMCPServer(ABC):

    """MCP服务器基类"""

    

    def __init__(self, mode: CommunicationMode):

        self.mode = mode

        self.clients = {}

        self.message_handlers = {}

        self._setup_handlers()

    

    def _setup_handlers(self):

        """设置消息处理器"""

        self.message_handlers = {

            "initialize": self._handle_initialize,

            "tools/list": self._handle_tools_list,

            "tools/call": self._handle_tools_call,

            "notifications": self._handle_notifications

        }

    

    @abstractmethod

    async def start(self, **kwargs):

        """启动服务器"""

        pass

    

    @abstractmethod

    async def stop(self):

        """停止服务器"""

        pass

    

    async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:

        """处理初始化请求"""

        return {

            "protocolVersion": "2024-11-07",

            "capabilities": {

                "tools": {"listChanged": True},

                "resources": {"listChanged": True}

            },

<"www.songyuan.gov.cn.felli.cn">

<"www.baiquan.gov.cn.felli.cn">

<"www.jinhua.gov.cn.felli.cn">

            "serverInfo": {

                "name": "MCP Server",

                "version": "1.0.0"

            }

        }

    

    async def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:

        """处理工具列表请求"""

        return {

            "tools": [

                {

                    "name": "echo",

                    "description": "回显输入文本",

                    "inputSchema": {

                        "type": "object",

                        "properties": {

                            "message": {"type": "string"}

                        }

                    }

                }

            ]

        }

    

    async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:

        """处理工具调用请求"""

        tool_name = params["name"]

        arguments = params.get("arguments", {})

        

        if tool_name == "echo":

            return {

                "content": [{

                    "type": "text",

                    "text": f"回显: {arguments.get('message', '')}"

                }]

            }

        

        raise ValueError(f"未知工具: {tool_name}")

```


## STDIO通信方式实现


STDIO(标准输入输出)通信适用于本地进程间通信,具有低延迟、简单可靠的特性。


```python

# mcp_servers/stdio_server.py

import sys

import asyncio

import json

from typing import Any, Dict

from mcp_core.architecture import BaseMCPServer, CommunicationMode


class StdioMCPServer(BaseMCPServer):

    """基于STDIO的MCP服务器"""

    

    def __init__(self):

        super().__init__(CommunicationMode.STDIO)

        self.reader: Optional[asyncio.StreamReader] = None

        self.writer: Optional[asyncio.StreamWriter] = None

    

    async def start(self):

        """启动STDIO服务器"""

        loop = asyncio.get_event_loop()

        

        # 使用标准输入输出创建流

        self.reader = asyncio.StreamReader()

        protocol = asyncio.StreamReaderProtocol(self.reader)

        

        await loop.connect_read_pipe(lambda: protocol, sys.stdin)

        

        transport, protocol = await loop.connect_write_pipe(

            asyncio.streams.FlowControlMixin, sys.stdout

        )

        self.writer = asyncio.StreamWriter(transport, protocol, None, loop)

        

        print("MCP STDIO服务器已启动", file=sys.stderr)

        await self._message_loop()

    

    async def _message_loop(self):

        """消息处理循环"""

        while True:

<"www.taizhou.gov.cn.felli.cn">

<"www.qiqihar.gov.cn.felli.cn">

<"www.jixi.gov.cn.felli.cn">

            try:

                # 读取消息长度头

                header = await self.reader.readuntil(b'\n')

                if not header:

                    break

                    

                content_length = int(header.decode('utf-8').strip())

                

                # 读取消息体

                body_data = await self.reader.readexactly(content_length)

                message = json.loads(body_data.decode('utf-8'))

                

                # 处理消息

                response = await self._process_message(message)

                

                # 发送响应

                await self._send_message(response)

                

            except asyncio.IncompleteReadError:

                break

            except Exception as e:

                error_response = {

                    "jsonrpc": "2.0",

                    "id": message.get("id") if 'message' in locals() else None,

                    "error": {

                        "code": -32603,

                        "message": f"内部错误: {str(e)}"

                    }

                }

                await self._send_message(error_response)

    

    async def _process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:

        """处理单个消息"""

<"www.hegang.gov.cn.felli.cn">

<"www.shuangyashan.gov.cn.felli.cn">

<"www.jiamusi.gov.cn.felli.cn">

        method = message.get("method")

        message_id = message.get("id")

        

        if method in self.message_handlers:

            params = message.get("params", {})

            result = await self.message_handlers[method](params)

            

            return {

                "jsonrpc": "2.0",

                "id": message_id,

                "result": result

            }

        else:

            return {

                "jsonrpc": "2.0",

                "id": message_id,

                "error": {

                    "code": -32601,

                    "message": f"方法未找到: {method}"

                }

            }

    

    async def _send_message(self, message: Dict[str, Any]):

        """发送消息到客户端"""

        message_str = json.dumps(message, ensure_ascii=False)

        message_bytes = message_str.encode('utf-8')

        

        # 发送消息长度头

        header = f"{len(message_bytes)}\n".encode('utf-8')

        self.writer.write(header)

        

        # 发送消息体

        self.writer.write(message_bytes)

        await self.writer.drain()

    

    async def stop(self):

        """停止服务器"""

        if self.writer:

            self.writer.close()

            await self.writer.wait_closed()


# STDIO客户端实现

class StdioMCPClient:

    """基于STDIO的MCP客户端"""

    

    def __init__(self):

        self.reader = None

        self.writer = None

        self.request_id = 0

    

    async def connect(self):

        """连接到STDIO服务器"""

        loop = asyncio.get_event_loop()

        

        # 创建标准输入输出流

        self.reader = asyncio.StreamReader()

        reader_protocol = asyncio.StreamReaderProtocol(self.reader)

        await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)

        

        transport, protocol = await loop.connect_write_pipe(

            asyncio.streams.FlowControlMixin, sys.stdout

        )

        self.writer = asyncio.StreamWriter(transport, protocol, None, loop)

    

    async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:

        """发送请求到服务器"""

        self.request_id += 1

<"www.qitaihe.gov.cn.felli.cn">

<"www.heihe.gov.cn.felli.cn">

<"www.suihua.gov.cn.felli.cn">

        request = {

            "jsonrpc": "2.0",

            "id": self.request_id,

            "method": method,

            "params": params

        }

        

        await self._send_message(request)

        response = await self._receive_message()

        return response

    

    async def _send_message(self, message: Dict[str, Any]):

        """发送消息"""

        message_str = json.dumps(message, ensure_ascii=False)

        message_bytes = message_str.encode('utf-8')

        

        header = f"{len(message_bytes)}\n".encode('utf-8')

        self.writer.write(header)

        self.writer.write(message_bytes)

        await self.writer.drain()

    

    async def _receive_message(self) -> Dict[str, Any]:

        """接收消息"""

        header = await self.reader.readuntil(b'\n')

        content_length = int(header.decode('utf-8').strip())

        

        body_data = await self.reader.readexactly(content_length)

        return json.loads(body_data.decode('utf-8'))

```


## HTTP通信方式实现


HTTP通信支持远程客户端连接,适合分布式部署场景。


```python

# mcp_servers/http_server.py

from aiohttp import web

import json

<"www.hulunbeier.gov.cn.felli.cn">

<"www.qiqihaer.gov.cn.felli.cn">

<"www.liyang.gov.cn.felli.cn">

from typing import Any, Dict

from mcp_core.architecture import BaseMCPServer, CommunicationMode


class HttpMCPServer(BaseMCPServer):

    """基于HTTP的MCP服务器"""

    

    def __init__(self, host: str = "localhost", port: int = 8080):

        super().__init__(CommunicationMode.HTTP)

        self.host = host

        self.port = port

        self.app = web.Application()

        self._setup_routes()

    

    def _setup_routes(self):

        """设置HTTP路由"""

        self.app.router.add_post('/mcp', self._handle_mcp_request)

        self.app.router.add_get('/health', self._handle_health_check)

    

    async def start(self):

        """启动HTTP服务器"""

        runner = web.AppRunner(self.app)

        await runner.setup()

        

        site = web.TCPSite(runner, self.host, self.port)

        await site.start()

        

        print(f"HTTP MCP服务器运行在 http://{self.host}:{self.port}")

    

    async def _handle_mcp_request(self, request: web.Request) -> web.Response:

        """处理MCP请求"""

        try:

            # 解析JSON请求

            data = await request.json()

            

            # 处理消息

            response_data = await self._process_message(data)

            

            return web.Response(

                text=json.dumps(response_data, ensure_ascii=False),

                content_type='application/json'

            )

            

        except json.JSONDecodeError:

            return web.Response(

                text=json.dumps({

                    "jsonrpc": "2.0",

                    "error": {

                        "code": -32700,

                        "message": "解析错误"

                    }

                }),

                status=400,

                content_type='application/json'

            )

        except Exception as e:

            return web.Response(

                text=json.dumps({

                    "jsonrpc": "2.0",

                    "error": {

                        "code": -32603,

                        "message": f"内部错误: {str(e)}"

                    }

                }),

                status=500,

                content_type='application/json'

            )

    

    async def _handle_health_check(self, request: web.Request) -> web.Response:

        """健康检查端点"""

        return web.Response(

            text=json.dumps({"status": "healthy", "mode": "http"}),

            content_type='application/json'

        )

<"www.dongcheng.gov.cn.felli.cn">

<"www.xicheng.gov.cn.felli.cn">

<"www.chaoyang.gov.cn.felli.cn">

    

    async def stop(self):

        """停止HTTP服务器"""

        # 在aiohttp中,停止通过外部控制实现

        pass


# HTTP客户端实现

import aiohttp

import asyncio


class HttpMCPClient:

    """基于HTTP的MCP客户端"""

    

    def __init__(self, base_url: str):

        self.base_url = base_url

        self.session = None

        self.request_id = 0

    

    async def connect(self):

        """创建HTTP会话"""

        self.session = aiohttp.ClientSession()

    

    async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:

        """发送HTTP请求"""

        self.request_id += 1

        request_data = {

            "jsonrpc": "2.0",

            "id": self.request_id,

            "method": method,

            "params": params

        }

        

        async with self.session.post(f"{self.base_url}/mcp", 

                                   json=request_data) as response:

            if response.status == 200:

                return await response.json()

            else:

                raise Exception(f"HTTP错误: {response.status}")

    

    async def close(self):

        """关闭客户端"""

        if self.session:

            await self.session.close()

```


## HTTP+SSE实时通信实现


HTTP+SSE(Server-Sent Events)结合了HTTP的简单性和实时推送能力,适合需要服务器主动推送的场景。


```python

# mcp_servers/http_sse_server.py

from aiohttp import web, SSEResponse

import asyncio

import json

from typing import Any, Dict, Set

from mcp_core.architecture import BaseMCPServer, CommunicationMode


class HttpSSEMCPServer(BaseMCPServer):

    """基于HTTP+SSE的MCP服务器"""

    

    def __init__(self, host: str = "localhost", port: int = 8081):

        super().__init__(CommunicationMode.HTTP_SSE)

        self.host = host

        self.port = port

        self.app = web.Application()

        self.sse_connections: Set[SSEResponse] = set()

        self._setup_routes()

    

    def _setup_routes(self):

        """设置HTTP+SSE路由"""

        self.app.router.add_post('/mcp', self._handle_mcp_request)

        self.app.router.add_get('/events', self._handle_sse_connection)

        self.app.router.add_post('/notify', self._handle_notification)

        self.app.router.add_get('/health', self._handle_health_check)

    

    async def start(self):

        """启动HTTP+SSE服务器"""

        runner = web.AppRunner(self.app)

        await runner.setup()

        

        site = web.TCPSite(runner, self.host, self.port)

        await site.start()

<"www.fengtai.gov.cn.felli.cn">

<"www.shijingshan.gov.cn.felli.cn">

<"www.haidian.gov.cn.felli.cn">

        

        print(f"HTTP+SSE MCP服务器运行在 http://{self.host}:{self.port}")

    

    async def _handle_sse_connection(self, request: web.Request) -> SSEResponse:

        """处理SSE连接"""

        response = SSEResponse()

        await response.prepare(request)

        

        self.sse_connections.add(response)

        

        try:

            # 发送连接确认

            await response.send(json.dumps({

                "type": "connected",

                "message": "SSE连接已建立"

            }))

            

            # 保持连接

            async for _ in response:

                await asyncio.sleep(1)

                

        except ConnectionError:

            print("SSE连接断开")

        finally:

            self.sse_connections.remove(response)

        

        return response

    

    async def _handle_mcp_request(self, request: web.Request) -> web.Response:

        """处理MCP请求"""

        try:

            data = await request.json()

            response_data = await self._process_message(data)

            

            return web.Response(

                text=json.dumps(response_data, ensure_ascii=False),

                content_type='application/json'

            )

            

        except Exception as e:

            return web.Response(

                text=json.dumps({

                    "jsonrpc": "2.0",

                    "error": {

                        "code": -32603,

                        "message": f"内部错误: {str(e)}"

                    }

                }),

                status=500,

                content_type='application/json'

            )

    

    async def _handle_notification(self, request: web.Request) -> web.Response:

        """处理通知请求"""

        try:

            data = await request.json()

            notification_type = data.get("type")

            message = data.get("message")

            

            # 广播通知到所有SSE客户端

            await self._broadcast_notification(notification_type, message)

            

            return web.Response(

                text=json.dumps({"status": "notification_sent"}),

                content_type='application/json'

            )

            

        except Exception as e:

            return web.Response(

                text=json.dumps({"error": str(e)}),

                status=500,

                content_type='application/json'

            )

    

    async def _broadcast_notification(self, notification_type: str, message: str):

        """广播通知到所有SSE客户端"""

        notification = {

            "type": notification_type,

            "timestamp": asyncio.get_event_loop().time(),

            "message": message

        }

        

        disconnected = set()

        for connection in self.sse_connections:

            try:

                await connection.send(json.dumps(notification))

            except ConnectionError:

                disconnected.add(connection)

        

        # 清理断开的连接

        for connection in disconnected:

            self.sse_connections.remove(connection)

    

    async def _handle_health_check(self, request: web.Request) -> web.Response:

        """健康检查端点"""

        health_info = {

            "status": "healthy",

            "mode": "http_sse",

            "connected_clients": len(self.sse_connections),

            "active_handlers": len(self.message_handlers)

        }

        return web.Response(

            text=json.dumps(health_info),

            content_type='application/json'

        )

<"www.mentougou.gov.cn.felli.cn">

<"www.fangshan.gov.cn.felli.cn">

<"www.tongzhou.gov.cn.felli.cn">

    

    async def stop(self):

        """停止服务器"""

        # 关闭所有SSE连接

        for connection in list(self.sse_connections):

            await connection._close()

        self.sse_connections.clear()


# HTTP+SSE客户端实现

import aiohttp

import asyncio


class HttpSSEMCPClient:

    """基于HTTP+SSE的MCP客户端"""

    

    def __init__(self, base_url: str):

        self.base_url = base_url

        self.http_session = None

        self.event_handlers = {}

        self.is_listening = False

    

    async def connect(self):

        """连接到服务器"""

        self.http_session = aiohttp.ClientSession()

    

    async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:

        """发送HTTP请求"""

        request_data = {

            "jsonrpc": "2.0",

            "method": method,

            "params": params

        }

        

        async with self.http_session.post(f"{self.base_url}/mcp", 

                                        json=request_data) as response:

            return await response.json()

    

    async def listen_events(self):

        """监听服务器事件"""

        self.is_listening = True

        

        async with self.http_session.get(f"{self.base_url}/events") as response:

            async for line in response.content:

                if line.startswith(b'data: '):

                    try:

                        event_data = json.loads(line[6:].decode('utf-8'))

                        await self._handle_event(event_data)

                    except json.JSONDecodeError:

                        continue

    

    async def _handle_event(self, event_data: Dict[str, Any]):

        """处理服务器事件"""

        event_type = event_data.get("type")

        if event_type in self.event_handlers:

            for handler in self.event_handlers[event_type]:

                await handler(event_data)

    

    def on_event(self, event_type: str):

        """事件处理器装饰器"""

        def decorator(handler):

            if event_type not in self.event_handlers:

                self.event_handlers[event_type] = []

            self.event_handlers[event_type].append(handler)

            return handler

        return decorator

    

    async def close(self):

        """关闭客户端"""

        self.is_listening = False

        if self.http_session:

            await self.http_session.close()

```


## 统一服务器工厂与配置管理


为了简化不同通信方式的服务器的创建和管理,实现统一的服务器工厂。


```python

# mcp_core/server_factory.py

from typing import Dict, Type, Any

from mcp_servers.stdio_server import StdioMCPServer

from mcp_servers.http_server import HttpMCPServer

from mcp_servers.http_sse_server import HttpSSEMCPServer

from mcp_core.architecture import CommunicationMode, BaseMCPServer


class MCPServerFactory:

<"www.shunyi.gov.cn.felli.cn">

<"www.changping.gov.cn.felli.cn">

<"www.daxing.gov.cn.felli.cn">

    """MCP服务器工厂"""

    

    _server_classes: Dict[CommunicationMode, Type[BaseMCPServer]] = {

        CommunicationMode.STDIO: StdioMCPServer,

        CommunicationMode.HTTP: HttpMCPServer,

        CommunicationMode.HTTP_SSE: HttpSSEMCPServer

    }

    

    @classmethod

    def create_server(cls, mode: CommunicationMode, **kwargs) -> BaseMCPServer:

        """创建指定模式的服务器"""

        if mode not in cls._server_classes:

            raise ValueError(f"不支持的通信模式: {mode}")

        

        server_class = cls._server_classes[mode]

        return server_class(**kwargs)

    

    @classmethod

    async def start_server(cls, config: Dict[str, Any]) -> BaseMCPServer:

        """根据配置启动服务器"""

        mode = CommunicationMode(config.get("mode", "stdio"))

        server_config = config.get("server_config", {})

        

        server = cls.create_server(mode, **server_config)

        await server.start()

        

        return server


# 配置管理

import yaml

from pathlib import Path


class MCPConfigManager:

    """MCP配置管理器"""

    

    def __init__(self, config_path: str = "mcp_config.yaml"):

        self.config_path = Path(config_path)

        self.config = self._load_default_config()

    

    def _load_default_config(self) -> Dict[str, Any]:

        """加载默认配置"""

        return {

            "servers": {

                "stdio": {

                    "mode": "stdio",

                    "enabled": True

                },

                "http": {

                    "mode": "http",

                    "enabled": False,

                    "server_config": {

                        "host": "localhost",

                        "port": 8080

                    }

                },

                "http_sse": {

                    "mode": "http_sse", 

                    "enabled": True,

                    "server_config": {

                        "host": "localhost",

                        "port": 8081

                    }

                }

            },

            "logging": {

                "level": "INFO",

                "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

            }

        }

    

    def load_config(self) -> Dict[str, Any]:

        """加载配置文件"""

        if self.config_path.exists():

            with open(self.config_path, 'r', encoding='utf-8') as f:

                user_config = yaml.safe_load(f)

                self._merge_configs(self.config, user_config)

        

        return self.config

    

    def _merge_configs(self, base: Dict[str, Any], update: Dict[str, Any]):

        """合并配置"""

        for key, value in update.items():

            if isinstance(value, dict) and key in base and isinstance(base[key], dict):

                self._merge_configs(base[key], value)

            else:

                base[key] = value


# 使用示例

async def main():

    """主函数示例"""

    config_manager = MCPConfigManager()

    config = config_manager.load_config()

    

    servers = []

    

    # 启动所有启用的服务器

    for server_name, server_config in config["servers"].items():

        if server_config["enabled"]:

            print(f"启动 {server_name} 服务器...")

            server = await MCPServerFactory.start_server(server_config)

            servers.append(server)<"www.huairou.gov.cn.felli.cn">

    

    try:

        # 保持服务器运行

        await asyncio.Future()

    except KeyboardInterrupt:

        print("正在停止服务器...")

        for server in servers:

            await server.stop()


if __name__ == "__main__":

    asyncio.run(main())

```


通过本文的详细讲解和代码示例,我们完整展示了MCP框架支持的三种通信方式:STDIO适用于本地进程间通信,HTTP支持远程客户端访问,HTTP+SSE则提供了实时推送能力。这种灵活的架构设计使得MCP框架能够适应各种不同的应用场景,从简单的本地工具到复杂的分布式系统都能得到良好支持。


请使用浏览器的分享功能分享到微信等