Dynamiq框架深度解析:构建可编排的AI智能体与RAG应用
2026/5/9 0:46:33 网站建设 项目流程

1. Dynamiq:一个面向AI应用开发者的编排框架深度解析

如果你正在构建基于大语言模型(LLM)的智能应用,无论是简单的聊天机器人,还是复杂的多智能体协作系统,你大概率会遇到一个共同的挑战:如何高效、优雅地组织和管理这些“智能体”之间的交互、数据流和状态?是写一堆胶水代码,还是自己造一个轮子?今天我想和你深入聊聊一个让我眼前一亮的开源框架——Dynamiq。它不是一个简单的SDK,而是一个专门为“智能体”和RAG应用设计的编排框架。简单来说,它帮你把LLM、工具、数据流和业务流程,像搭积木一样组合起来,让你能专注于业务逻辑本身,而不是底层的通信和调度。我花了些时间研究它的源码和设计理念,发现它在解决智能体应用“工程化”问题上,确实有不少独到之处。

2. 核心设计理念与架构拆解

2.1 为什么需要“编排框架”?

在深入Dynamiq之前,我们先明确一个概念:什么是编排?在传统的微服务架构里,编排指的是协调多个独立服务来完成一个业务流程,比如一个订单创建流程可能涉及库存服务、支付服务和物流服务。在AI应用里,这个“服务”变成了LLM调用、工具函数、数据检索、条件判断等。编排框架的核心价值,就是定义这些组件如何连接、数据如何流动、错误如何传递、状态如何管理。

没有编排框架时,我们通常的写法是线性的、命令式的。比如,先调用一个LLM生成计划,再根据计划调用搜索工具,最后把结果喂给另一个LLM总结。代码会很快变得冗长,且逻辑和流程耦合紧密,难以复用和测试。Dynamiq的核心理念是将这种“流程”声明式地定义为一个有向无环图,每个节点执行一个原子操作,节点之间通过清晰的输入输出依赖连接。这种设计带来了几个显著优势:可视化与可调试性(整个流程一目了然)、模块化与复用性(节点可以像乐高一样拼装)、并发与异步优化(框架可以自动识别可并行执行的节点)。

2.2 Dynamiq的核心抽象:节点、流与工作流

Dynamiq的架构围绕几个核心抽象构建,理解它们就理解了整个框架。

节点:这是最基本的执行单元。一个节点可以是一个LLM模型(如OpenAI的GPT)、一个工具(如代码解释器、搜索引擎)、一个数据转换器(如PDF解析、文本分割),甚至是一个自定义的Python函数。每个节点都有明确的输入和输出接口。Dynamiq内置了丰富的节点类型,覆盖了AI应用开发的常见需求。

:流是一个有向无环图,由多个节点和它们之间的依赖关系构成。它定义了数据从输入到最终输出的完整路径。在Dynamiq中,你可以通过.depends_on()方法显式声明节点间的依赖,也可以通过.inputs()方法将上游节点的输出映射到当前节点的输入。这种声明式的方式让复杂的多步骤流程变得清晰。

工作流:工作流是流的执行器。它负责接收初始输入,按照流定义的图结构调度各个节点的执行,处理节点间的数据传递,并最终返回结果。Workflow类是这个过程的封装,提供了run方法来触发整个流程。

这种分层设计非常清晰:你定义节点(做什么),用流组织节点(怎么做),最后用工作流来运行它(执行)。这种分离使得单元测试(测试单个节点)、集成测试(测试一个流)和系统部署都变得更加容易。

3. 从零开始:基础安装与核心概念上手

3.1 环境准备与安装

Dynamiq是一个Python框架,因此你需要一个Python环境。官方要求Python 3.10+,我建议使用3.10或更高版本以获得最佳兼容性。安装非常简单,直接使用pip即可:

pip install dynamiq

如果你想从源码安装或参与贡献,项目使用Poetry进行依赖管理。克隆仓库后运行poetry install即可搭建开发环境。

git clone https://github.com/dynamiq-ai/dynamiq.git cd dynamiq poetry install

安装完成后,一个重要的准备工作是配置你的API密钥。Dynamiq本身不提供模型服务,它需要连接外部的服务,比如OpenAI、Pinecone、E2B等。通常的做法是通过环境变量来管理这些密钥,这样既安全又方便。例如,在终端中设置:

export OPENAI_API_KEY='your-openai-api-key-here' export PINECONE_API_KEY='your-pinecone-api-key-here'

在你的代码中,Dynamiq的Connection对象会去读取这些环境变量,或者你也可以在初始化时直接传入。

注意:在实际生产部署中,切勿将API密钥硬编码在代码中。务必使用环境变量、密钥管理服务(如AWS Secrets Manager、HashiCorp Vault)或配置文件(并确保.gitignore排除了该配置文件)。

3.2 第一个示例:理解最简单的LLM调用

让我们从一个最简单的例子开始,看看Dynamiq是如何工作的。这个例子实现了一个文本翻译功能。

from dynamiq.nodes.llms.openai import OpenAI from dynamiq.connections import OpenAI as OpenAIConnection from dynamiq.prompts import Prompt, Message # 1. 定义提示词模板 prompt_template = """ Translate the following text into English: {{ text }} """ prompt = Prompt(messages=[Message(content=prompt_template, role="user")]) # 2. 创建LLM节点 llm = OpenAI( id="openai_translator", # 节点唯一标识符,用于在复杂流程中引用 connection=OpenAIConnection(api_key="OPENAI_API_KEY"), # 连接配置,这里传入了密钥 model="gpt-4o", # 指定使用的模型 temperature=0.3, # 控制输出的随机性,0.3比较确定 max_tokens=1000, # 限制生成的最大token数 prompt=prompt # 绑定我们定义的提示词 ) # 3. 运行节点 result = llm.run( input_data={ "text": "Hola Mundo!" # 这是传递给模板中 `{{ text }}` 变量的值 } ) # 4. 获取结果 print(result.output) # 输出: Hello World!

这个例子虽然简单,但包含了Dynamiq的几个关键概念:

  • 节点创建OpenAI类是一个节点。我们配置了它的参数(模型、温度等)。
  • 连接OpenAIConnection封装了与OpenAI API通信的细节,包括认证。
  • 提示词管理PromptMessage对象帮助结构化你的提示词,支持模板变量({{ text }})。
  • 运行与输出:调用节点的run方法,传入输入数据。输出是一个结构化的对象,通过.output属性获取LLM生成的内容。

你可能觉得这比直接调用openai.ChatCompletion.create多了几步。确实,对于单一调用,优势不明显。但请记住,这只是构建复杂流程的“原子”。当多个这样的原子需要以特定顺序和逻辑组合时,Dynamiq的价值才会真正体现。

4. 智能体构建:从单智能体到复杂编排

4.1 构建一个具备工具使用能力的ReAct智能体

智能体的核心能力之一是使用工具。Dynamiq让为智能体装备工具变得非常简单。下面我们构建一个能使用E2B代码解释器工具来解决数学问题的智能体。

import asyncio from dynamiq.nodes.llms.openai import OpenAI from dynamiq.connections import OpenAI as OpenAIConnection, E2B as E2BConnection from dynamiq.nodes.agents import Agent from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool # 1. 初始化工具节点 e2b_tool = E2BInterpreterTool( connection=E2BConnection(api_key="E2B_API_KEY") # 需要E2B的API密钥 ) # 2. 配置LLM(智能体的大脑) llm = OpenAI( id="openai_agent_brain", connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="gpt-4o", temperature=0.3, max_tokens=1000, ) # 3. 创建智能体,并赋予它工具 agent = Agent( name="math-problem-solver", llm=llm, # 指定使用的LLM tools=[e2b_tool], # 将工具列表传给智能体 role="Senior Data Scientist skilled in mathematical reasoning and Python coding.", # 定义智能体角色 max_loops=10, # 限制最大思考循环次数,防止无限循环 ) async def run_async_agent(): # 4. 异步运行智能体 result = await agent.run( input_data={ "input": "Add the first 10 natural numbers and tell me if the sum is a prime number.", } ) # 5. 解析并打印结果 print(result.output.get("content")) if __name__ == "__main__": asyncio.run(run_async_agent())

