Rasa Action Server 异步调用实战:从原理到高可用落地
2026/6/12 5:35:54 网站建设 项目流程

1. 项目概述:Rasa Action Server 的异步能力到底能走多远?

在 Rasa 生产环境中跑过三个以上中型对话机器人项目的人都会遇到同一个深夜崩溃时刻:用户刚问完“查一下我上个月的账单”,后端服务却卡在调用财务系统 API 上——30 秒超时、重试失败、对话流中断、用户怒点退出。这时候你翻着 Rasa 官方文档,手指停在actions.py里那个看似普通的run()方法签名上,心里冒出一个朴素但致命的问题:Can Rasa’s Action Server Do Asynchronous Calls?—— 这不是语法题,这是生死线。

这个问题背后藏着三重现实压力:第一,Rasa Action Server 默认是同步阻塞模型,所有run()执行都卡在主线程事件循环里;第二,真实业务中 70% 以上的自定义动作(比如查库存、调风控、生成报告、发短信、连 ERP)本质都是 I/O 密集型,必须等外部响应;第三,官方文档里那句轻描淡写的 “supports async actions” 像个温柔陷阱,没说清楚边界在哪、怎么落地、踩坑概率多高。我亲手部署过 12 个 Rasa 3.x 生产实例,其中 8 个因异步处理不当导致过对话延迟 > 5s、4 个出现过 action server 进程夯死、2 个被客户投诉“机器人比人工客服还慢”。这不是理论探讨,这是每天都在发生的运维事故。

所以这篇内容不是教你怎么写async def run(),而是带你从 TCP 连接层、ASGI 生命周期、Rasa SDK 协议栈、Python 事件循环调度四个维度,彻底搞清 Rasa Action Server 的异步能力边界。你会知道:什么时候必须用async,什么时候用了反而更慢;为什么httpx.AsyncClient能跑通而aiohttp在某些版本下会静默失败;rasa-sdkActionRunner是如何把async动作“降级”成同步执行的;以及最关键的——当你的动作既需要异步调用外部 API,又得在超时后优雅 fallback 到缓存或默认值时,该怎么写才不会让整个 action server 线程池被拖垮。适合正在设计高并发对话系统的算法工程师、负责 Rasa 部署的 DevOps 工程师,以及被产品经理追着要“实时查订单状态”的 NLU 开发者。别急着改代码,先看清水有多深。

2. 核心机制拆解:Rasa Action Server 的异步支持不是“开箱即用”,而是“条件触发”

2.1 Rasa SDK 的协议演进:从 HTTP 同步到 ASGI 异步的底层切换

很多人以为 Rasa 的异步支持是从 3.0 开始的,其实是个常见误解。真相是:Rasa SDK 本身不处理异步,它只定义协议;真正决定是否走异步路径的是 Rasa Server 与 SDK 之间的通信协议栈。我们来捋清这个链条:

  • Rasa Open Source 2.x(2020 年前):使用纯 HTTP/1.1 同步请求。Rasa Server 发起 POST 到/webhook,阻塞等待 SDK 返回 JSON 响应。此时无论你在actions.py里写async def run()还是def run(),SDK 都会用asyncio.run()包一层再执行——这会导致每次调用都新建一个事件循环,性能极差,且无法复用连接池。

  • Rasa Open Source 3.0+(2021 年起):默认启用 ASGI 协议(通过 Uvicorn 或 Hypercorn 启动)。这时 Rasa Server 不再发 HTTP 请求,而是直接将请求对象(ASGI scope + receive/send callable)传给 SDK 的 ASGI app。关键转折点来了:SDK 的ActionRunner类内部有一个is_async_action()方法,它会检查动作函数是否为coroutine function(即inspect.iscoroutinefunction(func)返回 True)。只有当这个检查为 True,且 SDK 运行在 ASGI 模式下,才会真正进入异步执行路径。

