CoAP调试记录
2026/6/5 11:06:16
**流式传输(Streaming)**是一种数据传输方式,数据以连续流的形式逐步发送和接收,而不是一次性传输全部数据。
流式传输需要发送方和接收方都实现才能正常工作:
| 组合 | 结果 |
|---|---|
| ✅ 后端流式 + 前端流式 | 正常工作,实时显示进度 |
| ❌ 后端流式 + 前端非流式 | 无法正常工作或体验差 |
| ⚠️ 后端非流式 + 前端流式 | 可以工作,但无实时效果 |
| ✅ 后端非流式 + 前端非流式 | 传统方式,等待全部完成 |
# Flask 流式响应defgenerate():yieldf"data:{json.dumps({'type':'info','message':'开始处理...'},ensure_ascii=False)}\n\n"# 处理逻辑...yieldf"data:{json.dumps({'type':'token','token':'文本片段'},ensure_ascii=False)}\n\n"returnResponse(stream_with_context(generate()),mimetype='text/event-stream',headers={'Cache-Control':'no-cache','Connection':'keep-alive'})// Fetch API 流式读取constreader=response.body.getReader();constdecoder=newTextDecoder();letbuffer='';while(true){const{done,value}=awaitreader.read();if(done)break;buffer+=decoder.decode(value,{stream:true});constlines=buffer.split('\n');buffer=lines.pop();for(constlineoflines){if(line.startsWith('data: ')){constmsg=JSON.parse(line.substring(6));// 处理消息...}}}流式传输的实现涉及三个不同的层面,每个层面都有不同的格式要求:
Server-Sent Events (SSE)- Web 标准协议
# 后端:设置 SSE 响应头mimetype='text/event-stream'headers={'Cache-Control':'no-cache','Connection':'keep-alive',...}SSE 格式规范:
data:开头\n\n结尾(两个换行符)自定义 JSON 消息格式
# 后端发送的消息格式{"type":"token",# 消息类型标识"token":"文本片段"# 实际的文本内容}消息类型:
info: 状态信息error: 错误信息results: 搜索结果token: 答案片段(流式生成)answer: 完整答案final: 最终数据不同大模型库有不同的流式格式:
| 模型库 | 流式格式 |
|---|---|
| Ollama | {delta: "文本"}或{text: "文本"} |
| LlamaIndex | {delta: "文本"}或{response: Object} |
| OpenAI | {delta: "文本"} |
这是流式传输的核心技术:如何统一不同格式。
大模型库(Ollama/LlamaIndex) ↓ {delta: "文本"} / {text: "文本"} / {response: Object} ↓ ┌─────────────────────────────────────────┐ │ 🔧 适配层(Adapter Layer) │ │ rag_generate_with_llamaindex_stream() │ │ │ │ 检查格式 → 提取文本 → 统一为字符串 │ └─────────────────────────────────────────┘ ↓ "文本片段" (字符串) ↓ ┌─────────────────────────────────────────┐ │ 📦 包装层(Wrapper Layer) │ │ search_alarm() 路由函数 │ │ │ │ 字符串 → JSON对象 → JSON字符串 → SSE │ └─────────────────────────────────────────┘ ↓ data: {"type":"token","token":"文本片段"}\n\n ↓ 前端接收并显示位置:rag_generate_with_llamaindex_stream()函数
作用:将不同大模型库的格式统一为字符串
# 调用 Ollama 的流式 APIresponse_stream=Settings.llm.stream_complete(prompt)# 适配层 - 处理 Ollama 的不同返回格式fortokeninresponse_stream:# 情况1:Ollama 返回 {delta: "文本片段"}ifhasattr(token,'delta')andtoken.delta:yieldtoken.delta# ← 提取 delta 属性# 情况2:Ollama 返回 {text: "文本片段"}elifhasattr(token,'text'):yieldtoken.text# ← 提取 text 属性# 情况3:Ollama 返回其他格式(字符串或对象)else:token_str=str(token)# ← 转换为字符串iftoken_str:yieldtoken_str适配逻辑:
delta属性 → 提取文本text属性 → 提取文本统一输出:所有格式最终都 yield 字符串
# 调用 LlamaIndex ChatEngine 的流式 APIresponse_stream=chat_engine.stream_chat(query)# 适配层 - 处理 LlamaIndex 的不同返回格式forresponse_chunkinresponse_stream:# 情况1:LlamaIndex 返回 {delta: "文本片段"}ifhasattr(response_chunk,'delta')andresponse_chunk.delta:yieldresponse_chunk.delta# 情况2:LlamaIndex 返回 {response: ResponseObject}elifhasattr(response_chunk,'response'):response_text=str(response_chunk.response)ifresponse_text:yieldresponse_text# 情况3:直接是字符串或其他格式else:chunk_str=str(response_chunk)ifchunk_str:yieldchunk_str# 调用 QueryEngine 的流式 APIstreaming_response=query_engine.query(enhanced_query)# 适配层 - 处理 QueryEngine 的不同返回格式ifhasattr(streaming_response,'response_gen'):# 情况1:返回 StreamingResponse 对象,有 response_gen 生成器fortext_chunkinstreaming_response.response_gen:iftext_chunk:yieldtext_chunkelifhasattr(streaming_response,'__iter__')andnotisinstance(streaming_response,str):# 情况2:直接是生成器forchunkinstreaming_response:ifhasattr(chunk,'delta')andchunk.delta:yieldchunk.deltaelse:chunk_str=str(chunk)ifchunk_str:yieldchunk_strelse:# 情况3:非流式,返回完整响应response_text=str(streaming_response)yieldresponse_text位置:search_alarm()路由函数
作用:将适配层输出的字符串包装成自定义 JSON 格式,并封装为 SSE 格式
# 调用适配层,获取字符串流answer_stream=rag_generate_with_llamaindex_stream(query,results,chat_history_list=current_history)# 包装层 - 将字符串包装成 JSON + SSE 格式fortokeninanswer_stream:# ← token 是字符串(来自适配层)iftokenisNone:continue# 确保是字符串类型token_str=str(token)ifnotisinstance(token,str)elsetokeniftoken_str:# 包装成自定义 JSON 格式json_data={'type':'token',# ← 消息类型'token':token_str# ← 文本片段}# 转换为 JSON 字符串json_str=json.dumps(json_data,ensure_ascii=False)# 包装成 SSE 格式:data: {...}\n\nyieldf"data:{json_str}\n\n"{"type":"token",// 消息类型标识"token":"文本片段"// 实际的文本内容}data: {"type":"token","token":"文本片段"}\n\n这就是适配器模式 + 包装器模式的组合应用。
后端根据流程阶段和状态决定发送哪种类型的消息:
| 消息类型 | 发送时机 | 发送条件 | 数据内容 |
|---|---|---|---|
| info | 流程关键节点 | 搜索开始/完成、RAG开始 | message字符串 |
| error | 出现错误 | 验证失败、配置错误、执行异常 | message错误信息 |
| results | 搜索完成 | 搜索完成后(无论是否有结果) | data.results结果列表 |
| token | RAG流式生成 | 每个文本片段生成时 | token文本片段 |
| answer | RAG生成完成 | 生成成功且有答案 | answer完整答案 |
| final | 流程结束 | 所有情况(成功/失败) | data包含所有最终数据 |
info- 状态信息发送时机:流程中的关键节点
# 搜索开始时yieldf"data:{json.dumps({'type':'info','message':'开始搜索知识库...'},ensure_ascii=False)}\n\n"# 找到精确匹配结果时yieldf"data:{json.dumps({'type':'info','message':f'找到{len(results)}个精确匹配结果'},ensure_ascii=False)}\n\n"# 开始向量搜索时yieldf"data:{json.dumps({'type':'info','message':'正在进行向量搜索...'},ensure_ascii=False)}\n\n"# 搜索完成时yieldf"data:{json.dumps({'type':'info','message':f'找到{len(results)}个相关结果'},ensure_ascii=False)}\n\n"# 开始生成RAG回答时yieldf"data:{json.dumps({'type':'info','message':'开始生成智能回答...'},ensure_ascii=False)}\n\n"分类逻辑:
error- 错误信息发送时机:出现错误时
# 查询为空ifnotquery:yieldf"data:{json.dumps({'type':'error','message':'查询内容不能为空'},ensure_ascii=False)}\n\n"# LLM未初始化ifSettings.llmisNone:error_msg="LlamaIndex LLM 未初始化,无法生成回答。请检查 Ollama 服务是否运行。"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"# 流式生成失败ifnotfull_answer:error_msg="流式生成失败,未收到有效回答"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"# 异常捕获exceptExceptionase:error_msg=f"流式生成过程出错:{str(e)}"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"分类逻辑:
results- 搜索结果发送时机:搜索完成后
# 搜索完成后发送结果yieldf"data:{json.dumps({'type':'results','data':{'results':results,# 搜索结果列表(包含 images 字段)'count':len(results),# 结果数量'search_mode':search_mode# 搜索模式}},ensure_ascii=False)}\n\n"分类逻辑:
images字段包含图片路径数组token- 答案片段(流式生成)发送时机:RAG流式生成过程中
# 流式生成答案片段fortokeninanswer_stream:iftokenisNone:continuetoken_str=str(token)ifnotisinstance(token,str)elsetokeniftoken_str:full_answer+=token_str# 每个文本片段都发送一次 token 消息yieldf"data:{json.dumps({'type':'token','token':token_str},ensure_ascii=False)}\n\n"分类逻辑:
answer- 完整答案发送时机:RAG生成完成后
# 生成完成后发送完整答案iffull_answer:yieldf"data:{json.dumps({'type':'answer','answer':full_answer,# 完整答案'references':results[:5]ifresultselse[]# 参考来源(包含 images)},ensure_ascii=False)}\n\n"分类逻辑:
references中每个对象的images字段final- 最终数据发送时机:整个流程结束时
# RAG成功时(有答案)yieldf"data:{json.dumps({'type':'final','data':{'success':True,'answer':full_answer,'results':results,# 所有结果(包含 images)'references':results[:5],# 参考来源(包含 images)'session_id':session_id}},ensure_ascii=False)}\n\n"# 不使用RAG时(只有搜索结果)yieldf"data:{json.dumps({'type':'final','data':{'success':True,'results':results,# 所有结果(包含 images)'session_id':session_id}},ensure_ascii=False)}\n\n"分类逻辑:
results和references中都包含images字段1. info: "开始搜索知识库..." ↓ 2. info: "找到 5 个相关结果" 或 "正在进行向量搜索..." ↓ 3. results: { results: [ {id: 1, images: ["img1.jpg"], ...}, {id: 2, images: ["img2.jpg"], ...}, ... ], count: 5 } ↓ 4. final: { success: true, results: [{images: [...]}, ...], session_id: "..." }图片返回时机:
results消息:所有搜索结果的图片final消息:所有搜索结果的图片(再次返回)1. info: "开始搜索知识库..." ↓ 2. info: "找到 5 个相关结果" ↓ 3. results: { results: [{images: [...]}, ...], count: 5 } ↓ 4. info: "开始生成智能回答..." ↓ 5. token: "根据" ↓ 6. token: "知识库" ↓ 7. token: "中的信息" ... (多个token,逐字显示) ↓ 8. answer: { answer: "完整答案...", references: [{images: [...]}, ...] // 前5条结果的图片 } ↓ 9. final: { success: true, answer: "完整答案...", results: [{images: [...]}, ...], // 所有结果的图片 references: [{images: [...]}, ...], // 参考来源的图片 session_id: "..." }图片返回时机:
results消息:所有搜索结果的图片(第一次)answer消息:前5条参考结果的图片(第二次)final消息:所有结果和参考来源的图片(第三次和第四次)1. info: "开始搜索知识库..." ↓ 2. results: {results: [{images: [...]}, ...], count: 5} ↓ 3. info: "开始生成智能回答..." ↓ 4. error: "LlamaIndex LLM 未初始化..." ↓ 5. final: { success: true, results: [{images: [...]}, ...], rag_error: "...", session_id: "..." }图片数据不是单独返回,而是作为搜索结果的一部分,每个结果对象都有一个images字段(数组)。
| 返回时机 | 消息类型 | 包含字段 | 说明 |
|---|---|---|---|
| 搜索完成后 | results | data.results[].images | 所有搜索结果的图片 |
| RAG生成完成后 | answer | references[].images | 前5条参考结果的图片 |
| 流程结束时 | final | data.results[].imagesdata.references[].images | 所有结果和参考来源的图片 |
{"id":1,"alarm_code":"E001","alarm_message":"温度传感器故障","solution":"检查传感器连接...","images":["uploads/images/sensor1.jpg","uploads/images/sensor2.jpg"],...}三个层面分离
适配器模式
消息类型设计
统一定义消息类型
# 使用常量定义classMessageType:INFO='info'ERROR='error'RESULTS='results'TOKEN='token'ANSWER='answer'FINAL='final'错误处理
error消息final消息(包含错误信息)性能优化
用户体验
流式传输是一个涉及多个层面的复杂技术,需要:
通过这种设计,我们实现了:
希望这篇文章能帮助你理解流式传输的完整实现过程!