详解5个最需要了解的AI多智能体编排框架【5】:事件驱动的LlamaIndex Workflows

点击上方蓝色字体,关注我们

今天来到这个系列学习的最后一个框架,来自同样是大名鼎鼎的LlamaIndex框架的最新特性:Workflows。相对于AutoGen的天然面向多智能体系统设计,LlamaIndx Workflows则更接近LangGraph,是一个更通用且强大的开发框架。本文内容:

  • LlamaIndex Workflows快速入门

  • 实现多智能体Demo

  • LlamaIndex Workflows特点总结


前四篇回顾:


01

LlamaIndex Workflows快速入门

LlamaIndex本身是一个强大的LLM应用底层开发框架,相对于LangChain,其更侧重在构建数据密集型的复杂RAG类应用,且更容易上手。

如果你期望构建一个强大的RAG应用,我们会更推荐LlamaIndex而非LangChain,当然两者也可以结合使用。

LlamaIndex在2024年下半年推出了一项重要特性,即LlamaIndex Workflows,显然这是一个与LangGraph针锋相对,用来构建与编排复杂AI工作流应用的重要更新,也是本文的主角。

【基本思想】

LlamaIndex Workflows是一个类似LangGraph的复杂AI工作流的编排与开发框架,包括高级的RAG流,以及多步骤的AI智能体与多智能体系统。其最大的特点是:

它采用事件驱动的设计模式来编排工作流。即每个任务步骤只需关心自身的输入事件与输出事件(Event),当一个步骤的输出事件与另一个步骤的输入事件匹配,这两个步骤就被自然的连接。因此你无需像LangGraph一样定义连接节点间的边(Edge)。

【核心概念】

  • Workflow(工作流)类似LangGraph的Graph。代表一个复杂的RAG或Agent工作流,实例化后可以直接运行(run)。


  • Step(步骤)类似LangGraph的Node。代表一个AI工作流的任务步骤,接收输入事件,并返回输出事件。


  • Event(事件)任务步骤的输入与输出对象,步骤间交换信息的载体。事件类型可以自定义,还有StartEvent与StopEvent两个系统事件类型。


  • Context(上下文)类似LangGraph的State。用来保存在工作流运行期间,多个任务步骤之间共享的上下文状态信息。


下面仍然以实现这个多智能体系统的Demo为例,来演示Workflows的使用:


02

实现多智能体Demo

在LlamaIndex中构建一个Agent,实际上有两种途径:

  • 直接借助Agent组件快速构建简单Agent

  • 借助Workflows构建复杂工作流Agent


现在我们需要构建一个多Agent系统,采用如下方案:

  • 使用ReActAgent构建两个简单的Agent

  • 使用Workflows来编排多Agent的协作


步骤如下:

1. 构建两个ReAct工作Agent

使用ReActAgent抽象创建两个Agent,这里模拟了两个简单工具(搜索天气与邮件发送)供Agent使用。

......
#模拟搜索
def search_weather(query: str) -> str:
    """用于搜索天气情况"""
    # Perform search logic here
    search_results = f"明天天气晴转多云,最高温度30度,最低温度23度。天气炎热,注意防晒哦。"
    return search_results

tool_search = FunctionTool.from_defaults(fn=search_weather)
agent_search = ReActAgent.from_tools([tool_search], llm=llm, verbose=True)

#模拟发送邮件
def send_email(subject: str, recipient: str, message: str) -> None:
    """用于发送电子邮件"""
    # Send email logic here
    print(f"邮件已发送至 {recipient},主题为 {subject},内容为 {message}")

tool_send_mail = FunctionTool.from_defaults(fn=send_email)
agent_send_mail = ReActAgent.from_tools([tool_send_mail], llm=llm, verbose=True)

2. 管理Agent提示词

Demo中的supervisor负责管理与分配任务,首先设计好提示词:

class TransferToAgent(BaseModel):
    """用来把特定的任务分配给特定的Agent"""

    agent_name: str
    agent_task: str

