基于FastMCP的智能RAG系统构建:从检索到自主决策的完整实现


在当今人工智能应用领域,检索增强生成(RAG)系统已成为连接大语言模型与专业知识库的重要桥梁。然而,传统RAG系统往往缺乏自主决策和动态调整能力。基于FastMCP构建的Agentic RAG系统通过引入智能体概念,让RAG具备了自主决策、工具使用和动态优化的能力。


## Agentic RAG架构设计


Agentic RAG与传统RAG的核心区别在于引入了自主决策层,系统能够根据查询内容自主选择检索策略、调整生成参数并执行后续操作。


```python

# agentic_rag/architecture.py

from typing import List, Dict, Any, Optional

from pydantic import BaseModel

from enum import Enum

import asyncio


class QueryType(Enum):

    """查询类型枚举"""

    FACTUAL_QUERY = "factual"

    ANALYTICAL_QUERY = "analytical"

    OPERATIONAL_QUERY = "operational"

    COMPLEX_REASONING = "complex"


class RetrievalStrategy(Enum):

    """检索策略枚举"""

    SEMANTIC_SEARCH = "semantic"

    KEYWORD_SEARCH = "keyword"

    HYBRID_SEARCH = "hybrid"

    MULTI_HOP = "multi_hop"


class AgenticRAGConfig(BaseModel):

    """Agentic RAG配置"""

    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.HYBRID_SEARCH

    max_retrieval_docs: int = 5

    enable_self_reflection: bool = True

    max_iterations: int = 3

    confidence_threshold: float = 0.7

<"www.pinggu.gov.cn.felli.cn">

<"www.miyun.gov.cn.felli.cn">

<"www.yanqing.gov.cn.felli.cn">


class QueryAnalysis(BaseModel):

    """查询分析结果"""

    query_type: QueryType

    required_depth: str

    needs_multiple_sources: bool

    suggested_strategy: RetrievalStrategy

    confidence: float


class AgenticRAG:

    """Agentic RAG核心类"""

    

    def __init__(self, config: AgenticRAGConfig):

        self.config = config

        self.retrieval_agent = RetrievalAgent()

        self.generation_agent = GenerationAgent()

        self.reflection_agent = ReflectionAgent()

        self.tool_agent = ToolAgent()

        

    async def process_query(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:

        """处理用户查询的完整流程"""

        # 阶段1: 查询分析与策略制定

        query_analysis = await self.analyze_query(query, context)

        

        # 阶段2: 自主检索与信息收集

        retrieval_result = await self.retrieve_information(query, query_analysis)

        

        # 阶段3: 生成与验证

        generation_result = await self.generate_response(query, retrieval_result, query_analysis)

        

        # 阶段4: 反思与优化

        if self.config.enable_self_reflection:

            reflection_result = await self.reflect_on_response(

                query, generation_result, retrieval_result

            )

            if reflection_result.needs_improvement:

                generation_result = await self.improve_response(

                    generation_result, reflection_result

                )

        

        return {

            "query_analysis": query_analysis,

            "retrieval_result": retrieval_result,

            "generation_result": generation_result,

            "reflection_result": reflection_result if self.config.enable_self_reflection else None

        }

<"www.heping.gov.cn.felli.cn">

<"www.hedong.gov.cn.felli.cn">

<"www.hexi.gov.cn.felli.cn">

    

    async def analyze_query(self, query: str, context: Dict[str, Any]) -> QueryAnalysis:

        """分析查询意图和需求"""

        # 使用FastMCP进行查询分析

        analysis_prompt = f"""

        分析以下查询,确定最佳处理策略:

        

        查询:{query}

        上下文:{context}

        

        请分析:

        1. 查询类型(事实查询、分析查询、操作查询、复杂推理)

        2. 需要的信息深度

        3. 是否需要多源信息

        4. 建议的检索策略

        5. 分析置信度

        

        以JSON格式返回分析结果。

        """

        

        # 通过FastMCP调用分析模型

        analysis_result = await self.fastmcp_client.analyze(analysis_prompt)

        return QueryAnalysis(**analysis_result)

```


## FastMCP集成与工具定义


FastMCP为Agentic RAG提供了标准化的工具调用和模型交互接口。


