首页 > 运维 > linux运维 > 正文

RocketMQ消息为什么会被重复消费?

星夢妙者
发布: 2025-07-12 09:40:01
原创
533人浏览过

在使用rocketmq时,消息可能会被重复消费。让我们从全局视角探讨消息发送和消费的过程。rocketmq-dashboard是一个非常实用的图形化界面工具

RocketMQ消息为什么会被重复消费?

首先,我们在RocketMQ-Dashboard上创建一个topic,每个topic下有4个队列。

每个topic是一类消息的集合,topic下再细分queue是为了提高消息消费的并发度。

RocketMQ消息为什么会被重复消费?

「当producer发送topic消息时,应该发送到topic下的哪个queue呢?」

producer会采用轮询策略来发送消息。

「那么consumer应该消费哪个queue下的消息呢?」

当只有一个消费者时,它会消费所有queue中的消息。

RocketMQ消息为什么会被重复消费?

「如果有多个消费者呢?」

只需根据各种负载均衡策略将队列分配给消费者即可,如下图展示了两种负载均衡方式。

RocketMQ消息为什么会被重复消费?RocketMQ消息为什么会被重复消费?RocketMQ消息为什么会被重复消费?

你问我这两种负载策略是如何实现的?去查看源码吧,我就不详细分析了。

「如果消费者数量超过队列的数量会发生什么?」

多余的消费者将不会消费任何队列。

RocketMQ消息为什么会被重复消费?

为什么一个consumer只能消费一个queue呢?」

多个消费者消费一个queue肯定会有并发问题,所以需要加锁,这样还不如将topic下的队列数量设置得更多一些。

「我在运行过程中可以设置topic下queue的数量吗?」

当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景。

「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」

是的,大家一般把这个过程称为重平衡。

下面我们来分享一下详细的细节。

消息发送流程主要有三种方式:单向发送(只发送,不管结果)、同步发送和异步发送。

RocketMQ消息为什么会被重复消费?

消息消费流程基于推还是拉?消息消费的模式有两种方式:

拉取:Consumer不断从Broker拉取。 推送:Broker向Consumer推送。

这两种方式都有各自的缺点:

拉取:拉取的间隔不好确定,间隔太短没消息时会造成带宽浪费,间隔太长又会造成消息不能及时被消费。 推送:「推送和速率难以适配消费速率」,推的太快,消费者消费不过来怎么办?推的太慢消息不能及时被消费。

「看起来拉取和推送难以抉择。」

然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!

「你猜怎么改的?」

其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。

如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取。「对了,这种策略就叫做长轮询。」

「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的。」

具体消费流程如下图所示。

RocketMQ消息为什么会被重复消费?

「拉取到消息后是怎么处理的呢?」

PullRequest类的成员变量如下图所示。

RocketMQ消息为什么会被重复消费?

当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体。

「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏吧。」

msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的。

「什么是流控呢?」

就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图所示。

RocketMQ消息为什么会被重复消费?

当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等。

RocketMQ消息为什么会被重复消费?

接着就是收到响应,处理消息,并将PullRequest再次放入阻塞队列。

「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」

嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的。」

Consumer怎么提交offset?

RocketMQ消息为什么会被重复消费?

当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中。

「这样就会造成消息的重复消费。」

Consumer消费完消息并不是实时同步到Broker的,而是将offset先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致offset没提交,下次没提交offset的这部分消息会被再次消费。

即使offset被提交到了Broker,在还没来得及持久化的时候Broker宕机了,当重启的时候Broker会读取consumerOffset.json中保存的offset信息,这就会导致没持久化offset的这部分消息会被再次消费。

以上就是RocketMQ消息为什么会被重复消费?的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号