AI Agent 编排系统:从线性流程到事件驱动架构的演进
2026/6/6 23:16:29 网站建设 项目流程

AI Agent 编排系统:从线性流程到事件驱动架构的演进

一、从单 Agent 到多 Agent 协作:复杂任务的编排困境

在 AI 产品的早期阶段,我们使用简单的单 Agent 架构就能满足需求。用户提出一个问题,Agent 调用一次 LLM 就给出答案。但随着功能不断丰富,我们发现越来越多的任务需要多个步骤协同完成:数据分析需要先查询数据库,再进行计算,最后生成图表;客户支持需要先理解意图,检索知识库,再调用 CRM 系统。

最初我们用简单的 Python 脚本串联这些步骤,但很快就遇到了问题:流程硬编码在代码中,修改需要重新部署;错误处理逻辑分散在各个步骤中,难以维护;没有可视化的流程设计工具,产品经理无法参与流程设计。

这些问题并非我们独有,几乎所有从简单对话机器人向复杂工作流演进的团队都会遇到。技术如果不服务于真实的业务灵活性,那只是僵硬的代码堆砌。我们需要一套可编排、可监控、可调试的 Agent 协作框架。

二、事件驱动的 Agent 编排架构设计:解耦与弹性的核心

flowchart TB subgraph 接入层 A[用户请求] --> B[API Gateway] B --> C[事件总线] end subgraph 编排层 C --> D[Workflow Engine] D --> E[状态管理器] E --> F[任务调度器] end subgraph 执行层 F --> G[Agent 1] F --> H[Agent 2] F --> I[Agent 3] G --> J[工具调用] H --> J I --> J end subgraph 持久化层 K[状态存储] L[执行日志] M[监控指标] end E --> K G --> L H --> L I --> L G --> M H --> M I --> M

2.1 工作流定义与 DSL 设计

我们设计了一套简单的领域特定语言(DSL)来定义 Agent 工作流,让产品经理和开发者都能参与流程设计。工作流由任务、条件分支、循环等基本元素组成:

from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Callable from enum import Enum import json class TaskType(Enum): AGENT = "agent" TOOL = "tool" CONDITION = "condition" PARALLEL = "parallel" LOOP = "loop" @dataclass class Task: id: str type: TaskType name: str config: Dict[str, Any] = field(default_factory=dict) dependencies: List[str] = field(default_factory=list) on_error: Optional[str] = None # 错误处理策略 @dataclass class Workflow: id: str name: str tasks: List[Task] entry_point: str version: str = "1.0" class WorkflowDefinitionLoader: """工作流定义加载器""" @staticmethod def from_dict(data: Dict[str, Any]) -> Workflow: """从字典加载工作流定义""" tasks = [ Task( id=t["id"], type=TaskType(t["type"]), name=t["name"], config=t.get("config", {}), dependencies=t.get("dependencies", []), on_error=t.get("on_error") ) for t in data["tasks"] ] return Workflow( id=data["id"], name=data["name"], tasks=tasks, entry_point=data["entry_point"], version=data.get("version", "1.0") ) @staticmethod def from_json(json_str: str) -> Workflow: """从 JSON 字符串加载""" return WorkflowDefinitionLoader.from_dict(json.loads(json_str)) # 工作流定义示例 workflow_def = { "id": "data_analysis", "name": "数据分析工作流", "entry_point": "query_data", "tasks": [ { "id": "query_data", "type": "tool", "name": "查询数据库", "config": {"tool": "database_query"}, "dependencies": [] }, { "id": "analyze_data", "type": "agent", "name": "数据分析 Agent", "config": {"agent_id": "data_analyst"}, "dependencies": ["query_data"] }, { "id": "generate_chart", "type": "tool", "name": "生成图表", "config": {"tool": "chart_generator"}, "dependencies": ["analyze_data"] }, { "id": "generate_report", "type": "agent", "name": "报告生成 Agent", "config": {"agent_id": "report_writer"}, "dependencies": ["generate_chart"] } ] } workflow = WorkflowDefinitionLoader.from_dict(workflow_def)

2.2 状态机与持久化设计

工作流执行是一个状态机,每个任务都有明确的生命周期:待执行、执行中、成功、失败、已取消。我们使用 Redis 缓存当前状态,使用 PostgreSQL 持久化完整的执行历史:

import uuid from datetime import datetime from enum import Enum from dataclasses import dataclass, field from typing import Dict, Any, Optional class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" CANCELLED = "cancelled" @dataclass class TaskExecution: id: str task_id: str workflow_id: str execution_id: str status: TaskStatus input_data: Dict[str, Any] output_data: Optional[Dict[str, Any]] = None error: Optional[str] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None @dataclass class WorkflowExecution: id: str workflow_id: str status: TaskStatus tasks: Dict[str, TaskExecution] = field(default_factory=dict) input_data: Dict[str, Any] = field(default_factory=dict) output_data: Optional[Dict[str, Any]] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None class StateManager: """状态管理器,负责工作流和任务状态的持久化""" def __init__(self, redis_client, db_connection): self.redis = redis_client self.db = db_connection def create_workflow_execution(self, workflow_id: str, input_data: Dict[str, Any]) -> str: """创建工作流执行实例""" execution_id = str(uuid.uuid4()) execution = WorkflowExecution( id=execution_id, workflow_id=workflow_id, status=TaskStatus.PENDING, input_data=input_data, started_at=datetime.now() ) # 保存到 Redis 缓存 cache_key = f"workflow:{execution_id}" self.redis.setex(cache_key, 3600, json.dumps({ "id": execution.id, "workflow_id": execution.workflow_id, "status": execution.status.value, "started_at": execution.started_at.isoformat() })) return execution_id def update_task_status(self, execution_id: str, task_id: str, status: TaskStatus, output: Optional[Dict] = None, error: Optional[str] = None): """更新任务状态""" # 更新缓存 cache_key = f"workflow:{execution_id}:task:{task_id}" update_data = { "status": status.value, "updated_at": datetime.now().isoformat() } if output: update_data["output"] = output if error: update_data["error"] = error self.redis.hset(cache_key, mapping=update_data) def get_ready_tasks(self, execution_id: str, workflow: Workflow) -> List[Task]: """获取可以执行的任务(所有依赖已完成)""" ready_tasks = [] for task in workflow.tasks: # 检查任务是否已处理过 cache_key = f"workflow:{execution_id}:task:{task.id}" if self.redis.exists(cache_key): task_data = self.redis.hgetall(cache_key) status = TaskStatus(task_data.get(b"status", b"pending").decode()) if status != TaskStatus.PENDING: continue # 检查所有依赖是否已成功完成 all_deps_ready = True for dep_id in task.dependencies: dep_key = f"workflow:{execution_id}:task:{dep_id}" if not self.redis.exists(dep_key): all_deps_ready = False break dep_data = self.redis.hgetall(dep_key) dep_status = TaskStatus(dep_data.get(b"status", b"pending").decode()) if dep_status != TaskStatus.SUCCESS: all_deps_ready = False break if all_deps_ready: ready_tasks.append(task) return ready_tasks

2.3 事件总线与任务调度

我们使用事件总线来解耦工作流引擎和执行器:

import asyncio from typing import Dict, Any, Callable from abc import ABC, abstractmethod class Event(ABC): """事件基类""" @abstractmethod def to_dict(self) -> Dict[str, Any]: pass class TaskScheduledEvent(Event): """任务调度事件""" def __init__(self, execution_id: str, task: Task, input_data: Dict[str, Any]): self.execution_id = execution_id self.task = task self.input_data = input_data def to_dict(self) -> Dict[str, Any]: return { "type": "task_scheduled", "execution_id": self.execution_id, "task_id": self.task.id, "task_type": self.task.type.value, "input_data": self.input_data } class TaskCompletedEvent(Event): """任务完成事件""" def __init__(self, execution_id: str, task_id: str, output_data: Dict[str, Any]): self.execution_id = execution_id self.task_id = task_id self.output_data = output_data def to_dict(self) -> Dict[str, Any]: return { "type": "task_completed", "execution_id": self.execution_id, "task_id": self.task_id, "output_data": self.output_data } class EventBus: """事件总线""" def __init__(self): self.subscribers: Dict[str, List[Callable]] = {} def subscribe(self, event_type: str, handler: Callable): """订阅事件""" if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(handler) async def publish(self, event: Event): """发布事件""" event_dict = event.to_dict() event_type = event_dict["type"] if event_type in self.subscribers: for handler in self.subscribers[event_type]: await handler(event_dict) class WorkflowEngine: """工作流引擎""" def __init__(self, event_bus: EventBus, state_manager: StateManager): self.event_bus = event_bus self.state_manager = state_manager # 订阅任务完成事件 self.event_bus.subscribe("task_completed", self.on_task_completed) async def start_workflow(self, workflow: Workflow, input_data: Dict[str, Any]) -> str: """启动工作流""" execution_id = self.state_manager.create_workflow_execution( workflow.id, input_data ) # 调度初始任务 await self.schedule_ready_tasks(execution_id, workflow, input_data) return execution_id async def schedule_ready_tasks(self, execution_id: str, workflow: Workflow, context: Dict[str, Any]): """调度可执行的任务""" ready_tasks = self.state_manager.get_ready_tasks(execution_id, workflow) for task in ready_tasks: # 更新任务状态为运行中 self.state_manager.update_task_status( execution_id, task.id, TaskStatus.RUNNING ) # 发布任务调度事件 event = TaskScheduledEvent(execution_id, task, context) await self.event_bus.publish(event) async def on_task_completed(self, event_dict: Dict[str, Any]): """处理任务完成事件""" execution_id = event_dict["execution_id"] task_id = event_dict["task_id"] output_data = event_dict["output_data"] # 更新任务状态 self.state_manager.update_task_status( execution_id, task_id, TaskStatus.SUCCESS, output=output_data ) # 继续调度后续任务 # 这里需要加载工作流定义并获取上下文 # (省略具体实现)

三、条件分支与并行执行:复杂流程的灵活控制

3.1 条件分支实现

条件分支让工作流可以根据中间结果选择不同的执行路径:

class ConditionEvaluator: """条件评估器""" @staticmethod def evaluate(condition: str, context: Dict[str, Any]) -> bool: """ 评估条件表达式 支持简单的比较运算: ==, !=, >, <, >=, <= 支持逻辑运算: and, or, not """ # 这里使用安全的表达式评估 # 生产环境中应该使用更严格的解析器 try: # 创建安全的全局命名空间 safe_globals = { "__builtins__": {}, "True": True, "False": False, "None": None } # 将上下文注入局部命名空间 safe_locals = context.copy() # 安全评估 result = eval(condition, safe_globals, safe_locals) return bool(result) except Exception as e: print(f"条件评估失败: {e}") return False # 使用示例 context = {"data_count": 1500, "has_error": False} condition = "data_count > 1000 and not has_error" result = ConditionEvaluator.evaluate(condition, context) print(f"条件 '{condition}' 评估结果: {result}")

3.2 并行任务执行

对于可以并行处理的任务,我们支持并行执行以提高效率:

class ParallelTaskExecutor: """并行任务执行器""" def __init__(self, max_concurrency: int = 5): self.max_concurrency = max_concurrency self.semaphore = asyncio.Semaphore(max_concurrency) async def execute_task(self, task: Task, input_data: Dict[str, Any]) -> Dict[str, Any]: """执行单个任务(带并发控制)""" async with self.semaphore: # 实际任务执行逻辑 print(f"执行任务: {task.name}") await asyncio.sleep(1) # 模拟执行时间 return {"result": f"{task.name} 执行完成"} async def execute_parallel(self, tasks: List[Task], input_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: """并行执行多个任务""" coroutines = [ self.execute_task(task, input_data) for task in tasks ] results = await asyncio.gather(*coroutines, return_exceptions=True) # 整理结果 output = {} for task, result in zip(tasks, results): if isinstance(result, Exception): output[task.id] = {"error": str(result)} else: output[task.id] = result return output

四、可观测性与调试工具:生产环境的必备设施

4.1 完整的执行日志与追踪

每个工作流执行都会生成详细的日志,包括任务的输入输出、执行时间、错误信息等:

import logging from datetime import datetime class WorkflowLogger: """工作流执行日志记录器""" def __init__(self, log_directory: str = "./workflow_logs"): self.log_directory = log_directory import os os.makedirs(log_directory, exist_ok=True) # 配置日志 self.logger = logging.getLogger("workflow_engine") self.logger.setLevel(logging.DEBUG) # 文件处理器 file_handler = logging.FileHandler( f"{log_directory}/workflow_{datetime.now().strftime('%Y%m%d')}.log" ) file_handler.setLevel(logging.DEBUG) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式化 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def log_task_start(self, execution_id: str, task_id: str, task_name: str, input_data: Dict): """记录任务开始""" self.logger.info( f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) started" ) self.logger.debug(f"Input data: {json.dumps(input_data, ensure_ascii=False)}") def log_task_complete(self, execution_id: str, task_id: str, task_name: str, output_data: Dict, duration_seconds: float): """记录任务完成""" self.logger.info( f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) " f"completed in {duration_seconds:.2f}s" ) self.logger.debug(f"Output data: {json.dumps(output_data, ensure_ascii=False)}") def log_task_error(self, execution_id: str, task_id: str, task_name: str, error: Exception): """记录任务错误""" self.logger.error( f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) " f"failed: {str(error)}" )

4.2 可视化的执行追踪界面

为了帮助开发者调试工作流,我们构建了可视化界面,可以:

  • 查看工作流的执行进度
  • 检查每个任务的输入输出
  • 回放执行过程
  • 分析性能瓶颈

五、架构权衡与适用边界:编排系统的选型考量

架构模式优点缺点适用场景
线性脚本简单直接难以维护、扩展性差简单、固定流程
事件驱动解耦、高弹性复杂度高复杂、动态流程
状态机清晰、可控状态定义繁琐有明确状态转换的场景

4.1 何时需要编排系统

构建编排系统有一定的复杂度,不是所有场景都需要。建议在以下情况下考虑:

  • 工作流步骤超过 5 个
  • 需要支持条件分支和并行执行
  • 工作流需要频繁变更,希望非技术人员能参与
  • 需要完整的执行历史和监控

如果只是简单的 2-3 个步骤,用普通的函数调用可能更合适。

4.2 与现有框架的对比

目前有一些开源的 Agent 编排框架,如 LangGraph、AutoGen 等。我们最终选择自己构建,主要是因为:

  • 需要与现有的基础设施深度集成
  • 对性能和延迟有较高要求
  • 需要支持特定的业务规则

但对于大多数团队来说,先评估现有框架是否满足需求是更高效的选择。

五、总结

AI Agent 编排系统是复杂业务流程的核心基础设施。从简单的线性脚本到事件驱动架构,我们经历了多次迭代,最终构建了一套灵活、可靠、可观测的编排系统。

这套系统的核心要素包括:声明式的工作流定义 DSL、基于状态机的执行引擎、事件驱动的任务调度、完整的可观测性工具。在选择技术方案时,要根据业务复杂度和团队能力做出合适的权衡,不要过度设计。

对于 AI 创业公司来说,灵活的编排系统是快速迭代产品的关键。它让我们可以将复杂的 AI 能力封装成可重用的模块,通过可视化的方式组合出新的业务流程,大大加速了产品创新的速度。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询