OpenAI Agents SDK 深度解析(三):执行层——Agent 的“幕后指挥部”
2026/5/1 18:51:25 网站建设 项目流程

开发一个智能体,就像训练一名士兵。你给他下达一个任务,他最终会交回一个结果。

但是,如果这名士兵在执行任务的途中开了几枪、呼叫了几次炮火支援、又换了几次频道联络后方基地——你却完全不知道。你只知道“任务完成”或者“任务失败”。

这样的士兵,你敢重用吗?

在真实的应用场景中,我们需要的 Agent 不是只会说“是”或“不是”的黑箱子,而是每一步都可以被观察、被记录、被审计的透明系统。

用户问“北京天气怎么样”,Agent 到底是靠猜的,还是真的去查了天气 API?
如果查了,传的是什么参数?
API 返回了什么原始数据?
模型在拿到数据之后又做了哪些推理才给出最终答案?

这些问题,仅靠 final_output 永远无法回答。

这正是 OpenAI Agents SDK 设计“执行层”的初衷——为每一次 Agent 的执行安装一个黑匣子记录仪,同时在驾驶舱里装上一块实时直播屏幕。

在这篇文章中,我会用最通俗的语言、最完整的代码,带你彻底吃透执行层的四个核心概念:输出项、事件流、历史会话,以及贯穿它们的一次执行的生命周期——Run。

一、执行层到底是什么

在 OpenAI Agents SDK 中,所有 Agent 的执行都被包装成一个叫做Run的对象。一次 Run,就是一个独立的任务执行生命周期。

当你写下这行代码时:

await Runner.run(agent, "帮我查一下天气")

SDK 会在背后做以下几件事:

  • 创建一条空的消息记录,准备接收用户输入。
  • 调用大模型,让模型决定要不要使用工具。
  • 如果模型决定用工具,SDK 会执行对应的 Python 函数,拿到结果再送回给模型。
  • 模型基于工具结果生成最终的语言回复。
  • 最后,把所有中间产物打包成一个 Run 对象返回给你。

执行层,就是负责管理 Run 内部所有这些状态的组件。

具体来说,执行层管理了四个最重要的状态:

第一,历史消息。
也就是整个对话过程中用户说了什么、模型回复了什么、工具返回了什么。这些消息按时间顺序排列,构成了对话的完整时间线。

第二,工具调用状态。
Agent 是否正在等待某个工具执行完成?工具执行是成功了还是失败了?这些状态信息在执行层被完整记录。

第三,Agent 路由轨迹。
在多 Agent 系统中,任务可能会从一个 Agent 转交给另一个 Agent,比如从“通用助手”转给“天气专家”。执行层会记录每一次这样的交接,包括从谁转给谁,以及交接时的上下文。

第四,流式状态。
如果用户要求实时输出,那执行层会维护一个流式缓冲区,记录已经输出了哪些字符,还要输出哪些字符。

理解了执行层是什么,我们就可以分别深入它的四个核心产出物:输出项、事件流和历史会话。

二、输出项:Agent 的每一处“脚印”

输出项是执行层最基础、最完整的数据产物。如果说 Run 是一场完整的演出,那么输出项就是这场演出的分镜头剧本——每一幕、每一句台词、每一个动作都被写成了条目。

1. 什么是输出项

输出项,英文叫Output Items。它的定义非常简单:

在一次 Run 的执行过程中,所有“已经发生的事实”被抽象成的结构化对象。

我们用一个具体的例子来理解。假设你问 Agent:“北京今天的天气怎么样?”

在没有输出项的黑盒模式下,你只能看到最终的回答:“北京今天晴天,25度。”

而在有输出项的白盒模式下,SDK 会生成一条又一条的记录,就像这样:

  • 第一条记录:模型打算调用一个名为 get_weather 的工具,参数是 {"city": "北京"}。
    这条记录的类型叫做 ToolCallItem。
  • 第二条记录:工具 get_weather 执行结束,返回值是字符串 "北京,晴天,25度"。
    这条记录的类型叫做 ToolCallOutputItem。
  • 第三条记录:模型基于工具返回的数据,生成了最终的文本回答。
    这条记录的类型叫做 MessageOutputItem。

