【RocketMQ】(五)消息的消费

消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest将消费请求提交到线程池处理,否则需要分批构建进行提交。

消息消费

在消息被提交到线程池后进行处理时,会调用消息监听器的consumeMessage进行消息消费,它返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESSRECONSUME_LATER

  • CONSUME_SUCCESS:表示消息消费成功。
  • RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。

处理消息消费结果

设置ackIndex

在消息消费完毕之后,会根据consumeMessage方法返回的结果状态进行处理,对ackIndex的值进行设置,ackIndex的值用于在下一步中处理消费失败的消息。

前面可知消费结果状态有以下两种:

  • CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消费的总消息个数 - 1,表示消息都消费成功。
  • RECONSUME_LATER:消息消费失败,延迟进行消费,此时ackIndex值为-1。

二、处理消费失败的消息

广播模式

广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。

集群模式

开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,向Broker发送CONSUMER_SEND_MSG_BACK请求,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,消费次数加1,并加入到失败消息列表中,稍后重新提交到消息消费线程池进行处理。

发送CONSUMER_SEND_MSG_BACK请求

延迟级别

RocketMQ的延迟级别对应的延迟时间常量定义如下:

public class MessageStoreConfig {private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

延迟级别与延迟时间对应关系:
延迟级别0 —> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 —> 延迟时间5s
延迟级别2 —> 延迟时间10s

以此类推,最大的延迟时间为2h。

在向Broker发送CONSUMER_SEND_MSG_BACK请求的时候,会从上下文中获取设置的延迟级别(默认为0,也就是延迟1s),然后设置以下信息,向Broker发送请求:

  • 设置请求类型,请求类型为CONSUMER_SEND_MSG_BACK
  • 设置消费者组名称;
  • 设置消息在CommitLog中的偏移量;
  • 设置延迟级别;
  • 设置消息的ID;
  • 设置该消息的最大消费次数;
Broker对CONSUMER_SEND_MSG_BACK请求处理

Broker对CONSUMER_SEND_MSG_BACK类型的请求处理逻辑如下:

  1. 根据消费组获取该消费者组的订阅信息配置;
  2. 根据消费者组名称获取对应的重试主题;
  3. 从该消费者组的重试队列中随机选取一个队列;
  4. 根据消息在CommitLog中的偏移量从commitLog文件中获取消息内容;
  5. 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:
    • 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时从死信队列中随机选取一个队列;
    • 如果条件不满足,判断延迟级别是否为0,如果为0的话,会使用消息的消费次数作 + 3为新的延迟级别进行延迟消费;
  6. 新建消息对象MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),重新添加到CommitLog中,消息主题的设置有两种情况:
    • 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中;
    • 未达到DLQ队列的条件,设置延迟级别,使用重试主题(%RETRY% + 消费组名称),之后将消息投递到此主题下的队列中;
  7. 调用asyncPutMessage存储消息;

asyncPutMessage方法中,会对延迟级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

  1. 获取延迟消息的主题名称,RocketMQ对延迟消息有一个默认的主题名称SCHEDULE_TOPIC_XXXX;
  2. 根据消息设置的延迟级别,获取对应的延迟队列,SCHEDULE_TOPIC_XXXX主题下,会根据延迟级别创建对应的消息队列,所以这一步会根据消息的延迟级别投递到对应的队列中;
  3. 在消息属性中,设置消息原本的主题名称和消息队列,然后将消息当前的Topic改成RMQ_SYS_SCHEDULE_TOPIC

总结
消费者在消息消费失败的时候,会向Broker发送CONSUMER_SEND_MSG_BACK请求,在请求处理中会判断消息的消费次数是否大于最大的消费次数,如果超过最大消费次数,会将消息投递到死信队列中。
如果未达到最大的消费次数,会根据请求中设置的延迟级别,重新生成一条消息,使用重试主题(%RETRY% + 消费组名称),并随机选取一个队列投递消息,延迟进行消费,不过消息不会立刻投递到队列中,在消息存储之前会对延迟级别进行判断,如果需要延迟消费,会使用RocketMQ默认创建的SCHEDULE_TOPIC_XXXX主题,先根据延迟级别将消息投递到对应的延迟队列中,然后由一个定时任务去检测这个主题下的消息,当消息到达延迟的时间后,再将消息取出投递到原本主题下的消息队列中,之后的流程就与普通消息的存储一致,将消息存入CommitLog中,再创建对应的ConsumeQueue数据,消费者就可以拉取到消息重新进行消费。

消费者在启动的时候,会处理订阅的Topic数据,如果是集群模式,会自动添加重试主题的订阅(%RETRY% + 消费组名称),然后就可以从重试主题中拉取到对应的重试消息进行消费。