```python

# fastmcp_integration/mcp_server.py

from mcp import MCPServer

from typing import Dict, Any, List

import json

<"www.nankai.gov.cn.felli.cn">

<"www.hebei.gov.cn.felli.cn">

<"www.hongqiao.gov.cn.felli.cn">

import asyncio


class RAGMCPServer(MCPServer):

    """RAG专用MCP服务器"""

    

    def __init__(self):

        super().__init__()

        self.tools = {

            "semantic_search": {

                "description": "语义相似度搜索",

                "parameters": {

                    "query": {"type": "string", "description": "搜索查询"},

                    "top_k": {"type": "number", "description": "返回结果数量"},

                    "filters": {"type": "object", "description": "过滤条件"}

                }

            },

            "multi_hop_retrieval": {

                "description": "多跳检索",

                "parameters": {

                    "initial_query": {"type": "string", "description": "初始查询"},

                    "max_hops": {"type": "number", "description": "最大跳数"}

                }

            },

            "knowledge_graph_query": {

                "description": "知识图谱查询",

                "parameters": {

                    "entities": {"type": "array", "description": "实体列表"},

                    "relationships": {"type": "array", "description": "关系类型"}

                }

            },

            "answer_validation": {

                "description": "答案验证",

                "parameters": {

                    "answer": {"type": "string", "description": "待验证答案"},

                    "source_documents": {"type": "array", "description": "源文档"}

                }

            }

        }

    

    async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> str:

        """处理工具调用"""

        if tool_name == "semantic_search":

            return await self.semantic_search(

                arguments["query"], 

                arguments.get("top_k", 5),

                arguments.get("filters", {})

            )

        elif tool_name == "multi_hop_retrieval":

            return await self.multi_hop_retrieval(

                arguments["initial_query"],

                arguments.get("max_hops", 3)

            )

<"www.dongli.gov.cn.felli.cn">

<"www.xiqing.gov.cn.felli.cn">

<"www.jinnan.gov.cn.felli.cn">

        elif tool_name == "knowledge_graph_query":

            return await self.knowledge_graph_query(

                arguments["entities"],

                arguments.get("relationships", [])

            )

        elif tool_name == "answer_validation":

            return await self.answer_validation(

                arguments["answer"],

                arguments["source_documents"]

            )

        

        return f"未知工具: {tool_name}"

    

    async def semantic_search(self, query: str, top_k: int, filters: Dict) -> str:

        """语义搜索实现"""

        # 这里集成向量数据库搜索

        vector_results = await self.vector_db.similarity_search(query, top_k, filters)

        

        # 应用重排序

        reranked_results = await self.rerank_results(vector_results, query)

        

        return json.dumps({

            "results": reranked_results,

            "query": query,

            "count": len(reranked_results)

        })

    

    async def multi_hop_retrieval(self, initial_query: str, max_hops: int) -> str:

        """多跳检索实现"""

        current_query = initial_query

        collected_documents = []

        

        for hop in range(max_hops):

            # 检索当前查询的相关文档

            results = await self.semantic_search(current_query, 3, {})

            results_data = json.loads(results)

            

            if not results_data["results"]:

                break

                

            collected_documents.extend(results_data["results"])

            

            # 判断是否需要继续检索

            should_continue = await self.decide_continuation(

                initial_query, collected_documents, hop, max_hops

            )

            

            if not should_continue:

                break

                

            # 生成下一跳查询

            current_query = await self.generate_next_query(

                initial_query, collected_documents

            )

        

        return json.dumps({

            "documents": collected_documents,

            "hops_performed": min(hop + 1, max_hops),

            "final_query": current_query

        })

<"www.beichen.gov.cn.felli.cn">

<"www.wuqing.gov.cn.felli.cn">

<"www.baodi.gov.cn.felli.cn">


# FastMCP客户端封装

class FastMCPRAGClient:

    """FastMCP RAG客户端"""

    

    def __init__(self, mcp_server_url: str):

        self.server_url = mcp_server_url

        self.session = None

    

    async def initialize(self):

        """初始化客户端"""

        self.session = await self.create_mcp_session(self.server_url)

    

    async def execute_retrieval_plan(self, query: str, analysis: QueryAnalysis) -> Dict[str, Any]:

        """执行检索计划"""

        retrieval_plan = self._create_retrieval_plan(query, analysis)

        results = {}

        

        for step in retrieval_plan.steps:

            tool_result = await self.session.call_tool(

                step.tool_name, step.parameters

            )

            results[step.step_name] = tool_result

            

            # 动态调整后续步骤

            if await self.should_adjust_plan(step, tool_result, retrieval_plan):

                retrieval_plan = self._adjust_retrieval_plan(retrieval_plan, step, tool_result)

        

        return results

    

    def _create_retrieval_plan(self, query: str, analysis: QueryAnalysis) -> 'RetrievalPlan':

        """创建检索计划"""

        if analysis.query_type == QueryType.FACTUAL_QUERY:

            return RetrievalPlan([

                RetrievalStep("semantic_search", {"query": query, "top_k": 3})

            ])

        elif analysis.query_type == QueryType.COMPLEX_REASONING:

            return RetrievalPlan([

                RetrievalStep("multi_hop_retrieval", {

                    "initial_query": query, 

                    "max_hops": 3

                }),

                RetrievalStep("knowledge_graph_query", {

                    "entities": ["extracted_from_previous_step"],

                    "relationships": ["related_to", "part_of"]

                })

            ])

        else:

            return RetrievalPlan([

                RetrievalStep("semantic_search", {"query": query, "top_k": 5}),

                RetrievalStep("keyword_search", {"query": query, "top_k": 3})

            ])

<"www.binhaixin.gov.cn.felli.cn">

<"www.ninghe.gov.cn.felli.cn">

<"www.jinghai.gov.cn.felli.cn">

```