在这个例子中,智能体接收到问题后,会遵循ReAct(推理-行动)模式:先思考(Reasoning),决定需要调用代码解释器工具,然后行动(Act)调用工具执行Python代码(计算1到10的和为55,并判断是否为质数),最后观察(Observe)工具返回的结果,并给出最终答案。max_loops参数至关重要,它限制了智能体“思考-行动”循环的最大次数,是一个重要的安全护栏,防止智能体在无法解决问题时陷入死循环。

4.2 工作流编排:并行与串行执行

单个智能体能力有限,真实的业务场景往往需要多个智能体或处理节点协作。Dynamiq的WorkflowFlow就是为这种协作而生的。它允许你以图形化的思维来设计流程。

场景一:并行处理。假设我们有一个问题,既需要专业的解答,又需要诗意的改写。我们可以让两个智能体同时工作。

from dynamiq import Workflow from dynamiq.nodes.llms import OpenAI from dynamiq.connections import OpenAI as OpenAIConnection from dynamiq.nodes.agents import Agent # 共享的LLM配置 llm = OpenAI( connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="gpt-4o", temperature=0.1, # 对于确定性任务,温度可以设低一些 ) # 专家智能体 expert_agent = Agent( name="Expert Agent", llm=llm, role="A physics professor who explains concepts clearly and accurately.", id="expert", max_loops=3 ) # 诗人智能体 poet_agent = Agent( name="Poetic Rewriter Agent", llm=llm, role="A poet who can transform scientific explanations into beautiful verses without losing the core meaning.", id="poet", max_loops=3 ) # 创建工作流,并添加节点。默认情况下,没有依赖关系的节点会被并行执行。 wf = Workflow() wf.flow.add_nodes([expert_agent, poet_agent]) # 同时添加两个节点 # 运行工作流,两个智能体同时处理同一个输入 result = wf.run( input_data={"input": "How are sin(x) and cos(x) connected in electrodynamics?"}, ) # 分别获取两个智能体的输出 expert_answer = result.output[expert_agent.id].get("output").get('content') poetic_answer = result.output[poet_agent.id].get("output").get('content') print(f"Expert Answer:\n{expert_answer}\n") print(f"Poetic Version:\n{poetic_answer}\n")

场景二:串行处理(管道)。更常见的是,一个任务的输出是下一个任务的输入。比如,先让一个智能体做研究,再让另一个智能体根据研究结果写报告。

from dynamiq import Workflow from dynamiq.nodes.llms import OpenAI from dynamiq.connections import OpenAI as OpenAIConnection from dynamiq.nodes.agents import Agent from dynamiq.nodes.node import InputTransformer, NodeDependency llm = OpenAI( connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="gpt-4o", temperature=0.1, ) # 研究员智能体 researcher = Agent( name="Researcher", llm=llm, role="Find and summarize key information from the given topic.", id="researcher", max_loops=5 ) # 报告员智能体,它依赖于研究员 reporter = Agent( name="Reporter", llm=llm, role="Write a formal report based on the provided research summary.", id="reporter", depends=[NodeDependency(researcher)], # 关键:声明依赖关系 # 关键:定义输入转换器,将研究员的输出内容作为本节点的输入 input_transformer=InputTransformer( selector={"input": f"${[researcher.id]}.output.content"} ), max_loops=5 ) wf = Workflow() wf.flow.add_nodes([researcher, reporter]) # 由于声明了依赖,工作流会自动先执行researcher,将其输出处理后,再作为输入传递给reporter。 result = wf.run( input_data={"input": "Recent advancements in fusion energy."}, ) final_report = result.output[reporter.id].get("output").get('content') print(final_report)

这里有两个关键机制:

  1. depends:明确告诉Dynamiq,reporter节点必须在researcher节点执行完成后才能开始。
  2. input_transformer:定义了如何将上游节点的输出,映射到当前节点期望的输入格式。f"${[researcher.id]}.output.content"是一个模板字符串,意思是“取researcher节点输出对象中output.content字段的值”。这种声明式数据绑定极大地简化了节点间的数据传递。

4.3 高级编排:多智能体协作与图编排器

对于更复杂的、动态的流程,比如需要根据中间结果决定下一步走向,Dynamiq提供了更强大的GraphOrchestrator。它允许你定义状态和状态之间的转移条件,实现真正的业务流程编排。

