MCP上下文协议完全指南:构建智能模型通信的实践路径

# MCP上下文协议完全指南:构建智能模型通信的实践路径


Model Context Protocol (MCP) 作为连接大语言模型与外部工具的标准协议,正在重新定义AI系统的可扩展性和互操作性。本文将深入解析MCP的核心原理,并提供从基础到实践的完整实现方案。


## MCP协议核心架构解析


MCP基于JSON-RPC 2.0协议,为模型提供了标准化的工具调用和资源访问接口。


```python

# mcp_core/protocol.py

from typing import Dict, List, Any, Optional

from dataclasses import dataclass

from enum import Enum

import json


class MCPMessageType(Enum):

    """MCP消息类型枚举"""

    INITIALIZE = "initialize"

    TOOLS_LIST = "tools/list"

    TOOLS_CALL = "tools/call"

    RESOURCES_LIST = "resources/list"

    RESOURCES_READ = "resources/read"

    NOTIFICATIONS = "notifications"


@dataclass

class MCPRequest:

    """MCP请求数据结构"""

<"share.m.hbgufen.com">

<"tv.share.hbgufen.com">

<"read.share.hbgufen.com">

    jsonrpc: str = "2.0"

    id: Optional[str] = None

    method: str = ""

    params: Optional[Dict[str, Any]] = None

    

    def to_dict(self) -> Dict[str, Any]:

        """转换为字典格式"""

        data = {

            "jsonrpc": self.jsonrpc,

            "method": self.method

        }

        if self.id:

            data["id"] = self.id

        if self.params:

            data["params"] = self.params

        return data


@dataclass

class MCPResponse:

    """MCP响应数据结构"""

    jsonrpc: str

    id: Optional[str]

    result: Optional[Dict[str, Any]] = None

    error: Optional[Dict[str, Any]] = None

    

    @classmethod

    def from_dict(cls, data: Dict[str, Any]) -> 'MCPResponse':

        """从字典创建响应对象"""

        return cls(

            jsonrpc=data.get("jsonrpc", "2.0"),

            id=data.get("id"),

            result=data.get("result"),

            error=data.get("error")

        )

    

    def is_success(self) -> bool:

        """判断请求是否成功"""

        return self.error is None


class MCPErrorCode(Enum):

    """MCP错误码枚举"""

    PARSE_ERROR = -32700

    INVALID_REQUEST = -32600

    METHOD_NOT_FOUND = -32601

    INVALID_PARAMS = -32602

    INTERNAL_ERROR = -32603

<"share.tv.hbgufen.com">

<"live.share.hbgufen.com">

<"wap.share.hbgufen.com">

```


## MCP服务器基础实现


构建符合MCP标准的服务器需要实现核心协议处理方法。