## 智能检索代理实现


检索代理负责根据查询分析结果自主选择和执行检索策略。


```python

# agents/retrieval_agent.py

from typing import List, Dict, Any

import asyncio

from dataclasses import dataclass


@dataclass

class RetrievalStep:

    """检索步骤"""

    tool_name: str

    parameters: Dict[str, Any]

    step_name: str


@dataclass

class RetrievalPlan:

    """检索计划"""

    steps: List[RetrievalStep]

    current_step: int = 0

    

    def get_next_step(self) -> Optional[RetrievalStep]:

        """获取下一个步骤"""

        if self.current_step < len(self.steps):

            step = self.steps[self.current_step]

            self.current_step += 1

            return step

        return None


class RetrievalAgent:

    """检索代理"""

    

    def __init__(self, fastmcp_client: FastMCPRAGClient):

        self.client = fastmcp_client

        self.retrieval_history = []

    

    async def execute_adaptive_retrieval(self, query: str, analysis: QueryAnalysis) -> Dict[str, Any]:

        """执行自适应检索"""

        initial_plan = self._create_initial_plan(query, analysis)

        executed_steps = []

        collected_documents = []

        

        current_plan = initial_plan

        while step := current_plan.get_next_step():

            # 执行检索步骤

            step_result = await self.client.session.call_tool(

                step.tool_name, step.parameters

            )

            

            executed_steps.append({

                "step": step,

                "result": step_result

            })

            

            # 收集文档

            documents = self._extract_documents(step_result)

            collected_documents.extend(documents)

            

            # 评估检索质量

            quality_score = await self.evaluate_retrieval_quality(

                query, collected_documents, analysis

            )

            

            # 动态调整计划

            if quality_score < 0.6 and len(executed_steps) < 5:

                new_steps = await self.generate_additional_steps(

                    query, collected_documents, analysis

                )

                if new_steps:

<"www.jizhou.gov.cn.felli.cn">

<"www.xuanwei.gov.cn.felli.cn">

<"www.chengjiang.gov.cn.felli.cn">

                    current_plan.steps.extend(new_steps)

            

            # 如果质量足够高,提前结束

            if quality_score > 0.85:

                break

        

        return {

            "documents": collected_documents,

            "executed_steps": executed_steps,

            "final_quality_score": quality_score,

            "retrieval_strategy": analysis.suggested_strategy.value

        }

    

    async def evaluate_retrieval_quality(self, query: str, documents: List[Dict], 

                                       analysis: QueryAnalysis) -> float:

        """评估检索质量"""

        evaluation_prompt = f"""

        评估以下检索结果对于回答查询的质量:

        

        查询:{query}

        查询类型:{analysis.query_type.value}

        检索到的文档数量:{len(documents)}

        文档摘要:{[doc.get('content', '')[:200] + '...' for doc in documents[:3]]}

        

        请从以下维度评估:

        1. 相关性(0-1)

        2. 覆盖度(0-1) 

        3. 信息新鲜度(0-1)

        4. 权威性(0-1)

        

        返回综合质量分数(0-1)。

        """

        

        quality_score = await self.client.session.analyze(evaluation_prompt)

        return float(quality_score)

    

    async def generate_additional_steps(self, query: str, current_documents: List[Dict],

                                      analysis: QueryAnalysis) -> List[RetrievalStep]:

        """生成额外的检索步骤"""

        gap_analysis = await self.analyze_information_gaps(query, current_documents)

        

        additional_steps = []

        for gap in gap_analysis.gaps:

            if gap.gap_type == "missing_entities":

                additional_steps.append(

                    RetrievalStep("knowledge_graph_query", {

                        "entities": gap.missing_entities,

                        "relationships": ["related_to"]

                    }, f"fill_entity_gap_{len(additional_steps)}")

                )

            elif gap.gap_type == "temporal_gap":

                additional_steps.append(

                    RetrievalStep("semantic_search", {

                        "query": f"{query} 最新信息",

                        "top_k": 3,

                        "filters": {"time_range": "recent"}

                    }, f"fill_temporal_gap_{len(additional_steps)}")

                )

<"www.luoyang.gov.cn.felli.cn">

<"www.kaifeng.gov.cn.felli.cn">

<"www.anyang.gov.cn.felli.cn">

        

        return additional_steps

```


## 生成与反思代理


生成代理负责基于检索结果生成回答,反思代理则对生成结果进行质量评估和改进。


```python

# agents/generation_agent.py

class GenerationAgent:

    """生成代理"""

    

    def __init__(self, fastmcp_client: FastMCPRAGClient):

        self.client = fastmcp_client

    

    async def generate_context_aware_response(self, query: str, retrieval_result: Dict[str, Any],

                                            analysis: QueryAnalysis) -> Dict[str, Any]:

        """生成上下文感知的回答"""

        

        # 准备生成上下文

        context = self._prepare_generation_context(query, retrieval_result, analysis)

        

        # 根据查询类型选择生成策略

        if analysis.query_type == QueryType.FACTUAL_QUERY:

            response = await self._generate_factual_response(context)

        elif analysis.query_type == QueryType.ANALYTICAL_QUERY:

            response = await self._generate_analytical_response(context)

        elif analysis.query_type == QueryType.COMPLEX_REASONING:

            response = await self._generate_complex_reasoning_response(context)

        else:

            response = await self._generate_general_response(context)

        

        # 添加引用和来源

        response_with_citations = await self._add_citations(response, retrieval_result)

        

        return {

            "response": response_with_citations,

            "generation_strategy": analysis.query_type.value,

            "context_used": context.get("used_documents", []),

            "confidence_score": await self._calculate_confidence(response_with_citations, context)

        }

    

    async def _generate_complex_reasoning_response(self, context: Dict) -> str:

        """生成复杂推理回答"""

        reasoning_prompt = f"""

        基于以下信息进行复杂推理:

        

        问题:{context['query']}

        相关信息:{context['documents']}

        

        请按照以下步骤推理:

        1. 提取关键事实和关系

        2. 识别信息间的逻辑联系

        3. 进行多步推理

        4. 得出结论并验证

        

        输出格式:

        - 推理过程

        - 最终结论

        - 不确定性说明

        """

        

        return await self.client.session.generate(reasoning_prompt)


# agents/reflection_agent.py

class ReflectionAgent:

    """反思代理"""

    

    def __init__(self, fastmcp_client: FastMCPRAGClient):

        self.client = fastmcp_client

    

    async def reflect_on_response(self, query: str, generation_result: Dict[str, Any],

                                retrieval_result: Dict[str, Any]) -> Dict[str, Any]:

        """对生成结果进行反思"""

        

        reflection_criteria = {

            "accuracy": "答案是否基于提供的证据且准确",

            "completeness": "是否完整回答了查询的所有方面",

            "coherence": "回答是否逻辑连贯且易于理解",

            "citation_quality": "引用是否准确支持相关陈述"

        }

        

        scores = {}

        feedback = {}

        

        for criterion, description in reflection_criteria.items():

            score, comments = await self._evaluate_criterion(

                criterion, query, generation_result, retrieval_result

            )

            scores[criterion] = score

            feedback[criterion] = comments

        

        overall_score = sum(scores.values()) / len(scores)

        needs_improvement = overall_score < 0.7

        

        improvement_suggestions = []

        if needs_improvement:

            improvement_suggestions = await self._generate_improvement_suggestions(

                query, generation_result, retrieval_result, scores, feedback

            )

        

        return {

            "overall_score": overall_score,

            "criterion_scores": scores,

            "feedback": feedback,

            "needs_improvement": needs_improvement,

            "improvement_suggestions": improvement_suggestions

        }

    

    async def _evaluate_criterion(self, criterion: str, query: str,

                                generation_result: Dict, retrieval_result: Dict) -> tuple:

        """评估特定标准"""

<"www.xinxiang.gov.cn.felli.cn">

<"www.jiaozuo.gov.cn.felli.cn">

<"www.nanyang.gov.cn.felli.cn">

        evaluation_prompt = f"""

        评估以下回答的{criterion}:

        

        查询:{query}

        生成回答:{generation_result['response']}

        源文档:{retrieval_result['documents']}

        

        请给出0-1的分数和具体反馈。

        """

        

        result = await self.client.session.analyze(evaluation_prompt)

        score = result.get("score", 0.5)

        feedback = result.get("feedback", "")

        

        return score, feedback

```


## 完整系统集成与实战


将各个组件集成为完整的Agentic RAG系统。


```python

# system_integration.py

import asyncio

from typing import Dict, Any

from fastmcp_integration.mcp_server import RAGMCPServer, FastMCPRAGClient

from agentic_rag.architecture import AgenticRAG, AgenticRAGConfig

from agents.retrieval_agent import RetrievalAgent

from agents.generation_agent import GenerationAgent

from agents.reflection_agent import ReflectionAgent


class AgenticRAGSystem:

    """完整的Agentic RAG系统"""

    

    def __init__(self, config: Dict[str, Any]):

        self.config = AgenticRAGConfig(**config)

        self.mcp_server = None

        self.fastmcp_client = None

        self.rag_system = None

    

    async def initialize(self):

        """初始化系统"""

        # 启动MCP服务器

        self.mcp_server = RAGMCPServer()

        await self.mcp_server.start()

        

        # 初始化FastMCP客户端

        self.fastmcp_client = FastMCPRAGClient("http://localhost:8080")

        await self.fastmcp_client.initialize()

        

        # 初始化各个代理

        retrieval_agent = RetrievalAgent(self.fastmcp_client)

        generation_agent = GenerationAgent(self.fastmcp_client)

        reflection_agent = ReflectionAgent(self.fastmcp_client)

        

        # 创建Agentic RAG系统

        self.rag_system = AgenticRAG(self.config)

        self.rag_system.retrieval_agent = retrieval_agent

        self.rag_system.generation_agent = generation_agent

        self.rag_system.reflection_agent = reflection_agent

        self.rag_system.fastmcp_client = self.fastmcp_client

        

        print("Agentic RAG系统初始化完成")

    

    async def process_query(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:

        """处理查询"""

<"www.xuchang.gov.cn.felli.cn">

<"www.luohe.gov.cn.felli.cn">

<"www.wuhu.gov.cn.felli.cn">

        if not self.rag_system:

            raise RuntimeError("系统未初始化")

        

        return await self.rag_system.process_query(query, context)

    

    async def shutdown(self):

        """关闭系统"""

        if self.mcp_server:

            await self.mcp_server.stop()

        if self.fastmcp_client:

            await self.fastmcp_client.close()


# 使用示例

async def main():

    """主函数示例"""

    config = {

        "retrieval_strategy": "hybrid",

        "max_retrieval_docs": 5,

        "enable_self_reflection": True,

        "max_iterations": 3,

        "confidence_threshold": 0.7

    }

    

    system = AgenticRAGSystem(config)

    

    try:

        await system.initialize()

        

        # 测试查询

        test_queries = [

            "解释深度学习中的注意力机制及其在Transformer中的应用",

            "比较Python和JavaScript在Web开发中的优缺点",

            "如何构建一个能够自主学习和改进的AI系统?"

        ]

        

        for query in test_queries:

            print(f"\n处理查询: {query}")

            result = await system.process_query(query)

            

            print(f"查询分析: {result['query_analysis']}")

            print(f"检索文档数: {len(result['retrieval_result']['documents'])}")

            print(f"生成回答: {result['generation_result']['response'][:200]}...")

            print(f"反思分数: {result['reflection_result']['overall_score']}")

            

    finally:

        await system.shutdown()


if __name__ == "__main__":

    asyncio.run(main())

```


基于FastMCP构建的Agentic RAG系统通过引入智能体概念,使传统的检索增强生成系统具备了自主决策、动态调整和持续优化的能力。这种架构不仅提高了系统的准确性和可靠性,还为复杂查询处理提供了更加智能的解决方案。随着技术的不断发展,Agentic RAG将在知识密集型应用中发挥越来越重要的作用。


请使用浏览器的分享功能分享到微信等