RocketMQ-订阅一致及解决方案

背景

这里借用Rocketmq官方的一句话来描述订阅关系一致: 

订阅关系一致指的是同一个消费者分组Group ID下,所有Consumer实例所订阅的Topic和Tag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。

具体的问题和实例请看阿里云关于Rocketmq订阅关系一致的说明 ,里面写的非常详细,这边主要是讨论一下关于经典的会出现的一个订阅不一致问题。

当前问题

我司由于历史问题,java侧服务mq使用泛滥,每多一个topic订阅就伴随着新建一个group,导致维护成本越来越高,所以我们在2.0 sdk第一版即支持 【一个消费group消费多个topic】,也就是如下面这张图的预期:
在这里插入图片描述
看起来没有问题,RocketMQ官方也支持多topic的订阅逻辑,我们也是这么去推动大家升级的。但是随着对MQ的深入了解,逐渐发现一个很可怕的问题: 如果一个正在使用的group我希望去对它进行订阅关系的变更(添加/删除topic订阅),这个是绝对没有办法走灰度发布的!因为它会直接出现

RocketMQ领域经典的订阅不一致问题,详情见下图(模拟了一个使用中的group变更订阅关系时的灰度发布过程)
在这里插入图片描述
由图中可知,当前sdk虽然支持了一个group监听多个topic,但是这仅限于新业务,一个全新的group才可以在一开始用这种方式去升级,但却没有办法支持后续的订阅关系变更,看起来之前的sdk升级没什么用,可扩展性太差。如果消息的收发都是新业务还好一点,假如是订阅一个发送量非常大的现有topic,一发版就会喜提告警,严重的会存在消息丢失的风险,并且无法回放。

解决方案

其实问题的关键在于: 每个客户端虽然知道其他客户端的存在,但是并不知道大家的订阅关系,就导致了在实际平衡的时候产生【我觉得他应该去消费这些队列】的错觉,所以解决问题的关键就是我们只要让每个客户端都知道整个group集群中所有客户端的订阅关系就行了。参考之前发表的rocketmq灰度方案,可以利用ClientId的特性,将当前客户端的订阅关系加密追加在ID后面。

public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}if (enableStreamRequestType) {sb.append("@");sb.append(RequestType.STREAM);}# 关键在于下面这几行代码MessageInstance instance = MessageStorage.getInstance(this.getInstanceName());if (instance != null) {sb.append("#");sb.append(MessageStorage.generateInstanceSubInfoEncode(instance));} else {sb.append("#[]");}return sb.toString();}

关于instance、group、topic的关系可以看下面这张图:

在这里插入图片描述

每个服务进程使用binder可以收发不同实例下的消息,因此在SDK中ClientId是以订阅的实例为维度创建的,在RocketMQ源码中是单例模式。

然后可以自己实现一个负载均衡策略:

/*** 消息队列分配策略增强--保证不出现订阅不一致的情况** @author mobai* @since 2024/6/9 12:57 AM*/
@Slf4j
public class EnhanceAllocateMessageQueueStrategyImpl extends AllocateMessageQueueAveragely {/*** 保证订阅一致的分配算法* 如果有任意客户端sdk版本低于当前版本,则降为默认的平均分配算法* <p> 1.如果是重试topic,则使用平均分配策略(重试的topic走的是内部回传broker,写到哪一个队列是随机的)* <p> 2.通过clientId获取每个client的订阅信息,然后获取客户端中对应当前group的topic监听列表,判断当前需要平衡的topic是否在监听列表中,* 如果不在则认为订阅不一致,让所有订阅了当前topic的客户端去分配所有的队列* <p> 3.如果订阅一致,则使用平均分配策略* 同时提供了一个允许覆盖的分配方法,默认是平均分配。子类可以根据实际情况自行覆盖,该方法会传入当前的订阅是否不出现不一致** @param consumerGroup 当前消费者组* @param currentCID    当前客户端id* @param mqAll         当前topic下的所有队列* @param cidAll        当前group的云端所有客户端实例* @return 分配结果*/@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return Collections.emptyList();}if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {return super.allocate(consumerGroup, currentCID, mqAll, cidAll);}String topic = mqAll.get(0).getTopic();boolean isSomeClientVersionLower = cidAll.stream().anyMatch(c -> c.lastIndexOf(MqConstant.GROUP_ENHANCE_TAG) == -1);if (isSomeClientVersionLower) {//避免当前这个增强sdk版本在灰度的时候,出现低版本客户端log.warn("[enhance allocate]: group:{}sub topic:{} has lower version client,use the default avg strategy", consumerGroup, topic);return super.allocate(consumerGroup, currentCID, mqAll, cidAll);}if (log.isDebugEnabled()) {log.info("[enhance allocate]: group:{} start topic rebalance:{},current client num:{},current queues num:{}", consumerGroup, topic, cidAll.size(), mqAll.size());}Map<String, List<MessageConsumer>> allClientsSubInfo = MessageStorage.getDecodeSubInfo(cidAll);Map<String, MessageConsumer> eachClientGroup = new HashMap<>(allClientsSubInfo.size());allClientsSubInfo.forEach((k, v) -> {for (MessageConsumer messageConsumer : v) {if (messageConsumer.getActualGroup().equals(consumerGroup)) {eachClientGroup.put(k, messageConsumer);break;}}});List<String> validCids = new ArrayList<>(eachClientGroup.size());for (Map.Entry<String, MessageConsumer> consumerEntry : eachClientGroup.entrySet()) {List<MessageConsumer.ListenTopic> currentConsumerSubTopics = consumerEntry.getValue().getTopics();if (currentConsumerSubTopics.stream().anyMatch(listenTopic ->listenTopic.getActualTopic().equals(topic)|| listenTopic.getTopic().equals(topic)|| listenTopic.getSourceTopic().equals(topic))) {validCids.add(consumerEntry.getKey());}}//如果存在订阅不一致的情况,则让所有订阅了当前topic的客户端去分配所有的队列,并且此逻辑不允许扩展,优先保证消息安全不丢失、不堆积if (validCids.size() != cidAll.size()) {List<MessageQueue> messageQueues = balanceAllocate(consumerGroup, currentCID, mqAll, validCids);log.warn("[enhance allocate]: group:{}sub topic:{} has not-balance-sub condition,sdk start enhance,clients {} complete {} queues rebalance,currentId:{},\n allocate result:{}", consumerGroup, topic,MessageStorage.getClientsIp(validCids), mqAll.size(), currentCID, MessageStorage.joinMessageQueue(messageQueues));return messageQueues;} else {return doAllocate(consumerGroup, currentCID, mqAll, cidAll);}}/*** 可扩展的分配算法,默认是平均分配** @param consumerGroup 消费组* @param currentCID    当前消费者* @param mqAll         所有消息队列* @param cidAll        所有消费者* @param isSubBalance  是否订阅均衡* @return 分配结果*/public List<MessageQueue> doAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {return balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);}/*** 平均分配算法** @param consumerGroup 消费组* @param currentCID    当前消费者* @param mqAll         所有消息队列* @param cidAll        所有消费者* @return 消息队列*/public final List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {return super.allocate(consumerGroup, currentCID, mqAll, cidAll);}@Overridepublic String getName() {return "Enhance";}

策略继承于平均分配策略,大概的思路如下:

  1. 排除掉重试topic
  2. 通过clientId判断是否存在不同版本的SDK,这点也很重要,当这个增强的策略在发布时,因为线上的服务并没有该ClientId标识,所以此时退化成标准的平均分配是最安全的。
  3. 通过将所有客户端Id进行信息提取和解密,判断当前balance的topic有哪一些客户端在监听(当前group肯定会监听,不然这个方法链路进不来)
  4. 如果发现过滤出来的客户端个数和云上记录的所有客户端个数不同,即认定为订阅不一致,此时让有当前topic订阅关系的客户端分配所有队列,这个逻辑禁止覆盖
  5. 在保证订阅一致的前提下,提供了一个允许扩展的分配算法,默认使用平均分配(灰度消息就是通过继承此类,扩展该方法实现的保证一致性的前提下做的灰度)
  6. 那些没有订阅当前topic的客户端进程不会进到这个topic的平衡方法

升级了SDK之后,以下是对应的交互变更效果图(只讨论新增订阅关系的场景,删除订阅关系也是一个道理)

在这里插入图片描述

验证

接下来通过一个服务来验证此逻辑的可行性(包含了灰度消息逻辑),首先准备了一个订阅了一个topic的group,sdk版本是2.0.8(没有该增强逻辑)

已知:topic有64个队列,存在8个broker上,消费已做好幂等。
在这里插入图片描述

升级该服务的sdk版本到2.1.0(当前增强版本),订阅关系不变,发布灰度

在这里插入图片描述在这里插入图片描述

sdk判断当前客户端存在版本不一致,因此降级为默认平均分配算法,发送10条消息测试一下

在这里插入图片描述
消费正常。

升级该服务SDK到2.1.0,直接发布上线,无订阅关系变更

在这里插入图片描述
在这里插入图片描述
队列分配正确,再发送10条消息:

在这里插入图片描述
消费正常。

新增加一个topic的订阅关系,发布灰度(新topic48个队列,分布在6个brokder上)

控制台提示订阅不一致

在这里插入图片描述
灰度pod日志: 独自接管了新topic全部队列,旧topic获取到每个brokder最后一个分区

在这里插入图片描述
在这里插入图片描述
正常pod日志:不受影响,只和消费之前的topic(灰度pod消费每个broker最后一个分区),所以只分配到到56个队列

在这里插入图片描述
在这里插入图片描述
此时发送10条消息到新的topic上,结果消息全部被灰度也就是新加订阅关系的客户端全部消费

在这里插入图片描述
再发送10条老消息到旧topic上,9条在正常的pod,1条在灰度的pod,也符合灰度只负载1/10分区的策略

在这里插入图片描述
验证通过,灰度验证通过

在这里插入图片描述
订阅一致了

减少其中一个topic的订阅关系,再次发布灰度

控制台订阅不一致

在这里插入图片描述
灰度pod(减少订阅的客户端)日志:只参与旧topic的分配,且是灰度分区,其他无影响

在这里插入图片描述
在这里插入图片描述
正常pod(完整订阅关系)日志: 新topic提示不一致,进入增强逻辑,分配到全部的48个队列,旧topic分配正常

在这里插入图片描述
在这里插入图片描述
发送10条消息到被删除订阅关系的新topic: 全部被有订阅关系的正常客户端消费

在这里插入图片描述
发送10条消息到老的共有的老topic: 9比1的比例被俩客户端平均消费

在这里插入图片描述
验证通过。

结论

该方案被验证是安全可行的,但是在实际接入时需要注意:

  1. 不要在首次升级sdk时就变更订阅关系发灰度,这样的话还是会出现订阅不一致,无解,一个比较好的做法是先将SDK版本全部升级(允许灰度),等后续版本迭代再做订阅关系的变更,就可以正常发灰度验证。
  2. 生产环境永远不要使用公网接入点,除了安全问题之外,阿里云公网接入点架构模式是服务端负载,该策略会失效,而且原则上生产也不应该开放公网接入点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1473517.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

关于GIS的概念方面在前端编程中的理解

关于GIS的概念方面在前端编程中的理解 一. 什么是gis二. 关于地球的建模(了解)三. GIS坐标系表现形式四.GIS的数据4.1 矢量数据4.2 栅格数据4.3 矢量数据和栅格数据的不同 一. 什么是gis 地理坐标系统&#xff0c;其目的就是通过地理坐标系可以确定地球上任何一点的位置。 二. …

jenkins配置gitee源码地址连接不上

报错信息如下&#xff1a; 网上找了好多都没说具体原因&#xff0c;最后还是看jenkins控制台输出日志发现&#xff1a; ssh命令执行失败&#xff08;git环境有问题&#xff0c;可能插件没安装成功等其他问题&#xff09; 后面发现是jenkins配置git的地方git安装路径错了。新手…

215. 数组中的第K个最大元素(中等)

215. 数组中的第K个最大元素 1. 题目描述2.详细题解3.代码实现3.1 Python3.2 Java 1. 题目描述 题目中转&#xff1a;215. 数组中的第K个最大元素 2.详细题解 快速排序算法在每一轮排序中&#xff0c;随机选择一个数字 x x x&#xff0c;根据与 x x x的大小关系将要排序的数…

设计小能手必备!CorelDRAW2024新功能大揭秘

&#x1f389; 设计小能手必备&#xff01;CorelDRAW 2024新功能大揭秘 嗨&#xff0c;亲爱的小红书的朋友们&#xff5e;&#x1f44b; 今天我要和大家安利一款让设计师们疯狂打call的设计软件——CorelDRAW 2024&#xff01;&#x1f31f; 作为一名资深的设计师&#xff0c;我…

VBA初学:零件成本统计之三(获取材料外协的金额)

第三步&#xff0c;从K3的数据库中获取金额 我这里是使用循环&#xff0c;通过任务单号将金额汇总出来&#xff0c;如果使用数组的话&#xff0c;还要按任务单写GROUP&#xff0c;还要去对应&#xff0c;不如循环直接一点 获取材料和外协金额的表格Sub getje()Dim rowcount A…

ctfshow-web入门-文件包含(web88、web116、web117)

目录 1、web88 2、web116 3、web117 1、web88 没有过滤冒号 : &#xff0c;可以使用 data 协议&#xff0c;但是过滤了括号和等号&#xff0c;因此需要编码绕过一下。 这里有点问题&#xff0c;我 (ls) 后加上分号发现不行&#xff0c;可能是编码结果有加号&#xff0c;题目…

Qwen1.5-1.8b部署

仿照ChatGLM3部署&#xff0c;参考了Qwen模型的文档&#xff0c;模型地址https://modelscope.cn/models/qwen/Qwen1.5-1.8B-Chat/summary http接口 服务端代码api.py from fastapi import FastAPI, Request from transformers import AutoTokenizer, AutoModelForCausalLM, …

Docker:Docker网络

Docker Network 是 Docker 平台中的一项功能&#xff0c;允许容器相互通信以及与外界通信。它提供了一种在 Docker 环境中创建和管理虚拟网络的方法。Docker 网络使容器能够连接到一个或多个网络&#xff0c;从而使它们能够安全地共享信息和资源。 预备知识 推荐先看视频先有…

多功能实用工具箱,实用工具箱提供了从日常,图片,查询、设备、特色、提取等多方面的功能,操作简单,即点即用,避免您下载超多应用的难题,应用体积轻巧,界面简洁。

今天给大家分享手机工具软件合集&#xff0c;明天想看什么软件&#xff0c;在评论区留言吧&#xff01; 软件链接&#xff1a;4款万能玩机工具&#xff0c;一网打尽&#xff0c;快来看看&#xff01; 实用工具箱 这是一款多功能实用工具箱&#xff0c;实用工具箱提供了从日常…

前端面试题7(单点登录)

如何实现单点登录 单点登录&#xff08;Single Sign-On&#xff0c;简称SSO&#xff09;是一种允许用户在多个应用系统中只需登录一次&#xff0c;就可以访问所有相互信任的应用系统的认证技术。实现前端单点登录主要依赖于后端的支持和一些特定的协议&#xff0c;如OAuth、Ope…

Elasticsearch 实现 Word、PDF,TXT 文件的全文内容提取与检索

文章目录 一、安装软件:1.通过docker安装好Es、kibana安装kibana:2.安装原文检索与分词插件:之后我们可以通过doc命令查看下载的镜像以及运行的状态:二、创建管道pipeline名称为attachment二、创建索引映射:用于存放上传文件的信息三、SpringBoot整合对于原文检索1、导入依赖…

论文学习——基于小生境预测策略的动态多目标进化算法

论文题目&#xff1a;A dynamic multi-objective evolutionary algorithm based on Niche prediction strategy 基于决策变量分类的动态多目标优化算法&#xff08;Jinhua Zheng a,b, Bo Zhang a,b,∗, Juan Zou a,b, Shengxiang Yang a,d, Yaru Hu&#xff09;Applied Soft C…

昇思第10天

RNN实现情感分类 二分类问题&#xff1a;Positive和Negative两类 步骤&#xff1a; 1.加载IMDB数据集 2.加载预训练词向量:预训练词向量是对输入单词的数值化表示&#xff0c;通过nn.Embedding层&#xff0c;采用查表的方式&#xff0c;输入单词对应词表中的index&#xff0c;…

深度学习基础以及vgg16讲解

一 什么是卷积 上图所示&#xff0c;为图像边缘提取得一个卷积过程&#xff0c;卷积核就是计算当前像素左右两边得像素差&#xff0c;这个差值越大代表越可能是图像边缘。因此当实现其它功能时&#xff0c;只需要调整卷积核得参数即可。深度学习的训练其实就是在确定这些参数。…

惕佫酰假托品合酶的发现-文献精读28

Discovering a mitochondrion-localized BAHD acyltransferase involved in calystegine biosynthesis and engineering the production of 3β-tigloyloxytropane 发现一个定位于线粒体的BAHD酰基转移酶&#xff0c;参与打碗花精生物合成&#xff0c;并工程化生产惕佫酰假托品…

C # @逐字字符串

逐字字符串 代码 namespace TestAppConsole {class program{static void Main(string[] args){int a 0;int b 9;string c "2ui923i9023";//Console.Write(sizeof(int));string d "\t8282jjksk";string e "\t8282jjksk";Console.WriteLine(…

Tkinter布局助手

免费的功能基本可以满足快速开发布局&#xff0c; https://pytk.net/ iamxcd/tkinter-helper: 为tkinter打造的可视化拖拽布局界面设计小工具 (github.com) 作者也把项目开源了&#xff0c;有兴趣可以玩玩

每周算法:无向图的双连通分量

题目链接 冗余路径, Redundant Paths G 题目描述 为了从 F F F 个草场中的一个走到另一个&#xff0c;奶牛们有时不得不路过一些她们讨厌的可怕的树。 奶牛们已经厌倦了被迫走某一条路&#xff0c;所以她们想建一些新路&#xff0c;使每一对草场之间都会至少有两条相互分离…

对BSV区块链的曼达拉网络通俗易懂的解释

​​发表时间&#xff1a;2023年6月15日 BSV区块链正在引入“曼达拉”升级&#xff0c;使BSV区块链网络的拓扑结构能够适配Teranode&#xff0c;适配这个可以大幅扩容的节点软件。BSV区块链上曼达拉网络的概念并不会改变整个系统的核心规则&#xff1b;相反&#xff0c;它能够引…

I2C接口+高度集成的电源管理芯片(PMIC)-iML1942

电源管理芯片 - iML1942是一个高度集成的电源管理IC为TFT液晶面板。它具有完整的I2C接口来编程各种参数。该设备包括一个针对AVDD的电流模式升压调节器&#xff0c;一个针对VBK1的同步升压转换器。VGL可选的反相转换器或负电荷泵调节器&#xff0c;VSS1负线性调节器&#xff0c…