```python

# mcp_server/base_server.py

from abc import ABC, abstractmethod

from typing import Dict, List, Any

import asyncio


class MCPServer(ABC):

    """MCP服务器基类"""

    

    def __init__(self, server_name: str, version: str = "1.0.0"):

        self.server_name = server_name

        self.version = version

        self.tools = {}

        self.resources = {}

        self._initialize_capabilities()

    

    def _initialize_capabilities(self):

        """初始化服务器能力"""

        self.capabilities = {

            "tools": {"listChanged": True},

            "resources": {"listChanged": True},

            "roots": {"listChanged": True}

        }

    

    async def handle_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:

        """处理MCP请求"""

        try:

            request = MCPRequest(

                id=request_data.get("id"),

                method=request_data.get("method"),

                params=request_data.get("params")

            )

            

            # 路由到对应的处理方法

            if request.method == "initialize":

                response = await self._handle_initialize(request)

            elif request.method == "tools/list":

                response = await self._handle_tools_list(request)

            elif request.method == "tools/call":

                response = await self._handle_tools_call(request)

            elif request.method == "resources/list":

                response = await self._handle_resources_list(request)

            elif request.method == "resources/read":

                response = await self._handle_resources_read(request)

            else:

                response = MCPResponse(

                    jsonrpc="2.0",

                    id=request.id,

                    error={

                        "code": -32601,

                        "message": f"Method not found: {request.method}"

                    }

                )

            

            return response.to_dict() if hasattr(response, 'to_dict') else {

                "jsonrpc": "2.0",

                "id": response.id,

                "result": response.result,

                "error": response.error

            }

            

        except Exception as e:

            return {

                "jsonrpc": "2.0",

                "id": request_data.get("id"),

                "error": {

                    "code": -32603,

                    "message": f"Internal error: {str(e)}"

<"share.share.hbgufen.com">

<"gov.cn.hbgufen.com">

<"nba.share.hbgufen.com">

                }

            }

    

    async def _handle_initialize(self, request: MCPRequest) -> MCPResponse:

        """处理初始化请求"""

        return MCPResponse(

            jsonrpc="2.0",

            id=request.id,

            result={

                "protocolVersion": "2024-11-07",

                "capabilities": self.capabilities,

                "serverInfo": {

                    "name": self.server_name,

                    "version": self.version

                }

            }

        )

    

    async def _handle_tools_list(self, request: MCPRequest) -> MCPResponse:

        """处理工具列表请求"""

        tools_list = []

        for tool_name, tool_info in self.tools.items():

            tools_list.append({

                "name": tool_name,

                "description": tool_info.get("description", ""),

                "inputSchema": tool_info.get("inputSchema", {})

            })

        

        return MCPResponse(

            jsonrpc="2.0",

            id=request.id,

            result={"tools": tools_list}

        )

    

    async def _handle_tools_call(self, request: MCPRequest) -> MCPResponse:

        """处理工具调用请求"""

        tool_name = request.params.get("name")

        arguments = request.params.get("arguments", {})

        

        if tool_name not in self.tools:

            return MCPResponse(

                jsonrpc="2.0",

                id=request.id,

                error={

                    "code": -32601,

                    "message": f"Tool not found: {tool_name}"

                }

            )

        

        try:

            result = await self._execute_tool(tool_name, arguments)

            return MCPResponse(

                jsonrpc="2.0",

                id=request.id,

                result={"content": [{"type": "text", "text": str(result)}]}

            )

        except Exception as e:

            return MCPResponse(

                jsonrpc="2.0",

                id=request.id,

                error={

                    "code": -32603,

                    "message": f"Tool execution failed: {str(e)}"

                }

<"www.share.okjiabao.com">

<"share.m.okjiabao.com">

<"tv.share.okjiabao.com">

            )

    

    @abstractmethod

    async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:

        """执行具体工具(由子类实现)"""

        pass

    

    async def _handle_resources_list(self, request: MCPRequest) -> MCPResponse:

        """处理资源列表请求"""

        resources_list = []

        for resource_uri, resource_info in self.resources.items():

            resources_list.append({

                "uri": resource_uri,

                "name": resource_info.get("name", ""),

                "description": resource_info.get("description", ""),

                "mimeType": resource_info.get("mimeType", "text/plain")

            })

        

        return MCPResponse(

            jsonrpc="2.0",

            id=request.id,

            result={"resources": resources_list}

        )

    

    async def _handle_resources_read(self, request: MCPRequest) -> MCPResponse:

        """处理资源读取请求"""

        resource_uri = request.params.get("uri")

        

        if resource_uri not in self.resources:

            return MCPResponse(

                jsonrpc="2.0",

                id=request.id,

                error={

                    "code": -32601,

                    "message": f"Resource not found: {resource_uri}"

                }

            )

        

        try:

            content = await self._read_resource(resource_uri)

            return MCPResponse(

                jsonrpc="2.0",

                id=request.id,

                result={"contents": [{"uri": resource_uri, "text": content}]}

            )

        except Exception as e:

            return MCPResponse(

                jsonrpc="2.0",

                id=request.id,

                error={

                    "code": -32603,

                    "message": f"Resource read failed: {str(e)}"

                }

            )

    

    @abstractmethod

    async def _read_resource(self, resource_uri: str) -> str:

        """读取具体资源(由子类实现)"""

        pass

<"read.share.okjiabao.com">

<"share.tv.okjiabao.com">

<"live.share.okjiabao.com">

```