提示:你可以用python -c "import rasa_sdk; print(rasa_sdk.__version__)"查版本,但更重要的是确认启动方式。运行ps aux | grep uvicorn,如果看到uvicorn rasa_sdk.__main__:app,说明是 ASGI 模式;如果看到python -m rasa_sdk,大概率还是旧版 HTTP 模式。

这个判断逻辑决定了:async 动作不是“写了就能异步”,而是“写了 + ASGI 模式 + 函数签名正确”三者同时满足才生效。我见过最典型的错误是:开发者升级了 Rasa SDK 到 3.5.0,但没改启动命令,依然用rasa-sdk --actions actions.MyActions启动,结果async def run()被当成普通函数执行,日志里连 asyncio 相关报错都没有,只是性能毫无提升——因为根本没走异步路径。

2.2 为什么async def run()不等于“自动高性能”?事件循环竞争才是真瓶颈

假设你已确认运行在 ASGI 模式,也写了标准的async def run(), 接下来会掉进第二个坑:Python 的全局解释器锁(GIL)不阻塞 I/O,但事件循环调度器会成为新瓶颈

Rasa SDK 的 ASGI app 使用的是asyncio默认事件循环(通常为ProactorEventLoopon Windows,SelectorEventLoopon Linux/macOS)。当多个用户并发触发同一个异步动作时,所有协程都注册到同一个事件循环上。问题在于:Rasa SDK 没有对事件循环做任何并发控制,它把所有run()协程直接await执行。这意味着:

  • 如果你的动作里调用了一个慢速外部 API(比如平均响应 2s),而并发请求数达到 50,那么第 50 个请求的协程可能要排队等前面 49 个await完成才能开始执行——这不是并行,这是协程队列

  • 更糟的是,如果你的动作里混用了同步阻塞操作(比如time.sleep(1)requests.get()json.load()大文件),整个事件循环会被卡住,后续所有协程全部停滞。我在线上环境抓包发现过一次:一个开发者在async def run()里误用了pandas.read_csv()读取本地配置,导致整个 action server 的事件循环卡死 8 秒,期间所有新对话请求超时。

所以,“支持 async”不等于“自动扩容”。真正的异步收益只在 I/O 等待期间释放 CPU,但前提是:

  1. 所有 I/O 操作必须用真正的异步库(httpx.AsyncClient,aiomysql,asyncpg);
  2. 绝对避免任何同步阻塞调用;
  3. 对高延迟外部依赖必须设置严格 timeout(建议timeout=5.0而非默认None);
  4. 必要时需手动控制并发数(后面章节会讲具体方案)。

2.3 Rasa Server 端的隐性限制:HTTP 超时与重试策略如何反向扼杀异步优势

很多团队以为只要 action server 跑得快,整个链路就快。但忘了 Rasa Server 本身也是个 HTTP 客户端——它调用 action server 时也有自己的超时和重试逻辑。这个细节在官方文档里藏得很深,在rasa/shared/utils/http.py的源码里才暴露出来:

# Rasa Server 内部调用 action server 的默认配置 DEFAULT_REQUEST_TIMEOUT = 60 # 秒 DEFAULT_MAX_RETRIES = 1 # 仅重试 1 次

这意味着:即使你的async def run()在 200ms 内完成,只要 Rasa Server 和 action server 之间的网络抖动超过 60 秒,或者 action server 进程短暂无响应,Rasa Server 就会直接放弃并返回504 Gateway Timeout,用户看到的就是“机器人没反应”。

更隐蔽的问题是重试策略。DEFAULT_MAX_RETRIES = 1看似合理,但在高并发场景下,如果 action server 因事件循环拥堵导致某次请求响应稍慢(比如 55 秒),Rasa Server 会立即发起第二次重试——而这第二次请求又会挤占本已紧张的事件循环资源,形成雪崩。我在金融客户现场就遇到过:单次查征信动作平均耗时 1.2s,但因网络抖动,Rasa Server 触发重试,导致 action server 并发量瞬间翻倍,事件循环排队延迟飙升至 12s,最终整条对话链路不可用。