你看,仅仅一个简单的天气查询,背后就有至少三条输出项。

如果 Agent 做了更复杂的事,比如连续调用三个工具、在两个 Agent 之间来回交接,输出项的数量会成倍增加。

官方文档中有一句话非常值得记住:

输出项不是模型输出本身,不是 Agent 的决策逻辑,也不是 Tool 的执行逻辑——它是 Run 对“执行过程”的结构化快照。

也就是说,它记录的是“发生了什么事实”。对于调试和审计来说,“发生了什么”往往比“为什么发生”更关键。

2. 五类最核心的输出项

在 Agents SDK 中,虽然输出项的种类很多,但日常开发中最常用的只有五类。我们用文字逐一解释清楚,不列表格。

第一类:MessageOutputItem
它代表“模型生成了一段对话消息”这个事实。注意,它不是消息本身的内容,而是对“生成消息”这个事件的记录。当你看到一条 MessageOutputItem,就意味着 Agent 已经产生了一段可以直接展示给用户的文本。通常这就是最终输出的一部分。

第二类:ToolCallItem
它代表“Agent 决定调用某个工具”这个事实。里面记录了工具的名字,以及调用时传入的参数(JSON 格式)。你可以通过 ToolCallItem 精确地知道 Agent 在什么时候、基于什么理由、调用了什么工具。

第三类:ToolCallOutputItem
它代表“某个工具已经执行完成并返回了结果”这个事实。里面包含了工具的执行结果,通常是一个字符串或者 JSON 对象。将 ToolCallItem 和 ToolCallOutputItem 对照着看,你就能完整还原一次工具调用的全过程:传入了什么参数,得到了什么结果。

第四类:HandoffOutputItem
它代表“当前 Agent 把控制权转交给了另一个 Agent”这个事实。在多 Agent 架构中非常有用。比如,你的主 Agent 负责意图识别,发现用户想问天气,就立即将任务转交给专门的天气 Agent。这个交接动作就会被记录成一条 HandoffOutputItem,包括源 Agent 的名字和目标 Agent 的名字。

第五类:ReasoningItem
它代表“模型进行了一次内部推理”这个事实。有些高级模型(比如 OpenAI o1 系列)会在输出最终答案之前进行一段“思维链”式的内部推导。这段推导不会显示给最终用户,但对开发者调试 Agent 的行为非常宝贵。ReasoningItem 就是用来捕获这段推导过程的。

3. 一个容易混淆的问题:ResponseXXX 和 XXXItem 的区别

很多刚接触 SDK 的开发者会被 ResponseOutputMessage、ResponseReasoningItem、MessageOutputItem、ReasoningItem 这样的命名搞得头晕。

这里只需要记住一条原则:

所有以 Response 开头的类型,都来自大模型厂商(比如 OpenAI)的原始 API 协议。
所有以 Item 结尾的类型,才是 Agents SDK 对外统一暴露的执行层接口。

为什么 SDK 要多做一层包装?因为不同大模型厂商的响应格式不一样。OpenAI 的格式和 Anthropic 的格式、和国内通义千问的格式都有差异。

SDK 把这些差异全部屏蔽掉,对外统一暴露 MessageOutputItem、ToolCallItem 等接口。这样一来,你写的代码可以无缝切换底层模型提供商,而不用修改任何输出项的处理逻辑。

所以,在你的业务代码中,请优先使用 XXXItem 这一类执行层类型。只有当你需要访问某些非常底层的、厂家特定的字段时,才通过 .raw_item 属性去拿到原始的 ResponseXXX 对象。

4. 完整的代码示例:如何获取并打印输出项

下面是一段完整可运行的代码。它会创建一个带天气查询工具的 Agent,执行一次询问,然后逐个打印出这次 Run 产生的所有输出项。代码中的每一处关键逻辑都有详细的注释。