## 具体MCP服务器实现


实现具有实际功能的MCP服务器,展示协议的实际应用。


```python

# mcp_server/file_system_server.py

import os

import json

from typing import Dict, Any

from .base_server import MCPServer


class FileSystemMCPServer(MCPServer):

    """文件系统MCP服务器"""

    

    def __init__(self, base_directory: str = "."):

        super().__init__("file-system-server", "1.0.0")

        self.base_directory = os.path.abspath(base_directory)

        self._register_tools()

        self._register_resources()

    

    def _register_tools(self):

        """注册文件系统工具"""

        self.tools = {

            "read_file": {

                "description": "读取文件内容",

                "inputSchema": {

                    "type": "object",

                    "properties": {

                        "filepath": {

                            "type": "string",

                            "description": "文件路径"

                        }

                    },

                    "required": ["filepath"]

                }

            },

            "write_file": {

                "description": "写入文件内容",

                "inputSchema": {

                    "type": "object",

                    "properties": {

                        "filepath": {

                            "type": "string",

                            "description": "文件路径"

                        },

                        "content": {

                            "type": "string",

                            "description": "文件内容"

                        }

                    },

                    "required": ["filepath", "content"]

                }

            },

            "list_directory": {

                "description": "列出目录内容",

                "inputSchema": {

                    "type": "object",

                    "properties": {

                        "directory": {

                            "type": "string",

                            "description": "目录路径"

                        }

                    },

                    "required": ["directory"]

                }

            },

            "file_info": {

                "description": "获取文件信息",

                "inputSchema": {

                    "type": "object",

                    "properties": {

                        "filepath": {

                            "type": "string",

                            "description": "文件路径"

                        }

                    },

                    "required": ["filepath"]

                }

            }

        }

    

    def _register_resources(self):

        """注册文件系统资源"""

        self.resources = {

            "file:///README": {

                "name": "README文件",

                "description": "项目说明文件",

                "mimeType": "text/plain"

<"wap.share.okjiabao.com">

<"share.share.okjiabao.com">

<"gov.cn.okjiabao.com">

            },

            "file:///config": {

                "name": "配置文件",

                "description": "系统配置文件",

                "mimeType": "application/json"

            }

        }

    

    def _get_absolute_path(self, filepath: str) -> str:

        """获取绝对路径"""

        absolute_path = os.path.join(self.base_directory, filepath)

        # 安全检查:确保路径在基础目录内

        if not absolute_path.startswith(self.base_directory):

            raise ValueError("访问路径超出允许范围")

        return absolute_path

    

    async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:

        """执行文件系统工具"""

        if tool_name == "read_file":

            return await self._read_file(arguments["filepath"])

        elif tool_name == "write_file":

            return await self._write_file(arguments["filepath"], arguments["content"])

        elif tool_name == "list_directory":

            return await self._list_directory(arguments["directory"])

        elif tool_name == "file_info":

            return await self._get_file_info(arguments["filepath"])

        else:

            raise ValueError(f"未知工具: {tool_name}")

    

    async def _read_file(self, filepath: str) -> str:

        """读取文件"""

        absolute_path = self._get_absolute_path(filepath)

        with open(absolute_path, 'r', encoding='utf-8') as f:

            return f.read()

    

    async def _write_file(self, filepath: str, content: str) -> str:

        """写入文件"""

        absolute_path = self._get_absolute_path(filepath)

        os.makedirs(os.path.dirname(absolute_path), exist_ok=True)

        with open(absolute_path, 'w', encoding='utf-8') as f:

            f.write(content)

        return f"文件已写入: {filepath}"

<"nba.share.okjiabao.com">

<"hbgufen.com">

<"okjiabao.com">

    

    async def _list_directory(self, directory: str) -> List[Dict[str, Any]]:

        """列出目录内容"""

        absolute_path = self._get_absolute_path(directory)

        items = []

        

        for item in os.listdir(absolute_path):

            item_path = os.path.join(absolute_path, item)

            stat = os.stat(item_path)

            

            items.append({

                "name": item,

                "type": "directory" if os.path.isdir(item_path) else "file",

                "size": stat.st_size,

                "modified": stat.st_mtime

            })

        

        return items

    

    async def _get_file_info(self, filepath: str) -> Dict[str, Any]:

        """获取文件信息"""

        absolute_path = self._get_absolute_path(filepath)

        stat = os.stat(absolute_path)

        

        return {

            "path": filepath,

            "type": "directory" if os.path.isdir(absolute_path) else "file",

            "size": stat.st_size,

            "created": stat.st_ctime,

            "modified": stat.st_mtime,

            "accessed": stat.st_atime

        }

    

    async def _read_resource(self, resource_uri: str) -> str:

        """读取资源"""

        if resource_uri == "file:///README":

            return "# MCP文件系统服务器\n\n这是一个MCP协议的文件系统服务器实现。"

        elif resource_uri == "file:///config":

            return json.dumps({

                "server": {

                    "name": self.server_name,

                    "version": self.version,

                    "base_directory": self.base_directory

                }

            }, indent=2)

        else:

            raise ValueError(f"资源不存在: {resource_uri}")

```