更新拉取偏移量

以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后更新拉取偏移量。

RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。

广播模式
广播模式对应的OffSetStore实现类为LocalFileOffsetStore,使用了一个ConcurrentMap类型的变量offsetTable存储每个消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,对offsetTable中的值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘。

集群模式
集群模式对应的实现类为RemoteBrokerOffsetStore,更新进度与广播模式下的更新类似,都是只更新了offsetTable中的数据。

持久化的触发
消费者在启动的时候注册了定时任务,定时将消息拉取进度进行持久化,对于广播模式,将每个消息队列对应的拉取偏移量持久化到本地文件即可,对于集群模式,由于拉取进度保存在Broker端,所以需要向Broker发送请求进行持久化,在RocketMQ的存储目录中有一个对应的文件,叫consumerOffset.json,里面的offsetTable中保存了每个消息队列的消费进度,持久化时会将消费进度写入这个文件:

{"offsetTable":{"TestTopic@TestTopicGroup":{ // 主题名称@消费者组名称0:0, // 每个消息队列对应的消费进度,Key中的0表示队列0,value中的0表示消息在ConsumeQueue中的逻辑偏移量1:1,2:1,3:0  }}
}

RocketMQ消息的消费相关源码可参考:【RocketMQ】【源码】消息的消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/140319.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

OpenMesh 网格平滑

文章目录 一、简介二、相关参数二、实现代码三、实现效果参考资料一、简介 由于物理采样过程固有的局限性,三维扫描仪获得的网格通常是有噪声的。为了消除这种噪声,所谓的平滑算法被开发出来。这类方法有很多,OpenMesh主要为我们提供了两种平滑算法,一种是较为经典的Laplac…

火山引擎 ByteHouse:ClickHouse 如何保证海量数据一致性

背景 ClickHouse是一个开源的OLAP引擎&#xff0c;不仅被全球开发者广泛使用&#xff0c;在字节各个应用场景中也可以看到它的身影。基于高性能、分布式特点&#xff0c;ClickHouse可以满足大规模数据的分析和查询需求&#xff0c;因此字节研发团队以开源ClickHouse为基础&…

【【萌新的FPGA学习之实战流水灯】】

萌新的FPGA学习之实战流水灯 实验任务 本节的实验任务是使用领航者底板上的两个 PL LED 灯顺序点亮并熄灭&#xff0c;循环往复产生流水灯的效 果&#xff0c;流水间隔时间为 0.5s。 1MHz&#xff1d;1000000Hz 10的6次方 1ns&#xff1d;10的-9次方秒 开发板晶振50Mhz 计算得…

NIO简单介绍

一、什么是NIO 1、Java NIO全称java non-blocking IO&#xff0c; 是指JDK提供的新API。从JDK1.4开始&#xff0c;Java提供了一系列改进的输入/输出的新特性&#xff0c;被统称为NIO(即New IO)&#xff0c;是同步非阻塞的 2、NIO有三大核心部分: Channel(通道)&#xff0c; Buf…

Goland设置头注释

package ${GO_PACKAGE_NAME} * Author: 坐公交也用券 * HomePage: https://liumou.site * File: ${NAME}.go * Date: ${DATE} ${TIME} * Des: 文件作用

点分治维护dp+连通块上新型dp思路+乘积方面进行根号dp:0922T4

首先连通块&#xff0c;所以点分治肯定是 Trick1 钦定选根的连通块dp 对于钦定选根的连通块dp&#xff0c;有一种常见思路 先对原树求其dfn序&#xff0c;按dfn序倒序求解 具体的&#xff0c;对于当前点 i i i&#xff08;注意这里都是指dfn序&#xff09;&#xff0c;我们…

企业电子招标采购系统源码之从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理

功能描述 1、门户管理&#xff1a;所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含&#xff1a;招标公告、非招标公告、系统通知、政策法规。 2、立项管理&#xff1a;企业用户可对需要采购的项目进行立项申请&#xff0c;并提交审批&#xff0c;查看所…

【智慧工地源码】智慧工地助力数字建造、智慧建造、安全建造、绿色建造

智慧工地围绕建设过程管理&#xff0c;建设项目与智能生产、科学管理建设项目信息生态系统集成在一起&#xff0c;该数据在虚拟现实环境中&#xff0c;将物联网收集的工程信息用于数据挖掘和分析&#xff0c;提供过程趋势预测和专家计划&#xff0c;实现工程建设的智能化管理&a…

[Linux]线程概念

[Linux]线程概念 文章目录 [Linux]线程概念什么是线程Linux系统下的线程实现线程是CPU调度的基本单位进程是系统分配资源的基本实体二级页表 线程的优点线程的缺点线程异常线程用途线程资源 什么是线程 线程是进程内部的一个执行分支&#xff0c;执行粒度比进程更细&#xff0…

【Java 基础篇】Java网络编程:下载进度监控实现详解

文件下载是许多应用程序的重要功能&#xff0c;而下载进度监控是提高用户体验的关键。在本文中&#xff0c;我们将详细介绍如何使用Java实现文件下载进度监控&#xff0c;以便用户可以实时了解文件下载的进度。 什么是下载进度监控 下载进度监控是一种用户界面元素或功能&…

113双周赛

题目列表 2855. 使数组成为递增数组的最少右移次数 2856. 删除数对后的最小数组长度 2857. 统计距离为 k 的点对 2858. 可以到达每一个节点的最少边反转次数 一、使数组成为递增数组的最少右移次数 这题可以直接暴力求解&#xff0c;枚举出每种右移后的数组&#xff0c;将…

什么是UWB定位技术?UWB定位的应用场景及功能介绍

说到定位我们并不陌生&#xff0c;定位技术一直与我们的生活密不可分&#xff0c;比如最常见的车辆导航。 根据使用场景&#xff0c;定位技术分为室内定位和室外定位。 室外定位主要依靠GPS&#xff0c;北斗&#xff0c;GLONASS&#xff0c;伽利略等全球卫星定位导航系统。室内…

2023年“羊城杯”网络安全大赛 决赛 AWDP [Break+Fix] Web方向题解wp 全

终于迎来了我的第一百篇文章。 这次决赛赛制是AWDP。BreakFix&#xff0c;其实就是CTFFix&#xff0c;Fix规则有点难崩。Break和Fix题目是一样的。 总结一下&#xff1a;败北&#xff0c;还是太菜了得继续修炼一下。 一、Break ezSSTI 看到是SSTI&#xff0c;焚靖直接一把梭…

AI人体行为分析:玩手机/打电话/摔倒/攀爬/扭打检测及TSINGSEE场景解决方案

一、AI人体行为分析技术概述及场景 人体姿态分析/行为分析/动作识别AI算法&#xff0c;是一种利用人工智能技术对人体行为进行检测、跟踪和分析的方法。通过计算机视觉、深度学习和模式识别等技术&#xff0c;可以实现对人体姿态、动作和行为的自动化识别与分析。 在场景应用…

005-第一代光电小工具(一)

第一代光电小工具(一) 文章目录 第一代光电小工具(一)项目介绍大致原理描述核心控件QCustomPlot关于QCustomPlot 播放音频软件截图 关键字&#xff1a; Qt、 Qml、 QCustomPlot、 曲线、 SQLite 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&…

解决因为修改SELINUX配置文件出错导致Faild to load SELinux poilcy无法进入CentOS7系统的问题

一、问题 最近学习Kubernetes&#xff0c;需要设置永久关闭SELINUX,结果修改错了一个SELINUX配置参数&#xff0c;关机重新启动后导致无法进入CentOS7系统&#xff0c;卡在启动进度条界面。 二、解决 多次重启后&#xff0c;在启动日志中发现 Faild to load SELinux poilcy…

VirtualBox解决VERR_SUPDRV_COMPONENT_NOT_FOUND错误

简述 最近使用VirtualBox时发现其增强功能不能用了&#xff0c;也就是不能双向拖拉文件&#xff0c;整了很久不知所以&#xff1b;看到有网友说跟新其VBoxGuestAdditions.ios文件&#xff0c;所以直接把我的VirtualBox从6.x升级到了7.x&#xff0c;然后就发生了眼前的一幕&…

IDEA2023新UI回退老UI

idea2023年发布了新UI&#xff0c;如下所示 但是用起来真心不好用&#xff0c;各种位置也是错乱&#xff0c;用下面方法可以回退老UI

【C++入门指南】C如何过渡到C++?祖师爷究竟对C++做了什么?

【C入门指南】C如何过渡到C&#xff1f;祖师爷究竟对C做了什么&#xff1f; 前言一、命名空间1.1 命名空间的定义1.2 命名空间使用 二、C输入、输出2.1 std命名空间的使用惯例 三、缺省参数3.1 缺省参数的定义3.2 缺省参数分类 四、函数重载4.1 函数重载概念4.2 C支持函数重载的…

如何防止商业秘密泄露(洞察眼MIT系统商业机密防泄密解决方案)

在当今的商业环境中&#xff0c;保护公司的商业秘密是至关重要的。商业秘密可能包括独特的业务流程、客户列表、研发成果、市场策略等&#xff0c;这些都是公司的核心竞争力。一旦这些信息被泄露&#xff0c;可能会对公司的生存和发展产生重大影响。本文将探讨如何通过使用洞察…