大模型应用落地:基于 Agent 拓扑设计模式的 LangChain 工程实践
Agent 拓扑设计模式:让大模型应用不再"一团乱麻"
前言
做大模型 Agent 应用最痛苦的是什么?代码耦合严重、流程混乱、调试困难。一个功能改了,好几个地方都要改。
后来本文用 Agent 拓扑设计模式重构了整个架构,思路是完全不同的——每个 Agent 只做一件事,自由编排。今天来聊聊。
一、 底层原理
1.1 什么是 Agent 拓扑设计模式
Agent 拓扑设计模式就是把一个复杂任务拆成多个独立的 Agent,通过拓扑结构编排执行流程:
graph TD A["用户输入"] --> B["理解 Agent"] B --> C["规划 Agent"] C --> D["工具 Agent"] C --> E["推理 Agent"] D --> F["结果验证"] E --> F F --> G{"验证结果"} G -->|通过| H["输出 Agent"] G -->|不通过| I["修复 Agent"] I --> C设计优势:
- 每个 Agent 单一职责
- 拓扑结构灵活可扩展
- 便于调试和测试
- 支持重试和回滚
1.2 与传统架构对比
| 维度 | 传统架构 | Agent 拓扑 |
|---|---|---|
| 耦合度 | 高 | 低 |
| 可扩展性 | 低 | 高 |
| 可测试性 | 低 | 高 |
| 可维护性 | 低 | 高 |
| 可观测性 | 低 | 高 |
二、 快速上手
2.1 定义 Agent 拓扑
from typing import Dict, Any, Callable, Optional class AgentNode: def __init__(self, name: str, process: Callable): self.name = name self.process = process self.next_nodes: Dict[str, str] = {} def add_next(self, condition: str, node_name: str): self.next_nodes[condition] = node_name class AgentTopology: def __init__(self): self.nodes: Dict[str, AgentNode] = {} def add_node(self, node: AgentNode): self.nodes[node.name] = node def execute(self, start: str, data: Any) -> Any: current = start while current: node = self.nodes.get(current) if not node: break result = node.process(data) if node.next_nodes: status = result.get("status", "default") current = node.next_nodes.get(status) data = result else: return result return data2.2 使用示例
def understand(input_data): return {"intent": "search", "query": input_data, "status": "ok"} def search(data): return {"result": f"搜索 {data['query']} 完成", "status": "ok"} topo = AgentTopology() understand_node = AgentNode("understand", understand) search_node = AgentNode("search", search) understand_node.add_next("ok", "search") topo.add_node(understand_node) topo.add_node(search_node) result = topo.execute("understand", "今天天气怎么样") print(result)三、 核心 API 与深水区
3.1 拓扑组件速查
| 组件 | 职责 | 示例 |
|---|---|---|
| 理解 Agent | 分析输入 | 意图识别 |
| 规划 Agent | 制定计划 | 任务分解 |
| 工具 Agent | 调用工具 | 搜索/计算 |
| 验证 Agent | 检查结果 | 质量检测 |
| 修复 Agent | 修正错误 | 重试机制 |
3.2 带上下文的拓扑
class Context: def __init__(self): self.data = {} self.history = [] def set(self, key, value): self.data[key] = value def get(self, key): return self.data.get(key) class ContextNode(AgentNode): def process(self, ctx: Context) -> Context: result = self.process_func(ctx) ctx.history.append({"node": self.name, "result": result}) return ctx3.3 条件分支
def decide_route(data): if "搜索" in data: return "search" elif "计算" in data: return "calculate" return "reply" decide_node = AgentNode("decide", decide_route) decide_node.add_next("search", "search_agent") decide_node.add_next("calculate", "calculate_agent")四、 实战演练
4.1 完整的多 Agent 系统
from typing import Dict, Any, List class IntentAnalyzer: def process(self, data: Dict) -> Dict: intent = self._detect_intent(data.get("input", "")) return {"intent": intent, "input": data["input"], "status": "ok"} def _detect_intent(self, text: str) -> str: if "订单" in text: return "order" elif "客服" in text: return "service" return "general" class OrderProcessor: def process(self, data: Dict) -> Dict: return {"result": f"处理订单: {data['input']}", "status": "ok"} class ServiceProcessor: def process(self, data: Dict) -> Dict: return {"result": f"客服回复: {data['input']}", "status": "ok"} class Orchestrator: def __init__(self): self.topology = AgentTopology() self._build() def _build(self): analyzer = AgentNode("analyzer", IntentAnalyzer().process) order = AgentNode("order", OrderProcessor().process) service = AgentNode("service", ServiceProcessor().process) analyzer.add_next("order", "order") analyzer.add_next("service", "service") analyzer.add_next("general", "order") self.topology.add_node(analyzer) self.topology.add_node(order) self.topology.add_node(service) def process(self, user_input: str) -> str: result = self.topology.execute("analyzer", {"input": user_input}) return result.get("result", "无法处理") orchestrator = Orchestrator() print(orchestrator.process("本文的订单怎么了")) print(orchestrator.process("本文要找客服"))五、 避坑指南与最佳实践
💡技巧:每个 Agent 只做一件事
单一职责原则,调试方便。
⚠️警告:注意拓扑环路
验证失败回退,最多 3 次,防止死循环。
✅推荐:加日志追踪
每个 Agent 的处理记录下来,方便排查。
六、 综合实战演示
6.1 生产级拓扑框架
from typing import Dict, Any, List, Optional from dataclasses import dataclass import json import time @dataclass class StepLog: node: str input: Any output: Any duration: float class ProductionTopology: def __init__(self, max_retries=3): self.nodes = {} self.max_retries = max_retries self.step_logs: List[StepLog] = [] def add_node(self, name: str, process_func, next_map: Dict[str, str] = None): node = AgentNode(name, process_func) if next_map: for condition, next_name in next_map.items(): node.add_next(condition, next_name) self.nodes[name] = node def execute(self, start: str, data: Any) -> Any: current = start retries = 0 while current and retries < self.max_retries: node = self.nodes.get(current) if not node: break start_time = time.time() result = node.process(data) duration = time.time() - start_time self.step_logs.append(StepLog( node=current, input=data, output=result, duration=duration )) if isinstance(result, dict) and result.get("status") == "fail": retries += 1 if node.next_nodes: status = result.get("status", "default") current = node.next_nodes.get(status) else: return result return {"status": "error", "message": "处理失败"} def get_report(self) -> str: report = [] for log in self.step_logs: report.append({ "node": log.node, "duration_ms": round(log.duration * 1000, 2) }) return json.dumps(report, ensure_ascii=False, indent=2) topo = ProductionTopology() topo.add_node("step1", lambda x: {"value": x * 2, "status": "ok"}, {"ok": "step2"}) topo.add_node("step2", lambda x: {"result": f"最终: {x['value']}", "status": "ok"}) result = topo.execute("step1", 5) print(result) print(topo.get_report())总结
Agent 拓扑设计模式的核心:
- 拆解成独立 Agent
- 拓扑结构编排
- 灵活扩展
- 易于调试