## MCP客户端实现


构建能够与MCP服务器通信的客户端,完成完整的协议交互。


```python

# mcp_client/client.py

import aiohttp

import asyncio

from typing import Dict, List, Any, Optional

from dataclasses import dataclass


@dataclass

class ToolInfo:

    """工具信息"""

    name: str

    description: str

    input_schema: Dict[str, Any]


@dataclass

class ResourceInfo:

    """资源信息"""

    uri: str

    name: str

    description: str

    mime_type: str


class MCPClient:

    """MCP客户端"""

    

    def __init__(self, server_url: str):

        self.server_url = server_url

        self.session: Optional[aiohttp.ClientSession] = None

        self.initialized = False

        self.request_id = 0

    

    async def connect(self):

        """连接到MCP服务器"""

        self.session = aiohttp.ClientSession()

        

        # 初始化连接

        init_result = await self.initialize()

        if not init_result.get("error"):

            self.initialized = True

            print("✅ MCP客户端连接成功")

        else:

            raise ConnectionError(f"初始化失败: {init_result.get('error')}")

    

    async def close(self):

        """关闭连接"""

        if self.session:

            await self.session.close()

        self.initialized = False

    

    async def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:

        """发送MCP请求"""

        if not self.session:

            raise RuntimeError("客户端未连接")

        

        self.request_id += 1

        request = {

            "jsonrpc": "2.0",

            "id": str(self.request_id),

            "method": method,

            "params": params or {}

        }

        

        try:

            async with self.session.post(

                self.server_url,

                json=request,

                headers={"Content-Type": "application/json"}

            ) as response:

                return await response.json()

        except Exception as e:

            return {

                "jsonrpc": "2.0",

                "id": str(self.request_id),

                "error": {

                    "code": -32000,

                    "message": f"请求失败: {str(e)}"

                }

            }

    

    async def initialize(self) -> Dict[str, Any]:

        """初始化连接"""

        params = {

            "protocolVersion": "2024-11-07",

            "capabilities": {

                "roots": {"listChanged": True},

                "tools": {"listChanged": True}

            },

            "clientInfo": {

                "name": "python-mcp-client",

                "version": "1.0.0"

<"www.hbgufen.com">

<"www.okjiabao.com">

            }

        }

        return await self._send_request("initialize", params)

    

    async def list_tools(self) -> List[ToolInfo]:

        """获取工具列表"""

        response = await self._send_request("tools/list")

        

        if "error" in response:

            raise RuntimeError(f"获取工具列表失败: {response['error']}")

        

        tools = []

        for tool_data in response["result"]["tools"]:

            tools.append(ToolInfo(

                name=tool_data["name"],

                description=tool_data["description"],

                input_schema=tool_data["inputSchema"]

            ))

        

        return tools

    

    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:

        """调用工具"""

        params = {

            "name": tool_name,

            "arguments": arguments

        }

        

        response = await self._send_request("tools/call", params)

        

        if "error" in response:

            raise RuntimeError(f"工具调用失败: {response['error']}")

        

        # 提取文本内容

        content = response["result"]["content"]

        for item in content:

            if item["type"] == "text":

                return item["text"]

        

        return content

    

    async def list_resources(self) -> List[ResourceInfo]:

        """获取资源列表"""

        response = await self._send_request("resources/list")

        

        if "error" in response:

            raise RuntimeError(f"获取资源列表失败: {response['error']}")

        

        resources = []

        for resource_data in response["result"]["resources"]:

            resources.append(ResourceInfo(

                uri=resource_data["uri"],

                name=resource_data["name"],

                description=resource_data["description"],

                mime_type=resource_data["mimeType"]

            ))

        

        return resources

    

    async def read_resource(self, resource_uri: str) -> str:

        """读取资源"""

        params = {

            "uri": resource_uri

        }

        

        response = await self._send_request("resources/read", params)

        

        if "error" in response:

            raise RuntimeError(f"资源读取失败: {response['error']}")

        

        # 提取资源内容

        contents = response["result"]["contents"]

        for content in contents:

            if content["uri"] == resource_uri:

                return content["text"]

        

        raise ValueError(f"资源内容未找到: {resource_uri}")

```


## 完整工作流示例


展示MCP协议在实际应用中的完整工作流程。


```python

# examples/complete_workflow.py

import asyncio

import sys

import os


# 添加模块路径

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))


from mcp_server.file_system_server import FileSystemMCPServer

from mcp_client.client import MCPClient


class MCPWorkflowDemo:

    """MCP工作流演示"""

    

    def __init__(self, server_url: str = "http://localhost:8080/mcp"):

        self.server_url = server_url

        self.client = MCPClient(server_url)

    

    async def run_demo(self):

        """运行完整演示"""

        print("? 开始MCP协议完整工作流演示")

        print("=" * 50)

        

        try:

            # 连接服务器

            await self.client.connect()

            

            # 演示1: 查看可用工具

            await self.demo_tools_list()

            

            # 演示2: 工具调用

            await self.demo_tool_calls()

            

            # 演示3: 资源访问

            await self.demo_resource_access()

            

            # 演示4: 错误处理

            await self.demo_error_handling()

            

        finally:

            await self.client.close()

    

    async def demo_tools_list(self):

        """演示工具列表功能"""

        print("\n? 演示1: 查看可用工具")

        print("-" * 30)

        

        tools = await self.client.list_tools()

        print(f"发现 {len(tools)} 个可用工具:")

        

        for tool in tools:

            print(f"  • {tool.name}: {tool.description}")

    

    async def demo_tool_calls(self):

        """演示工具调用功能"""

        print("\n? 演示2: 工具调用")

        print("-" * 30)

        

        # 创建测试文件

        test_content = "这是通过MCP客户端创建的文件内容\n创建时间戳"

        

        try:

            # 写入文件

            write_result = await self.client.call_tool("write_file", {

                "filepath": "test_demo.txt",

                "content": test_content

            })

            print(f"✅ 文件写入: {write_result}")

            

            # 读取文件

            read_result = await self.client.call_tool("read_file", {

                "filepath": "test_demo.txt"

            })

            print(f"? 文件内容: {read_result}")

            

            # 列出目录

            list_result = await self.client.call_tool("list_directory", {

                "directory": "."

            })

            print(f"? 目录内容: {len(list_result)} 个项目")

            

        except Exception as e:

            print(f"❌ 工具调用失败: {e}")

    

    async def demo_resource_access(self):

        """演示资源访问功能"""

        print("\n? 演示3: 资源访问")

        print("-" * 30)

        

        try:

            resources = await self.client.list_resources()

            print(f"发现 {len(resources)} 个可用资源:")

            

            for resource in resources:

                print(f"  • {resource.name} ({resource.uri})")

                

                # 读取资源内容

                content = await self.client.read_resource(resource.uri)

                print(f"    内容预览: {content[:50]}...")

                

        except Exception as e:

            print(f"❌ 资源访问失败: {e}")

    

    async def demo_error_handling(self):

        """演示错误处理"""

        print("\n⚠️  演示4: 错误处理")

        print("-" * 30)

        

        # 测试不存在的工具

        try:

            await self.client.call_tool("non_existent_tool", {})

        except Exception as e:

            print(f"✅ 正确捕获错误: {e}")

        

        # 测试无效参数

        try:

            await self.client.call_tool("read_file", {

                "invalid_param": "value"

            })

        except Exception as e:

            print(f"✅ 正确捕获参数错误: {e}")


# HTTP服务器包装

from aiohttp import web

import json


class MCPServerWrapper:

    """MCP服务器HTTP包装器"""

    

    def __init__(self, mcp_server: MCPServer):

        self.mcp_server = mcp_server

        self.app = web.Application()

        self.app.router.add_post('/mcp', self.handle_mcp_request)

        self.app.router.add_get('/health', self.handle_health_check)

    

    async def handle_mcp_request(self, request: web.Request) -> web.Response:

        """处理MCP请求"""

        try:

            data = await request.json()

            result = await self.mcp_server.handle_request(data)

            return web.Response(

                text=json.dumps(result),

                content_type='application/json'

            )

        except Exception as e:

            return web.Response(

                text=json.dumps({

                    "jsonrpc": "2.0",

                    "id": None,

                    "error": {

                        "code": -32700,

                        "message": f"Parse error: {str(e)}"

                    }

                }),

                content_type='application/json',

                status=400

            )

    

    async def handle_health_check(self, request: web.Request) -> web.Response:

        """健康检查"""

        return web.Response(

            text=json.dumps({"status": "healthy", "service": "mcp-server"}),

            content_type='application/json'

        )

    

    async def start(self, host: str = 'localhost', port: int = 8080):

        """启动服务器"""

        runner = web.AppRunner(self.app)

        await runner.setup()

        

        site = web.TCPSite(runner, host, port)

        await site.start()

        

        print(f"? MCP服务器运行在 http://{host}:{port}")

        return runner


async def main():

    """主函数"""

    # 创建文件系统MCP服务器

    file_server = FileSystemMCPServer()

    server_wrapper = MCPServerWrapper(file_server)

    

    # 启动服务器

    runner = await server_wrapper.start()

    

    try:

        # 运行客户端演示

        demo = MCPWorkflowDemo()

        await demo.run_demo()

        

        # 保持服务器运行

        print("\n? 服务器运行中,按 Ctrl+C 停止...")

        await asyncio.Future()

        

    except KeyboardInterrupt:

        print("\n? 停止服务器")

    finally:

        await runner.cleanup()


if __name__ == "__main__":

    asyncio.run(main())

```


