JMeter压测SSE接口避坑指南:5大常见错误与解决方案
2026/6/30 18:46:12 网站建设 项目流程

1. 项目概述:当JMeter遇上SSE,一场“持久”的较量

如果你正在用JMeter测试一个实时数据推送的接口,比如股票行情、聊天消息或者后台任务进度,那么你很可能已经接触过SSE(Server-Sent Events,服务器发送事件)。这玩意儿用起来是真方便,一个HTTP长连接,服务器就能源源不断地把数据“流”过来。但当你兴冲冲地打开JMeter,准备给它来个压力测试时,十有八九会掉进坑里。你会发现,脚本要么收不到完整数据,要么内存暴涨把机器搞崩,要么结果树里一片混乱,根本没法断言。这感觉就像你开着一辆F1赛车去跑越野赛道,引擎轰鸣,但寸步难行。

“避坑指南:JMeter处理SSE响应时常见的5个错误及解决方案”这个标题,精准地戳中了性能测试工程师和开发者在做SSE接口压测时的痛点。它不是一个泛泛而谈的教程,而是直指那些让你调试到怀疑人生的具体问题。SSE协议本身并不复杂,但JMeter作为一个基于HTTP/1.1短连接模型设计的工具,其默认的请求-响应生命周期与SSE的长连接、流式响应特性存在根本性的冲突。这篇文章的目的,就是帮你捋清这些冲突点,把常见的五个大坑——从连接建立、数据接收、资源管理到结果分析——一个个填平,让你手里的JMeter能从“越野模式”切换回“赛道模式”,真正驾驭SSE这种流式接口的测试。

2. 核心原理拆解:为什么JMeter“看不懂”SSE?

在开始填坑之前,我们必须先搞清楚JMeter和SSE之间到底哪里“不对付”。不理解这个,所有的解决方案都只是死记硬背的步骤,换个场景可能又失灵了。

2.1 SSE协议的核心机制

SSE本质上是一个基于HTTP的长连接。客户端发起一个普通的GET请求,但服务器在响应时,会做两件关键的事:

  1. 设置响应头Content-Type: text/event-streamCache-Control: no-cache。这是告诉浏览器或客户端:“嘿,这不是一个一次性文档,而是一个事件流,别缓存,一直听着。”
  2. 保持连接并流式发送数据:服务器不会立即关闭连接,而是会以特定的格式(data: {消息内容}\n\n)持续地向客户端发送数据块,每个数据块以两个换行符\n\n分隔。连接会一直保持,直到服务器主动关闭或客户端断开。

2.2 JMeter默认行为与SSE的冲突

JMeter的线程模型是为模拟大量独立、短暂的HTTP请求而优化的。一个HTTP Request采样器(Sampler)的生命周期是:建立连接 -> 发送请求 -> 接收响应(直到服务器关闭输出流)-> 断开连接 -> 记录结果。这里就产生了几个根本矛盾:

  • 冲突一:响应结束的判定。JMeter认为服务器关闭输出流(TCP连接)才代表“响应结束”。但SSE连接是常开的,服务器永远不会主动关闭(除非出错或任务完成)。这导致JMeter的采样器会一直“挂起”,等待一个永远不会到来的结束信号,最终超时。
  • 冲突二:响应数据的处理。JMeter默认将整个响应体作为一个完整的字符串,在请求结束后一次性交给后置处理器(如正则表达式提取器)或断言。但对于SSE,数据是片段化、持续到达的。JMeter的默认机制无法实时处理这些片段。
  • 冲突三:资源与线程占用。一个挂起的采样器会独占一个JMeter线程。如果你模拟100个并发用户,就意味着100个线程被无限期挂起,无法进行后续迭代,这完全违背了压测的初衷。

简单来说,JMeter期待的是“一问一答,答完就走”的对话,而SSE提供的是“一问多答,答个不停”的广播。不解决这个根本矛盾,任何测试都无法进行。

注意:市面上有些文章会教你用“循环控制器+短超时”来模拟,这本质上是轮询(Polling),不是真正的SSE长连接测试,无法模拟真实的长连接资源占用和服务器推送行为,测试结果会失真。

3. 错误一:采样器无限挂起与超时

这是你遇到的第一个,也是最明显的错误。脚本一运行,JMeter就卡住了,视图里一堆采样器一直显示绿色(运行中),直到你设置的超时时间(比如60秒)到了,它们才批量失败,报告Response timeout