让我们看一个邮件起草与反馈循环的例子:

from typing import Any from dynamiq.connections import OpenAI as OpenAIConnection from dynamiq.nodes.agents.orchestrators.graph import END, START, GraphOrchestrator from dynamiq.nodes.agents.orchestrators.graph_manager import GraphAgentManager from dynamiq.nodes.agents import Agent from dynamiq.nodes.llms import OpenAI llm = OpenAI( connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="gpt-4o", temperature=0.1, ) # 定义邮件撰写智能体 email_writer = Agent( name="email-writer-agent", llm=llm, role="Write personalized emails taking into account feedback.", ) # 定义一个自定义函数节点,用于收集用户反馈 def gather_feedback(context: dict[str, Any], **kwargs): """收集关于邮件草稿的反馈。""" # 从上下文中获取最新的邮件草稿 latest_draft = context.get('history', [{}])[-1].get('content', 'No draft') feedback = input( f"Email draft:\n{latest_draft}\n" f"Type 'SEND' to send, 'CANCEL' to exit, or provide feedback to refine: \n" ) reiterate = True result = f"Gathered feedback: {feedback}" feedback = feedback.strip().lower() if feedback == "send": print("####### Email was sent! #######") result = "Email was sent!" reiterate = False elif feedback == "cancel": print("####### Email was canceled! #######") result = "Email was canceled!" reiterate = False # 返回结果和是否迭代的标志 return {"result": result, "reiterate": reiterate} # 定义一个路由函数,根据反馈决定下一个状态 def router(context: dict[str, Any], **kwargs): """根据提供的反馈决定下一个状态。""" if context.get("reiterate", False): return "generate_sketch" # 需要重写,返回起草状态 return END # 结束流程 # 创建图编排器 orchestrator = GraphOrchestrator( name="Email Drafting Orchestrator", manager=GraphAgentManager(llm=llm), # 管理器,可以处理智能体调用 ) # 添加状态:每个状态绑定一个或多个任务(节点) orchestrator.add_state_by_tasks("generate_sketch", [email_writer]) # 状态1:生成草稿 orchestrator.add_state_by_tasks("gather_feedback", [gather_feedback]) # 状态2:收集反馈 # 定义状态转移边 orchestrator.add_edge(START, "generate_sketch") # 从开始到生成草稿 orchestrator.add_edge("generate_sketch", "gather_feedback") # 从草稿到收集反馈 # 条件边:根据`router`函数的返回值,决定从“收集反馈”状态是跳回“生成草稿”还是结束 orchestrator.add_conditional_edge("gather_feedback", ["generate_sketch", END], router) if __name__ == "__main__": print("Welcome to the email writer.") email_details = input("Provide email details: ") orchestrator.run(input_data={"input": f"Write an email: {email_details}"})

这个例子展示了一个交互式、带循环的流程:

  1. 流程从START开始,进入generate_sketch状态,执行email_writer智能体生成第一版邮件。
  2. 自动进入gather_feedback状态,执行gather_feedback函数,与用户交互。
  3. 根据用户输入(SEND, CANCEL, 或其他反馈),router函数决定下一步:
    • 如果是“其他反馈”,返回generate_sketch,流程跳回第1步,智能体根据反馈重写邮件。
    • 如果是SEND或CANCEL,返回END,流程结束。 这种基于图的编排方式非常适合实现复杂的、有分支和循环的业务逻辑,比如客服对话流、多轮审核流程等。

5. 构建生产级RAG应用:从文档索引到智能检索

检索增强生成是当前LLM应用的核心模式之一。Dynamiq为RAG提供了端到端的支持,将文档加载、解析、分块、向量化、存储、检索和生成等多个步骤封装成可编排的节点。

5.1 文档索引流程:构建你的知识库

首先,我们需要将原始文档(如PDF、Word)处理成向量并存入向量数据库。这是一个典型的预处理流水线。