import asyncio from openai import AsyncOpenAI from agents import ( Agent, OpenAIChatCompletionsModel, Runner, function_tool, set_tracing_disabled ) # 配置你的 API 地址和密钥 # 这里以阿里云百炼平台的兼容接口为例,你可以换成任何兼容 OpenAI 格式的服务 BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" API_KEY = "sk-your-api-key-here" # 请替换成你自己的真实 API Key MODEL_NAME = "qwen-plus" # 创建一个异步的 HTTP 客户端 client = AsyncOpenAI(base_url=BASE_URL, api_key=API_KEY) # 关闭 SDK 自带的追踪功能(仅为了简化输出,生产环境建议保留) set_tracing_disabled(disabled=True) # 定义一个工具函数:获取天气 # @function_tool 装饰器会把普通 Python 函数自动转换成 Agent 可调用的工具 @function_tool def get_weather(city: str) -> str: """根据城市名返回天气信息(这里为了演示,返回静态数据)""" # 在实际项目中,你可以在这里调用真实的天气 API return f"天气信息: {city} 是晴天,温度 28 度,湿度 50%。" async def main(): # 创建一个 Agent # name: Agent 的名字,用于调试和交接 # instructions: 系统指令,告诉 Agent 应该遵循什么行为准则 # model: 指定使用的大模型(这里用 OpenAIChatCompletionsModel 包装第三方接口) # tools: 绑定工具列表 agent = Agent( name="天气助手", instructions=( "你是一个专业的天气助手。每当用户询问某个城市的天气时," "你必须调用 get_weather 工具来获取真实数据。" "绝对禁止自己编造天气信息。" ), model=OpenAIChatCompletionsModel(model=MODEL_NAME, openai_client=client), tools=[get_weather], ) # 执行一次 Run # 注意:这里使用的是 Runner.run 方法,它会等待整个 Run 执行完毕, # 然后将所有输出项一次性整理到 result.new_items 中返回。 # 如果你需要实时获取输出项,请使用后面的 run_streamed 版本。 result = await Runner.run(agent, "武汉今天的天气怎么样?") # 遍历并打印所有输出项 print("=" * 50) print("开始输出 Agent 的执行足迹(输出项)") print("=" * 50) for idx, item in enumerate(result.new_items, start=1): print(f"\n第 {idx} 个输出项:") print(f" 类型名称: {type(item).__name__}") # 根据不同的输出项类型,打印更有意义的信息 # 注意:.raw_item 是原始厂商对象,这里我们用最简单的方式打印 print(f" 原始内容: {item.raw_item}") print("\n" + "=" * 50) print(f"Agent 最终的返回内容(final_output):") print(result.final_output) print("=" * 50) if __name__ == "__main__": asyncio.run(main())

当你运行这段代码时,控制台会依次打印出类似下面的内容:

================================================== 开始输出 Agent 的执行足迹(输出项) ================================================== 第 1 个输出项: 类型名称: ToolCallItem 原始内容: ResponseFunctionToolCall(id='call_abc123', name='get_weather', arguments='{"city":"武汉"}') 第 2 个输出项: 类型名称: ToolCallOutputItem 原始内容: ResponseFunctionToolOutput(id='call_abc123', output='天气信息: 武汉是晴天,温度 28 度,湿度 50%。') 第 3 个输出项: 类型名称: MessageOutputItem 原始内容: ResponseOutputMessage(role='assistant', content=[ResponseOutputText(text='武汉今天天气晴朗,温度28度,湿度50%。')]) ================================================== Agent 最终的返回内容(final_output): 武汉今天天气晴朗,温度28度,湿度50%。 ==================================================

通过这个输出,你可以清晰地看到 Agent 的三步走:

  1. 先决定调用工具(ToolCallItem)
  2. 然后拿到工具结果(ToolCallOutputItem)
  3. 最后生成自然语言回复(MessageOutputItem)

这就是输出项的威力——让 Agent 的每一次思考与行动都无处遁形。

三、事件流:Agent 的现场直播

输出项非常详细,但它们有一个“缺点”:它们是事后一次性交付的。

也就是说,你必须等到整个 Run 彻底执行完毕,才能从 result.new_items 里拿到所有的输出项。

这种模式对于批处理、离线分析来说没有问题,但对于需要实时反馈的交互场景(比如一个聊天机器人),用户显然无法接受等 10 秒钟然后一次性蹦出一大段话。

