背景痛点:快递客服的“三座大山”
快递行业日均单量早已破亿,客服中心却常年处于“三高一低”的困境:
- 咨询高峰:大促凌晨 0-3 点仍保持 3k+ 并发,人工坐席无法覆盖
- 重复问题:物流状态、改址、催件三类 query 占总量 68%,却仍需人工逐条回复
- 情绪投诉:延误、丢件等场景易升级为投诉,需快速识别并流转至工单系统
传统 IVR 或关键词机器人只能解决 30% 左右,剩余 70% 仍需人工兜底,导致人力成本随单量线性上涨。AI 辅助开发的目标,是把 70% 里的“可标准化”部分再砍掉一半,同时保证体验不降级。
技术选型:规则、传统 NLP 与深度学习的三角权衡
| 维度 | 规则引擎 | 传统 NLP(CRF/SVM) | Rasa+Transformer |
|---|---|---|---|
| 意图扩展成本 | 高,每新增意图需写正则 | 中,需重新标注数据 | 低,finetune 30 样本即可 |
| 实体识别精度 | 低于 80% | 85% 左右 | 92%+ |
| 上下文管理 | 无 | 需手工维护槽位 | 内置 TEDPolicy |
| 运维复杂度 | 低 | 中 | 高(需 GPU) |
| 离线部署 | 任意 | 可 | 可(ONNX 量化后 200 MB) |
综合“精度-迭代速度-运维成本”三角,最终采用Rasa 3.x + 轻量 Transformer(Chinese-RoBERTa-wwm-ext)作为核心架构:Rasa 负责对话管理,Transformer 负责意图/实体,二者通过 HTTP 服务解耦,方便独立灰度。
核心实现
1. 意图识别与实体提取模块
以下代码为独立微服务,供 Rasa NLU pipeline 通过custom_component调用。已跑通 100 W 单卡 T4 推理 800 QPS,P99 延迟 120 ms。
# intent_entity_service.py # -*- coding: utf-8 -*- """ 快递场景意图识别服务 POST /parse 输入: {"text": "我的快递到哪儿了"} 返回: {"intent": "query_logistics", "entities": [{"value":"SF123456", "entity":"bill_no"}]} """ import json import torch from flask import Flask, request, jsonify from transformers import AutoTokenizer, AutoModelForSequenceClassification from seqeval.metrics.sequence_labeling import get_entities app = Flask(__name__) MODEL_DIR = "/models/express_roberta_intent_entity" tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR) intent_model = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR) token_model = torch.load(f"{MODEL_DIR}/token_classifier.pt") # 实体头 intent_model.eval() token_model.eval() device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") intent_model.to(device) token_model.to(device) ID2LABEL = {0: "query_logistics", 1: "change_address", 2: "complaint", 3: "greet"} ENTITY_LABELS = ["O", "B-BILL_NO", "I-BILL_NO", "B-PHONE", "I-PHONE"] @app.route("/parse", methods=["POST"]) def parse(): text = request.json["text"] tokens = tokenizer(text, return_tensors="pt", truncation=True, max_length=128) input_ids = tokens["input_ids_ids"].to(device) attention_mask = tokens["attention_mask"].to(device) # 意图 with torch.no_grad(): logits = intent_model(input_ids, attention_mask).logits intent_id = int(logits.argmax(-1)[0]) intent = ID2LABEL[intent_id] # 实体 with torch.no_grad(): logits = token_model(input_ids, attention_mask) preds = torch.argmax(logits, dim=-1) preds = [ENTITY_LABELS[i] for i in preds[0].cpu().numpy()] entities = [] for ent in get_entities(preds): label, start, end = ent value = tokenizer.decode(input_ids[0][start: end+1]) entities.append({"entity": label.lower(), "value": value}) return jsonify({"intent": intent, "entities": entities}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, threaded=False) # 单线程,后续用 gunicorn 多 worker2. 上下文保持:Rasa Slot 与 TEDPolicy 配置片段
# config.yml language: zh pipeline: - name: custom_components.ExpressNLU # 上述服务封装 - name: RegexEntityExtractor # 兜底正则 policies: - name: TEDPolicy max_history: 5 constrain_similarities: true - name: RulePolicy core_fallback_threshold: 0.3 core_fallback_action_name: action_default_fallback3. 工单自动生成状态机
当意图为complaint且情绪置信度>0.6 时,触发工单状态机。采用Python 3.11+enum+asyncio实现,保证单线程内状态安全。
# ticket_state_machine.py import asyncio from enum import Enum, auto class State(Enum): INIT = auto() COLLECT_BILL = auto() COLLECT_EVIDENCE = auto CONFIRM_PRIORITY = auto() DONE = auto() class TicketFlow: def __init__(self, session_id): self.session_id = session_id self.state = State.INIT self.data = {} async def trigger(self, intent, entities, text): if self.state == State.INIT and intent == "complaint": self.state = State.COLLECT_BILL return "请提供快递单号,方便我们核查" if self.state == State.COLLECT_BILL: bill = next((e["value"] for e in entities if e["entity"]=="bill_no"), None) if bill: self.data["bill_no"] = bill self.state = State.COLLECT_EVIDENCE return "请上传破损或延迟凭证(图片/文字)" return "未识别到单号,请重新输入" if self.state == State.COLLECT_EVIDENCE: self.data["evidence"] = text self.state = State.CONFIRM_PRIORITY return "已收到凭证,正在评估优先级,请稍候" if self.state == State.CONFIRM_PRIORITY: # 调用内部 SLA 接口 priority = await self._calc_priority() await self._create_ticket(priority) self.state = State.DONE return "工单已生成,预计 2 小时内专员联系您" async def _calc_priority(self): await asyncio.sleep(0) # 模拟 IO return "high" if "破损" in self.data.get("evidence", "") else "medium" async def _create_ticket(self, priority): # 调用工单中心 REST pass状态机流程图(文字版)
[INIT] ──complaint──> [COLLECT_BILL] ──bill_no──> [COLLECT_EVIDENCE] ──evidence──> [CONFIRM_PRIORITY] ──create──> [DONE]生产考量
1. 对话服务的幂等性设计
- 使用 Redis 记录
session_id+message_id去重表,TTL 300 s - Rasa 自定义
action_default_fallback时,先查重放键,存在即直接返回空包,避免重复工单
2. 敏感词过滤异步化
敏感词过滤模型(BERT+CRF)延迟 60 ms,若同步放在对话链路,P99 增加 40%。采用Sidecar模式:
- 对话线程把文本写入 Kafka
audit_topic - 敏感词服务消费后,若命中,通过
conversation_id调用 Rasa HTTP API 推送action_revoke收回不当回复
3. 冷启动降级策略
- 意图置信度 < 0.5 且无规则命中时,触发“人工客服排队”回复
- 同时把该句及后续 3 轮日志写入“待标注池”,凌晨低峰期自动微调模型,实现24 h 内闭环
避坑指南
会话未隔离导致并发串台
现象:用户 A 输入单号,用户 B 收到“您的包裹已签收”。
根因:多 worker 共用内存 Slot 字典。
解决:Rasa 3.x 默认使用 RedisTrackerStore,务必关闭InMemoryTrackerStore,并为session_id加 UUID4 前缀。GPU 推理未做 batch 合并,CPU fallback 时雪崩
现象:晚高峰 GPU 打满,fallback 到 CPU,延迟 3 s,队列堆积。
解决:在intent_entity_service层增加动态 batch=8的队列,超时 20 ms 即部分 batch 推理;同时用 ONNXRuntime+Quantization 把 GPU 模型压到 1/3 大小,CPU 推理 250 ms 以内。未考虑多租户灰度,版本回退困难
现象:新模型误召回“投诉”意图,导致工单量暴增。
解决:在 HTTP Header 带上X-Canary=5%,NGINX 根据用户尾号分流;回退时改权重即可秒级生效,无需重新打包镜像。
延伸思考
引入知识图谱,提升多轮对话能力
把“网点-配送员-包裹状态”三元组写入 NebulaGraph,当用户问“为什么卡在徐汇网点”时,系统可返回“徐汇网点今日件量 1.2 万,配送员 12 人,预计 18:00 前完成派送”,实现可解释性回复。强化学习优化回复策略
当前采用监督学习,无法感知用户满意度。后续可把“是否转人工”“是否差评”作为 reward,使用Offline RL(Decision Transformer)训练对话策略,目标函数 = 最大化解决率 - 0.3×对话轮数,预计再降 15% 人力介入。
把代码跑通只是第一步,真正的挑战是让模型在 1 亿次对话后依旧稳定。愿这份避坑笔记能帮你少熬几个通宵,把精力留给更酷的创新。