消息不丢失需生产者、Broker、消费者三端协同保障:开启confirm机制并配异步监听、队列与消息均持久化、消费者手动ACK,金融级系统还需本地消息表+定时对账。

消息不丢失不是靠单个配置或一次确认就能解决的,而是生产者、Broker、消费者三端协同保障的结果。漏掉任意一环,都可能在高并发或异常场景下丢消息。
开启 RabbitMQ 生产者确认机制:channel.confirmSelect()
这是防止消息“发出去但没到交换机”的第一道防线。不启用它,basicPublish 调用成功只代表 TCP 写入成功,不代表 Broker 已接收。
- 必须在
channel创建后、声明队列前调用channel.confirmSelect() - 推荐搭配异步确认(
addConfirmListener),而非阻塞式waitForConfirms(),否则吞吐暴跌 - 注意:仅开启 confirm 不够,还要配合
publisher-return监听路由失败(比如 routingKey 错误导致消息被丢弃) - 常见错误:只加了
confirmSelect,却没注册监听器,失败时完全无感知
消息与队列都得持久化:durable=true 和 MessageProperties.PERSISTENT_TEXT_PLAIN
持久化是防 Broker 重启丢消息的关键。但很多人只设了队列持久化,忘了消息本身也要标记为持久化。
- 声明队列时传
true:channel.queueDeclare("q1", true, false, false, null) - 发送消息时必须用持久化属性:
channel.basicPublish("", "q1", MessageProperties.PERSISTENT_TEXT_PLAIN, body) - 交换机也建议声明为持久化(
channel.exchangeDeclare("ex", "direct", true)),否则重启后绑定关系丢失,新消息路由失败 - 坑点:Kafka 中对应的是
acks=all+min.insync.replicas=2,不是简单设个retention.ms
消费者必须关闭自动 ACK,改用手动 channel.basicAck()
这是最容易被跳过的环节。开 autoAck=true 意味着只要消息一推过去,RabbitMQ 就认为消费成功,哪怕你的业务逻辑还没跑完就崩了。
立即学习“Java免费学习笔记(深入)”;
- 创建消费者时指定
autoAck=false:channel.basicConsume("q1", false, consumer) - 在业务处理完成、数据库事务提交之后,再调用
channel.basicAck(deliveryTag, false) - 如果处理失败,应明确拒绝:
channel.basicNack(deliveryTag, false, true)(重入队)或转发至死信队列 - 别在 try-catch 外层直接 ack——异常吞掉、没日志、没重试,等于静默丢消息
兜底方案:本地消息表 + 定时对账
上面三步能覆盖 95% 场景,但遇到网络分区、磁盘损坏、回调丢失等极端情况,仍可能出问题。金融/订单类系统建议加一层业务级保障。
- 发消息前,先插入一条记录到
msg_log表,字段至少含:msg_id(UUID)、content、exchange、routing_key、status('sending')、next_retry_time - 发送成功后更新状态为
'sent';消费方处理完,通过回调或反向 MQ 通知发送方,将其改为'confirmed' - 定时任务每 30 秒扫描
status = 'sent' AND next_retry_time 的记录,重新投递并更新重试时间 - 注意:表主键必须是
msg_id,且所有写操作走同一 DB 实例,避免分布式事务复杂度
真正难的不是写几行确认代码,而是在每个环节都预设“它会失败”——网络会断、磁盘会坏、回调会丢、人会配错。所谓可靠性,就是把所有“应该发生”的事,都变成“必须验证发生”。