class TaskRecord(BaseModel):
    """用来记录每一次任务记录,包括执行的agent,输入与输出""

    agent: str
    input: str
    result: str

agent_context_str = """
agent_search: 仅用于网络搜索天气情况。
agent_send_mail: 仅用于发送电子邮件。
"""


DEFAULT_ORCHESTRATOR_PROMPT = """
你是一个任务编排专家。
你的工作是:
1. 根据用户的输入问题与任务执行历史决定下一步是否需要AI助手
2. 根据任务历史决定下一步AI助手的任务内容与输入信息
3. 如果用户任务已经完成,则结束任务
4. 如果无需借助AI助手,则自行处理任务并输出
以下是任务描述:
{task_description}
以下是你可以选择的AI助手:
{agent_context_str}
以下是任务历史:
{task_history_str}
请决定下一步动作。
"""

3. 编排workflow

现在来到核心环节,使用LlamaIndex Workflows来编排工作流程,这个工作流程设计图如下(蓝色为step,红色为event):

根据这里的workflow设计,流程中需要的事件定义如下

#触发supervisor
class PrepEvent(Event):
    pass

#触发搜索agent
class SearchEvent(Event):
    input: str
    pass

#触发邮件agent
class SendMailEvent(Event):
    input: str
    pass

现在来实现完整的Workflow,包括所有的Step及其执行逻辑:

class MultiAgentWorkflow(Workflow):

    def __init__(
        self,
        orchestrator_prompt: str | None = None,
        **kwargs: Any,
    )
-> None:

        super().__init__(**kwargs)
        self.orchestrator_prompt = orchestrator_prompt or DEFAULT_ORCHESTRATOR_PROMPT
        self.agent_search = agent_search
        self.agent_send_mail = agent_send_mail
        
    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:       
        await ctx.set("task_description", ev.input)
        return PrepEvent()

    @step
    async def orchestrator(
        self, ctx: Context, ev: PrepEvent
    )
 -> StopEvent | SearchEvent | SendMailEvent:

        
        print(Fore.GREEN + '\033[1m选择下一个任务Agent...\033[0m' + Fore.RESET)

        task_description=await ctx.get("task_description")
        task_history = await ctx.get("task_history", default=[])
        task_history_str = "\n".join([f"AI助手:{task_record.agent},输入:{task_record.input},输出:{task_record.result}" for task_record in task_history])

        try:
            system_prompt = self.orchestrator_prompt.format(
                task_description=task_description,
                agent_context_str=agent_context_str,
                task_history_str=task_history_str
            )

            llm_input = [ChatMessage(role="system", content=system_prompt)]
            tools = [get_function_tool(TransferToAgent)]

            response = await llm.achat_with_tools(tools, chat_history=llm_input)
            tool_calls = llm.get_tool_calls_from_response(
                response, error_on_no_tool_call=False
            )

            if len(tool_calls) == 0:
                print(Fore.GREEN + '\033[1m任务结束。\033[0m' + Fore.RESET)
                print(response.message.content)
                return StopEvent()
            
            tool_call = tool_calls[0]
            agent_name = tool_call.tool_kwargs["agent_name"]
            agent_task = tool_call.tool_kwargs["agent_task"]
    
            if agent_name == 'agent_search':
                return SearchEvent(input=agent_task)
            elif agent_name == 'agent_send_mail':
                return SendMailEvent(input=agent_task)
            else:
                return StopEvent()
            
        except Exception as e:
            print(e)
            return StopEvent()
        
    @step
    async def call_agent_search(
        self, ctx: Context, ev: SearchEvent
    )
 -> PrepEvent:

        
        print(Fore.GREEN + '\033[1m调用Search Agent...\033[0m' + Fore.RESET)

        response = self.agent_search.chat(ev.input)
        task_history = await ctx.get("task_history", default=[])
        task_history.append(TaskRecord(agent="agent_search", input=ev.input, result=response.response))
        await ctx.set("task_history", task_history)
        return PrepEvent()

    @step
    async def call_agent_send_mail(
        self, ctx: Context, ev: SendMailEvent
    )
 -> PrepEvent:

        
        print(Fore.GREEN + '\033[1m调用Email Agent...\033[0m' + Fore.RESET)

        response = self.agent_send_mail.chat(ev.input)
        task_history = await ctx.get("task_history", default=[])
        task_history.append(TaskRecord(agent="agent_send_mail", input=ev.input, result=response.response))
        await ctx.set("task_history", task_history)
        return PrepEvent()

基本过程解释如下:

1). 接收新输入(new_user_msg),并把任务放到Context。

2). 对任务做调度(orchestrator),将提示词、任务信息、可用的Agent信息、以及任务历史作为输入,由LLM来判断下一步分配的Agent及其任务。

这里为了能够获得结构化输出,使用了一个技巧:构建一个“转交”函数(使用get_function_tool)给LLM,利用LLM的函数调用能力来输出“转交”时需要的参数(agent_name,agent_task)。

3).两个具体的工作Agent对事件进行响应(call_agent_xx),并把任务执行信息放到task_history,以帮助下一次任务推理。

4). 循环推理,直至任务完成(orchestrator返回StopEvent)。

这里的例子中对单个工作Agent做了简化处理。但实际应用中,工作Agent可以是使用多个工具完成更复杂任务的Agent。

这里代码还可以进一步优化,比如可以把不同工作Agent的调用做成动态化而非硬编码,甚至可以动态加入新的工作Agent“成员”等,留给读者们自行扩展。

4. 执行Workflow

现在可以来运行这个workflow,并输入不同的任务信息。代码如下:

workflow = MultiAgentWorkflow(timeout=None)

async def main():
    res = await workflow.run(
            input="查询明天北京的天气情况,发送给ycp@test.com",
        )

if __name__ == "__main__":
    
    import asyncio
    asyncio.run(main())

看下这个任务的输出(注意观察绿色部分):

这个过程符合期望:管理Agent决策并选择工作Agent,工作Agent借助ReAct思维范式推理并完成具体任务;整个过程多次迭代直到任务结束。

如果把这里的input换成“创作一首中国春节的祝福小诗”,则输出如下,可以看到管理Agent会自行输出,这取决于之前设计的提示词。

至此,这个Demo的Workflow就开发完成。


03

LlamaIndex Workflows特点总结

如果从前面的四个多智能体框架(Swam、LangGraph、CrewAI、AutoGen)中选择一个最类似Workflows的,无疑是LangGraph。因此其也具备类似LangGraph的优缺点。两者的共同点是:

  • 通用LLM开发底层框架,并非仅面向多智能体

  • 都提供了基础Agent与复杂工作流Agent的不同构建组件

  • 都有强大的组件库与第三方兼容性

  • 灵活可定制,几乎适应任何应用场景

  • 学习曲线较陡峭(与CrewAI/AutoGen相比)

  • 都有配合的商业在线服务,如LlamaCloud,LlamaDeploy,Langsmith等


区别在于:

  • LlamaIndex相对更容易上手

  • LlamaIndex的基础框架对RAG管道更友好

  • 一个基于事件驱动工作流,一个采用Graph定义工作流


LlamaIndex是我们比较推荐的一个框架,其具备了强大的功能与可定制性,同时相比LangChain复杂性有所降低。因此如果你具备一定的应用开发能力,同时希望给开发的智能体保留较大的灵活性与扩展定制能力,可以考虑使用LlamaIndex Workflows,目前这个框架仍然在不断的完善之中。

福利时间


为了帮助LLM开发人员更好的、更系统性的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本500页的指南,与大家一起来深入到LLM应用开发的全新世界。

更多细节,点击如下链接了解

此处购买享50%折扣

END




交流请识别以下名片




请使用浏览器的分享功能分享到微信等