百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

RocketMQ源码分析之消息消费 轮询机制 PullRequestHoldService

ztj100 2024-10-27 18:33 24 浏览 0 评论

#头条创作挑战赛#

一、前言

RocketMQ 消费过程中的轮询机制是啥?

1.1 消息消费方式

RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式

  • Pull 模式:用户自己进行消息的拉取和消费进度的更新
  • Push 模式:Broker 将新的消息自动发送给用户进行消费

1.2 Push 消费模式

我们一般使用 RocketMQ 使用的是 Push 模式,因为比较方便,不需要手动拉取消息和更新消费进度。

那么你有没有想过 Push 模式是如何做到能够立即消费新的消息?

1.2.1 Push 模式原理

实际上,在 Push 消费时,消费者是在不断轮询 Broker,询问是否有新消息可供消费。一旦有新消息到达,马上拉取该消息。也就是说 Push 模式内部也用了 Pull 消息的模式,这样就可以立即消费到最新的消息。

1.3 如何进行轮询?

那么 Push 模式或 Pull 模式如何进行消息的查询?

能够想到的比较笨的方法是,每隔一定的时间(如1ms)就向 Broker 发送一个查询请求,如果没有新消息则立刻返回。可想而知这种方法非常浪费网络资源。

RocketMQ 为了提高网络性能,在拉取消息时如果没有新消息,不会马上返回,而是会将该查询请求挂起一段时间,然后再重试查询。如果一直没有新消息,直到轮询时间超过设定的阈值才会返回。

根据轮询设定的超时阈值大小的不同,RocketMQ 有两种轮询方式,分别为长轮询(默认)和短轮询。

1.4 长轮询和短轮询

RocketMQ 的 Broker 端参数 longPollingEnable 可以配置轮询方式,默认为 true

  • 短轮询:longPollingEnable=false,轮询时间为 shortPollingTimeMills ,默认为 1s
  • 长轮询:longPollingEnable=true,轮询时间为 5s。拉取请求挂起时间:受 DefaultMQPullConsumer 的 brokerSuspendMaxTimeMillis 控制,默认push模式固定15s,pull模式固定20s。

二、源码分析

  1. 挂起拉取请求;
  2. 线程不断轮询判断该offset是否有新的消息达到;
  3. 调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务;
  4. PullMessageProcessor线程处理任务交给processRequest方法进行处理;
  5. NotifyMessageArrivingListener监听消息到达;

1、挂起拉取请求

// 挂起拉取请求
// 根据主题名称 + 队列id, 获取 ManyPullRequest, 对于同一个 topic + 队列的拉取请求用 ManyPullRequest包装,
// 然后将 pullRequest 添加到 ManyPullRequest 中
public void suspendPullRequest(
        final String topic,
        final int queueId,
        final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}
private String buildKey(final String topic, final int queueId) {
    StringBuilder sb = new StringBuilder(topic.length() + 5);
    sb.append(topic);
    sb.append(TOPIC_QUEUEID_SEPARATOR);
    sb.append(queueId);
    return sb.toString();
}

2、线程不断轮询判断该offset是否有新的消息达到