事件流(Stream Event)就是为了解决这个问题而生的。它把输出项拆解成一个个实时的事件,每发生一件事就立即向外推送。

1. 事件流的本质

从技术的角度看,事件流就是 Run 在执行过程中,对自身状态变化的一种实时广播机制。

官方文档里有一句非常精辟的概括:

事件流是执行层的实时可观测接口。

翻译成大白话就是:如果你把 Run 想象成一部正在拍摄的电影,那么:

  • 输出项就是最终剪辑好的成片(每一帧都在,但你要等全片拍完才能看到)。
  • 事件流就是现场的导播信号(摄像机拍到一个画面,就立刻把这个画面传送到监视器上)。

由于是实时广播,事件流特别适合用来驱动 UI 的进度条、实时日志、打字机效果等需要立即反馈的场景。

2. 三种事件类型及其用途

Agents SDK 的事件流只定义了三种事件类型。不要被“只有三种”迷惑,实际上它们已经覆盖了绝大多数实时监控的需求。

第一种事件类型:RawResponsesStreamEvent
这是最底层、最原始的事件。它直接封装了大模型厂商在流式模式下推送的原始数据包。

最常见的子类型有两个:

  • ResponseTextDeltaEvent:代表模型刚刚产出了一小段文本增量——可能只有几个字符。你在网页上看到的“打字机效果”,一个字符一个字符往外蹦的效果,就是靠捕获这种事件然后逐字追加到页面上实现的。
  • ResponseReasoningDeltaEvent:代表模型在推理阶段产出了一小段思维链增量。这类事件不会显示给最终用户,但你可以把它收集起来,用于调试或者内部审计。

第二种事件类型:RunItemStreamEvent
这种事件的粒度比 RawResponsesStreamEvent 粗一些,但语义更丰富。它不是推送一个字符或一个 token,而是推送“一个完整的输出项已经生成完毕”这个事实。

也就是说,每当 SDK 内部新产生了一个 ToolCallItem、ToolCallOutputItem 或 MessageOutputItem,就会触发一个 RunItemStreamEvent。

通过监听 RunItemStreamEvent,你可以实现更高级的 UI 反馈:

  • 当收到一个 tool_called 事件时,你在界面上显示“正在查询天气接口...”。
  • 当收到 tool_output 事件时,更新为“天气接口返回数据,正在分析...”。
  • 当收到 message_output_created 事件时,切换回正常的文本输出模式。

第三种事件类型:AgentUpdatedStreamEvent
这种事件只在多 Agent 系统中出现,单 Agent 模式下基本不会触发。

当一个 Agent 主动将任务交接给另一个 Agent 时,就会触发此事件,事件中会携带新 Agent 的完整对象。你可以利用这个事件在 UI 上切换头像、改变提示语,让用户清楚地知道当前正在由哪个 Agent 处理他的问题。

3. 什么时候应该用哪种事件

为了帮助你决策,下面用纯文字列举几个典型场景,以及对应的最佳事件选择。

  • 场景一:实现逐字输出的打字机效果
    使用 RawResponsesStreamEvent,并进一步判断是否为 ResponseTextDeltaEvent。拿到 delta 字段后,将内容实时追加到 UI 上。
  • 场景二:在界面上显示 Agent 当前的执行阶段
    使用 RunItemStreamEvent。检查 name 字段是否为 tool_called、tool_output 或 message_output_created,然后在 UI 中更新对应的状态标签。
  • 场景三:调试或记录模型内部的推理过程(思维链)
    使用 RawResponsesStreamEvent 中的 ResponseReasoningDeltaEvent。将所有的 delta 片段拼接起来,就得到了完整的推理摘要。
  • 场景四:多 Agent 应用中显示当前工作的 Agent
    使用 AgentUpdatedStreamEvent,从 event.new_agent.name 中读取新 Agent 的名字并刷新界面。

4. 完整的事件流代码示例

下面这段代码演示了如何用 run_streamed 启动流式执行,并完整处理三类事件。你可以直接复制运行,代码中逐行添加了注释。