from io import BytesIO from dynamiq import Workflow from dynamiq.connections import OpenAI as OpenAIConnection, Pinecone as PineconeConnection from dynamiq.nodes.converters import PyPDFConverter from dynamiq.nodes.splitters.document import DocumentSplitter from dynamiq.nodes.embedders import OpenAIDocumentEmbedder from dynamiq.nodes.writers import PineconeDocumentWriter # 1. 创建工作流 rag_wf = Workflow() # 2. PDF转换节点:将PDF文件按页转换为文档对象 converter = PyPDFConverter(document_creation_mode="one-doc-per-page") rag_wf.flow.add_nodes(converter) # 3. 文档分割节点:将长文档切成适合嵌入的小块 document_splitter = ( DocumentSplitter( split_by="sentence", # 按句子分割 split_length=10, # 每块包含10个句子 split_overlap=1, # 块之间重叠1个句子,保持上下文连贯 ) .inputs(documents=converter.outputs.documents) # 输入来自转换器 .depends_on(converter) # 依赖于转换器 ) rag_wf.flow.add_nodes(document_splitter) # 4. 文本嵌入节点:将文本块转换为向量 embedder = ( OpenAIDocumentEmbedder( connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="text-embedding-3-small", # 使用OpenAI的嵌入模型 ) .inputs(documents=document_splitter.outputs.documents) .depends_on(document_splitter) ) rag_wf.flow.add_nodes(embedder) # 5. 向量存储节点:将向量和元数据写入Pinecone vector_store = ( PineconeDocumentWriter( connection=PineconeConnection(api_key="PINECONE_API_KEY"), index_name="my_knowledge_base", # 你的Pinecone索引名称 dimension=1536, # 必须与嵌入模型维度匹配,text-embedding-3-small是1536维 ) .inputs(documents=embedder.outputs.documents) # 输入是已嵌入的文档 .depends_on(embedder) ) rag_wf.flow.add_nodes(vector_store) # 6. 准备输入数据 file_paths = ["financial_report_2023.pdf", "product_manual.pdf"] input_data = { "files": [ BytesIO(open(path, "rb").read()) for path in file_paths ], "metadata": [ # 为每个文件附加元数据,便于后续过滤检索 {"filename": path, "source": "internal_wiki", "year": 2023} for path in file_paths ], } # 7. 执行索引流程 rag_wf.run(input_data=input_data) print("Document indexing completed!")

这个流程清晰地展示了Dynamiq的管道式编程风格。每个节点职责单一,通过.inputs().depends_on()连接,形成了一个从原始文件到向量存储的完整数据流。如果你想增加新的处理步骤,比如文本清洗、关键词提取,只需要在中间插入相应的节点即可。

实操心得:分块策略的选择split_bysplit_length对RAG效果影响巨大。对于技术文档,按句子或段落分割效果较好。split_overlap设置为1或2可以避免答案恰好被切分到两个块边界的问题。最佳参数需要根据你的文档类型和查询特点进行实验。

5.2 检索与问答流程:从知识库获取答案

索引完成后,我们就可以构建检索问答链了。

from dynamiq import Workflow from dynamiq.connections import OpenAI as OpenAIConnection, Pinecone as PineconeConnection from dynamiq.nodes.embedders import OpenAITextEmbedder from dynamiq.nodes.retrievers import PineconeDocumentRetriever from dynamiq.nodes.llms import OpenAI from dynamiq.prompts import Message, Prompt # 1. 创建检索工作流 retrieval_wf = Workflow() # 2. 共享OpenAI连接(用于嵌入和生成) openai_connection = OpenAIConnection(api_key="OPENAI_API_KEY") # 3. 查询嵌入节点:将用户问题转换为向量 query_embedder = OpenAITextEmbedder( connection=openai_connection, model="text-embedding-3-small", ) retrieval_wf.flow.add_nodes(query_embedder) # 4. 向量检索节点:在Pinecone中查找相似文档 document_retriever = ( PineconeDocumentRetriever( connection=PineconeConnection(api_key="PINECONE_API_KEY"), index_name="my_knowledge_base", dimension=1536, top_k=5, # 返回最相似的5个文档块 # 可选:添加元数据过滤,例如只检索特定来源的文档 # filter={"source": "internal_wiki"} ) .inputs(embedding=query_embedder.outputs.embedding) # 输入是查询向量 .depends_on(query_embedder) ) retrieval_wf.flow.add_nodes(document_retriever) # 5. 定义提示词模板,用于将检索到的上下文和问题组合 prompt_template = """ You are a helpful assistant. Answer the question based ONLY on the provided context. If the context does not contain the answer, say "I cannot find the answer in the provided documents." Question: {{ query }} Context: {% for document in documents %} - {{ document.content }} {% endfor %} Answer: """ prompt = Prompt(messages=[Message(content=prompt_template, role="user")]) # 6. 答案生成节点:LLM根据上下文生成最终答案 answer_generator = ( OpenAI( connection=openai_connection, model="gpt-4o", prompt=prompt, temperature=0.1, # 对于事实性问答,温度宜低 ) .inputs( documents=document_retriever.outputs.documents, # 检索到的文档 query=query_embedder.outputs.query, # 原始问题 ) .depends_on([document_retriever, query_embedder]) ) retrieval_wf.flow.add_nodes(answer_generator) # 7. 运行问答流程 question = "What were the key financial highlights mentioned in the 2023 report?" result = retrieval_wf.run(input_data={"query": question}) # 8. 提取并打印答案 answer = result.output.get(answer_generator.id).get("output", {}).get("content") print(f"Q: {question}") print(f"A: {answer}")

这个流程就是RAG的经典模式:问句嵌入 -> 向量检索 -> 提示词构建 -> LLM生成。Dynamiq将其标准化、模块化了。你可以轻松地替换其中的组件,比如把Pinecone换成Weaviate或Chroma,把OpenAI Embedding换成BGE或Jina的模型,只需更换对应的节点即可,流程主体不变。

6. 状态管理与高级特性

6.1 为智能体添加记忆

对于聊天机器人等交互式应用,记忆(上下文管理)是必不可少的。Dynamiq提供了Memory模块来管理对话历史。

from dynamiq.connections import OpenAI as OpenAIConnection from dynamiq.memory import Memory from dynamiq.memory.backends.in_memory import InMemory from dynamiq.nodes.agents import Agent from dynamiq.nodes.llms import OpenAI AGENT_ROLE = "You are a helpful and knowledgeable assistant." llm = OpenAI( connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="gpt-4o", temperature=0.7, # 对话可以稍具创造性 ) # 创建内存后端。这里使用内存存储,生产环境可用Redis、PostgreSQL等持久化后端。 memory = Memory(backend=InMemory()) agent = Agent( name="Chatbot", llm=llm, role=AGENT_ROLE, id="chat_agent", memory=memory, # 为智能体装配记忆模块 ) def chat(): print("Chat started. Type 'exit' to end.") user_id = "user_123" # 用户ID,用于区分不同用户的记忆 session_id = "session_001" # 会话ID,用于区分同一用户的不同对话 while True: user_input = input("You: ") if user_input.lower() == 'exit': break # 运行智能体,传入用户ID和会话ID,记忆模块会自动关联历史 response = agent.run({ "input": user_input, "user_id": user_id, "session_id": session_id }) ai_response = response.output.get("content") print(f"AI: {ai_response}") if __name__ == "__main__": chat()

Memory模块会自动将当前对话的上下文(包括历史消息)添加到发给LLM的提示词中,从而实现多轮对话。后端是可插拔的,InMemory后端只保存在进程内存中,重启即丢失。对于生产环境,可以考虑使用RedisMemoryBackend或实现自定义的后端连接到数据库。

6.2 工具调用与多智能体调度

在复杂的多智能体系统中,一个智能体可以将任务“外包”给其他智能体,形成管理者-工作者模式。Dynamiq通过将智能体本身作为工具暴露给其他智能体来实现这一点。

from dynamiq import Workflow from dynamiq.connections import OpenAI as OpenAIConnection, ScaleSerp as ScaleSerpConnection from dynamiq.flows import Flow from dynamiq.nodes.agents import Agent from dynamiq.nodes.llms import OpenAI from dynamiq.nodes.tools.scale_serp import ScaleSerpTool from dynamiq.nodes.types import Behavior, InferenceMode llm = OpenAI( connection=OpenAIConnection(api_key="OPENAI_API_KEY"), model="gpt-4o", temperature=0.1, ) # 工具:网络搜索 search_tool = ScaleSerpTool(connection=ScaleSerpConnection(api_key="SCALESERP_API_KEY")) # 研究员智能体:负责搜索和整理信息 research_agent = Agent( name="Research Analyst", role="Find recent market news and provide referenced highlights.", llm=llm, tools=[search_tool], # 可以使用搜索工具 inference_mode=InferenceMode.XML, # 使用XML模式进行结构化推理 max_loops=6, behaviour_on_max_loops=Behavior.RETURN, # 达到最大循环后返回当前结果,而不是报错 ) # 写作智能体:负责润色报告 writer_agent = Agent( name="Brief Writer", role="Turn research highlights into a concise executive brief.", llm=llm, inference_mode=InferenceMode.XML, max_loops=4, behaviour_on_max_loops=Behavior.RETURN, ) # 经理智能体:协调下属智能体工作 manager_agent = Agent( name="Manager", role=( "You are a manager. Delegate research and writing tasks to your sub-agents.\n" "First, ask the research_agent to find information.\n" "Then, ask the writer_agent to write a brief based on the research results.\n" "Finally, compile the final answer." ), llm=llm, tools=[research_agent, writer_agent], # 关键:将其他智能体作为工具! inference_mode=InferenceMode.XML, parallel_tool_calls_enabled=True, # 允许并行调用工具,提高效率 max_loops=8, behaviour_on_max_loops=Behavior.RETURN, ) # 创建工作流,只需运行经理智能体即可 workflow = Workflow(flow=Flow(nodes=[manager_agent])) result = workflow.run( input_data={"input": "Summarize the latest developments in battery technology for investors."}, ) # 经理智能体的输出包含了最终整合的报告 print(result.output[manager_agent.id]["output"]["content"])

在这个架构中,manager_agent并不直接做研究或写作,而是将research_agentwriter_agent作为工具来调用。它根据任务决定调用哪个子智能体,并整合它们的结果。这种模式非常强大,可以构建出层次清晰、职责分明的多智能体系统。parallel_tool_calls_enabled=True是一个性能优化选项,如果任务间没有依赖,经理可以同时下达指令,让研究和写作智能体并行工作。

7. 实战经验、避坑指南与性能优化

经过一段时间的实际项目使用,我积累了一些关于Dynamiq的实战经验和需要注意的地方。

7.1 错误处理与调试

在复杂的流程中,错误可能发生在任何节点。Dynamiq会捕获节点运行时的异常,但如何优雅地处理和调试是关键。

1. 启用详细日志:在开发阶段,建议设置详细的日志记录,以查看每个节点的输入、输出和执行状态。

import logging logging.basicConfig(level=logging.DEBUG)

这会在控制台输出大量信息,帮助你追踪数据流和定位问题节点。

2. 结果检查workflow.run()返回的result对象包含每个节点的执行状态和输出。即使整体流程成功,也应检查关键节点的输出是否符合预期。

result = wf.run(input_data=...) for node_id, node_result in result.output.items(): if node_result.get(“status”) == “error”: print(f“Node {node_id} failed: {node_result.get(‘error’)}”) # 也可以检查中间数据 # print(f“Node {node_id} output: {node_result.get(‘output’)}”)

3. 使用断点与单元测试:由于每个节点都是相对独立的,你可以单独实例化并运行一个节点进行测试,这比调试整个工作流要简单得多。为关键节点编写单元测试是保证稳定性的好方法。

7.2 性能优化要点

1. 并发执行:Dynamiq会自动分析节点依赖图,对没有依赖关系的节点进行并发执行。在定义工作流时,要仔细规划节点间的依赖,将可以并行的任务拆分开,以充分利用多核CPU或异步IO的优势。例如,在RAG流程中,如果需要对多个独立的文档源进行索引,可以为每个源创建独立的“转换-分割-嵌入”子链,然后将它们的结果同时输入到同一个向量存储节点。

2. 连接复用:像OpenAIConnection这样的连接对象,如果多个节点使用相同的配置(如API密钥、Base URL),应该复用同一个实例,而不是为每个节点都创建一个。这有助于内部连接池的管理。

3. 缓存中间结果:对于计算成本高且输入不变的操作,例如文档嵌入,可以考虑引入缓存机制。Dynamiq本身不提供内置缓存,但你可以通过包装节点或使用外部缓存(如diskcacheredis)来实现。例如,可以创建一个自定义的缓存嵌入节点,先检查哈希后的文本是否在缓存中,有则直接返回,无则调用OpenAI并存储结果。

4. 批量处理:对于嵌入和LLM调用,尽可能采用批量处理。Dynamiq的一些节点(如OpenAIDocumentEmbedder)内部可能已经做了批量优化。在构建自己的自定义节点时,如果涉及网络请求,也应考虑批量处理以减少请求次数。

7.3 常见问题与解决方案

问题1:节点依赖循环导致死锁。

  • 现象:工作流无法启动或卡住。
  • 原因:在定义.depends_on()时,不小心创建了循环依赖(A依赖B,B又依赖A)。
  • 解决:在设计流程时,确保依赖关系是有向无环的。画一个简单的节点图有助于理清关系。Dynamiq在构建流时会进行检查,但自己提前规划更重要。

问题2:智能体陷入无限循环。

  • 现象:智能体不停地调用工具或思考,不输出最终答案。
  • 原因max_loops设置过高,或者提示词/角色设定未能引导智能体达成“最终答案”状态。
  • 解决:首先,合理设置max_loops(通常5-10次足够)。其次,在智能体的role描述中明确其目标和终止条件,例如“当你得到最终答案时,请以‘最终答案是:’开头输出”。

问题3:向量检索结果不相关。

  • 现象:RAG回答质量差,经常“胡言乱语”或回答“未找到”。
  • 原因:文档分块策略不当、嵌入模型不匹配、或检索top_k参数不合适。
  • 解决
    • 分块:尝试不同的split_by(字符、句子、段落)和split_length。技术文档可能适合按段落,代码可能适合按函数。
    • 嵌入模型:确保索引和检索时使用完全相同的嵌入模型。
    • 检索参数:调整top_k。太小可能遗漏关键信息,太大可能引入噪声。可以尝试使用“重排序”技术,先用较大的top_k召回,再用一个更小的模型对结果进行相关性重排。
    • 元数据过滤:在检索时使用filter参数,根据文档来源、类型、时间等进行筛选,提高精度。

问题4:提示词模板变量渲染错误。

  • 现象:运行时报错,提示变量未定义或模板语法错误。
  • 原因:在Prompt模板中使用了{{ variable }},但在运行节点时,input_data中没有提供对应的键值对。
  • 解决:仔细检查节点run方法传入的input_data字典,确保其包含了模板中所有需要的变量。使用调试器或打印语句查看input_data的内容。

7.4 生产部署考量

1. 配置管理:将所有配置(API密钥、模型名称、温度、最大token数等)外部化,使用配置文件(如YAML、JSON)或环境变量管理。避免在代码中硬编码。

2. 可观测性:在生产环境中,需要监控工作流的执行耗时、成功率、各节点性能以及API调用成本。考虑集成像Prometheus、OpenTelemetry这样的监控工具,或在关键节点添加日志记录和指标上报。

3. 版本控制:你的工作流定义(即Python代码)应该进行版本控制。同时,考虑对提示词模板、智能体角色描述等也进行独立版本管理,便于回滚和A/B测试。

4. 弹性与重试:网络调用和外部API服务可能失败。为可能失败的节点(特别是LLM调用和外部工具调用)实现重试机制。Dynamiq节点本身可以包装在具有重试逻辑的装饰器中,或者使用像tenacity这样的重试库。

5. 成本控制:LLM API调用是主要成本。在开发阶段,可以使用更便宜的模型(如gpt-3.5-turbo)进行流程验证。在生产环境,对于不同的任务选择合适的模型(例如,嵌入用text-embedding-3-small,简单分类用gpt-3.5-turbo,复杂推理用gpt-4o)。同时,监控token使用量,设置预算和告警。

Dynamiq作为一个快速发展的框架,其生态和最佳实践也在不断演进。将它引入项目时,建议从小而具体的流程开始,逐步验证其稳定性和效果,再扩展到更复杂的业务场景中。它的声明式、模块化设计,确实能显著提升AI应用开发的效率和可维护性。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询