【Redis从入门到精通】第06篇:Redis管道(Pipeline)与批量操作——一次网络往返顶十次
2026/5/30 10:02:49 网站建设 项目流程

上一篇【第05篇】Redis客户端操作指南——Jedis/Lettuce/Spring Data Redis对比
下一篇【第07篇】Redis命令速查手册——工作中最常用的80条命令


摘要

redis以单线程处理命令著称,但真正的性能瓶颈往往不在redis服务端,而在客户端与服务端之间的网络往返时延(rtt)。当你需要执行1000条命令时,如果逐条发送,就要付出1000次网络往返的代价。本文深入剖析redis pipeline技术的核心原理,通过ascii时序图直观展示rtt概念与pipeline优化效果,对比pipeline与multi/exec事务的本质区别,并提供jedis和lettuce的实战代码示例。同时探讨mget/mset等原生批量命令与pipeline的选择策略,帮你真正理解"一次网络往返顶十次"的含义。


一、rtt:看不见的性能杀手

1.1 什么是rtt?

rtt(round trip time,往返时延)是指数据包从客户端发出到收到服务端响应之间的总耗时。在redis的使用场景中,rtt = 客户端发送命令的时间 + 服务端处理命令的时间 + 服务端返回结果的时间。

用一张ascii时序图来直观感受一下:

逐条命令模式(without pipeline): 客户端 redis服务端 │ │ │──── set key1 value1 ────────────────>│ │ │ (处理) │<──────── ok ─────────────────────────│ │ │ │──── set key2 value2 ────────────────>│ │ │ (处理) │<──────── ok ─────────────────────────│ │ │ │──── set key3 value3 ────────────────>│ │ │ (处理) │<──────── ok ─────────────────────────│ │ │ │ ... 998次往返 ... │ │ │ │──── set key1000 value1000 ──────────>│ │ │ (处理) │<──────── ok ─────────────────────────│ │ │ 总耗时 ≈ 1000 × rtt + 1000 × 处理时间

从这个图可以清楚看到:时间主要消耗在客户端和服务端之间的网络往返上,而不是redis处理命令本身。redis处理一条set命令只需要微秒级别,而一次网络往返通常是毫秒级别的。假设rtt是1ms,光网络等待就要花掉1秒钟,而redis实际处理1000条set可能只需几十毫秒。

1.2 rtt的实际测量

我们可以用redis-cli--latency选项来测量本机到redis的延迟:

# 测量redis延迟redis-cli--latency-h127.0.0.1-p6379# 输出示例:# min: 0, max: 1, avg: 0.18 (826 samples)

本地回环的延迟通常在0.1-0.3ms之间。但如果是跨机房甚至是跨地域的部署呢?延迟可能会飙升到10ms甚至50ms以上。假设rtt为10ms,执行1000条命令的网络开销就是10秒!这显然不能接受。

1.3 直观性能对比

模拟一下不带pipeline的逐条操作:

# 逐条写入10000个key(极慢)timeforiin$(seq110000);doredis-clisettest:$i$i>/dev/nulldone# 手动中断前测试结果(1000条):# real 0m3.847s# 10000条完整测试估算需要 38秒+

用pipeline执行同样操作:

# 使用pipeline批量写入time(foriin$(seq110000);doecho"set test:$i$i"done|redis-cli--pipe)# 结果:# all data transferred. waiting for the last reply...# last reply received from server.# real 0m0.123s

⚠️ 注意:redis-cli --pipe使用的是redis的resp协议批量发送模式,不是真正的pipeline api,但效果类似。生产环境请使用客户端sdk的pipeline api。

性能差距一目了然:同样的操作,逐条执行需要38+秒,pipeline方式只需要0.123秒,性能提升了300倍以上


二、pipeline工作原理详解

2.1 pipeline的执行流程

pipeline的核心思想很简单:把多条命令打包在一个tcp报文中一次发送,redis也把结果打包在一个报文中一次性返回