import asyncio import sys from openai import AsyncOpenAI from openai.types.responses import ( ResponseTextDeltaEvent, ResponseReasoningSummaryTextDeltaEvent ) from agents import ( Agent, OpenAIChatCompletionsModel, Runner, function_tool, set_tracing_disabled, ModelSettings, ) from agents.items import ToolCallItem, ToolCallOutputItem, MessageOutputItem # 配置 API(同样以阿里云百炼为例) BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" API_KEY = "sk-your-api-key-here" MODEL_NAME = "qwen3-max" # 这个模型支持推理摘要输出 client = AsyncOpenAI(base_url=BASE_URL, api_key=API_KEY) set_tracing_disabled(True) # 定义一个简单的天气工具 @function_tool def get_weather(city: str) -> str: """模拟的天气查询工具""" return f"天气数据: {city} 地区多云转晴,气温 22 度,空气质量良好。" async def main(): # 创建 Agent # 注意:这里增加了 model_settings 参数,强制要求模型必须调用工具 agent = Agent( name="实时天气分析师", instructions=( "你是一个严谨的天气分析师。用户询问天气时," "你必须先调用 get_weather 工具获取数据," "然后再根据数据回答。" ), model=OpenAIChatCompletionsModel(model=MODEL_NAME, openai_client=client), tools=[get_weather], model_settings=ModelSettings(tool_choice="required"), # 强制使用工具 ) # 关键点:使用 run_streamed 而不是 run # 这样才能获得实时的事件流 result = Runner.run_streamed(agent, "麻烦查一下武汉明天的天气情况") print("开始接收实时事件流,正在处理...\n") # 异步迭代事件流 async for event in result.stream_events(): # 处理第一类事件:原始模型流事件 if event.type == "raw_response_event": # 如果是文本增量(打字机效果的核心) if isinstance(event.data, ResponseTextDeltaEvent): # 直接打印增量内容,不换行,实现连续输出效果 print(event.data.delta, end="", flush=True) # 如果是推理摘要增量(通常不会显示给用户,但可以用于调试) elif isinstance(event.data, ResponseReasoningSummaryTextDeltaEvent): # 为了演示,我们把它输出到标准错误,不与正常输出混在一起 # 在实际开发中,你可能把它写入日志文件 print(f"\n[推理中] {event.data.delta}", file=sys.stderr, end="") # 处理第二类事件:输出项级别的事件 elif event.type == "run_item_stream_event": # 获取事件的名称,例如 "tool_called", "tool_output", "message_output_created" name = getattr(event, "name", None) # 当工具被调用时触发 if name == "tool_called" and isinstance(event.item, ToolCallItem): tool_name = event.item.raw_item.name tool_args = event.item.raw_item.arguments print(f"\n[系统] 正在调用工具:{tool_name}") print(f"[系统] 调用参数:{tool_args}") # 当工具执行完成并返回结果时触发 elif name == "tool_output" and isinstance(event.item, ToolCallOutputItem): output_data = event.item.output print(f"\n[系统] 工具返回结果:{output_data}") # 当模型生成了一条完整的消息时触发 elif name == "message_output_created" and isinstance(event.item, MessageOutputItem): print("\n[系统] 最终回答生成完毕,实时输出如下:") # 处理第三类事件:Agent 切换事件(单 Agent 下很少触发,但为了完整性仍保留) elif event.type == "agent_updated_stream_event": new_agent_name = event.new_agent.name print(f"\n[系统] 当前工作的 Agent 切换为:{new_agent_name}") # 事件流结束后,可以获取最终的完整输出 # 注意:final_output 是所有文本增量的拼接结果,和实时输出的内容一致 print("\n\n" + "=" * 50) print("最终完整回答(final_output):") print(result.final_output) print("=" * 50) if __name__ == "__main__": asyncio.run(main())

运行这段代码,你会发现控制台上的文字是一个字一个字蹦出来的,同时在文字之间会穿插类似 [系统] 正在调用工具:get_weather 这样的状态信息。

这就是事件流的实时魅力。它让你不仅能展示最终结果,还能让用户看到 Agent 正在“努力思考”和“积极行动”的全过程,体验提升非常明显。

四、历史会话:Agent 的长期记忆