3.1 错误现象与根因分析

  • 现象:在“查看结果树”中,采样器状态长时间为“正在运行”,无响应数据。最终因超时而失败。
  • 根因:如上所述,JMeter在等待响应结束,但SSE连接永不结束。HTTP Request采样器的默认行为不适应流式响应。

3.2 解决方案:使用SSE Sampler插件

最正统、最有效的解决方案是使用专为SSE设计的第三方插件。SSE Sampler插件(通常指jmeter-sseWebSocket/SSE Sampler)改变了采样器的行为逻辑。

实操步骤:

  1. 安装插件

    • 访问 JMeter 插件管理器网站(如https://jmeter-plugins.org/),搜索 “SSE” 或 “WebSocket”。
    • 找到合适的SSE采样器插件(例如WebSocket Samplers by Peter Doornbosch这个插件包通常包含SSE支持)。下载plugins-manager.jar并放入JMeter的lib/ext目录,重启JMeter后通过插件管理器安装。
    • 或者,直接下载插件的JAR文件,放入lib/ext目录后重启JMeter。
  2. 配置SSE采样器

    • 在测试计划中,右键添加 -> 取样器 -> 你应该能看到新增的SSE Sampler或类似名称的采样器。
    • 关键配置项
      • Server URL:你的SSE接口地址,例如http://localhost:8080/events
      • Read Timeout:读取超时。这不再是等待连接关闭的超时,而是等待下一条消息的超时。如果服务器超过这个时间没有发送新消息,采样器会认为本次“读取”结束,但连接可能仍保持。可以设置得较长(如30000毫秒)。
      • Connection Timeout:连接建立超时。
      • Implementation:选择HTTP
      • Streaming Connection:确保勾选(通常是默认的),这告诉采样器这是一个流式连接。
  3. 处理持续响应

    • SSE采样器会持续读取消息,直到达到Read Timeout或通过后置处理器主动停止。
    • 你可以在采样器下添加后置处理器(如BeanShell或JSR223)来实时处理每条收到的消息,并进行计数、断言或存储。

避坑心得

  • 插件兼容性:注意JMeter版本与插件版本的兼容性。建议在测试环境先验证插件的基本功能。
  • 超时理解:区分Read TimeoutResponse Timeout。前者是消息间等待,后者是整个请求等待。使用SSE采样器后,你主要关注Read Timeout
  • 资源清理:在测试计划最后,确保添加一个测试活动->SSE End采样器(如果插件提供)或在Teardown线程组中强制断开连接,防止连接泄漏。

4. 错误二:响应数据不完整或截断

即使用上了SSE采样器,或者你通过一些“黑魔法”让普通HTTP采样器收到了数据,你可能会发现“查看结果树”里显示的响应数据只有第一条,或者只有一部分,后面的消息不见了。

4.1 错误现象与根因分析

  • 现象:结果树中采样器的响应数据只显示了SSE流中的第一条或前几条消息,后续消息丢失。
  • 根因
    1. 普通HTTP采样器:如前所述,它只记录“响应结束时”缓冲区里的数据。对于SSE,它可能只捕获了连接建立后服务器立即发送的第一条欢迎消息或初始数据。
    2. SSE采样器配置不当:即使使用SSE采样器,如果其内部的消息缓冲区大小有限,或者在采样器“结束”前没有正确读取并存储所有消息,也会导致数据丢失。
    3. JMeter的默认数据存储机制:JMeter默认只保存采样器的“响应数据”,对于持续到达的数据流,它没有内置的、持续的追加存储逻辑。

4.2 解决方案:实时监听与存储

我们需要改变思路:不从采样器的“结果”里拿完整数据,而是在数据到达的瞬间就把它捕获并存下来

实操步骤(以SSE采样器为例):

  1. 添加JSR223监听器或后置处理器

    • 在SSE采样器下,添加一个 -> 监听器 ->JSR223 Listener。选择语言(推荐Groovy,性能好)。
    • 这个监听器会在每次收到消息时被触发。
  2. 编写消息处理脚本

    // 获取当前采样器实例 def sampler = sampler // 获取本次读取到的响应数据(一条SSE消息) def response = prev.getResponseDataAsString() // 简单的日志输出,可在jmeter.log中查看 log.info("收到SSE消息: " + response) // 将消息存储到JMeter变量中,供后续采样器使用(例如,存储到数组或列表) vars.putObject("lastSseMessage", response) // 或者,更常见的做法:将消息追加到一个文件或内存列表中 def messageList = vars.getObject("sseMessageList") if (messageList == null) { messageList = new java.util.ArrayList() vars.putObject("sseMessageList", messageList) } messageList.add(response) // 示例:对消息内容进行断言(检查是否包含特定关键字) if (!response.contains("expected_keyword")) { prev.setSuccessful(false) prev.setResponseMessage("SSE消息未包含预期关键字") } // 如果你想在收到特定消息后停止这个SSE连接,可以操作采样器 // if (response.contains("STOP")) { // sampler.setStopThread(true) // 谨慎使用,会停止当前线程 // // 或者调用采样器提供的断开方法(如果插件有提供API) // }
  3. 使用BeanShell PostProcessor或自定义Java代码

    • 原理类似,但JSR223(Groovy)是更现代、性能更好的选择。BeanShell在复杂逻辑和高并发下可能成为瓶颈。

避坑心得

  • 性能影响:在每条消息到达时都执行脚本(尤其是文件写入操作)会对测试性能产生较大影响。对于高性能压测,考虑将消息先存储在内存列表(如ArrayList)中,在测试结束后或定时批量写入文件。
  • 变量作用域vars.putObject存储的对象是线程独立的。如果你需要跨线程聚合所有消息,需要使用props(JMeter属性),但要注意线程安全。
  • 消息去重与格式:SSE消息可能包含idevent等字段。你的处理脚本需要能够解析data: actual message这种格式,并处理可能的重连机制(retry字段)。

5. 错误三:内存消耗暴涨(OOM)

这是最危险的一个错误,可能直接导致JMeter客户端崩溃。现象是JMeter进程的内存使用率(在监控工具或任务管理器中可见)持续快速上升,最终抛出java.lang.OutOfMemoryError: Java heap space错误。

5.1 错误现象与根因分析

  • 现象:JMeter运行一段时间后变卡,最终崩溃。日志中报堆内存溢出错误。
  • 根因
    1. 无限增长的响应缓冲区:如果错误地使用了普通HTTP采样器,并且没有设置超时或超时很长,JMeter会试图将整个永不结束的响应流读入内存,直到内存耗尽。
    2. 不当的消息累积:即使用了SSE采样器,但如果你在JSR223脚本中将所有消息无限制地添加到一个全局的ArrayListStringBuilder中,内存也会随着测试时间线性增长。
    3. JMeter自身报告收集:如果开启了“保存响应数据”的选项,JMeter会为每个采样器存储完整的响应数据。对于SSE长连接,这相当于在内存中保存一个不断增大的字符串,非常危险。

5.2 解决方案:精细化内存管理

实操步骤:

  1. 调整JMeter启动参数

    • 这是基础。编辑JMeter启动脚本(jmeter.batjmeter),找到HEAP设置。
    • 根据你的测试机器内存,适当增加堆大小,例如:set HEAP=-Xms2g -Xmx4g -XX:MaxMetaspaceSize=512m。但这不是根本解决办法,只是扩大了“水池”,如果漏水太快,还是会满。
  2. 禁用不必要的响应数据保存

    • HTTP RequestSSE Sampler的配置中,找到 “Save response as MD5 hash?” 或类似的选项。务必勾选这个。这样JMeter只保存一个固定长度的哈希值,而不是庞大的响应体。
    • 在“查看结果树”等监听器中,也避免配置保存完整的响应数据。
  3. 设计智能的消息处理逻辑

    • 抽样存储:不是每条消息都存。例如,每收到10条消息,才处理或存储1条。
    // 在JSR223 Listener中 def counter = vars.getObject("msgCounter") ?: 0 counter++ vars.putObject("msgCounter", counter) if (counter % 10 == 0) { // 只处理第10, 20, 30...条消息 def messageList = vars.getObject("sseMessageSampleList") ?: new java.util.ArrayList() messageList.add(prev.getResponseDataAsString()) vars.putObject("sseMessageSampleList", messageList) } // 其他消息直接忽略,或只做极轻量的检查(如计数)
    • 滑动窗口:只保留最近N条消息。
    def windowSize = 100 def messageWindow = vars.getObject("sseMessageWindow") ?: new java.util.LinkedList() messageWindow.add(prev.getResponseDataAsString()) if (messageWindow.size() > windowSize) { messageWindow.removeFirst() } vars.putObject("sseMessageWindow", messageWindow)
    • 及时清理:在收到特定结束事件或经过一定时间后,主动清理存储的消息集合。
    • 写入磁盘:对于需要完整日志的场景,尽快将消息写入文件,并清空内存中的列表。使用带缓冲的写入器,减少IO次数。
  4. 控制连接时长与并发数

    • 不要让单个SSE连接无限制地运行。在测试计划中,使用定时器(如Runtime Controller)来控制SSE采样器的执行时长。
    • 合理设置并发用户数(线程数)。每个SSE长连接都会占用服务器和客户端资源。过高的并发本身就是内存溢出的主要原因。

避坑心得

  • 监控是关键:运行测试时,打开JConsole或VisualVM连接到JMeter进程,实时观察堆内存使用情况和GC活动。看到内存曲线只升不降,就要警惕了。
  • 理解插件行为:不同的SSE插件实现不同,了解你用的插件是如何管理连接和缓冲区的。有些插件可能需要在采样器级别配置缓冲区大小。
  • 分离控制与压测机:对于大规模SSE压测,强烈建议使用分布式模式,让多个JMeter从机(Slave)分担连接压力,主机(Master)只负责收集聚合结果,这能有效分散单机内存压力。

6. 错误四:断言与后置处理器失效

在SSE流测试中,你可能会发现精心配置的“响应断言”根本不触发,或者“正则表达式提取器”什么也提取不到。这是因为这些元件的工作时机不对。

6.1 错误现象与根因分析

  • 现象:在SSE采样器下添加的响应断言,即使服务器返回了错误格式的消息,断言也显示通过。正则表达式提取器提取的变量为空。
  • 根因:JMeter的标准后置处理器(断言、提取器等)是在采样器执行完毕后才执行的。对于SSE采样器,其“执行完毕”可能是在Read Timeout超时之后。因此,这些处理器只能对超时前最后一条收到的消息(或者整个缓冲区的内容)进行操作,无法对流转过程中的每一条消息进行实时断言和提取。

6.2 解决方案:实时断言与提取

我们需要将断言和提取的逻辑,从采样器的“后置”阶段,移动到消息到达的“即时”阶段

实操步骤:

  1. 使用JSR223 Assertion进行实时断言

    • 不要用标准的“响应断言”。在SSE采样器下,添加一个 -> 断言 ->JSR223 Assertion
    • 它的执行时机可以配置(默认为“在采样器之后”),但关键是,我们可以把它和JSR223 Listener结合,或者在其脚本中访问当前响应。
    • 更常见的模式是:将断言逻辑直接写在JSR223 Listener脚本里(如前面示例中的if (!response.contains(...)))。这样,每条消息到达时都会进行校验。
  2. 在消息处理器中提取变量

    • 同样,放弃使用“正则表达式提取器”。在JSR223 Listener中,使用Groovy/Java的字符串处理功能(如split(),substring(), 或正则表达式)来解析消息,并将需要的值存入JMeter变量。
    def message = prev.getResponseDataAsString() // 假设消息格式为: event: price\ndata: {"symbol":"BTC","price":50000}\n\n def lines = message.split("\\n") for (line in lines) { if (line.startsWith("data: ")) { def jsonStr = line.substring(6) // 去掉"data: "前缀 // 简单提取,实际可用JsonSlurper解析 def matcher = jsonStr =~ /"price":(\d+)/ if (matcher.find()) { vars.put("latestPrice", matcher.group(1)) log.info("提取到价格: " + vars.get("latestPrice")) } } }
    • 这样提取出的变量(如${latestPrice})可以被同一个线程后续的采样器(比如一个下单接口)使用。
  3. 使用自定义采样器或插件的高级功能

    • 一些高级的SSE插件可能内置了按事件(Event)进行断言和字段提取的功能。仔细阅读插件的文档。

避坑心得

  • 断言粒度:对于SSE流,断言的目标通常是“流中是否出现了符合/不符合预期的消息”,而不是“整个响应的最终状态”。设计断言时要考虑这种持续性。
  • 变量生命周期:在JSR223 Listener中设置的变量,默认对该线程后续的采样器可见。确保你理解变量的作用域(vars是线程局部变量)。
  • 性能考量:对每条消息都进行复杂的正则表达式匹配或JSON解析,会增加CPU负担。确保你的测试机器能承受。

7. 错误五:测试结果分析与报告困难

即使你成功发起了SSE连接,收到了数据,控制了内存,也做了断言,最后却发现在“聚合报告”或“汇总报告”里,数据完全失真了。TPS(每秒事务数)高得离谱或者低得可怜,响应时间毫无意义。

7.1 错误现象与根因分析

  • 现象
    • TPS异常高:因为一个长连接可能收到了成百上千条消息,JMeter可能将每条消息都错误地记录为一个独立的“事务”。
    • 响应时间异常长:采样器的响应时间被记录为整个长连接的持续时间(从连接到超时或手动停止),可能长达几分钟甚至几小时,这完全扭曲了服务器处理单个请求的真实性能。
    • 成功率失真:连接建立成功就算成功,即使流中后续消息全是错误的。
  • 根因:JMeter的标准事务模型(一个采样器=一次请求=一个事务)与SSE的流式交互模型不匹配。我们需要重新定义什么是SSE测试中的“事务”和“响应时间”。

7.2 解决方案:重新定义事务与指标

我们需要跳出JMeter默认的度量体系,建立适合SSE的定制化指标收集和分析方法。

实操步骤:

  1. 禁用或忽略默认的采样器计时

    • 对于SSE连接采样器本身,在监听器(如聚合报告)中过滤掉它,或者接受它的响应时间就是连接时长这个事实,但在分析时明确知道这个数据不代表服务端处理性能。
    • 更关键的是,我们要自己定义和测量业务指标
  2. 测量“消息到达延迟”

    • 这是SSE测试的核心指标之一:从服务器发出消息,到客户端(JMeter)收到消息的时间差。
    • 实现方法:需要服务器端支持,在发送的每条消息里带一个高精度的时间戳(如data: {"ts": 1640995200000, "payload": "..."})。
    • 在JMeter的JSR223 Listener中,收到消息后,立即获取当前系统时间,与消息中的时间戳相减,得到延迟。
    import groovy.json.JsonSlurper def message = prev.getResponseDataAsString() def lines = message.split("\\n") for (line in lines) { if (line.startsWith("data: ")) { def jsonStr = line.substring(6) def json = new JsonSlurper().parseText(jsonStr) long serverTs = json.ts long clientTs = System.currentTimeMillis() long latency = clientTs - serverTs // 将延迟存储起来,供后续分析 def latencyList = vars.getObject("latencyList") ?: new java.util.ArrayList() latencyList.add(latency) vars.putObject("latencyList", latencyList) // 也可以直接使用SampleResult记录一个自定义的“虚拟事务” // 但这需要更复杂的操作,通常建议将latencyList在测试结束后输出到文件,用其他工具分析 } }
  3. 测量“连接建立成功率”与“消息丢失率”

    • 连接成功率:SSE采样器自身的成功/失败状态可以很好地反映这一点。
    • 消息丢失率:这需要业务逻辑配合。例如,服务器发送的消息带有连续递增的ID。JMeter端统计收到的ID,检查是否有间隔。
    // 假设消息格式为: data: {"id": 123, ...} // 在JSR223 Listener中 def currentId = json.id def lastId = vars.getObject("lastReceivedId") ?: 0 if (currentId != lastId + 1) { // 发现丢包!记录日志或增加计数器 def lossCounter = vars.getObject("lossCounter") ?: 0 lossCounter += (currentId - lastId - 1) vars.putObject("lossCounter", lossCounter) log.warn("消息丢失!期望ID: ${lastId + 1}, 实际收到ID: $currentId") } vars.putObject("lastReceivedId", currentId)
  4. 使用SampleResult记录自定义事务(高级):

    • 如果你想在JMeter的报告中生成更直观的图表(如消息延迟的分布),可以创建自定义的SampleResult对象。
    • 这通常在JSR223 PostProcessorBeanShell PostProcessor中完成,但需要小心处理线程同步和资源管理,复杂度较高。对于大多数场景,将原始数据(如每条消息的延迟)写入CSV或JSON文件,然后用专业的数据分析工具(如Grafana, Python Pandas)进行后期处理,是更简单可靠的选择。

避坑心得

  • 明确测试目标:在开始SSE压测前,就要想清楚你到底要衡量什么?是服务器维持大量长连接的能力?是消息推送的延迟?还是消息的吞吐量?不同的目标决定了不同的脚本设计和指标收集方式。
  • 结果可视化:不要只盯着JMeter的聚合报告。将自定义指标(延迟、丢包数)写入文件,用Excel、Grafana或自定义脚本生成图表,才能获得更深入的洞察。
  • 时钟同步:测量“消息到达延迟”要求JMeter测试机与服务器时钟高度同步,最好使用NTP服务。否则,测量值可能包含巨大的系统时间差,失去意义。

8. 实战配置与脚本框架示例

理论说再多,不如一个可运行的例子来得实在。下面我给出一个基于WebSocket/SSE Sampler插件(假设已安装)的完整测试计划框架,它涵盖了连接、接收、处理、断言和资源清理的基本流程。

测试计划结构:

测试计划 (Test Plan) ├─ 用户定义的变量 (User Defined Variables) │ ├─ SERVER_URL=http://localhost:8080/sse/stream │ ├─ READ_TIMEOUT=30000 │ └─ TEST_DURATION=60 (seconds) ├─ 线程组 (Thread Group) │ ├─ 线程数: 10 │ ├─ Ramp-up: 5 │ └─ 循环次数: 1 (由Runtime Controller控制时长) │ │ │ ├─ 运行时控制器 (Runtime Controller) -> 运行时: ${__P(TEST_DURATION,60)} │ │ │ │ │ ├─ SSE Sampler (名称: Connect to SSE Stream) │ │ │ ├─ Server URL: ${SERVER_URL} │ │ │ ├─ Read Timeout: ${READ_TIMEOUT} │ │ │ ├─ Implementation: HTTP │ │ │ └─ (其他参数默认) │ │ │ │ │ ├─ JSR223 Listener (名称: Process SSE Messages) [语言: Groovy] │ │ │ └─ 脚本内容: (见下方) │ │ │ │ │ └─ 响应断言 (JSR223 Assertion) [可选,或直接在Listener中断言] │ │ │ └─ 查看结果树 (View Results Tree) [仅调试用,正式压测建议禁用或使用简单数据写入器] │ └─ tearDown线程组 (tearDown Thread Group) [确保连接关闭] └─ SSE End Sampler (名称: Close SSE Connections) └─ (配置为关闭所有连接)

核心JSR223 Listener脚本示例:

这个脚本实现了消息计数、延迟计算(需服务器时间戳)、简单断言和抽样存储。

import groovy.json.JsonSlurper import java.util.concurrent.atomic.AtomicLong // 获取当前采样器结果 def sampleResult = prev // 获取当前响应数据(一条SSE消息) def rawMessage = sampleResult.getResponseDataAsString() if (rawMessage == null || rawMessage.trim().isEmpty()) { return // 忽略空消息 } // 1. 消息计数(线程安全,跨线程统计需用props) def threadCounter = vars.getObject("msgCount") ?: new AtomicLong(0) def currentCount = threadCounter.incrementAndGet() vars.putObject("msgCount", threadCounter) log.debug("线程 ${ctx.getThreadNum()} 收到第 ${currentCount} 条消息") // 2. 解析SSE消息格式 (简单处理,寻找data行) def lines = rawMessage.split("\\n") def eventName = "message" def dataContent = "" def messageId = null def retryTime = null for (line in lines) { if (line.startsWith("event:")) { eventName = line.substring(6).trim() } else if (line.startsWith("data:")) { dataContent = line.substring(5).trim() } else if (line.startsWith("id:")) { messageId = line.substring(3).trim() } else if (line.startsWith("retry:")) { retryTime = line.substring(6).trim() } // 忽略以冒号开头的行(注释)和空行 } if (dataContent.isEmpty()) { return // 没有data字段,可能是注释或心跳,跳过处理 } // 3. 业务逻辑处理 (示例:假设dataContent是JSON) try { def jsonSlurper = new JsonSlurper() def data = jsonSlurper.parseText(dataContent) // 3.1 计算延迟(假设服务器返回了timestamp字段,单位毫秒) if (data.timestamp) { long serverTime = data.timestamp as Long long clientTime = System.currentTimeMillis() long latency = clientTime - serverTime // 存储延迟样本(使用线程局部列表,避免竞争) def latencyList = vars.getObject("latencyList") ?: [] latencyList.add(latency) if (latencyList.size() > 1000) { latencyList.remove(0) } // 滑动窗口,防止内存溢出 vars.putObject("latencyList", latencyList) // 可以在这里设置阈值断言 if (latency > 1000) { // 延迟超过1秒视为警告 // sampleResult.setSuccessful(false) // 谨慎使用,这会影响整个采样器的成功状态 log.warn("高延迟消息: ID=${messageId}, latency=${latency}ms") } } // 3.2 检查消息内容断言 if (data.status && data.status == "error") { // 业务逻辑错误,可以记录失败计数 def errorCounter = vars.getObject("errorCount") ?: new AtomicLong(0) errorCounter.incrementAndGet() vars.putObject("errorCount", errorCounter) log.error("收到错误消息: " + dataContent) } // 3.3 抽样存储(每100条存1条到文件,减少IO压力) if (currentCount % 100 == 0) { def sampleList = vars.getObject("messageSamples") ?: [] sampleList.add([timestamp: System.currentTimeMillis(), id: messageId, data: data]) vars.putObject("messageSamples", sampleList) // 实际写入文件操作最好在测试结束后统一进行,或使用异步写入器 } // 3.4 提取变量供后续采样器使用(例如最新价格) if (data.price) { vars.put("latestPrice", data.price.toString()) } } catch (Exception e) { log.error("解析SSE消息JSON失败: " + rawMessage, e) // 可以考虑将解析失败视为测试失败 def parseFailCounter = vars.getObject("parseFailCount") ?: new AtomicLong(0) parseFailCounter.incrementAndGet() vars.putObject("parseFailCount", parseFailCounter) } // 4. 模拟业务操作(例如,每收到10条消息,发一个HTTP请求) if (currentCount % 10 == 0) { // 这里可以添加一个HTTP Request采样器,但更优雅的方式是用if控制器和模块控制器 // 简单示例:记录日志表示应该触发业务操作 log.info("已收到 ${currentCount} 条消息,触发模拟业务操作...") // vars.put("triggerOperation", "true") // 设置标志,由后续逻辑判断 }

后处理与结果收集:

在测试计划的最后,添加一个JSR223 PostProcessor(或使用BeanShell PostProcessor)在测试结束后执行,将各个线程收集的指标(如平均延迟、消息总数、错误数)汇总并输出到文件。

// 此脚本可在测试计划的“tearDown线程组”或最后一个线程组中添加一个仅一次执行的采样器后执行 import java.util.concurrent.atomic.AtomicLong def totalMessages = 0L def totalErrors = 0L def allLatencies = [] // 注意:这里简化了跨线程收集,实际高并发时props.getObject可能返回副本,需要更精细的同步。 // 对于生产压测,建议每个线程独立写文件,或使用JMeter的聚合报告配合自定义指标。 ctx.getThreadGroup().getThreads().each { thread -> def threadVars = thread.getVariables() def msgCountObj = threadVars.getObject("msgCount") if (msgCountObj instanceof AtomicLong) { totalMessages += msgCountObj.get() } def errorCountObj = threadVars.getObject("errorCount") if (errorCountObj instanceof AtomicLong) { totalErrors += errorCountObj.get() } def latencies = threadVars.getObject("latencyList") if (latencies instanceof List) { allLatencies.addAll(latencies) } } def avgLatency = allLatencies ? (allLatencies.sum() / allLatencies.size()) : 0 def maxLatency = allLatencies ? Collections.max(allLatencies) : 0 def minLatency = allLatencies ? Collections.min(allLatencies) : 0 def summary = """ SSE压力测试结果汇总: ==================== 总连接线程数: ${ctx.getThreadGroup().getNumThreads()} 总运行时间: ${props.get("TEST_DURATION")} 秒 总接收消息数: ${totalMessages} 总业务错误数: ${totalErrors} 消息错误率: ${totalMessages > 0 ? String.format("%.2f", (totalErrors / totalMessages * 100)) : 0}% 延迟统计 (ms): 平均: ${String.format("%.2f", avgLatency)} 最大: ${maxLatency} 最小: ${minLatency} 样本数: ${allLatencies.size()} """ log.info(summary) // 将结果写入文件 def resultFile = new File("sse_test_result_${System.currentTimeMillis()}.txt") resultFile.write(summary)

这个框架提供了一个坚实的起点。你需要根据自己服务器的SSE接口的具体格式(消息结构、事件类型)和你的测试目标(压测连接数、消息延迟、吞吐量)来调整消息解析逻辑、断言条件和指标收集方式。记住,测试SSE接口更像是在测试一个持续的服务,而不是离散的API调用,你的测试脚本也需要具备这种“持续性”和“状态性”的思维。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询