pipeline模式: 客户端 redis服务端 │ │ │──── cmd1 ─┐ │ │──── cmd2 ─┤ │ │──── cmd3 ─┼── 批量发送 ──────────────>│ │ ... │ (一次tcp报文) │ │──── cmdn ─┘ │ │ │ (依次处理cmd1,cmd2,...,cmdn) │ │ │<─── result1 ─┐ │ │<─── result2 ─┤ │ │<─── result3 ─┼── 批量返回 ───────────│ │ ... │ (一次tcp报文) │ │<─── resultn ─┘ │ │ │ 总耗时 ≈ 1 × rtt + n × 处理时间

关键点:

  • 客户端不会等待单条命令的返回结果,而是把n条命令全部发送完毕后,再一起接收所有结果
  • 服务端仍然逐条处理命令(单线程模型没变),只是把处理结果集中在一起返回
  • 网络往返次数从n次降为1次

2.2 pipeline的底层机制

pipeline利用的是tcp的全双工通信能力。具体来说:

  1. 写缓冲区(write buffer):客户端将多条命令写入tcp发送缓冲区,但不下发flush指令
  2. 批量发送:当缓冲区满或客户端主动调用flush()时,将缓冲区数据一次性发出
  3. 读缓冲区(read buffer):服务端返回的结果先存在客户端的tcp接收缓冲区中
  4. 批量读取:客户端从读缓冲区中按顺序解析n条命令的响应

从redis协议层面来看,pipeline发送的数据格式如下:

*3\r\n$3\r\nset\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n *3\r\n$3\r\nset\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n *3\r\n$3\r\nset\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n

redis同样返回:

+ok\r\n +ok\r\n +ok\r\n

2.3 pipeline的使用限制

pipeline虽然强大,但有几个硬伤:

限制项说明
命令间不能有依赖因为客户端在发送全部命令之前不会读取任何响应,所以后一条命令不能依赖前一条命令的结果
内存压力客户端和服务端都需要缓存全部命令和响应,如果pipeline中包含海量命令,可能导致内存骤增
错误处理复杂某条命令失败不会影响其他命令执行,但客户端需要自己遍历所有返回结果来检查错误
原子性不保证pipeline中的命令之间可能穿插其他客户端的命令,不是原子操作
不支持事务语义不能像multi/exec那样提供"全部执行或不执行"的保证

⚠️ 注意:pipeline中的命令虽然有先后顺序(服务端按接收顺序处理),但无法保证命令之间不被其他客户端插入。比如你pipeline了set key1 v1和get key1,理论上读到的是v1,但redis作为单线程模型确实保证了命令顺序处理,只是pipeline命令之间可能被插入——这里我做个精确表述:redis单线程保证了pipeline内部的命令会连续执行,不会插入其他客户端的命令。pipeline不保证的是跨客户端的原子性,即你无法保证"我pipeline了一组命令后,key的状态等于这组命令执行完的状态"——其他客户端的命令可能在这组命令之前或之后执行。


三、java客户端pipeline实战

3.1 jedis pipeline示例

