在现代企业会议场景中,从语音记录到结构化会议纪要的自动转换已成为提升工作效率的关键技术。本文将详细介绍如何基于Trae、Whisper、FFmpeg与Knowledge Graph MCP技术栈,构建智能语音会议纪要生成系统。
## 系统架构设计
该系统采用模块化设计,将语音处理、文本分析、知识提取和纪要生成有机结合,形成完整的处理流水线。
```python
# system_architecture.py
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
import asyncio
class ProcessingStage(Enum):
"""处理阶段枚举"""
AUDIO_PREPROCESSING = "audio_preprocessing"
SPEECH_TO_TEXT = "speech_to_text"
SPEAKER_DIARIZATION = "speaker_diarization"
TEXT_ANALYSIS = "text_analysis"
KNOWLEDGE_EXTRACTION = "knowledge_extraction"
SUMMARY_GENERATION = "summary_generation"
@dataclass
class AudioMetadata:
"""音频元数据"""
duration: float
sample_rate: int
channels: int
file_size: int
format: str
@dataclass
class SpeakerSegment:
"""说话人片段"""
speaker_id: str
start_time: float
end_time: float
text: str
confidence: float
@dataclass
class MeetingSummary:
"""会议纪要"""
<"www.bengbu.gov.cn.felli.cn">
<"www.huainan.gov.cn.felli.cn">
<"www.maanshan.gov.cn.felli.cn">
title: str
participants: List[str]
key_decisions: List[str]
action_items: List[Dict[str, str]]
discussion_topics: List[Dict[str, Any]]
next_steps: List[str]
metadata: Dict[str, Any]
class MeetingProcessor:
"""会议处理器核心类"""
def __init__(self):
self.audio_processor = AudioProcessor()
self.transcriber = WhisperTranscriber()
self.speaker_identifier = SpeakerIdentifier()
self.knowledge_graph = KnowledgeGraphMCP()
self.summary_generator = SummaryGenerator()
async def process_meeting(self, audio_file_path: str) -> MeetingSummary:
"""处理会议音频生成纪要"""
# 阶段1: 音频预处理
processed_audio = await self.audio_processor.preprocess_audio(audio_file_path)
# 阶段2: 语音转文本
transcription = await self.transcriber.transcribe(processed_audio)
# 阶段3: 说话人识别
speaker_segments = await self.speaker_identifier.identify_speakers(
processed_audio, transcription
)
<"www.anqing.gov.cn.felli.cn">
<"www.zhangzhou.gov.cn.felli.cn">
<"www.putian.gov.cn.felli.cn">
# 阶段4: 文本分析与知识提取
analyzed_content = await self.analyze_content(speaker_segments)
# 阶段5: 知识图谱构建
knowledge_graph = await self.knowledge_graph.build_graph(analyzed_content)
# 阶段6: 纪要生成
meeting_summary = await self.summary_generator.generate_summary(
analyzed_content, knowledge_graph
)
return meeting_summary
async def analyze_content(self, speaker_segments: List[SpeakerSegment]) -> Dict[str, Any]:
"""分析会议内容"""
analysis_results = {
"topics": [],
"decisions": [],
"action_items": [],
"questions": [],
"participants_engagement": {}
}
for segment in speaker_segments:
# 使用Trae进行内容分析
segment_analysis = await self.analyze_segment(segment)
analysis_results = self.merge_analysis(analysis_results, segment_analysis)
return analysis_results
```
<"www.sanming.gov.cn.felli.cn">
<"www.jiujiang.gov.cn.felli.cn">
<"www.ganzhou.gov.cn.felli.cn">
## 音频处理与语音识别模块
基于FFmpeg和Whisper的音频处理流水线,确保高质量的语音转文本效果。
```python
# audio_processing.py
import subprocess
import asyncio
from pathlib import Path
import json
import whisper
from typing import List, Tuple
class AudioProcessor:
"""音频处理器"""
def __init__(self):
self.supported_formats = ['.wav', '.mp3', '.m4a', '.flac']
async def preprocess_audio(self, audio_path: str) -> str:
"""音频预处理"""
output_path = self._get_output_path(audio_path)
# 使用FFmpeg进行音频预处理
ffmpeg_cmd = [
'ffmpeg', '-i', audio_path,
'-ac', '1', # 单声道
'-ar', '16000', # 16kHz采样率
'-acodec', 'pcm_s16le', # PCM编码
'-af', 'highpass=f=300,lowpass=f=3000,compand=attacks=0:decays=0.3:points=-80/-80|-30/-10|-20/-20|-0/-0', # 音频增强
'-y', # 覆盖输出文件
output_path
]
try:
process = await asyncio.create_subprocess_exec(
*ffmpeg_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await process.communicate()
if process.returncode == 0:
return output_path
else:
raise Exception(f"FFmpeg处理失败: {process.returncode}")
except Exception as e:
raise Exception(f"音频预处理错误: {str(e)}")
def _get_output_path(self, input_path: str) -> str:
"""生成输出路径"""
<"www.shangrao.gov.cn.felli.cn">
<"www.jining.gov.cn.felli.cn">
<"www.tengchong.gov.cn.felli.cn">
path = Path(input_path)
return str(path.parent / f"processed_{path.stem}.wav")
async def get_audio_metadata(self, audio_path: str) -> AudioMetadata:
"""获取音频元数据"""
ffprobe_cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', audio_path
]
process = await asyncio.create_subprocess_exec(
*ffprobe_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
metadata = json.loads(stdout)
audio_stream = next(
(stream for stream in metadata['streams'] if stream['codec_type'] == 'audio'),
None
)
return AudioMetadata(
duration=float(metadata['format']['duration']),
sample_rate=int(audio_stream['sample_rate']),
channels=int(audio_stream['channels']),
file_size=int(metadata['format']['size']),
format=metadata['format']['format_name']
)
class WhisperTranscriber:
"""Whisper语音识别器"""
def __init__(self, model_size: str = "base"):
self.model = whisper.load_model(model_size)
async def transcribe(self, audio_path: str) -> Dict[str, Any]:
"""语音转文本"""
try:
# 使用Whisper进行转录
result = self.model.transcribe(
audio_path,
language='zh', # 中文
task='transcribe',
fp16=False # 确保兼容性
)
return {
"text": result["text"],
"segments": result["segments"],
"language": result["language"]
}
except Exception as e:
raise Exception(f"语音识别失败: {str(e)}")
async def transcribe_with_timestamps(self, audio_path: str) -> List[Dict[str, Any]]:
"""带时间戳的转录"""
<"www.shuifu.gov.cn.felli.cn">
<"www.chuxiong.gov.cn.felli.cn">
<"www.leshan.gov.cn.felli.cn">
result = await self.transcribe(audio_path)
segments_with_timestamps = []
for segment in result["segments"]:
segments_with_timestamps.append({
"start": segment["start"],
"end": segment["end"],
"text": segment["text"],
"confidence": segment.get("confidence", 0.0)
})
return segments_with_timestamps
```
## 说话人识别与语音分割
结合语音特征和上下文信息进行说话人识别和语音分割。
```python
# speaker_diarization.py
import numpy as np
from sklearn.cluster import KMeans
from typing import List, Dict
import asyncio
class SpeakerIdentifier:
"""说话人识别器"""
<"www.gejiu.gov.cn.felli.cn">
<"www.najiang.gov.cn.felli.cn">
<"www.mengzi.gov.cn.felli.cn">
def __init__(self):
self.speaker_profiles = {}
async def identify_speakers(self, audio_path: str,
transcription: Dict[str, Any]) -> List[SpeakerSegment]:
"""识别说话人"""
# 提取音频特征
audio_features = await self.extract_audio_features(audio_path)
# 说话人聚类
speaker_clusters = await self.cluster_speakers(audio_features)
# 与转录文本对齐
speaker_segments = await self.align_with_transcription(
speaker_clusters, transcription
)
return speaker_segments
async def extract_audio_features(self, audio_path: str) -> List[Dict[str, Any]]:
"""提取音频特征"""
# 使用pyAudioAnalysis或其他音频处理库
# 这里简化实现
features = []
# 模拟特征提取
feature_batch = {
"mfcc": np.random.random((13,)), # MFCC特征
"spectral_centroid": np.random.random(),
"pitch": np.random.random(),
"energy": np.random.random()
}
features.append(feature_batch)
return features
async def cluster_speakers(self, features: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""说话人聚类"""
if not features:
return []
# 提取特征向量
feature_vectors = []
for feature in features:
vector = np.concatenate([
feature["mfcc"],
[feature["spectral_centroid"]],
[feature["pitch"]],
[feature["energy"]]
])
feature_vectors.append(vector)
# 使用K-means聚类
n_speakers = min(5, len(feature_vectors)) # 最多5个说话人
kmeans = KMeans(n_clusters=n_speakers, random_state=42)
labels = kmeans.fit_predict(feature_vectors)
# 构建聚类结果
clusters = []
for i, label in enumerate(labels):
clusters.append({
"cluster_id": int(label),
"features": feature_vectors[i],
"confidence": 0.8 # 简化置信度计算
})
return clusters
async def align_with_transcription(self, speaker_clusters: List[Dict[str, Any]],
transcription: Dict[str, Any]) -> List[SpeakerSegment]:
"""与转录文本对齐"""
segments = []
for i, segment in enumerate(transcription["segments"]):
# 简单的对齐逻辑 - 实际应用中需要更复杂的算法
cluster_idx = i % len(speaker_clusters) if speaker_clusters else 0
speaker_segment = SpeakerSegment(
speaker_id=f"speaker_{speaker_clusters[cluster_idx]['cluster_id']}",
start_time=segment["start"],
end_time=segment["end"],
text=segment["text"],
confidence=min(segment.get("confidence", 0.5),
speaker_clusters[cluster_idx]["confidence"])
)
segments.append(speaker_segment)
return segments
```
## Knowledge Graph MCP集成
基于MCP协议构建知识图谱,实现会议内容的深度理解和关系提取。
```python
<"www.hengshui.gov.cn.felli.cn">
<"www.xinji.gov.cn.felli.cn">
<"www.mile.gov.cn.felli.cn">
# knowledge_graph_mcp.py
from mcp import MCPServer
import json
from typing import Dict, List, Any, Tuple
import networkx as nx
from datetime import datetime
class KnowledgeGraphMCP(MCPServer):
"""知识图谱MCP服务器"""
def __init__(self):
super().__init__()
self.graph = nx.MultiDiGraph()
self.entity_cache = {}
self.tools = {
"extract_entities": {
"description": "从文本中提取实体",
"parameters": {
"text": {"type": "string", "description": "输入文本"}
}
},
"extract_relations": {
"description": "提取实体关系",
"parameters": {
"entities": {"type": "array", "description": "实体列表"},
"context": {"type": "string", "description": "上下文信息"}
}
},
"query_graph": {
"description": "查询知识图谱",
"parameters": {
"query": {"type": "string", "description": "查询语句"},
"entity_types": {"type": "array", "description": "实体类型过滤"}
}
}
}
async def build_graph(self, analyzed_content: Dict[str, Any]) -> nx.MultiDiGraph:
"""构建会议知识图谱"""
# 清空现有图谱
self.graph.clear()
# 提取实体和关系
entities = await self.extract_entities_from_meeting(analyzed_content)
relations = await self.extract_relations(entities, analyzed_content)
# 构建图谱
await self._build_graph_structure(entities, relations)
return self.graph
async def extract_entities_from_meeting(self, content: Dict[str, Any]) -> List[Dict[str, Any]]:
"""从会议内容提取实体"""
entities = []
<"www.xinle.gov.cn.felli.cn">
<"www.zunhua.gov.cn.felli.cn">
<"www.qianan.gov.cn.felli.cn">
# 提取参与者
for speaker_segment in content.get("speaker_segments", []):
entities.append({
"type": "Person",
"name": speaker_segment.speaker_id,
"properties": {
"speech_duration": speaker_segment.end_time - speaker_segment.start_time,
"segment_count": 1
}
})
# 提取主题实体
for topic in content.get("topics", []):
entities.append({
"type": "Topic",
"name": topic["name"],
"properties": {
"mention_count": topic.get("mention_count", 1),
"importance": topic.get("importance", 0.5)
}
})
# 提取决策实体
for decision in content.get("decisions", []):
entities.append({
"type": "Decision",
"name": decision["content"],
"properties": {
"maker": decision.get("maker", "unknown"),
"timestamp": decision.get("timestamp", datetime.now().isoformat())
}
})
return entities
async def extract_relations(self, entities: List[Dict[str, Any]],
content: Dict[str, Any]) -> List[Dict[str, Any]]:
"""提取实体关系"""
relations = []
# 构建参与者-主题关系
for topic in content.get("topics", []):
for speaker in [e for e in entities if e["type"] == "Person"]:
relations.append({
"source": speaker["name"],
"target": topic["name"],
"type": "discussed",
"properties": {
"mention_count": 1,
"first_mention": datetime.now().isoformat()
}
})
# 构建决策-参与者关系
for decision in [e for e in entities if e["type"] == "Decision"]:
maker = decision["properties"].get("maker")
if maker:
relations.append({
"source": maker,
"target": decision["name"],
"type": "made_decision",
"properties": {
"timestamp": decision["properties"]["timestamp"]
}
})
return relations
async def _build_graph_structure(self, entities: List[Dict[str, Any]],
relations: List[Dict[str, Any]]):
"""构建图谱结构"""
# 添加实体节点
for entity in entities:
self.graph.add_node(
entity["name"],
type=entity["type"],
**entity["properties"]
)
<"www.wuan.gov.cn.felli.cn">
<"www.nangong.gov.cn.felli.cn">
<"www.shahe.gov.cn.felli.cn">
# 添加关系边
for relation in relations:
self.graph.add_edge(
relation["source"],
relation["target"],
relation_type=relation["type"],
**relation["properties"]
)
async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""处理MCP工具调用"""
if tool_name == "extract_entities":
return await self._extract_entities_tool(arguments["text"])
elif tool_name == "extract_relations":
return await self._extract_relations_tool(
arguments["entities"], arguments["context"]
)
elif tool_name == "query_graph":
return await self._query_graph_tool(
arguments["query"], arguments.get("entity_types", [])
)
return f"未知工具: {tool_name}"
async def _extract_entities_tool(self, text: str) -> str:
"""实体提取工具"""
# 使用Trae进行实体提取
extraction_prompt = f"""
从以下会议文本中提取实体:
{text}
提取以下类型的实体:
- 人物 (Person)
- 组织 (Organization)
- 项目 (Project)
- 时间 (Time)
- 地点 (Location)
- 决策 (Decision)
- 任务 (Task)
以JSON格式返回提取结果。
"""
# 这里应该调用Trae的实体提取功能
entities = [] # 模拟提取结果
return json.dumps({"entities": entities})
```
## 智能纪要生成与Trae集成
基于知识图谱和会议内容,使用Trae生成结构化的会议纪要。
```python
# summary_generation.py
from typing import List, Dict, Any
import asyncio
import json
class SummaryGenerator:
"""纪要生成器"""
def __init__(self, trae_client):
self.trae_client = trae_client
async def generate_summary(self, analyzed_content: Dict[str, Any],
knowledge_graph: Any) -> MeetingSummary:
"""生成会议纪要"""
# 使用知识图谱增强内容理解
enhanced_content = await self.enhance_with_knowledge_graph(
analyzed_content, knowledge_graph
)
# 生成结构化纪要
summary_data = await self.generate_structured_summary(enhanced_content)
# 格式化和优化
formatted_summary = await self.format_summary(summary_data)
return formatted_summary
async def enhance_with_knowledge_graph(self, content: Dict[str, Any],
knowledge_graph: Any) -> Dict[str, Any]:
"""使用知识图谱增强内容"""
enhanced_content = content.copy()
# 分析参与者关系
participant_relations = await self.analyze_participant_relations(knowledge_graph)
enhanced_content["participant_relations"] = participant_relations
# 识别关键决策路径
decision_paths = await self.identify_decision_paths(knowledge_graph)
enhanced_content["decision_paths"] = decision_paths
# 提取话题演进
topic_evolution = await self.analyze_topic_evolution(knowledge_graph)
enhanced_content["topic_evolution"] = topic_evolution
return enhanced_content
async def generate_structured_summary(self, enhanced_content: Dict[str, Any]) -> Dict[str, Any]:
"""生成结构化纪要"""
summary_prompt = f"""
基于以下会议分析结果生成结构化会议纪要:
会议内容分析:{json.dumps(enhanced_content, ensure_ascii=False, indent=2)}
请生成包含以下部分的会议纪要:
1. 会议标题
2. 参会人员列表
3. 主要讨论话题
4. 关键决策
5. 行动项(包含负责人和截止时间)
6. 下一步计划
7. 其他重要信息
以JSON格式返回结果。
"""
# 使用Trae生成纪要
summary_result = await self.trae_client.generate(summary_prompt)
try:
return json.loads(summary_result)
except json.JSONDecodeError:
# 如果返回的不是JSON,进行后处理
return await self.post_process_summary(summary_result)
async def post_process_summary(self, raw_summary: str) -> Dict[str, Any]:
"""后处理生成的纪要"""
# 使用Trae进行结构化处理
structuring_prompt = f"""
<"www.anguo.gov.cn.felli.cn">
<"www.gaobeidian.gov.cn.felli.cn">
<"www.zhuozhou.gov.cn.felli.cn">
将以下会议纪要文本转换为结构化JSON格式:
{raw_summary}
要求的结构:
{{
"title": "会议标题",
"participants": ["参会人1", "参会人2"],
"key_decisions": ["决策1", "决策2"],
"action_items": [
{{
"task": "任务描述",
"assignee": "负责人",
"deadline": "截止时间"
}}
],
"discussion_topics": [
{{
"topic": "话题名称",
"summary": "讨论摘要"
}}
],
"next_steps": ["下一步1", "下一步2"]
}}
"""
structured_result = await self.trae_client.generate(structuring_prompt)
return json.loads(structured_result)
class TraeClient:
"""Trae客户端封装"""
def __init__(self, api_key: str, base_url: str = "https://api.trae.ai/v1"):
self.api_key = api_key
self.base_url = base_url
async def generate(self, prompt: str, **kwargs) -> str:
"""生成文本"""
# 这里实现Trae API调用
# 简化实现
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"prompt": prompt,
"max_tokens": kwargs.get("max_tokens", 1000),
"temperature": kwargs.get("temperature", 0.7)
}
# 实际应该使用aiohttp进行异步调用
# async with aiohttp.ClientSession() as session:
# async with session.post(f"{self.base_url}/generate",
# json=data, headers=headers) as response:
# result = await response.json()
# return result["choices"][0]["text"]
# 模拟返回
return "模拟生成的会议纪要内容"
```
## 完整系统集成
将各个模块集成为完整的语音会议纪要生成系统。
```python
# main_system.py
import asyncio
from typing import Dict, Any
import json
from datetime import datetime
class MeetingMinutesSystem:
"""会议纪要生成系统"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.processor = MeetingProcessor()
self.trae_client = TraeClient(config.get("trae_api_key"))
async def initialize(self):
"""初始化系统"""
# 初始化各个组件
self.processor.summary_generator.trae_client = self.trae_client
# 启动Knowledge Graph MCP服务器
await self.processor.knowledge_graph.start()
print("会议纪要系统初始化完成")
async def process_meeting_audio(self, audio_file_path: str,
meeting_context: Dict[str, Any] = None) -> Dict[str, Any]:
"""处理会议音频"""
start_time = datetime.now()
try:
# 处理会议音频
meeting_summary = await self.processor.process_meeting(audio_file_path)
# 添加处理元数据
processing_metadata = {
"processing_time": (datetime.now() - start_time).total_seconds(),
"audio_file": audio_file_path,
"context_provided": meeting_context is not None,
"timestamp": datetime.now().isoformat()
}
result = {
"success": True,
"meeting_summary": meeting_summary,
"processing_metadata": processing_metadata
}
except Exception as e:
result = {
"success": False,
"error": str(e),
"processing_metadata": {
"processing_time": (datetime.now() - start_time).total_seconds(),
"timestamp": datetime.now().isoformat()
}
}
return result
async def shutdown(self):
"""关闭系统"""
await self.processor.knowledge_graph.stop()
# 使用示例
async def main():
"""主函数示例"""
config = {
<"www.dingzhou.gov.cn.felli.cn">
<"www.botou.gov.cn.felli.cn">
<"www.renqiu.gov.cn.felli.cn">
"trae_api_key": "your_trae_api_key",
"whisper_model": "base",
"enable_speaker_diarization": True,
"max_speakers": 5
}
system = MeetingMinutesSystem(config)
try:
await system.initialize()
# 处理示例会议音频
audio_file = "meeting_recording.wav"
meeting_context = {
"meeting_topic": "季度项目规划",
"expected_participants": ["张三", "李四", "王五"],
"scheduled_duration": 60
}
result = await system.process_meeting_audio(audio_file, meeting_context)
if result["success"]:
summary = result["meeting_summary"]
print(f"会议标题: {summary.title}")
print(f"参会人员: {', '.join(summary.participants)}")
print(f"关键决策: {summary.key_decisions}")
print(f"行动项: {summary.action_items}")
else:
print(f"处理失败: {result['error']}")
finally:
await system.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
通过Trae、Whisper、FFmpeg与Knowledge Graph MCP技术的深度集成,我们构建了一个能够从原始语音自动生成结构化会议纪要的智能系统。该系统不仅实现了高精度的语音转文本,还通过知识图谱技术深入理解会议内容,生成具有实际价值的会议纪要,显著提升了会议效率和信息管理能力。