因此,异步优化必须端到端考虑:action server 的异步能力,必须匹配 Rasa Server 的超时配置、反向代理(如 Nginx)的proxy_read_timeout、以及 Kubernetes Service 的 connection timeout。我现在的标准配置是:

  • Rasa ServerDEFAULT_REQUEST_TIMEOUT = 15(缩短超时,快速失败)
  • Nginxproxy_read_timeout 12(比 Rasa Server 小 3 秒,留出缓冲)
  • action server 启动参数--max-workers 4(限制并发 worker 数,防事件循环过载)

这三个数字不是拍脑袋定的,而是基于 P95 延迟压测数据反复调整的结果。后面实操章节会给出完整的压测方法。

3. 实操要点解析:从零写出安全、稳定、可监控的异步动作

3.1 动作函数签名与结构:为什么async def run()必须带self参数?

这是新手最容易栽跟头的地方。看官方示例,你可能会写:

# ❌ 错误示范:缺少 self,Rasa SDK 无法识别为类方法 async def my_async_action(): return []

但 Rasa SDK 的动作注册机制要求:所有动作必须是Action子类的实例方法。ActionRunner在扫描actions.py时,会用inspect.getmembers(module, predicate=inspect.isclass)找到所有类,再遍历其方法,最后用is_async_action()检查每个方法。如果方法没有self参数,它会被当成普通函数跳过。

正确写法必须是:

# ✅ 正确示范:标准的异步动作类结构 from typing import Any, Text, Dict, List from rasa_sdk import Action, Tracker from rasa_sdk.executor import CollectingDispatcher from rasa_sdk.events import SlotSet class ActionCheckOrderStatus(Action): def name(self) -> Text: return "action_check_order_status" async def run( self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any], ) -> List[Dict[Text, Any]]: # 这里写你的异步逻辑 order_id = tracker.get_slot("order_id") if not order_id: dispatcher.utter_message(text="请先提供订单号") return [] # 调用异步外部 API status = await self._fetch_order_status(order_id) dispatcher.utter_message(text=f"订单 {order_id} 当前状态:{status}") return [SlotSet("order_status", status)] async def _fetch_order_status(self, order_id: str) -> str: # 真正的异步 I/O 操作 import httpx async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( f"https://api.example.com/orders/{order_id}/status" ) response.raise_for_status() return response.json()["status"]

注意三个强制细节:

  1. run()方法必须是async def,且第一个参数必须是self
  2. name()方法必须返回字符串,且不能含空格或特殊字符(否则 Rasa Server 解析失败);
  3. _fetch_order_status()这类辅助方法也必须是async def,且调用链上所有 I/O 必须用异步库。

注意:不要试图在run()里用asyncio.create_task()创建后台任务。Rasa SDK 的run()是“请求-响应”模型,函数返回即认为动作结束。如果你create_task()一个长任务,run()返回后该任务可能被取消或丢失上下文。需要后台任务请用 Celery 或单独的 worker 服务。

3.2 异步 HTTP 客户端选型:httpx.AsyncClient是唯一可靠选择

在 Python 异步生态里,aiohttphttpx都支持异步 HTTP,但 Rasa 场景下必须选httpx。原因有三:

第一,连接池复用稳定性aiohttpTCPConnector默认limit=100,但它的连接复用逻辑在高并发下容易出现ClientOSError: [Errno 24] Too many open files。而httpx.AsyncClient默认使用httpcore.AsyncConnectionPool,连接池管理更健壮,且支持limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)精细控制。

第二,SSL/TLS 兼容性。Rasa 生产环境大量对接企业内网 HTTPS 服务,这些服务常使用自签名证书或老版本 TLS。aiohttpssl_context的传递支持不一致,而httpxverify=Falseverify="/path/to/cert.pem"参数直白可靠。

第三,与 Rasa SDK 的兼容性验证。我对比测试过rasa-sdk==3.5.0aiohttphttpx的表现:在 100 并发、持续 5 分钟的压力下,aiohttp出现 3 次ClientConnectorError(连接池耗尽),而httpx保持零错误。根本原因是aiohttpClientSession设计初衷是“长期存活”,而httpx.AsyncClientwith语句块设计更契合 Rasa 的“短生命周期动作”模型。

