大模型应用落地:基于 Agent 拓扑设计模式的 LangChain 工程实践
2026/6/5 13:37:10 网站建设 项目流程

大模型应用落地:基于 Agent 拓扑设计模式的 LangChain 工程实践

Agent 拓扑设计模式:让大模型应用不再"一团乱麻"

前言

做大模型 Agent 应用最痛苦的是什么?代码耦合严重、流程混乱、调试困难。一个功能改了,好几个地方都要改。

后来本文用 Agent 拓扑设计模式重构了整个架构,思路是完全不同的——每个 Agent 只做一件事,自由编排。今天来聊聊。

一、 底层原理

1.1 什么是 Agent 拓扑设计模式

Agent 拓扑设计模式就是把一个复杂任务拆成多个独立的 Agent,通过拓扑结构编排执行流程:

graph TD A["用户输入"] --> B["理解 Agent"] B --> C["规划 Agent"] C --> D["工具 Agent"] C --> E["推理 Agent"] D --> F["结果验证"] E --> F F --> G{"验证结果"} G -->|通过| H["输出 Agent"] G -->|不通过| I["修复 Agent"] I --> C

设计优势:

  • 每个 Agent 单一职责
  • 拓扑结构灵活可扩展
  • 便于调试和测试
  • 支持重试和回滚

1.2 与传统架构对比

维度传统架构Agent 拓扑
耦合度
可扩展性
可测试性
可维护性
可观测性

二、 快速上手

2.1 定义 Agent 拓扑

from typing import Dict, Any, Callable, Optional class AgentNode: def __init__(self, name: str, process: Callable): self.name = name self.process = process self.next_nodes: Dict[str, str] = {} def add_next(self, condition: str, node_name: str): self.next_nodes[condition] = node_name class AgentTopology: def __init__(self): self.nodes: Dict[str, AgentNode] = {} def add_node(self, node: AgentNode): self.nodes[node.name] = node def execute(self, start: str, data: Any) -> Any: current = start while current: node = self.nodes.get(current) if not node: break result = node.process(data) if node.next_nodes: status = result.get("status", "default") current = node.next_nodes.get(status) data = result else: return result return data

2.2 使用示例

def understand(input_data): return {"intent": "search", "query": input_data, "status": "ok"} def search(data): return {"result": f"搜索 {data['query']} 完成", "status": "ok"} topo = AgentTopology() understand_node = AgentNode("understand", understand) search_node = AgentNode("search", search) understand_node.add_next("ok", "search") topo.add_node(understand_node) topo.add_node(search_node) result = topo.execute("understand", "今天天气怎么样") print(result)

三、 核心 API 与深水区

3.1 拓扑组件速查

组件职责示例
理解 Agent分析输入意图识别
规划 Agent制定计划任务分解
工具 Agent调用工具搜索/计算
验证 Agent检查结果质量检测
修复 Agent修正错误重试机制

3.2 带上下文的拓扑

class Context: def __init__(self): self.data = {} self.history = [] def set(self, key, value): self.data[key] = value def get(self, key): return self.data.get(key) class ContextNode(AgentNode): def process(self, ctx: Context) -> Context: result = self.process_func(ctx) ctx.history.append({"node": self.name, "result": result}) return ctx

3.3 条件分支

def decide_route(data): if "搜索" in data: return "search" elif "计算" in data: return "calculate" return "reply" decide_node = AgentNode("decide", decide_route) decide_node.add_next("search", "search_agent") decide_node.add_next("calculate", "calculate_agent")

四、 实战演练

4.1 完整的多 Agent 系统

from typing import Dict, Any, List class IntentAnalyzer: def process(self, data: Dict) -> Dict: intent = self._detect_intent(data.get("input", "")) return {"intent": intent, "input": data["input"], "status": "ok"} def _detect_intent(self, text: str) -> str: if "订单" in text: return "order" elif "客服" in text: return "service" return "general" class OrderProcessor: def process(self, data: Dict) -> Dict: return {"result": f"处理订单: {data['input']}", "status": "ok"} class ServiceProcessor: def process(self, data: Dict) -> Dict: return {"result": f"客服回复: {data['input']}", "status": "ok"} class Orchestrator: def __init__(self): self.topology = AgentTopology() self._build() def _build(self): analyzer = AgentNode("analyzer", IntentAnalyzer().process) order = AgentNode("order", OrderProcessor().process) service = AgentNode("service", ServiceProcessor().process) analyzer.add_next("order", "order") analyzer.add_next("service", "service") analyzer.add_next("general", "order") self.topology.add_node(analyzer) self.topology.add_node(order) self.topology.add_node(service) def process(self, user_input: str) -> str: result = self.topology.execute("analyzer", {"input": user_input}) return result.get("result", "无法处理") orchestrator = Orchestrator() print(orchestrator.process("本文的订单怎么了")) print(orchestrator.process("本文要找客服"))

五、 避坑指南与最佳实践

💡技巧:每个 Agent 只做一件事
单一职责原则,调试方便。

⚠️警告:注意拓扑环路
验证失败回退,最多 3 次,防止死循环。

推荐:加日志追踪
每个 Agent 的处理记录下来,方便排查。

六、 综合实战演示

6.1 生产级拓扑框架

from typing import Dict, Any, List, Optional from dataclasses import dataclass import json import time @dataclass class StepLog: node: str input: Any output: Any duration: float class ProductionTopology: def __init__(self, max_retries=3): self.nodes = {} self.max_retries = max_retries self.step_logs: List[StepLog] = [] def add_node(self, name: str, process_func, next_map: Dict[str, str] = None): node = AgentNode(name, process_func) if next_map: for condition, next_name in next_map.items(): node.add_next(condition, next_name) self.nodes[name] = node def execute(self, start: str, data: Any) -> Any: current = start retries = 0 while current and retries < self.max_retries: node = self.nodes.get(current) if not node: break start_time = time.time() result = node.process(data) duration = time.time() - start_time self.step_logs.append(StepLog( node=current, input=data, output=result, duration=duration )) if isinstance(result, dict) and result.get("status") == "fail": retries += 1 if node.next_nodes: status = result.get("status", "default") current = node.next_nodes.get(status) else: return result return {"status": "error", "message": "处理失败"} def get_report(self) -> str: report = [] for log in self.step_logs: report.append({ "node": log.node, "duration_ms": round(log.duration * 1000, 2) }) return json.dumps(report, ensure_ascii=False, indent=2) topo = ProductionTopology() topo.add_node("step1", lambda x: {"value": x * 2, "status": "ok"}, {"ok": "step2"}) topo.add_node("step2", lambda x: {"result": f"最终: {x['value']}", "status": "ok"}) result = topo.execute("step1", 5) print(result) print(topo.get_report())

总结

Agent 拓扑设计模式的核心:

  • 拆解成独立 Agent
  • 拓扑结构编排
  • 灵活扩展
  • 易于调试

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询