importredis.clients.jedis.jedis;importredis.clients.jedis.pipeline;importredis.clients.jedis.response;publicclassjedispipelinedemo{publicstaticvoidmain(string[]args){jedis jedis=newjedis("127.0.0.1",6379);// 方法一:普通pipeline(不返回结果,只执行)pipeline pipeline=jedis.pipelined();for(inti=0;i<10000;i++){pipeline.set("user:"+i,"value"+i);}pipeline.sync();// 一次性发送并等待全部完成system.out.println("写入10000条数据完成");// 方法二:需要返回结果的pipelinepipeline pipeline2=jedis.pipelined();response<string>resp1=pipeline2.get("user:1");response<string>resp2=pipeline2.get("user:9999");pipeline2.sync();system.out.println("user:1 = "+resp1.get());system.out.println("user:9999 = "+resp2.get());jedis.close();}}

jedis pipeline关键点

  • jedis.pipelined()返回pipeline对象,后续所有通过pipeline对象调用的命令都会被缓存
  • pipeline.sync()一次性发送所有缓存命令并同步等待结果
  • 如果需要返回值,response<t>必须在sync()之后才能调用.get()

3.2 lettuce pipeline示例

importio.lettuce.core.redisclient;importio.lettuce.core.redisfuture;importio.lettuce.core.api.statefulredisconnection;importio.lettuce.core.api.async.redisasynccommands;importjava.util.arraylist;importjava.util.list;publicclasslettucepipelinedemo{publicstaticvoidmain(string[]args){redisclient client=redisclient.create("redis://127.0.0.1:6379");statefulredisconnection<string,string>connection=client.connect();// lettuce使用异步命令 + flushcommands实现pipelineredisasynccommands<string,string>async=connection.async();// 设置autoflushcommands为false,命令会被缓存async.setautoflushcommands(false);list<redisfuture<?>>futures=newarraylist<>();for(inti=0;i<10000;i++){redisfuture<string>future=async.set("user:"+i,"value"+i);futures.add(future);}// 手动flush一次性发送所有命令async.flushcommands();// 等待所有结果返回lettucefutures.awaitall(30,timeunit.seconds,futures.toarray(newredisfuture[0]));system.out.println("写入10000条数据完成");// 恢复自动flushasync.setautoflushcommands(true);connection.close();client.shutdown();}}

lettuce pipeline关键点

  • lettuce基于netty,天然支持异步,pipeline通过setautoflushcommands(false)实现
  • 需要手动调用flushcommands()来发送缓冲区中的命令
  • 完成后记得恢复setautoflushcommands(true)

3.3 spring data redis的pipeline

如果你用spring boot,可以通过redistemplate执行pipeline:

@autowiredprivatestringredistemplate redistemplate;publicvoidpipelineexample(){list<object>results=redistemplate.executepipelined(newrediscallback<object>(){@overridepublicobjectdoinredis(redisconnection connection)throwsdataaccessexception{for(inti=0;i<10000;i++){connection.set(("user:"+i).getbytes(),("value"+i).getbytes());}returnnull;// pipeline模式下返回值无意义}});system.out.println("pipeline执行完毕,共"+results.size()+"条结果");}

四、mget/mset等原生批量命令 vs pipeline

redis本身提供了一些原生批量操作的命令,它们和pipeline是什么关系?怎么选?

4.1 原生批量命令

命令功能使用场景
mget一次获取多个string类型的key的值批量读取多个已知key
mset一次设置多个key-value对批量写入多个key
del key1 key2 …一次删除多个key批量删除
hmget一次获取一个hash的多个field值批量读取hash字段
hmset一次设置一个hash的多个field值批量写入hash字段
sunion/sdiff多个集合的交集/差集集合运算

4.2 原生批量命令 vs pipeline 对比

原生批量命令 (mget/mset) pipeline (管道) ────────────────────── ────────────── 执行流程: 客户端 ─── mget key1 key2... ──> redis 客户端 ─── cmd1..cmdn ──> redis <─────── [v1, v2, ...] ──── <──── [r1, ..., rn] ── 网络往返: 1次 rtt 1次 rtt 原子性: 某些命令有原子性保证 无原子性保证 (mget是原子操作) (只是一组命令批量发送) 灵活性: 只能做特定操作 可以混合任意命令 (只能读多个string key) 服务端实现: 服务端有专门优化 服务端仍是逐条处理 (减少内部开销) 错误处理: 要么全成功要么全失败 各命令独立,需要单独检查 选择建议: ✅ 纯string批量读/写用mget/mset ✅ 需要混合多种命令时用pipeline ✅ 需要原子性时用原生命令 ✅ 需要跨数据结构操作时用pipeline ❌ 不能混合set和incr ❌ 不需要原子性保证的场景

4.3 选择策略总结

场景推荐方案原因
批量写入100个string keymset更简洁,服务端有优化
批量读取100个string keymget一条命令搞定
先set再incr再getpipeline混合不同命令
对多个hash做不同操作pipeline混合hset/hget等命令
需要原子性(全部成功或全部失败)原生命令或事务pipeline不保证原子性
万级以上大量命令pipeline + 分批避免内存压力过大

⚠️ 注意:千万不要把原生批量命令简单理解成"加了原子性的pipeline"。它们是两种不同的机制。mget/mset是在服务端一条命令内完成,pipeline是在传输层批量打包。当你需要往一个set里加元素的同时又往一个hash里设字段,mget/mset也帮不了你,这时候必须用pipeline。


五、pipeline vs multi/exec 事务

这是面试和实际工作中非常容易混淆的两个概念。先看一张对比表:

对比维度pipelinemulti/exec事务
本质网络优化手段,减少rtt命令执行机制,保证原子性
原子性不保证。命令间可能被其他客户端插入保证。事务内命令要么全部执行要么全部不执行
错误处理某条命令失败不影响其他命令继续执行exec前有命令语法错误则全部放弃;exec后某条命令执行失败则其他命令仍然执行
是否阻塞不阻塞其他客户端事务期间不阻塞,exec执行时才短暂阻塞
命令依赖命令间不能有依赖(要依赖前一条的结果则必须先sync)事务内命令也按序执行,但也不能依赖前一条的结果(watch可以解决部分问题)
执行时机sync/flush时立即发送并执行客户端缓存命令,exec时批量执行
使用场景批量插入数据、批量读取、大批量无依赖操作银行转账、库存扣减、需要原子性保证的多步操作
性能极高,主要是因为减少了网络rtt较高,主要是保证原子性
乐观锁支持不支持支持(watch命令配合)

5.1 执行流程对比(ascii)

pipeline 执行流程: ┌──────┐ ┌────────┐ ┌──────────────┐ │ app │ │ client │ │ redis server │ └──┬───┘ └───┬────┘ └──────┬───────┘ │ 调用pipeline│ │ │────────────>│ │ │ │ 缓存cmd1,cmd2 │ │ │──────┐ │ │ │ │ │ │ sync() │<────┘ │ │────────────>│ 批量发送 ───>│ │ │ (一条tcp包) │ │ │ │ 依次执行cmd1,cmd2 │ │ <─── 批量返回 │ │ │ result1,res2 │ │ <──返回结果 │ │ multi/exec 事务流程: ┌──────┐ ┌────────┐ ┌──────────────┐ │ app │ │ client │ │ redis server │ └──┬───┘ └───┬────┘ └──────┬───────┘ │ multi() │ │ │────────────>│ multi ───────>│ 进入事务状态 │ │<── +ok ────────│ │ 命令入队 │ │ │────────────>│ cmd1 ────────>│ queued (未执行) │ │<── queued ─────│ │────────────>│ cmd2 ────────>│ queued (未执行) │ │<── queued ─────│ │ exec() │ │ │────────────>│ exec ────────>│ 原子执行cmd1,cmd2 │ │<── [ok, ok] ───│

关键区别一目了然:pipeline中的命令在服务端即时执行,事务中的命令在exec之前都是排队不执行


六、pipeline最佳实践与性能调优

6.1 分批发送策略

pipeline不是万能的。如果一次性发送100万条命令:

┌─────────────────────────────────────────────────┐ │ 内存爆炸警告 │ ├─────────────────────────────────────────────────┤ │ 客户端: 100万条命令 × 平均50字节 = 50mb │ │ 服务端: 100万条响应 × 平均10字节 = 10mb │ │ 关键: 发送/接收期间网络可能超时 │ │ 建议: 每批控制在10000条以内 │ └─────────────────────────────────────────────────┘
// 分批pipeline示例publicvoidbatchpipeline(jedis jedis,list<string>keys,list<string>values){intbatchsize=5000;// 每批5000条inttotal=keys.size();for(inti=0;i<total;i+=batchsize){intend=math.min(i+batchsize,total);pipeline pipeline=jedis.pipelined();for(intj=i;j<end;j++){pipeline.set(keys.get(j),values.get(j));}pipeline.sync();system.out.printf("进度: %d/%d (%.1f%%)\n",end,total,end*100.0/total);}}

6.2 pipeline + 多线程

如果数据量特别大,可以结合多线程进一步提升吞吐:

// 多线程 + pipeline 极致性能方案executorservice executor=executors.newfixedthreadpool(4);inttotal=1000000;intperthread=total/4;for(intt=0;t<4;t++){finalintthreadid=t;finalintstart=t*perthread;finalintend=(t==3)?total:start+perthread;executor.submit(()->{try(jedis jedis=pool.getresource()){pipeline pipeline=jedis.pipelined();for(inti=start;i<end;i++){pipeline.set("key:"+i,"value"+i);}pipeline.sync();system.out.println("线程"+threadid+"完成");}});}executor.shutdown();executor.awaittermination(5,timeunit.minutes);

⚠️ 注意:多线程+pipeline虽然快,但也要小心。连接池大小要跟上,最好用jedispool管理连接。另外注意redis是单线程的,多客户端发送的命令最终还是在服务端排队处理,所以瓶颈最终还是redis的cpu。

6.3 实操命令汇总

# 1. 测量redis延迟redis-cli--latency-h127.0.0.1-p6379# 2. 使用redis-cli的pipe模式批量写入(foriin$(seq110000);doecho"set pipe:$ivalue$i";done)|redis-cli--pipe# 3. 使用redis-cli的pipe模式批量读取(foriin$(seq110000);doecho"get pipe:$i";done)|redis-cli--pipe>output.txt# 4. 测试pipeline吞吐量redis-benchmark-tset,get-p16-n100000# 5. 查看redis处理了多少命令redis-cli info stats|greptotal_commands_processed# 6. 使用mget批量读取(原生批量命令)redis-cli mget user:1 user:2 user:3 user:4 user:5

七、踩坑记录

坑1:pipeline中混用有依赖的命令

// 错误示范pipeline pipeline=jedis.pipelined();pipeline.incr("counter");// counter变成多少?不知道pipeline.set("score",counter值);// 无法获取counter的新值!pipeline.sync();

正确做法:有依赖的命令拆成两次pipeline:

response<long>counter1=pipeline1.incr("counter");pipeline1.sync();longnewval=counter1.get();pipeline pipeline2=jedis.pipelined();pipeline2.set("score",string.valueof(newval));pipeline2.sync();

坑2:忘记sync()导致命令丢失

pipeline pipeline=jedis.pipelined();pipeline.set("key","value");// 忘了调sync()!命令永远不会发送到redis!// 然后过很久才发现key根本没有被设置

坑3:lettuce忘记恢复autoflushcommands

async.setautoflushcommands(false);// ... 各种操作...async.flushcommands();// 忘记恢复 autoflushcommands!// 后续所有操作都不会自动发送!

⚠️ 注意:setautoflushcommands(false)后一定要用try-finally保证恢复:

async.setautoflushcommands(false);try{// pipeline操作async.flushcommands();}finally{async.setautoflushcommands(true);// 必须恢复!}

总结

pipeline是redis性能优化的标配工具,核心就一句话:把n次网络往返变成1次。但它不是银弹——不保证原子性、命令间不能有依赖、内存压力需要控制。掌握pipeline的关键是区分它与multi/exec事务、原生批量命令的适用场景:

  • pipeline:批量操作、减少rtt、不关心原子性
  • multi/exec:需要原子性保证的业务场景
  • mget/mset:纯string批量读写、简洁高效有服务端优化

三者配合使用,才能把redis的性能发挥到极致。


上一篇【第05篇】Redis客户端操作指南——Jedis/Lettuce/Spring Data Redis对比
下一篇【第07篇】Redis命令速查手册——工作中最常用的80条命令


需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询