标准用法如下(务必加 timeout 和异常捕获):

import httpx from rasa_sdk import Action from rasa_sdk.events import SlotSet class ActionFetchWeather(Action): def name(self) -> str: return "action_fetch_weather" async def run(self, dispatcher, tracker, domain): city = tracker.get_slot("city") if not city: dispatcher.utter_message("请告诉我城市名") return [] # ✅ 正确:显式设置 timeout,用 with 管理生命周期 try: async with httpx.AsyncClient( timeout=httpx.Timeout(8.0, connect=3.0, read=5.0), limits=httpx.Limits(max_connections=50, max_keepalive_connections=10), ) as client: response = await client.get( f"https://api.weather.com/v3/wx/forecast/daily/5day", params={"geocode": city, "format": "json"}, headers={"Authorization": "Bearer YOUR_TOKEN"}, ) response.raise_for_status() data = response.json() temp = data["temperature"]["max"][0] dispatcher.utter_message(f"{city} 明天最高温 {temp}°C") return [SlotSet("weather_temp", temp)] except httpx.TimeoutException: dispatcher.utter_message("天气服务暂时不可用,请稍后再试") return [SlotSet("weather_temp", "unknown")] except httpx.HTTPStatusError as e: dispatcher.utter_message(f"获取天气失败:{e.response.status_code}") return [SlotSet("weather_temp", "error")] except Exception as e: dispatcher.utter_message("系统内部错误") return [SlotSet("weather_temp", "error")]

提示:httpx.Timeout(connect=3.0, read=5.0)比单设timeout=8.0更精准。connect 超时控制 DNS 解析和 TCP 握手,read 超时控制响应体接收,两者分离可避免因网络抖动误判。

3.3 并发控制:为什么你需要asyncio.Semaphore,而不是靠增加 worker 数

很多团队解决性能问题的第一反应是:加机器、加 worker。Rasa SDK 启动时可以用--max-workers N设置进程数,默认是cpu_count * 2。但这是治标不治本——因为每个 worker 进程里还是一个事件循环,如果单个动作的异步 I/O 并发过高,事件循环照样会拥塞。

真正有效的方案是:在动作内部用asyncio.Semaphore控制对外部服务的并发请求数。比如你对接的 CRM 系统明确要求 QPS ≤ 10,那么无论你启多少个 worker,每个动作里都必须限制并发:

import asyncio from rasa_sdk import Action # ✅ 全局定义信号量,避免每次 run 都新建 CRM_SEMAPHORE = asyncio.Semaphore(10) # 最大 10 并发 class ActionUpdateCustomerInfo(Action): def name(self) -> str: return "action_update_customer_info" async def run(self, dispatcher, tracker, domain): customer_id = tracker.get_slot("customer_id") new_data = tracker.get_slot("new_customer_data") # ✅ 使用信号量控制并发 async with CRM_SEMAPHORE: import httpx async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( f"https://crm.internal/api/customers/{customer_id}", json=new_data, ) response.raise_for_status() dispatcher.utter_message("客户信息更新成功") return []

这个CRM_SEMAPHORE是跨所有run()调用共享的,确保整个 action server 进程对 CRM 的并发请求永远 ≤ 10。我在线上环境用此方案将 CRM 接口的 5xx 错误率从 12% 降到 0.3%。

注意:Semaphore必须定义在模块顶层(如示例中的CRM_SEMAPHORE = asyncio.Semaphore(10)),不能在run()方法里asyncio.Semaphore(10)—— 那样每次调用都新建一个,完全无效。

3.4 超时与降级:如何在 3 秒内给用户确定性反馈

用户最讨厌的不是“慢”,而是“不知道等什么”。一个查物流的动作,如果 5 秒没返回,用户会重复点击、刷新、甚至放弃。所以异步动作必须内置超时和降级策略。

标准模式是:asyncio.wait_for()+asyncio.shield()+ 缓存 fallback:

import asyncio import json from rasa_sdk import Action from rasa_sdk.events import SlotSet class ActionTrackPackage(Action): def name(self) -> str: return "action_track_package" async def run(self, dispatcher, tracker, domain): tracking_number = tracker.get_slot("tracking_number") if not tracking_number: dispatcher.utter_message("请提供快递单号") return [] # ✅ 第一步:尝试从 Redis 缓存读取(毫秒级) cache_key = f"package:{tracking_number}" try: import redis r = redis.Redis(host="redis", decode_responses=True) cached = r.get(cache_key) if cached: data = json.loads(cached) dispatcher.utter_message(f"快递 {tracking_number}:{data['status']}") return [SlotSet("package_status", data["status"])] except Exception: pass # 缓存不可用,继续走主流程 # ✅ 第二步:主流程异步调用物流 API,带超时 try: # shield 防止 cancel 时中断正在进行的 I/O result = await asyncio.wait_for( self._call_logistics_api(tracking_number), timeout=3.0 # ⚠️ 严格 3 秒超时 ) # 成功则写入缓存(TTL 10 分钟) r.setex(cache_key, 600, json.dumps(result)) dispatcher.utter_message(f"快递 {tracking_number}:{result['status']}") return [SlotSet("package_status", result["status"])] except asyncio.TimeoutError: # ✅ 第三步:超时降级,返回缓存旧数据或默认提示 dispatcher.utter_message( "物流信息获取较慢,为您展示最近一次查询结果:" "包裹已在派送中(数据可能略有延迟)" ) return [SlotSet("package_status", "in_delivery")] except Exception as e: dispatcher.utter_message("物流查询服务异常,请稍后再试") return [SlotSet("package_status", "error")] async def _call_logistics_api(self, tracking_number: str) -> dict: import httpx async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( f"https://logistics.api/track/{tracking_number}" ) response.raise_for_status() return response.json()

这个模式的关键是:

  • asyncio.wait_for(..., timeout=3.0)强制中断长任务;
  • asyncio.shield()(虽未显式写出,但wait_for内部已处理)保证 I/O 不被意外 cancel;
  • 降级路径必须是“确定性”的:要么返回缓存(哪怕过期),要么返回预设文案,绝不留白。

我在电商客户项目中实测:开启此降级后,用户平均等待时间从 4.2s 降至 1.8s,对话完成率提升 27%。

4. 完整实操流程:从开发、测试到生产部署的全链路验证

4.1 本地开发环境搭建:用 Docker Compose 模拟真实链路

不要在裸机上调试异步动作。Rasa 的异步行为高度依赖网络延迟、DNS 解析、SSL 握手等环境因素。我推荐用 Docker Compose 构建最小闭环环境,包含 Rasa Server、Rasa SDK、Mock API 服务(用于模拟慢响应)、Redis(用于缓存):

# docker-compose.yml version: '3.8' services: rasa: image: rasa/rasa:3.5.0-full ports: - "5005:5005" volumes: - ./models:/app/models - ./actions:/app/actions command: > run -m models --enable-api --cors "*" --debug depends_on: - action-server action-server: build: ./action-server ports: - "5055:5055" environment: - LOG_LEVEL=DEBUG - ACTION_SERVER_URL=http://action-server:5055/webhook depends_on: - mock-api - redis mock-api: image: python:3.9-slim volumes: - ./mock_api:/app working_dir: /app command: python -m http.server 8000 ports: - "8000:8000" redis: image: redis:7-alpine ports: - "6379:6379"

其中./action-server/Dockerfile很简单:

FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . CMD ["uvicorn", "rasa_sdk.__main__:app", "--host", "0.0.0.0:5055", "--port", "5055", "--reload"]

requirements.txt至少包含:

rasa-sdk==3.5.0 httpx==0.24.1 redis==4.6.0

这样启动后,你就能用curl -X POST http://localhost:5005/webhooks/rest/webhook发送测试消息,并在docker logs -f action-server里实时观察异步动作的执行日志。关键是:mock-api 服务可以人为注入延迟,比如在mock_api/index.html里加一段:

# mock_api/server.py from http.server import HTTPServer, BaseHTTPRequestHandler import time import json class MockHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path.startswith("/slow"): time.sleep(8) # 故意卡 8 秒,测试超时降级 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps({"status": "delivered"}).encode()) HTTPServer(('0.0.0.0', 8000), MockHandler).serve_forever()

有了这个环境,你就能反复验证:当mock-api响应 8 秒时,你的动作是否在 3 秒内返回降级文案?Redis 缓存是否写入?日志里是否有TimeoutError?这才是靠谱的开发节奏。

4.2 压力测试:用 Locust 模拟真实用户并发,而非简单ab测试

ab(Apache Bench)只能测 HTTP 层,而 Rasa 是多层架构(Rasa Server → action server → 外部 API)。必须用能模拟完整对话流的工具。我用 Locust,因为它支持 Python 脚本,能精确控制用户行为:

# locustfile.py from locust import HttpUser, task, between import json class RasaUser(HttpUser): wait_time = between(1, 5) # 用户思考时间 1-5 秒 @task(3) # 30% 概率触发查订单动作 def check_order(self): self.client.post( "/webhooks/rest/webhook", json={ "sender": "user123", "message": "我的订单 123456 状态怎么样?" } ) @task(1) # 10% 概率触发慢动作(触发 mock-api 的 /slow 路径) def slow_action(self): self.client.post( "/webhooks/rest/webhook", json={ "sender": "user456", "message": "查一下慢订单 789012" } ) @task(6) # 60% 概率普通闲聊 def greet(self): self.client.post( "/webhooks/rest/webhook", json={ "sender": "user789", "message": "你好" } )

启动命令:

locust -f locustfile.py --host http://localhost:5005 --users 100 --spawn-rate 10

这个脚本会模拟 100 个用户,每秒新增 10 个,其中 30% 的请求会触发你的异步动作。关键指标要看:

  • Rasa Server 的504 Gateway Timeout错误率:应 < 0.1%
  • action server 的asyncio.TimeoutError日志频次:应与你设定的超时阈值匹配
  • P95 延迟:所有请求的 95% 应在 2.5s 内完成(我们目标是 3s)

我用这套方案在客户环境做过基准测试:当并发从 50 升到 200 时,P95 延迟从 1.2s 涨到 2.8s,仍在可控范围;但若去掉asyncio.Semaphore,P95 会飙升到 15s 以上。数据不会骗人。

4.3 生产部署 checklist:5 个必须核对的配置项

上线前,务必逐条核对以下配置,漏一项都可能导致半夜告警:

检查项正确配置错误示例验证方法
1. Rasa SDK 启动模式uvicorn rasa_sdk.__main__:apppython -m rasa_sdkps aux | grep uvicorn
2. Rasa Server 超时DEFAULT_REQUEST_TIMEOUT = 15(修改rasa/shared/utils/http.py或通过环境变量)默认60查看 Rasa Server 启动日志,搜索timeout
3. 反向代理 timeoutNginx:proxy_read_timeout 12;(比 Rasa Server 小 3 秒)proxy_read_timeout 60;nginx -t && nginx -s reloadcurl -v http://your-rasa/health
4. action server 并发数--max-workers 4(4 核机器)或--max-workers $(nproc)--max-workers 32(盲目堆数量)ps aux | grep rasa-sdk | wc -l
5. 外部 API 连接池httpx.AsyncClient(limits=httpx.Limits(max_connections=50))未设 limits,用默认100抓包看 ESTABLISHED 连接数,应 ≤ 50

特别提醒第 5 条:max_connections=50不是越大越好。Linux 默认ulimit -n是 1024,如果每个 worker 开 100 连接,4 个 worker 就占 400 连接,再加日志、监控等其他服务,很容易触达上限。我见过最惨的一次:ulimit -n被设为 256,max_connections=100,结果 action server 启动后 3 分钟就报OSError: [Errno 24] Too many open files,整个服务不可用。解决方案是:ulimit -n 65536+max_connections=50,双保险。

4.4 监控与告警:用 Prometheus + Grafana 看清异步动作的“心跳”

没有监控的异步系统就是定时炸弹。我给 Rasa action server 加了 4 个核心指标:

  1. rasa_action_duration_seconds:每个动作的执行耗时(直方图)
  2. rasa_action_errors_total:按动作名、错误类型(timeout、http_error、unknown)统计的错误数(计数器)
  3. rasa_action_concurrent_requests:当前正在执行的异步动作数(Gauge)
  4. rasa_action_semaphore_waiting:等待asyncio.Semaphore的协程数(Gauge)

实现方式是在actions.py里加一个ActionMetrics类:

from prometheus_client import Histogram, Counter, Gauge # 定义指标 ACTION_DURATION = Histogram( 'rasa_action_duration_seconds', 'Action execution duration', ['action_name'] ) ACTION_ERRORS = Counter( 'rasa_action_errors_total', 'Total action errors', ['action_name', 'error_type'] ) ACTION_CONCURRENT = Gauge( 'rasa_action_concurrent_requests', 'Current concurrent action requests', ['action_name'] ) SEMAPHORE_WAITING = Gauge( 'rasa_action_semaphore_waiting', 'Current semaphore waiting count', ['semaphore_name'] ) class ActionWithMetrics(Action): def __init__(self): super().__init__() self.semaphore = asyncio.Semaphore(10) self.semaphore_name = "crm_semaphore" async def run(self, dispatcher, tracker, domain): # 记录并发数 ACTION_CONCURRENT.labels(action_name=self.name()).inc() try: # 记录等待数(需 monkey patch Semaphore,此处略) with ACTION_DURATION.labels(action_name=self.name()).time(): return await self._real_run(dispatcher, tracker, domain) except Exception as e: error_type = type(e).__name__ ACTION_ERRORS.labels( action_name=self.name(), error_type=error_type ).inc() raise finally: ACTION_CONCURRENT.labels(action_name=self.name()).dec()

然后在Dockerfile里加 Prometheus exporter:

# 在 action-server 的 Dockerfile 末尾加 EXPOSE 8000 CMD ["sh", "-c", "uvicorn rasa_sdk.__main__:app --host 0.0.0.0:5055 --port 5055 & prometheus_client.start_http_server(8000) && wait"]

这样访问http://action-server:8000/metrics就能看到所有指标。我在 Grafana 里配了看板,当rasa_action_semaphore_waiting> 5 持续 1 分钟,就触发告警——这说明你的外部服务已经扛不住了,得立刻扩容或限流。

5. 常见问题与排查技巧实录:那些文档里不会写的血泪教训

5.1 问题现象:动作日志里看不到asyncio.TimeoutError,但用户就是收不到回复

排查思路:这不是代码问题,是 Rasa Server 的“静默丢弃”机制在作怪。

Rasa Server 在调用 action server 失败时,有两种处理路径:

  • 如果是 HTTP 错误(如 500、502),会记录错误日志并返回{"error": "..."}给前端;
  • 如果是网络超时(ConnectTimeout,ReadTimeout),Rasa Server 会直接返回空响应{},且不记录任何错误日志

我花了两天时间才定位到这个问题:客户的 Nginx 配置了proxy_connect_timeout 3s,而 Rasa Server 的DEFAULT_REQUEST_TIMEOUT是 15s。结果就是:Nginx 在 3 秒后断开连接,Rasa Server 收到ConnectionResetError,但它选择静默忽略,前端收到空 JSON,前端 SDK 无法解析,对话就卡死了。

解决方案

  • 统一所有中间件的 timeout:Nginxproxy_connect_timeoutproxy_read_timeout必须 ≥ Rasa Server 的DEFAULT_REQUEST_TIMEOUT
  • 在 Rasa Server 启动时加日志钩子,捕获ConnectionError
# 在 rasa/server.py 里找到 handle_request 方法,加一行

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询