为什么有了 RocketMQ 事务消息,我们还要自研本地消息表方案?
前言
最近在 Code Review 一个项目时,发现团队自研了一套完整的事务消息框架 4 张数据库表、定时补偿任务、分布式锁、衰减重试机制…一应俱全。
但项目依赖里明明引入了rocketmq-spring-boot-starter:2.3.1,RocketMQ 原生支持事务消息,为什么不直接用?
答案很有意思:RocketMQ 只被用作简单的实时消息推送通道,事务消息功能完全没用。
这引发了一个值得深思的问题:RocketMQ 事务消息已经很成熟了,为什么很多团队仍然选择自研本地消息表方案?
一、先搞清楚:我们要解决的是什么问题?
在分布式系统中,经常遇到这样的场景:
用户支付成功后,需要修改订单状态(写 MySQL),同时发消息给积分系统加积分(发 MQ)。
核心问题:数据库操作和 MQ 投递是两个独立的操作,无法放在同一个原子事务中。
- 强一致性:需要 2PC(XA 协议)或 Raft,性能极差,不适合高并发场景
- 最终一致性:BASE 理论,允许中间状态不一致,但保证最终一致
RocketMQ 事务消息和本地消息表,本质上都是实现最终一致性的方案。
二、RocketMQ 事务消息的理想与现实
理想的实现方式
RocketMQ 事务消息的设计非常优雅:
半消息 → 执行本地事务 → Commit/Rollback → 回查兜底很多人会这样实现:
@RocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){// 执行本地事务orderService.updateOrderStatus((OrderParam)arg);returnRocketMQLocalTransactionState.COMMIT;}@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringorderId=msg.getHeaders().get("orderId").toString();// 直接查业务表Orderorder=orderMapper.selectById(orderId);returnorder!=null?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;}}看起来很完美,但生产环境会遇到三个关键问题。
问题一:回查时的竞态条件
场景复现:
- 发半消息:成功
- 执行本地事务:数据库死锁,事务卡了 5 秒还没提交
- MQ 回查:Broker 发现消息未确认,发起回查
- 误判:查订单表 → 查不到(事务还没 Commit)
- 结果:返回 ROLLBACK,MQ 撤销消息
- 悲剧:下一秒本地事务提交成功,订单入库了,但消息没了
核心问题:回查时无法区分"事务还在执行中"和"事务执行失败"。
问题二:业务表旧数据的干扰
更隐蔽的问题是:如果业务表里已经有旧数据(比如同一个订单 ID 之前被取消过),回查时会误判。
时间线: T1: 用户下单 ORDER_123,事务成功 T2: 用户取消订单,业务表状态改为"已取消" T3: 用户重新下单 ORDER_123(同一订单ID) T4: 本次事务失败(回滚) T5: MQ 回查 → 查业务表 → 查到 ORDER_123(旧数据)→ COMMIT(误判!)核心问题:业务表存储的是数据的最终状态,可能被多次事务修改,回查时无法判断"这是本次事务写入的,还是之前遗留的旧数据"。
问题三:消费端的幂等陷阱
消费端的幂等检查和业务执行必须在同一个事务里,否则会出现部分成功的情况:
@TransactionalpublicvoidonMessage(Stringmessage){OrderMsgmsg=JSON.parseObject(message,OrderMsg.class);// 错误:幂等检查和业务执行分离if(idempotentMapper.exists(msg.getOrderId())){return;// 已消费}// 执行业务inventoryService.addPoints(msg.getUserId(),msg.getPoints());// 如果这里失败,业务执行了,但幂等记录没插进去idempotentMapper.insert(msg.getOrderId());}三、重要澄清:不一定需要事务表
上面的分析可能会让人误解:“RocketMQ 事务消息必须要有事务表才能可靠工作。”
实际上不是这样。要区分三个层次:
方案1:RocketMQ 官方推荐事务消息(最常见)
很多项目根本不会单独建事务表。
@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringorderId=msg.getHeaders().get("orderId").toString();Orderorder=orderMapper.selectById(orderId);if(order!=null){returnCOMMIT;}returnROLLBACK;}直接查业务表。
例如:
订单创建成功 → 订单表有数据 → COMMIT 订单创建失败 → 订单表没数据 → ROLLBACK这种写法非常普遍。
为什么很多项目敢这么干?
因为业务主键通常不会复用。
例如:
订单ID: 1001 1002 1003 ...永远唯一。
不会出现文章举的:
ORDER_123 删除 重新创建 还是ORDER_123这种情况。
所以:
查订单表 = 查事务状态成立。
什么时候需要事务表?
当你发现:
业务表不能准确反映事务状态时。
例如:
场景1:业务记录可能被删除
创建订单 ↓ 消息未确认 ↓ 订单被删除 ↓ Broker回查 ↓ 查不到订单这时就有问题。
场景2:一个业务对象会被反复修改
例如:
用户积分用户表:
id=1 score=100你无法通过:
select*fromuserwhereid=1判断:
本次加10分事务到底成功没有此时必须有:
t_transaction_log场景3:审计要求极高
金融系统:
支付 退款 结算通常都会留:
transaction_log用于追踪。
大厂实际情况
一般分两类:
简单业务
订单 商品 用户注册直接查业务表。
不建事务表。
核心交易链路
支付 资金 库存扣减会建:
transaction_log或者
outbox_message表。
你可以这么理解
RocketMQ 事务消息最核心要求只有一个:
Broker回查时必须能知道本地事务最终状态至于状态存哪里:
可以是
业务表也可以是
事务表RocketMQ 根本不关心。
所以:
事务消息 ≠ 必须有事务表而是:
事务消息 = 必须有一个可靠的事务状态来源这个来源可以是业务表,也可以是事务日志表。
很多互联网业务(订单、注册、发帖)直接查业务表就够了;只有复杂交易系统才会额外维护事务表。
四、RocketMQ 事务消息的修复方案(复杂场景)
如果你遇到了上面提到的复杂场景(业务记录可删除、反复修改、审计要求高),就需要引入事务日志表和时间窗口判断。
修复后的回查逻辑
@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringtxId=msg.getHeaders().get("txId").toString();// 本次事务唯一ID// 1. 查事务日志表(用事务ID,不是业务ID)intcount=logMapper.countByTxId(txId);if(count>0){// 这条日志是本次事务写入的,确认本次事务成功returnRocketMQLocalTransactionState.COMMIT;}// 2. 查不到时,需要时间窗口判断// 可能是事务还在执行中(卡住了),不能直接 RollbacklongbornTime=Long.parseLong(msg.getProperty("BORN_TIMESTAMP"));if(System.currentTimeMillis()-bornTime<5*60*1000){// 返回 UNKNOWN,告诉 MQ:我不确定,你过一会再来问returnRocketMQLocalTransactionState.UNKNOWN;}// 超过时间窗口,才判定为真正失败returnRocketMQLocalTransactionState.ROLLBACK;}事务日志表的设计
CREATETABLEt_transaction_log(tx_idVARCHAR(64)PRIMARYKEY,-- 本次事务唯一IDbiz_idVARCHAR(64),-- 业务ID(订单号等)statusVARCHAR(16),-- SUCCESS / FAILcreated_atTIMESTAMP,INDEXidx_biz_id(biz_id));关键点:
tx_id是本次事务的唯一标识,每次事务都不同biz_id是业务标识,可能重复(同一订单多次操作)- 回查时用
tx_id查询,确保查到的是"本次事务"的记录
为什么事务日志表能解决问题?
| 查询方式 | 查询条件 | 查到什么 | 代表什么 |
|---|---|---|---|
| 查业务表 | biz_id(业务ID) | 业务数据的最终状态 | “业务表里有数据”,但可能是旧数据 |
| 查事务日志表 | tx_id(事务ID) | 本次事务的执行记录 | “本次事务成功写入日志”,确定性 |
本质区别:
- 业务表:存储业务数据的最终状态,可能被多次事务修改
- 事务日志表:存储每次事务的执行记录,每次事务一条,互不干扰
重要补充:事务日志表并不能解决所有问题
关键点:不管是查事务日志表还是查业务表,"查不到"时都面临同样的困境。
| 场景 | 查业务表 | 查事务日志表 |
|---|---|---|
| 本次事务成功 | 查到 → COMMIT | 查到 → COMMIT |
| 本次事务失败,但业务表有旧数据 | 查到 → COMMIT(误判!) | 没查到 → 需时间窗口 |
| 本次事务还在执行中(卡住) | 没查到 → 需时间窗口 | 没查到 → 需时间窗口 |
| 本次事务失败(回滚) | 没查到 → 需时间窗口 | 没查到 → 需时间窗口 |
事务日志表解决的问题:
- 消除"查到时的误判"——避免业务表旧数据的干扰
- 确保"查到 = 本次事务成功"成为确定性判断
事务日志表无法解决的问题:
- “查不到时怎么判断”——仍然需要时间窗口机制
结论:
- 事务日志表:解决"查到时可能误判"的问题(应对重复下单用同一ID等情况)
- 时间窗口判断:解决"查不到时的歧义"问题(区分执行中 vs 失败)
两者配合才能彻底解决竞态条件问题。单靠事务日志表或单靠时间窗口都不够。
关键结论:修复后的复杂度并不比本地消息表低
看到上面的方案,你会发现一个关键问题:
为了解决 RocketMQ 事务消息的竞态条件问题,需要引入:
- 事务日志表:记录每次事务的执行状态
- 时间窗口判断逻辑:区分"执行中"和"失败"
- 回查代码:查表 + 时间窗口判断
- Service 层改造:业务数据和事务日志在同一事务中写入
这个复杂度……并不比本地消息表方案低。
| 方案 | 需要的组件 | 复杂度 |
|---|---|---|
| RocketMQ 事务消息(裸奔版) | 回查代码(查业务表) | 低(但不可靠) |
| RocketMQ 事务消息(修复版) | 事务日志表 + 时间窗口判断 + 回查逻辑 + Service改造 | 中 |
| 本地消息表 | 消息表 + 定时任务 + 幂等控制 | 高 |
本质上,修复版的 RocketMQ 事务消息和本地消息表方案复杂度相当。
两者都需要:
- 额外的数据库表(事务日志表 / 消息表)
- 和业务数据在同一事务中写入
- 额外的机制确保最终投递(回查 / 定时任务轮询)
唯一的区别:
- RocketMQ:实时投递(事务提交后立即 Commit 消息)
- 本地消息表:轮询投递(定时任务扫描后投递)
所以,如果 RocketMQ 事务消息需要这么多额外代码才能可靠工作,那选择本地消息表也未尝不可——至少它天生就包含了这些机制。
修复后的完整代码
@Component@RocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{@AutowiredprivateOrderServiceorderService;@AutowiredprivateTransactionLogMapperlogMapper;/** * 执行本地事务 * 核心原则:业务数据和事务日志,必须在同一个 @Transactional 下提交 */@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{// 开启本地事务(包含:写业务表 + 写日志表)orderService.createOrderWithLog((OrderParam)arg);returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 回查接口 * 解决竞态条件和旧数据干扰问题 */@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringtxId=msg.getHeaders().get("txId").toString();// 1. 查事务日志表intcount=logMapper.countByTxId(txId);if(count>0){returnRocketMQLocalTransactionState.COMMIT;}// 2. 时间窗口判断longbornTime=Long.parseLong(msg.getProperty("BORN_TIMESTAMP"));if(System.currentTimeMillis()-bornTime<5*60*1000){returnRocketMQLocalTransactionState.UNKNOWN;}returnRocketMQLocalTransactionState.ROLLBACK;}}// Service 层@TransactionalpublicvoidcreateOrderWithLog(OrderParamparam){StringtxId=param.getTxId();// 写业务表orderMapper.insert(order);// 写事务日志表(同一事务)logMapper.insert(newTransactionLog(txId:txId,bizId:order.getId(),status:"SUCCESS"));// 事务提交:要么两个都成功,要么两个都失败}五、看到这里你会发现一个问题
为了解决 RocketMQ 事务消息的竞态问题,我们引入了:
- 事务日志表:记录每次事务的执行状态
- 时间窗口判断:区分"执行中"和"失败"
这个复杂度…并不比本地消息表低。
六、本地消息表:经典但可靠的方案
核心原理
本地消息表的思路更直接:把"发消息"这个动作本身变成数据库操作,和业务数据放在同一个事务中。
@Transactionalpublicvoidregister(Useruser){// 1. 写入用户表(业务操作)userMapper.insert(user);// 2. 写入本地消息表(和业务数据在同一事务中)msgMapper.insert(newMsg(exchange:"user.welcome",routingKey:"user."+user.getId(),body:JSON.toJSONString(user),status:"INIT"));// 事务提交后,两个操作都已持久化}然后有一个后台任务轮询:
@Scheduled(fixedDelay=10000)// 每 10 秒扫描一次publicvoidretryFailedMessages(){List<Msg>msgs=msgMapper.selectByStatus("INIT");for(Msgmsg:msgs){try{mqProducer.send(msg.getExchange(),msg.getRoutingKey(),msg.getBody());msg.setStatus("SUCCESS");}catch(Exceptione){msg.setStatus("FAIL");msg.setFailCount(msg.getFailCount()+1);}msgMapper.update(msg);}}为什么能解决问题?
| 场景 | 结果 | 一致性 |
|---|---|---|
| 步骤 1 成功,步骤 2 失败 | 整个事务回滚,用户和消息都没写入 | 一致 |
| 步骤 1、2 都成功,但事务提交前宕机 | 事务回滚,数据都没写入 | 一致 |
| 事务成功,MQ 投递失败 | 后台任务不断重试,直到成功 | 最终一致 |
项目中的实际实现
我分析的项目中,自研框架的核心实现:
// DefaultMsgSender.javaif(hasTransaction){// 1. 消息先写入 t_msg 表(与业务数据在同一事务中)msgService.batchInsert(...);// 2. 注册事务同步回调 → 事务提交后才投递TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){@OverridepublicvoidafterCompletion(intstatus){if(status==STATUS_COMMITTED){// 事务提交后异步投递消息mqExecutor.execute(()->deliverMsg(msgPOList));}}});}关键设计:
- 消息和业务数据在同一事务中 → 原子性保证
- Spring
TransactionSynchronization→ 事务提交后才投递 - 补偿 Job 扫表重试 → 兜底机制
七、两种方案的对比
| 维度 | RocketMQ 事务消息(裸奔版) | RocketMQ 事务消息(修复版) | 本地消息表 |
|---|---|---|---|
| 实现原理 | 半消息 → 本地事务 → Commit/Rollback → 回查 | 半消息 + 事务日志表 + 时间窗口回查 | 业务数据 + 消息记录同事务 → 定时任务轮询投递 |
| 竞态条件 | 无法处理 | 通过 UNKNOWN 状态解决 | 不存在(消息先入库) |
| 实时性 | 高 | 高 | 低(取决于轮询频率) |
| 通用性 | 仅 RocketMQ | 仅 RocketMQ | 任何 MQ(RabbitMQ、Kafka 等) |
| 复杂度 | 低 | 中(需要事务日志表) | 高(需要定时任务、幂等控制) |
| 性能开销 | 低 | 中(多写一条日志) | 高(定时轮询数据库) |
| 可控性 | 低 | 低 | 高(自定义重试策略) |
| 可观测性 | 中 | 中 | 高(状态都在数据库) |
八、本质分析
看到对比表,你会发现一个有趣的结论:
RocketMQ 事务消息的"修复版",本质上就是把事务日志表换成了本地消息表。
两者都是:
- 在本地事务中写入一条记录(业务数据 + 事务日志/消息记录)
- 通过数据库事务保证原子性
- 通过额外的机制确保最终投递(回查 vs 轮询)
唯一的区别:
- RocketMQ:实时投递(事务提交后立即 Commit 消息)
- 本地消息表:轮询投递(定时任务扫描后投递)
九、什么时候选择本地消息表?
1. 已有其他 MQ(RabbitMQ、Kafka 等)
RabbitMQ原生不支持事务消息,只能通过本地消息表实现等价功能。
RocketMQ 是后期引入的?此时自研框架已经稳定运行,没必要迁移。
2. 需要精细控制重试策略
项目需要处理多个业务模块的异步事件:点赞、收藏、签到、红包、文章审核…每个场景对重试策略的要求不同。
本地消息表支持:
- 衰减重试:1s、5s、30s、1min、5min、10min…
- 自定义幂等键:每个消费者可以自定义
- 手动补偿:直接操作数据库,灵活干预
3. 需要高度可观测性
所有消息状态都在数据库里:
- 运维人员可以直接查询消息投递状态
- 监控系统可以基于数据库表做告警
- 出问题时可以手动修复或重试
十、消费端的原子幂等(通用)
无论用哪种方案,消费端都必须实现原子幂等。
错误做法
@TransactionalpublicvoidonMessage(Stringmessage){OrderMsgmsg=JSON.parseObject(message,OrderMsg.class);// 幂等检查和业务执行分离if(idempotentMapper.exists(msg.getOrderId())){return;}inventoryService.addPoints(msg.getUserId(),msg.getPoints());// 失败时,业务执行了,但幂等记录没插进去idempotentMapper.insert(msg.getOrderId());}正确做法
@Transactional(rollbackFor=Exception.class)publicvoidonMessage(Stringmessage){OrderMsgmsg=JSON.parseObject(message,OrderMsg.class);// 利用数据库唯一索引做原子幂等挡板try{idempotentMapper.insertBarrier(msg.getOrderId());}catch(DuplicateKeyExceptione){// 插入报错,说明消费过了,直接返回return;}// 执行业务// 如果报错,事务会回滚,幂等记录也撤销,下次重试能进来inventoryService.addPoints(msg.getUserId(),msg.getPoints());}十一、生产级兜底方案
无论选择哪种方案,都必须承认系统的不确定性,建立兜底机制。
1. 日志表/消息表爆炸了怎么办?
每天几百万条记录,一个月表就废了。
- 错误:
DELETE FROM log WHERE time < ...(间隙锁阻塞写入) - 正确:MySQL 分区表,按天分区,清理时直接
DROP PARTITION
2. 死信队列怎么处理?
- 告警:DLQ 进消息,立刻推送到开发群
- 重投平台:一键"原样重投"或"修改参数后重投"
- 最终底线:人工数据库修复
3. 消息积压怎么办?
- 监控告警:设置积压阈值
- 临时扩容:增加消费者实例
- 降级策略:非核心业务暂停投递
十二、总结
RocketMQ 事务消息 vs 本地消息表
| 选择 RocketMQ 事务消息 | 选择本地消息表 |
|---|---|
| 项目从零开始,没有历史包袱 | 已有 RabbitMQ 等其他 MQ |
| 团队熟悉 RocketMQ 生态 | 需要精细控制重试策略 |
| 追求实时性 | 需要高度可观测性 |
| 希望减少代码量 | 需要跨 MQ 通用方案 |
技术决策的关键点
- 定性:明确是最终一致性,不是强一致性
- 严谨:RocketMQ 需要事务日志表 + 时间窗口;本地消息表本身已包含这些机制
- 落地:消费端实现原子幂等
- 兜底:建立死信告警 + 批量重投机制
结论
没有最好的架构,只有最适合场景的方案。
RocketMQ 事务消息和本地消息表,本质上是两种不同的实现路径,最终效果等价–都是确保"业务操作和消息投递要么都成功,要么都不发生"。
如果 RocketMQ 事务消息需要引入事务日志表才能可靠工作,那复杂度已经和本地消息表相当了。这时候,选择的关键在于:
- 你的项目用什么 MQ?
- 你需要什么样的可控性和可观测性?
- 你的团队有什么技术积累?
选择适合自己场景的方案,比追求"最新最酷"的技术更重要。