public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            // 当前是否启用long polling,长轮询,一个请求过来了挂起,每次只挂起 5s,然后就去尝试拉取。
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000); // 长轮询一个等待周期是5s
            }
            // 如果不开启长轮询模式,则只挂起一次,挂起时间为 shortPollingTimeMills,然后去尝试查找消息
            else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            // 检查当前挂起的请求,把拉取的topic+queue的最大offset查一下,通知一下新消息来了,你不用挂起了
            // 你可以拿到消息走了
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}
// 你可以在线程体里进入等待的状态,还可以设置超时时间
protected void waitForRunning(long interval) {
    // 需要进行等待的时候,必须把已经通过过这个标识从true修改为false
    // 默认就是false,所以说一般这个逻辑是不跑的
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }

    // entry to wait
    // 无论此时你的countDownLatch.value是多少,此时都需要复位成1,让我们可以去进行一个等待
    waitPoint.reset();

    try {
        // 他可以等待一定的时间,可以去等待别人进行一个countDown操作
        // 开始来等待别人,指定我们的一个超时时间
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false); // 他一定会把hasNotified设置为false
        this.onWaitEnd();
    }
}
protected void checkHoldRequest() {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            // 根据主题,消费队列ID查找队列的最大偏移量。
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            try {
                // 根据该offset,判断是否有新的消息达到。
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
    notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
 * 通知消息到达
 *
 * @param topic        主题名称
 * @param queueId      队列id
 * @param maxOffset    消费队列当前最大偏移量
 * @param tagsCode     消息tag hashcode,基于tag消息过滤
 * @param msgStoreTime 消息存储时间
 * @param filterBitMap 过滤位图
 * @param properties   消息属性,基于属性的消息过滤
 */
public void notifyMessageArriving(
        final String topic,
        final int queueId,
        final long maxOffset, // 最大的offset已经到了哪儿去了
        final Long tagsCode,
        long msgStoreTime,
        byte[] filterBitMap,
        Map<String, String> properties) {

    String key = this.buildKey(topic, queueId);
    // 拿到当前挂起等待topic@queue里的消息
    ManyPullRequest mpr = this.pullRequestTable.get(key);

    if (mpr != null) {
        // 获取主题与队列的所有 PullRequest 并清除内部 pullRequest 集合,避免重复拉取
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();

            for (PullRequest request : requestList) {
                long newestOffset = maxOffset; // topic@queue里最新到达的消息最大的offset,新消息的offset
                if (newestOffset <= request.getPullFromThisOffset()) { // 如果说最新的消息是小于等于拉取请求里要拉取的起始offset
                    // 查询消息存储组件里的topic@queue最大的offset,查询出来了以后设置为topic@queue里的最大offset
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(
                            topic, queueId
                    );
                }

                // 如果队列最大偏移量大于 pullFromThisOffset 说明有新的消息到达,
                // 先简单对消息根据 tag,属性进行一次消息过滤,
                // 如果 tag,属性为空,则消息过滤器会返回true,然后 executeRequestWhenWakeup进行消息拉取,结束长轮询
                if (newestOffset > request.getPullFromThisOffset()) {
                    // 根据我们的消费队列判断一下是否匹配,新到达的消息tags是否跟拉取请求里的tags是匹配的
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(
                            tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)
                    );

                    // match by bit map, need eval again when properties is not null.
                    if (match && properties != null) {
                        // 继续判断一下跟commitlog想比是否匹配
                        match = request.getMessageFilter().isMatchedByCommitLog(
                                null, properties);
                    }

                    if (match) {
                        try {
                            // 调用PullMessageProcessor进行消息拉取
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
                                    request.getClientChannel(),
                                    request.getRequestCommand()
                            );
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }
                // 如果挂起时间超过 suspendTimeoutMillisLong,则超时,结束长轮询,调用executeRequestWhenWakeup 进行消息拉取,并返回结果到客户端
                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
                                request.getClientChannel(),
                                request.getRequestCommand()
                        );
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }

                replayList.add(request);
            }

            if (!replayList.isEmpty()) {
                // 如果待拉取偏移量大于消息消费队列最大偏移量,并且未超时,调用 mpr.addPullRequest(replayList) 将拉取任务重新放入,待下一次检测
                mpr.addPullRequest(replayList);
            }
        }
    }
}

3、调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务

public void executeRequestWhenWakeup(final Channel channel,
    final RemotingCommand request) throws RemotingCommandException {
    Runnable run = new Runnable() {
        @Override
        public void run() {
            try {
                final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);

                if (response != null) {
                    response.setOpaque(request.getOpaque());
                    response.markResponseType();
                    try {
                        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (!future.isSuccess()) {
                                    log.error("processRequestWrapper response to {} failed",
                                        future.channel().remoteAddress(), future.cause());
                                    log.error(request.toString());
                                    log.error(response.toString());
                                }
                            }
                        });
                    } catch (Throwable e) {
                        log.error("processRequestWrapper process request over, but response failed", e);
                        log.error(request.toString());
                        log.error(response.toString());
                    }
                }
            } catch (RemotingCommandException e1) {
                log.error("excuteRequestWhenWakeup run", e1);
            }
        }
    };
		// 最终将线程封装成一个RequestTask提交至线程池执行
    this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}