到目前为止,我们讨论的所有内容都局限在单次 Run 内部。

一个 Run 从用户输入开始,到 Agent 输出结束,生命周期很短,通常只有几秒钟。对于需要多轮对话的应用来说,这显然不够——用户希望 Agent 记得他上一轮说了什么,甚至记得三天前说过什么。

历史会话(Conversation)模块就是为这个需求而生的。

1. 什么是 Conversation

在 Agents SDK 中,Conversation 被定义为一次 Run 的完整记忆时间线。更具体地说,它就是一个按照时间顺序排列的输出项列表。

比如,一个 Conversation 可能包含以下内容:

  • 第 1 条:用户说“帮我订一张去北京的机票”。
  • 第 2 条:Agent 调用了一个名为 search_flight 的工具。
  • 第 3 条:search_flight 工具返回了航班列表。
  • 第 4 条:Agent 给出了几个航班选项。
  • 第 5 条:用户回复“选第一个”。
  • 第 6 条:Agent 调用 book_flight 工具。
  • 第 7 条:book_flight 工具返回订票成功信息。
  • 第 8 条:Agent 说“订票成功,您的订单号是 xxx”。

你看,Conversation 实际上就是把一次完整的用户-Agent-工具三方交互的全部输出项串了起来。

多轮对话的本质,就是多个 Run 共享同一个 Conversation。

也就是说:

  • 第一轮 Run 结束后,Conversation 中已经存了若干条输出项。
  • 第二轮 Run 开始时,SDK 会自动把 Conversation 中的所有历史输出项作为上下文送给模型,这样模型就能“回忆”起之前说过的话。

2. 为什么 Conversation 至关重要

如果没有 Conversation,你的 Agent 将是一个“失忆的专家”。每一轮对话都要重新开始,用户必须反复提供背景信息。

举个例子:

  • 用户说“帮我查北京的天气”,Agent 查了气温是 25 度。
  • 用户接着问“那上海呢”。

如果 Agent 失忆了,它不知道这个“那”指的是什么,就必须反问“您指的是哪个城市的天气?”这种体验非常糟糕。

Conversation 让 Agent 拥有了跨轮次的记忆能力。

更妙的是,Agents SDK 会自动处理历史上下文的拼接和截断,你完全不需要手动将历史消息拼接到新的 Prompt 中。

你只需要在每次调用 Runner.run 时传入相同的 Session 对象,SDK 会自动从数据库中读出之前的 Conversation,然后自动拼接到本次的模型请求中。

Conversation 还带来了另外三个重要的能力:

  • 可回溯性:你可以在任何时候翻开某个 Session 的历史,看到每一次工具调用的参数和返回值。
  • 可调试性:当 Agent 行为异常时,你可以重放整个会话,定位问题是出在模型推理上还是工具执行上。
  • 可持久化:SDK 支持将会话写入 SQLite、Redis 等存储后端,数据不会因为程序重启而丢失。

3. 如何使用 Session 管理历史对话

在 SDK 中,Conversation 是通过 Session 对象来管理的。最常用的实现是 SQLiteSession,它将所有会话数据存储在本地的 SQLite 数据库文件中。

下面是完整的使用示例:

from agents import SQLiteSession # 创建一个 SQLiteSession # 第一个参数是 session_id,你可以用用户 ID 或对话 ID 来命名 # 第二个参数是数据库文件的路径 session = SQLiteSession("user_zhang_san", "conversations.db") # 第一轮对话:询问天气 result1 = await Runner.run( agent, "武汉今天天气怎么样?", session=session # 传入同一个 session ) # 第二轮对话:追问明天的天气 # Agent 会从 session 的历史中知道“明天”指的是“武汉的明天” result2 = await Runner.run( agent, "那明天呢?", session=session # 使用同一个 session,自动续写历史 ) # 第三轮对话:验证记忆 result3 = await Runner.run( agent, "我刚才问了哪几个城市?", session=session ) print(result3.final_output) # 输出会类似:“您刚才问了武汉今天的天气,然后问了武汉明天的天气。”

从代码中可以看出,开发者不需要做任何额外的事情来维护历史消息。