## 高级特性与最佳实践


实现MCP协议的高级特性和生产环境最佳实践。


```python

# mcp_server/advanced_features.py

import asyncio

from typing import Dict, Any, List

from contextlib import asynccontextmanager


class AdvancedMCPServer(FileSystemMCPServer):

    """高级MCP服务器特性"""

    

    def __init__(self, base_directory: str = "."):

        super().__init__(base_directory)

        self.session_storage = {}

        self.request_log = []

        

        # 注册高级工具

        self._register_advanced_tools()

    

    def _register_advanced_tools(self):

        """注册高级工具"""

        self.tools.update({

            "batch_operations": {

                "description": "批量文件操作",

                "inputSchema": {

                    "type": "object",

                    "properties": {

                        "operations": {

                            "type": "array",

                            "items": {

                                "type": "object",

                                "properties": {

                                    "type": {"type": "string", "enum": ["read", "write", "delete"]},

                                    "filepath": {"type": "string"},

                                    "content": {"type": "string"}

                                },

                                "required": ["type", "filepath"]

                            }

                        }

                    },

                    "required": ["operations"]

                }

            },

            "search_files": {

                "description": "搜索文件内容",

                "inputSchema": {

                    "type": "object",

                    "properties": {

                        "query": {"type": "string"},

                        "directory": {"type": "string"},

                        "file_pattern": {"type": "string"}

                    },

                    "required": ["query"]

                }

            }

        })

    

    async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:

        """执行工具(包含高级特性)"""

        # 记录请求日志

        self._log_request(tool_name, arguments)

        

        if tool_name == "batch_operations":

            return await self._batch_operations(arguments["operations"])

        elif tool_name == "search_files":

            return await self._search_files(

                arguments["query"],

                arguments.get("directory", "."),

                arguments.get("file_pattern", "*")

            )

        else:

            return await super()._execute_tool(tool_name, arguments)

    

    async def _batch_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:

        """批量操作"""

        results = []

        

        for op in operations:

            try:

                if op["type"] == "read":

                    result = await self._read_file(op["filepath"])

                elif op["type"] == "write":

                    result = await self._write_file(op["filepath"], op["content"])

                elif op["type"] == "delete":

                    result = await self._delete_file(op["filepath"])

                else:

                    result = f"未知操作类型: {op['type']}"

                

                results.append({"operation": op, "result": result, "success": True})

                

            except Exception as e:

                results.append({"operation": op, "error": str(e), "success": False})

        

        return results

    

    async def _search_files(self, query: str, directory: str, file_pattern: str) -> List[Dict[str, Any]]:

        """搜索文件内容"""

        import glob

        import re

        

        search_results = []

        absolute_dir = self._get_absolute_path(directory)

        pattern = os.path.join(absolute_dir, "**", file_pattern)

        

        for filepath in glob.glob(pattern, recursive=True):

            if os.path.isfile(filepath):

                try:

                    content = await self._read_file(os.path.relpath(filepath, self.base_directory))

                    

                    # 搜索匹配行

                    matches = []

                    for line_num, line in enumerate(content.split('\n'), 1):

                        if re.search(query, line, re.IGNORECASE):

                            matches.append({

                                "line": line_num,

                                "content": line.strip()

                            })

                    

                    if matches:

                        search_results.append({

                            "filepath": os.path.relpath(filepath, self.base_directory),

                            "matches": matches

                        })

                        

                except Exception:

                    continue  # 跳过无法读取的文件

        

        return search_results

    

    async def _delete_file(self, filepath: str) -> str:

        """删除文件"""

        absolute_path = self._get_absolute_path(filepath)

        

        if os.path.exists(absolute_path):

            os.remove(absolute_path)

            return f"文件已删除: {filepath}"

        else:

            raise FileNotFoundError(f"文件不存在: {filepath}")

    

    def _log_request(self, tool_name: str, arguments: Dict[str, Any]):

        """记录请求日志"""

        log_entry = {

            "timestamp": asyncio.get_event_loop().time(),

            "tool": tool_name,

            "arguments": arguments

        }

        self.request_log.append(log_entry)

        

        # 限制日志大小

        if len(self.request_log) > 1000:

            self.request_log = self.request_log[-1000:]

    

    def get_usage_statistics(self) -> Dict[str, Any]:

        """获取使用统计"""

        tool_usage = {}

        for log in self.request_log:

            tool = log["tool"]

            tool_usage[tool] = tool_usage.get(tool, 0) + 1

        

        return {

            "total_requests": len(self.request_log),

            "tool_usage": tool_usage,

            "server_uptime": asyncio.get_event_loop().time()

        }

```


MCP上下文协议通过标准化的工具调用和资源访问机制,为构建可扩展的AI应用提供了坚实基础。从基础协议实现到高级特性开发,MCP正在成为连接大语言模型与现实世界的重要桥梁。


请使用浏览器的分享功能分享到微信等