RocketMQ源码分析之消息消费 轮询机制 PullRequestHoldService
ztj100 2024-10-27 18:33 74 浏览 0 评论
一、前言
RocketMQ 消费过程中的轮询机制是啥?
1.1 消息消费方式
RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式
- Pull 模式:用户自己进行消息的拉取和消费进度的更新
- Push 模式:Broker 将新的消息自动发送给用户进行消费
1.2 Push 消费模式
我们一般使用 RocketMQ 使用的是 Push 模式,因为比较方便,不需要手动拉取消息和更新消费进度。
那么你有没有想过 Push 模式是如何做到能够立即消费新的消息?
1.2.1 Push 模式原理
实际上,在 Push 消费时,消费者是在不断轮询 Broker,询问是否有新消息可供消费。一旦有新消息到达,马上拉取该消息。也就是说 Push 模式内部也用了 Pull 消息的模式,这样就可以立即消费到最新的消息。
1.3 如何进行轮询?
那么 Push 模式或 Pull 模式如何进行消息的查询?
能够想到的比较笨的方法是,每隔一定的时间(如1ms)就向 Broker 发送一个查询请求,如果没有新消息则立刻返回。可想而知这种方法非常浪费网络资源。
RocketMQ 为了提高网络性能,在拉取消息时如果没有新消息,不会马上返回,而是会将该查询请求挂起一段时间,然后再重试查询。如果一直没有新消息,直到轮询时间超过设定的阈值才会返回。
根据轮询设定的超时阈值大小的不同,RocketMQ 有两种轮询方式,分别为长轮询(默认)和短轮询。
1.4 长轮询和短轮询
RocketMQ 的 Broker 端参数 longPollingEnable 可以配置轮询方式,默认为 true
- 短轮询:longPollingEnable=false,轮询时间为 shortPollingTimeMills ,默认为 1s
- 长轮询:longPollingEnable=true,轮询时间为 5s。拉取请求挂起时间:受 DefaultMQPullConsumer 的 brokerSuspendMaxTimeMillis 控制,默认push模式固定15s,pull模式固定20s。
二、源码分析
- 挂起拉取请求;
- 线程不断轮询判断该offset是否有新的消息达到;
- 调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务;
- PullMessageProcessor线程处理任务交给processRequest方法进行处理;
- NotifyMessageArrivingListener监听消息到达;
1、挂起拉取请求
// 挂起拉取请求
// 根据主题名称 + 队列id, 获取 ManyPullRequest, 对于同一个 topic + 队列的拉取请求用 ManyPullRequest包装,
// 然后将 pullRequest 添加到 ManyPullRequest 中
public void suspendPullRequest(
final String topic,
final int queueId,
final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
private String buildKey(final String topic, final int queueId) {
StringBuilder sb = new StringBuilder(topic.length() + 5);
sb.append(topic);
sb.append(TOPIC_QUEUEID_SEPARATOR);
sb.append(queueId);
return sb.toString();
}
2、线程不断轮询判断该offset是否有新的消息达到
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
// 当前是否启用long polling,长轮询,一个请求过来了挂起,每次只挂起 5s,然后就去尝试拉取。
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000); // 长轮询一个等待周期是5s
}
// 如果不开启长轮询模式,则只挂起一次,挂起时间为 shortPollingTimeMills,然后去尝试查找消息
else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检查当前挂起的请求,把拉取的topic+queue的最大offset查一下,通知一下新消息来了,你不用挂起了
// 你可以拿到消息走了
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
// 你可以在线程体里进入等待的状态,还可以设置超时时间
protected void waitForRunning(long interval) {
// 需要进行等待的时候,必须把已经通过过这个标识从true修改为false
// 默认就是false,所以说一般这个逻辑是不跑的
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
// entry to wait
// 无论此时你的countDownLatch.value是多少,此时都需要复位成1,让我们可以去进行一个等待
waitPoint.reset();
try {
// 他可以等待一定的时间,可以去等待别人进行一个countDown操作
// 开始来等待别人,指定我们的一个超时时间
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false); // 他一定会把hasNotified设置为false
this.onWaitEnd();
}
}
protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 根据主题,消费队列ID查找队列的最大偏移量。
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 根据该offset,判断是否有新的消息达到。
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
* 通知消息到达
*
* @param topic 主题名称
* @param queueId 队列id
* @param maxOffset 消费队列当前最大偏移量
* @param tagsCode 消息tag hashcode,基于tag消息过滤
* @param msgStoreTime 消息存储时间
* @param filterBitMap 过滤位图
* @param properties 消息属性,基于属性的消息过滤
*/
public void notifyMessageArriving(
final String topic,
final int queueId,
final long maxOffset, // 最大的offset已经到了哪儿去了
final Long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
// 拿到当前挂起等待topic@queue里的消息
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 获取主题与队列的所有 PullRequest 并清除内部 pullRequest 集合,避免重复拉取
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset; // topic@queue里最新到达的消息最大的offset,新消息的offset
if (newestOffset <= request.getPullFromThisOffset()) { // 如果说最新的消息是小于等于拉取请求里要拉取的起始offset
// 查询消息存储组件里的topic@queue最大的offset,查询出来了以后设置为topic@queue里的最大offset
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(
topic, queueId
);
}
// 如果队列最大偏移量大于 pullFromThisOffset 说明有新的消息到达,
// 先简单对消息根据 tag,属性进行一次消息过滤,
// 如果 tag,属性为空,则消息过滤器会返回true,然后 executeRequestWhenWakeup进行消息拉取,结束长轮询
if (newestOffset > request.getPullFromThisOffset()) {
// 根据我们的消费队列判断一下是否匹配,新到达的消息tags是否跟拉取请求里的tags是匹配的
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(
tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)
);
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
// 继续判断一下跟commitlog想比是否匹配
match = request.getMessageFilter().isMatchedByCommitLog(
null, properties);
}
if (match) {
try {
// 调用PullMessageProcessor进行消息拉取
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
request.getClientChannel(),
request.getRequestCommand()
);
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 如果挂起时间超过 suspendTimeoutMillisLong,则超时,结束长轮询,调用executeRequestWhenWakeup 进行消息拉取,并返回结果到客户端
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
request.getClientChannel(),
request.getRequestCommand()
);
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
// 如果待拉取偏移量大于消息消费队列最大偏移量,并且未超时,调用 mpr.addPullRequest(replayList) 将拉取任务重新放入,待下一次检测
mpr.addPullRequest(replayList);
}
}
}
}
3、调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
try {
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
// 最终将线程封装成一个RequestTask提交至线程池执行
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
4、PullMessageProcessor线程处理任务交给processRequest方法进行处理
线程run方法逻辑,processRequest方法上文有分析过;
5、NotifyMessageArrivingListener监听消息到达
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
this.pullRequestHoldService = pullRequestHoldService;
}
@Override
public void arriving(
String topic,
int queueId,
long logicOffset,
long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(
topic,
queueId,
logicOffset,
tagsCode,
msgStoreTime,
filterBitMap,
properties
);
}
}
也就是说当provider消息推送到broker后,broker会通知NotifyMessageArrivingListener消息到达,从而及时的结束挂起的轮询的链接。
三、总结
要开启长轮询, 在 broker 配置文件中 longPollingEnable=true, 默认是开启的。
??消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。
短轮询:longPollingEnable=false,第一次未拉取到消息后等待 shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。
长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer 的brokerSuspendMaxTimeMillis控制,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。
PullRequestHoldService:每隔5s重试一次。
DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService 线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1), 继续下一次检查。
相关推荐
- Docker安全开放远程访问连接权限(docker 远程授权访问)
-
1、Docker完全开放远程访问Docker服务完全开放对外访问权限操作如下:#开启端口命令(--permanent永久生效,没有此参数重启后失效)firewall-cmd--zone=pu...
- SpringCloud系列——4OpenFeign简介及应用
-
学习目标什么是OpenFeign以及它的作用RPC到底怎么理解OpenFeign的应用第1章OpenFeign简介在前面的内容中,我们分析了基于RestTemplate实现http远程通信的方法。并...
- Spring Boot集成qwen:0.5b实现对话功能
-
1.什么是qwen:0.5b?模型介绍:Qwen1.5是阿里云推出的一系列大型语言模型。Qwen是阿里云推出的一系列基于Transformer的大型语言模型,在大量数据(包括网页文本、书籍、代码等)...
- JDK从8升级到21的问题集(jdk8升级到11)
-
一、背景与挑战1.升级动因oOracle长期支持策略o现代特性需求:协程、模式匹配、ZGC等o安全性与性能的需求oAI新技术引入的版本要求...
- 大白话详解Spring Cloud服务降级与熔断
-
1.Hystrix断路器概述1.1分布式系统面临的问题复杂分布式体系结构中的应用程序有数十个依赖关系,每个依赖关系在某些时候将不可避免地失败。这就造成有可能会发生...
- 面试突击43:lock、tryLock、lockInterruptibly有什么区别?
-
在Lock接口中,获取锁的方法有4个:lock()、tryLock()、tryLock(long,TimeUnit)、lockInterruptibly(),为什么需要这么多方法?这些方法都有...
- 了解网络编程 TCP/IP 协议与UDP 协议
-
因为iP地址比较难记忆,很多情况下可以使用域名代替iP地址。1.TCP/IP协议与UDP协议通过IP地址与端口号确定计算机在网络中的位置后,接下来考虑通讯的问题:因为不同计算机的软硬件平台...
- Semaphore与Exchanger的区别(semaphore和signal)
-
Semaphore和Exchanger是Java并发编程中两个常用的同步工具类,它们都可以用于协调多个线程之间的执行顺序和状态,但它们的作用和使用方式有所不同:Semaphore类表示一个...
- Java教程:什么是分布式任务调度?怎样实现任务调度?
-
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序...
- java多线程—Runnable、Thread、Callable区别
-
多线程编程优点:进程之间不能共享内存,但线程之间共享内存非常容易。系统创建线程所分配的资源相对创建进程而言,代价非常小。Java中实现多线程有3种方法:继承Thread类实现Runnable...
- 工厂模式详解(工厂模式是啥意思)
-
工厂模式详解简单工厂简单工厂模式(SimpleFactoryPattern)是指由一个工厂对象决定创建出哪一种产品类的实例。简单工厂适用于工厂类负责创建的对象较少的场景,且客户端只需要传入工厂类的...
- 我们程序员眼中的母亲节(你眼中的程序员是什么样子的?程序员的薪酬如何?)
-
导语:对于我们成人来说,尤其是漂泊在外的程序员,陪伴父母的时间太少了。每逢佳节倍思亲,我们流浪外在的游子应该深有感触。母亲,是世界上最伟大的人,她承载着对我们的爱,更是负担和压力。我们作为子女,只会嫌...
- 死锁的 4 种排查工具(死锁检测方法要解决两个问题)
-
死锁(DeadLock)指的是两个或两个以上的运算单元(进程、线程或协程),都在等待对方停止执行,以取得系统资源,但是没有一方提前退出,就称为死锁。死锁示例接下来,我们先来演示一下Java中最简...
- 1. 工厂模式详解(工厂模式示例)
-
我们的项目代码也是由简而繁一步一步迭代而来的,但对于调用者来说却是越来越简单化。简单工厂模式简单工厂模式(SimpleFactoryPattern)是指由一个工厂对象决定创建出哪一种产品类的实例。...
- Jmeter(二十):jmeter对图片验证码的处理
-
jmeter对图片验证码的处理在web端的登录接口经常会有图片验证码的输入,而且每次登录时图片验证码都是随机的;当通过jmeter做接口登录的时候要对图片验证码进行识别出图片中的字段,然后再登录接口中...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Docker安全开放远程访问连接权限(docker 远程授权访问)
- SpringCloud系列——4OpenFeign简介及应用
- Spring Boot集成qwen:0.5b实现对话功能
- JDK从8升级到21的问题集(jdk8升级到11)
- 大白话详解Spring Cloud服务降级与熔断
- 面试突击43:lock、tryLock、lockInterruptibly有什么区别?
- 了解网络编程 TCP/IP 协议与UDP 协议
- Semaphore与Exchanger的区别(semaphore和signal)
- Java教程:什么是分布式任务调度?怎样实现任务调度?
- java多线程—Runnable、Thread、Callable区别
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)