SDK 内部会在每次 Runner.run 调用时自动做以下三件事:

  1. 从 Session 中加载已有的 Conversation 历史。
  2. 将新的用户输入追加到 Conversation 中。
  3. 调用大模型时,自动将整个 Conversation 作为上下文。
  4. 将模型的新输出、工具调用、工具结果等全部追加到 Conversation 中,并写回 Session。

整个过程对开发者完全透明。

4. 如何读取历史会话

你还可以在任意时刻读取某个 Session 的完整历史,用于审计、调试或者迁移数据。

# 从 Session 中获取所有历史项 all_items = await session.get_items() print(f"当前会话共有 {len(all_items)} 个历史记录") for idx, item in enumerate(all_items): print(f"{idx}: {type(item)}") # 如果你需要更详细的内容,可以尝试打印 item.raw_item

历史项的类型包括但不限于:

  • MessageOutputItem
  • ToolCallItem
  • ToolCallOutputItem
  • ReasoningItem
  • HandoffOutputItem

通过分析这些历史项,你可以精确地还原出用户和 Agent 之间的每一次交互细节。

5. 持久化的意义

使用 SQLiteSession 或者自定义的 Session 后端(比如 RedisSession),意味着你的 Agent 拥有了长期记忆能力。

即使用户隔了一周再次访问,只要使用同一个 session_id,Agent 就能回忆起一周前的对话内容。

这对于需要持续服务的场景——比如私人助理、教育辅导、健康咨询——尤其重要。

当然,你也要注意隐私和合规问题。如果对话中包含敏感信息,你需要对数据库进行加密,或者定期清理过期的会话。

五、总结

文章写到这里,我们已经完整地走过了 OpenAI Agents SDK 执行层的每一个角落。现在,让我们回过头来,把四个核心概念放在一起做一个总体的回顾。

第一个概念:输出项(Output Items)
它是 Agent 执行过程中每一步事实的结构化记录。通过读取输出项,你可以知道 Agent 是否调用了工具、传入了什么参数、工具返回了什么结果、模型基于结果生成了什么文字。输出项让 Agent 从无法探测的黑盒变成了可审计、可调试的白盒。

第二个概念:事件流(Stream Event)
它是输出项的实时版本。输出项是事后一次性交付的,而事件流是在 Run 执行过程中实时广播的。事件流分为三类:

  • RawResponsesStreamEvent:用于构建打字机效果和捕获推理过程。
  • RunItemStreamEvent:用于构建高级 UI 状态反馈。
  • AgentUpdatedStreamEvent:用于多 Agent 场景下的界面更新。

通过事件流,你可以让用户实时看到 Agent 的一举一动,极大地提升交互体验。

第三个概念:历史会话(Conversation)
它是多个 Run 共享的记忆时间线,本质上就是输出项的有序集合。通过 Session 对象,SDK 自动管理历史消息的存储和加载,让 Agent 在多轮对话中保持连续的记忆。你可以将会话持久化到 SQLite、Redis 等数据库中,实现跨会话、跨设备的记忆共享。

第四个概念:Run 生命周期
它是执行层的容器,一个 Run 包含了从用户输入到最终输出的全部信息:历史消息、工具调用状态、Agent 路由轨迹、流式状态等。无论你是用 Runner.run 做普通的同步调用,还是用 Runner.run_streamed 做流式调用,底层都是一个 Run 对象在运转。

掌握了执行层,你就掌握了 Agent 开发的“仪表盘”。

  • 你不会再对着一个神秘的 final_output 惊慌失措,而是可以从输出项中轻松定位问题。
  • 你不会再苦恼于大模型的响应速度,而是可以用事件流优雅地实现打字机效果。
  • 你更不会受限于单轮对话,而是可以通过 Session 轻松构建多轮、有记忆的智能应用。

在下一篇文章中,我们将继续深入 Agents SDK 的更高阶特性——多 Agent 协作、交接协议、以及如何构建一个生产级的 Agent 系统。

如果你觉得这篇文章对你有帮助,欢迎分享给更多对 AI Agent 开发感兴趣的朋友。在 AI 工程化的道路上,唯有把黑盒拆成白盒,才能走得更稳、更远。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询