4、PullMessageProcessor线程处理任务交给processRequest方法进行处理

线程run方法逻辑,processRequest方法上文有分析过;

5、NotifyMessageArrivingListener监听消息到达

public class NotifyMessageArrivingListener implements MessageArrivingListener {

    private final PullRequestHoldService pullRequestHoldService;

    public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
        this.pullRequestHoldService = pullRequestHoldService;
    }

    @Override
    public void arriving(
            String topic,
            int queueId,
            long logicOffset,
            long tagsCode,
            long msgStoreTime,
            byte[] filterBitMap,
            Map<String, String> properties) {
        this.pullRequestHoldService.notifyMessageArriving(
                topic,
                queueId,
                logicOffset,
                tagsCode,
                msgStoreTime,
                filterBitMap,
                properties
        );
    }
}

也就是说当provider消息推送到broker后,broker会通知NotifyMessageArrivingListener消息到达,从而及时的结束挂起的轮询的链接。

三、总结

要开启长轮询, 在 broker 配置文件中 longPollingEnable=true, 默认是开启的。

??消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。

短轮询:longPollingEnable=false,第一次未拉取到消息后等待 shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。

长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer 的brokerSuspendMaxTimeMillis控制,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。

PullRequestHoldService:每隔5s重试一次。

DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService 线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1), 继续下一次检查。

相关推荐

从IDEA开始,迈进GO语言之门(idea got)

前言笔者在学习GO语言编程的时候,GO语言在国内还没有像JAVA/Php/Python那样普及,绕了不少的弯路,要开始入门学习一门编程语言,最好就先从选择一个好的编程语言的开发环境开始,有了这个开发环...

基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于SpringBoot...

基于springboot的个人服装管理系统java网上商城jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...

基于springboot的美食网站Java食品销售jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...

贸易管理进销存springboot云管货管账分析java jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述贸易管理进销存spring...

SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍SpringBoot+V...

目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗

帮粉丝找了一个基于SpringBoot的天猫商城项目,快速部署运行,所用技术:MySQL,Druid,Log4j2,Maven,Echarts,Bootstrap...免费给大家分享出来前台演示...

SpringBoot+Mysql实现的手机商城附带源码演示导入视频

今天为大家带来的是基于SpringBoot+JPA+Thymeleaf框架的手机商城管理系统,商城系统分为前台和后台、前台用的是Bootstrap框架后台用的是SpringBoot+JPA都是现在主...

全网首发!马士兵内部共享—1658页《Java面试突击核心讲》

又是一年一度的“金九银十”秋招大热门,为助力广大程序员朋友“面试造火箭”,小编今天给大家分享的便是这份马士兵内部的面试神技——1658页《Java面试突击核心讲》!...

SpringBoot数据库操作的应用(springboot与数据库交互)

1.JDBC+HikariDataSource...

SpringBoot 整合 Flink 实时同步 MySQL

1、需求在Flink发布SpringBoot打包的jar包能够实时同步MySQL表,做到原表进行新增、修改、删除的时候目标表都能对应同步。...

SpringBoot + Mybatis + Shiro + mysql + redis智能平台源码分享

后端技术栈基于SpringBoot+Mybatis+Shiro+mysql+redis构建的智慧云智能教育平台基于数据驱动视图的理念封装element-ui,即使没有vue的使...

Springboot+Mysql舞蹈课程在线预约系统源码附带视频运行教程

今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的Springboot+Mysql舞蹈课程在线预约系统,系统项目源代码在【猿来入此】获取!https://www.yuan...

SpringBoot+Mysql在线众筹系统源码+讲解视频+开发文档(参考论文

今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的在线众筹管理系统,主要实现了普通用户在线参与众筹基本操作流程的全部功能,系统分普通用户、超级管理员等角色,除基础脚手架外...

Docker一键部署 SpringBoot 应用的方法,贼快贼好用

这两天发现个Gradle插件,支持一键打包、推送Docker镜像。今天我们来讲讲这个插件,希望对大家有所帮助!GradleDockerPlugin简介...

取消回复欢迎 发表评论: