在当今人工智能应用领域,检索增强生成(RAG)系统已成为连接大语言模型与专业知识库的重要桥梁。然而,传统RAG系统往往缺乏自主决策和动态调整能力。基于FastMCP构建的Agentic RAG系统通过引入智能体概念,让RAG具备了自主决策、工具使用和动态优化的能力。
## Agentic RAG架构设计
Agentic RAG与传统RAG的核心区别在于引入了自主决策层,系统能够根据查询内容自主选择检索策略、调整生成参数并执行后续操作。
```python
# agentic_rag/architecture.py
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
from enum import Enum
import asyncio
class QueryType(Enum):
"""查询类型枚举"""
FACTUAL_QUERY = "factual"
ANALYTICAL_QUERY = "analytical"
OPERATIONAL_QUERY = "operational"
COMPLEX_REASONING = "complex"
class RetrievalStrategy(Enum):
"""检索策略枚举"""
SEMANTIC_SEARCH = "semantic"
KEYWORD_SEARCH = "keyword"
HYBRID_SEARCH = "hybrid"
MULTI_HOP = "multi_hop"
class AgenticRAGConfig(BaseModel):
"""Agentic RAG配置"""
retrieval_strategy: RetrievalStrategy = RetrievalStrategy.HYBRID_SEARCH
max_retrieval_docs: int = 5
enable_self_reflection: bool = True
max_iterations: int = 3
confidence_threshold: float = 0.7
<"www.pinggu.gov.cn.felli.cn">
<"www.miyun.gov.cn.felli.cn">
<"www.yanqing.gov.cn.felli.cn">
class QueryAnalysis(BaseModel):
"""查询分析结果"""
query_type: QueryType
required_depth: str
needs_multiple_sources: bool
suggested_strategy: RetrievalStrategy
confidence: float
class AgenticRAG:
"""Agentic RAG核心类"""
def __init__(self, config: AgenticRAGConfig):
self.config = config
self.retrieval_agent = RetrievalAgent()
self.generation_agent = GenerationAgent()
self.reflection_agent = ReflectionAgent()
self.tool_agent = ToolAgent()
async def process_query(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""处理用户查询的完整流程"""
# 阶段1: 查询分析与策略制定
query_analysis = await self.analyze_query(query, context)
# 阶段2: 自主检索与信息收集
retrieval_result = await self.retrieve_information(query, query_analysis)
# 阶段3: 生成与验证
generation_result = await self.generate_response(query, retrieval_result, query_analysis)
# 阶段4: 反思与优化
if self.config.enable_self_reflection:
reflection_result = await self.reflect_on_response(
query, generation_result, retrieval_result
)
if reflection_result.needs_improvement:
generation_result = await self.improve_response(
generation_result, reflection_result
)
return {
"query_analysis": query_analysis,
"retrieval_result": retrieval_result,
"generation_result": generation_result,
"reflection_result": reflection_result if self.config.enable_self_reflection else None
}
<"www.heping.gov.cn.felli.cn">
<"www.hedong.gov.cn.felli.cn">
<"www.hexi.gov.cn.felli.cn">
async def analyze_query(self, query: str, context: Dict[str, Any]) -> QueryAnalysis:
"""分析查询意图和需求"""
# 使用FastMCP进行查询分析
analysis_prompt = f"""
分析以下查询,确定最佳处理策略:
查询:{query}
上下文:{context}
请分析:
1. 查询类型(事实查询、分析查询、操作查询、复杂推理)
2. 需要的信息深度
3. 是否需要多源信息
4. 建议的检索策略
5. 分析置信度
以JSON格式返回分析结果。
"""
# 通过FastMCP调用分析模型
analysis_result = await self.fastmcp_client.analyze(analysis_prompt)
return QueryAnalysis(**analysis_result)
```
## FastMCP集成与工具定义
FastMCP为Agentic RAG提供了标准化的工具调用和模型交互接口。
```python
# fastmcp_integration/mcp_server.py
from mcp import MCPServer
from typing import Dict, Any, List
import json
<"www.nankai.gov.cn.felli.cn">
<"www.hebei.gov.cn.felli.cn">
<"www.hongqiao.gov.cn.felli.cn">
import asyncio
class RAGMCPServer(MCPServer):
"""RAG专用MCP服务器"""
def __init__(self):
super().__init__()
self.tools = {
"semantic_search": {
"description": "语义相似度搜索",
"parameters": {
"query": {"type": "string", "description": "搜索查询"},
"top_k": {"type": "number", "description": "返回结果数量"},
"filters": {"type": "object", "description": "过滤条件"}
}
},
"multi_hop_retrieval": {
"description": "多跳检索",
"parameters": {
"initial_query": {"type": "string", "description": "初始查询"},
"max_hops": {"type": "number", "description": "最大跳数"}
}
},
"knowledge_graph_query": {
"description": "知识图谱查询",
"parameters": {
"entities": {"type": "array", "description": "实体列表"},
"relationships": {"type": "array", "description": "关系类型"}
}
},
"answer_validation": {
"description": "答案验证",
"parameters": {
"answer": {"type": "string", "description": "待验证答案"},
"source_documents": {"type": "array", "description": "源文档"}
}
}
}
async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""处理工具调用"""
if tool_name == "semantic_search":
return await self.semantic_search(
arguments["query"],
arguments.get("top_k", 5),
arguments.get("filters", {})
)
elif tool_name == "multi_hop_retrieval":
return await self.multi_hop_retrieval(
arguments["initial_query"],
arguments.get("max_hops", 3)
)
<"www.dongli.gov.cn.felli.cn">
<"www.xiqing.gov.cn.felli.cn">
<"www.jinnan.gov.cn.felli.cn">
elif tool_name == "knowledge_graph_query":
return await self.knowledge_graph_query(
arguments["entities"],
arguments.get("relationships", [])
)
elif tool_name == "answer_validation":
return await self.answer_validation(
arguments["answer"],
arguments["source_documents"]
)
return f"未知工具: {tool_name}"
async def semantic_search(self, query: str, top_k: int, filters: Dict) -> str:
"""语义搜索实现"""
# 这里集成向量数据库搜索
vector_results = await self.vector_db.similarity_search(query, top_k, filters)
# 应用重排序
reranked_results = await self.rerank_results(vector_results, query)
return json.dumps({
"results": reranked_results,
"query": query,
"count": len(reranked_results)
})
async def multi_hop_retrieval(self, initial_query: str, max_hops: int) -> str:
"""多跳检索实现"""
current_query = initial_query
collected_documents = []
for hop in range(max_hops):
# 检索当前查询的相关文档
results = await self.semantic_search(current_query, 3, {})
results_data = json.loads(results)
if not results_data["results"]:
break
collected_documents.extend(results_data["results"])
# 判断是否需要继续检索
should_continue = await self.decide_continuation(
initial_query, collected_documents, hop, max_hops
)
if not should_continue:
break
# 生成下一跳查询
current_query = await self.generate_next_query(
initial_query, collected_documents
)
return json.dumps({
"documents": collected_documents,
"hops_performed": min(hop + 1, max_hops),
"final_query": current_query
})
<"www.beichen.gov.cn.felli.cn">
<"www.wuqing.gov.cn.felli.cn">
<"www.baodi.gov.cn.felli.cn">
# FastMCP客户端封装
class FastMCPRAGClient:
"""FastMCP RAG客户端"""
def __init__(self, mcp_server_url: str):
self.server_url = mcp_server_url
self.session = None
async def initialize(self):
"""初始化客户端"""
self.session = await self.create_mcp_session(self.server_url)
async def execute_retrieval_plan(self, query: str, analysis: QueryAnalysis) -> Dict[str, Any]:
"""执行检索计划"""
retrieval_plan = self._create_retrieval_plan(query, analysis)
results = {}
for step in retrieval_plan.steps:
tool_result = await self.session.call_tool(
step.tool_name, step.parameters
)
results[step.step_name] = tool_result
# 动态调整后续步骤
if await self.should_adjust_plan(step, tool_result, retrieval_plan):
retrieval_plan = self._adjust_retrieval_plan(retrieval_plan, step, tool_result)
return results
def _create_retrieval_plan(self, query: str, analysis: QueryAnalysis) -> 'RetrievalPlan':
"""创建检索计划"""
if analysis.query_type == QueryType.FACTUAL_QUERY:
return RetrievalPlan([
RetrievalStep("semantic_search", {"query": query, "top_k": 3})
])
elif analysis.query_type == QueryType.COMPLEX_REASONING:
return RetrievalPlan([
RetrievalStep("multi_hop_retrieval", {
"initial_query": query,
"max_hops": 3
}),
RetrievalStep("knowledge_graph_query", {
"entities": ["extracted_from_previous_step"],
"relationships": ["related_to", "part_of"]
})
])
else:
return RetrievalPlan([
RetrievalStep("semantic_search", {"query": query, "top_k": 5}),
RetrievalStep("keyword_search", {"query": query, "top_k": 3})
])
<"www.binhaixin.gov.cn.felli.cn">
<"www.ninghe.gov.cn.felli.cn">
<"www.jinghai.gov.cn.felli.cn">
```
## 智能检索代理实现
检索代理负责根据查询分析结果自主选择和执行检索策略。
```python
# agents/retrieval_agent.py
from typing import List, Dict, Any
import asyncio
from dataclasses import dataclass
@dataclass
class RetrievalStep:
"""检索步骤"""
tool_name: str
parameters: Dict[str, Any]
step_name: str
@dataclass
class RetrievalPlan:
"""检索计划"""
steps: List[RetrievalStep]
current_step: int = 0
def get_next_step(self) -> Optional[RetrievalStep]:
"""获取下一个步骤"""
if self.current_step < len(self.steps):
step = self.steps[self.current_step]
self.current_step += 1
return step
return None
class RetrievalAgent:
"""检索代理"""
def __init__(self, fastmcp_client: FastMCPRAGClient):
self.client = fastmcp_client
self.retrieval_history = []
async def execute_adaptive_retrieval(self, query: str, analysis: QueryAnalysis) -> Dict[str, Any]:
"""执行自适应检索"""
initial_plan = self._create_initial_plan(query, analysis)
executed_steps = []
collected_documents = []
current_plan = initial_plan
while step := current_plan.get_next_step():
# 执行检索步骤
step_result = await self.client.session.call_tool(
step.tool_name, step.parameters
)
executed_steps.append({
"step": step,
"result": step_result
})
# 收集文档
documents = self._extract_documents(step_result)
collected_documents.extend(documents)
# 评估检索质量
quality_score = await self.evaluate_retrieval_quality(
query, collected_documents, analysis
)
# 动态调整计划
if quality_score < 0.6 and len(executed_steps) < 5:
new_steps = await self.generate_additional_steps(
query, collected_documents, analysis
)
if new_steps:
<"www.jizhou.gov.cn.felli.cn">
<"www.xuanwei.gov.cn.felli.cn">
<"www.chengjiang.gov.cn.felli.cn">
current_plan.steps.extend(new_steps)
# 如果质量足够高,提前结束
if quality_score > 0.85:
break
return {
"documents": collected_documents,
"executed_steps": executed_steps,
"final_quality_score": quality_score,
"retrieval_strategy": analysis.suggested_strategy.value
}
async def evaluate_retrieval_quality(self, query: str, documents: List[Dict],
analysis: QueryAnalysis) -> float:
"""评估检索质量"""
evaluation_prompt = f"""
评估以下检索结果对于回答查询的质量:
查询:{query}
查询类型:{analysis.query_type.value}
检索到的文档数量:{len(documents)}
文档摘要:{[doc.get('content', '')[:200] + '...' for doc in documents[:3]]}
请从以下维度评估:
1. 相关性(0-1)
2. 覆盖度(0-1)
3. 信息新鲜度(0-1)
4. 权威性(0-1)
返回综合质量分数(0-1)。
"""
quality_score = await self.client.session.analyze(evaluation_prompt)
return float(quality_score)
async def generate_additional_steps(self, query: str, current_documents: List[Dict],
analysis: QueryAnalysis) -> List[RetrievalStep]:
"""生成额外的检索步骤"""
gap_analysis = await self.analyze_information_gaps(query, current_documents)
additional_steps = []
for gap in gap_analysis.gaps:
if gap.gap_type == "missing_entities":
additional_steps.append(
RetrievalStep("knowledge_graph_query", {
"entities": gap.missing_entities,
"relationships": ["related_to"]
}, f"fill_entity_gap_{len(additional_steps)}")
)
elif gap.gap_type == "temporal_gap":
additional_steps.append(
RetrievalStep("semantic_search", {
"query": f"{query} 最新信息",
"top_k": 3,
"filters": {"time_range": "recent"}
}, f"fill_temporal_gap_{len(additional_steps)}")
)
<"www.luoyang.gov.cn.felli.cn">
<"www.kaifeng.gov.cn.felli.cn">
<"www.anyang.gov.cn.felli.cn">
return additional_steps
```
## 生成与反思代理
生成代理负责基于检索结果生成回答,反思代理则对生成结果进行质量评估和改进。
```python
# agents/generation_agent.py
class GenerationAgent:
"""生成代理"""
def __init__(self, fastmcp_client: FastMCPRAGClient):
self.client = fastmcp_client
async def generate_context_aware_response(self, query: str, retrieval_result: Dict[str, Any],
analysis: QueryAnalysis) -> Dict[str, Any]:
"""生成上下文感知的回答"""
# 准备生成上下文
context = self._prepare_generation_context(query, retrieval_result, analysis)
# 根据查询类型选择生成策略
if analysis.query_type == QueryType.FACTUAL_QUERY:
response = await self._generate_factual_response(context)
elif analysis.query_type == QueryType.ANALYTICAL_QUERY:
response = await self._generate_analytical_response(context)
elif analysis.query_type == QueryType.COMPLEX_REASONING:
response = await self._generate_complex_reasoning_response(context)
else:
response = await self._generate_general_response(context)
# 添加引用和来源
response_with_citations = await self._add_citations(response, retrieval_result)
return {
"response": response_with_citations,
"generation_strategy": analysis.query_type.value,
"context_used": context.get("used_documents", []),
"confidence_score": await self._calculate_confidence(response_with_citations, context)
}
async def _generate_complex_reasoning_response(self, context: Dict) -> str:
"""生成复杂推理回答"""
reasoning_prompt = f"""
基于以下信息进行复杂推理:
问题:{context['query']}
相关信息:{context['documents']}
请按照以下步骤推理:
1. 提取关键事实和关系
2. 识别信息间的逻辑联系
3. 进行多步推理
4. 得出结论并验证
输出格式:
- 推理过程
- 最终结论
- 不确定性说明
"""
return await self.client.session.generate(reasoning_prompt)
# agents/reflection_agent.py
class ReflectionAgent:
"""反思代理"""
def __init__(self, fastmcp_client: FastMCPRAGClient):
self.client = fastmcp_client
async def reflect_on_response(self, query: str, generation_result: Dict[str, Any],
retrieval_result: Dict[str, Any]) -> Dict[str, Any]:
"""对生成结果进行反思"""
reflection_criteria = {
"accuracy": "答案是否基于提供的证据且准确",
"completeness": "是否完整回答了查询的所有方面",
"coherence": "回答是否逻辑连贯且易于理解",
"citation_quality": "引用是否准确支持相关陈述"
}
scores = {}
feedback = {}
for criterion, description in reflection_criteria.items():
score, comments = await self._evaluate_criterion(
criterion, query, generation_result, retrieval_result
)
scores[criterion] = score
feedback[criterion] = comments
overall_score = sum(scores.values()) / len(scores)
needs_improvement = overall_score < 0.7
improvement_suggestions = []
if needs_improvement:
improvement_suggestions = await self._generate_improvement_suggestions(
query, generation_result, retrieval_result, scores, feedback
)
return {
"overall_score": overall_score,
"criterion_scores": scores,
"feedback": feedback,
"needs_improvement": needs_improvement,
"improvement_suggestions": improvement_suggestions
}
async def _evaluate_criterion(self, criterion: str, query: str,
generation_result: Dict, retrieval_result: Dict) -> tuple:
"""评估特定标准"""
<"www.xinxiang.gov.cn.felli.cn">
<"www.jiaozuo.gov.cn.felli.cn">
<"www.nanyang.gov.cn.felli.cn">
evaluation_prompt = f"""
评估以下回答的{criterion}:
查询:{query}
生成回答:{generation_result['response']}
源文档:{retrieval_result['documents']}
请给出0-1的分数和具体反馈。
"""
result = await self.client.session.analyze(evaluation_prompt)
score = result.get("score", 0.5)
feedback = result.get("feedback", "")
return score, feedback
```
## 完整系统集成与实战
将各个组件集成为完整的Agentic RAG系统。
```python
# system_integration.py
import asyncio
from typing import Dict, Any
from fastmcp_integration.mcp_server import RAGMCPServer, FastMCPRAGClient
from agentic_rag.architecture import AgenticRAG, AgenticRAGConfig
from agents.retrieval_agent import RetrievalAgent
from agents.generation_agent import GenerationAgent
from agents.reflection_agent import ReflectionAgent
class AgenticRAGSystem:
"""完整的Agentic RAG系统"""
def __init__(self, config: Dict[str, Any]):
self.config = AgenticRAGConfig(**config)
self.mcp_server = None
self.fastmcp_client = None
self.rag_system = None
async def initialize(self):
"""初始化系统"""
# 启动MCP服务器
self.mcp_server = RAGMCPServer()
await self.mcp_server.start()
# 初始化FastMCP客户端
self.fastmcp_client = FastMCPRAGClient("http://localhost:8080")
await self.fastmcp_client.initialize()
# 初始化各个代理
retrieval_agent = RetrievalAgent(self.fastmcp_client)
generation_agent = GenerationAgent(self.fastmcp_client)
reflection_agent = ReflectionAgent(self.fastmcp_client)
# 创建Agentic RAG系统
self.rag_system = AgenticRAG(self.config)
self.rag_system.retrieval_agent = retrieval_agent
self.rag_system.generation_agent = generation_agent
self.rag_system.reflection_agent = reflection_agent
self.rag_system.fastmcp_client = self.fastmcp_client
print("Agentic RAG系统初始化完成")
async def process_query(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""处理查询"""
<"www.xuchang.gov.cn.felli.cn">
<"www.luohe.gov.cn.felli.cn">
<"www.wuhu.gov.cn.felli.cn">
if not self.rag_system:
raise RuntimeError("系统未初始化")
return await self.rag_system.process_query(query, context)
async def shutdown(self):
"""关闭系统"""
if self.mcp_server:
await self.mcp_server.stop()
if self.fastmcp_client:
await self.fastmcp_client.close()
# 使用示例
async def main():
"""主函数示例"""
config = {
"retrieval_strategy": "hybrid",
"max_retrieval_docs": 5,
"enable_self_reflection": True,
"max_iterations": 3,
"confidence_threshold": 0.7
}
system = AgenticRAGSystem(config)
try:
await system.initialize()
# 测试查询
test_queries = [
"解释深度学习中的注意力机制及其在Transformer中的应用",
"比较Python和JavaScript在Web开发中的优缺点",
"如何构建一个能够自主学习和改进的AI系统?"
]
for query in test_queries:
print(f"\n处理查询: {query}")
result = await system.process_query(query)
print(f"查询分析: {result['query_analysis']}")
print(f"检索文档数: {len(result['retrieval_result']['documents'])}")
print(f"生成回答: {result['generation_result']['response'][:200]}...")
print(f"反思分数: {result['reflection_result']['overall_score']}")
finally:
await system.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
基于FastMCP构建的Agentic RAG系统通过引入智能体概念,使传统的检索增强生成系统具备了自主决策、动态调整和持续优化的能力。这种架构不仅提高了系统的准确性和可靠性,还为复杂查询处理提供了更加智能的解决方案。随着技术的不断发展,Agentic RAG将在知识密集型应用中发挥越来越重要的作用。