深入浅出消息队列----【核心之消息的发送】

深入浅出消息队列----【核心之消息的发送】

  • 普通消息
    • 同步消息
    • 异步消息
  • 单向消息
  • 顺序消息
  • 延迟消息
  • 批量消息

本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】

根据 RocketMQ 官方,消息的类型可分为 5 大类,分别是:

  1. 普通消息
  2. 顺序消息
  3. 延迟消息
  4. 批量消息
  5. 事务消息

普通消息

普通消息,就是普普通通的消息…

同步消息

同步消息指的是:生产者发送一条消息给 Broker,需要等待 Broker 返回响应(类似返回我接收到啦的消息确认消息),然后才会继续发送后续的消息。

producer.send(msg)

这样就保证了发送的消息一定被成功接收之后,才继续处理后面的业务。

请添加图片描述

从图上看,消息 B 的发送,需要 Broker 反馈消息 A 已经收到了,不然生产者就会一直等着。

假设 Broker 没反馈消息 A 咋办?因为网络是不稳定的,很有可能 Broker 反馈了,但是生产者没收到。

此时,生产者就会进行重试。

请添加图片描述

默认重试三次,如果三次后还是失败就会抛出异常,这时候我们需要捕获这个异常,进行日志记录或其他兜底操作。

通过这个方式,就可以确保发送成功的消息一定是被接收的了,这样一来一回就类似 TCP 的三次握手,一次请求过去,一个 ack 回来。

重试机制会有一个弊端:消息的重复发送

请添加图片描述

其实 Broker 已经存储了消息 A,并且通知生产者收到消息了,但是因为网络的不稳定,生产者没有收到这个响应,然后超时后重新发送消息 A,这样一来 Broker 就存储了两条消息 A,后面消费者消费时也会拿到两条消息 A。

虽然这样看起来,拿到两条相同的消息无关紧要,但是带入真实场景,假设消息 A 是银行卡扣钱的消息,那我们卡里的钱是不是可能会被扣 2 次了?这不损失大了!

所以要对这方面做好消息的防重,或者幂等。

面试题:如何保证消息一定发送成功?

答:Broker 接收成功后,返回 ACK (类似接收到了的响应)给生产者,没接收到 ACK 则认为消息发送失败,进行重试。

异步消息

与同步消息对应的就是异步消息了。

发送异步消息时,生产者不需要阻塞等待着上一条消息的返回,它可以紧接着发送后续的消息。

请添加图片描述

那么问题来了,假设前面的消息发送失败怎么办?

发送异步消息其实需要提供一个方法,方法里面定义了 onSuccess、onException 两个方法。

在这里插入图片描述

会有另外的线程来处理 Broker 的响应,如果接受到成功的响应就会执行 onSuccess 的逻辑。

如果发送失败,会执行 onException 的逻辑,我们可以在这个逻辑里面实现失败的记录数据,然后进行后续的人工处理或定时处理或报警等等。

当然,异步消息也可以设置重试的次数,有个参数 RetryTimesWhenSendAsyncFailed,调整这个参数我们可以定义异步发送失败重试的次数。

它跟同步消息的区别主要在于场景的应用,同步消息需要等到前一条消息的响应才能继续发后面的消息,而异步消息不需要等待。

因此,在对响应时间敏感的场景下,异步消息比较合适,因为生产者不需要等待消息的响应可以直接处理后续的消息发送。

还有,因为异步消息也可以设置重试,因此也会出现和同步消息一样的消息重复问题。

单向消息

前面无论是同步消息还是异步消息,我们都会关注发送完消息 Broker 的响应,但是有些场景压根就不关心这个结果。

即生产者只管发送消息,至于 Broker 有没有收到消息,生产者不关心,不需要等待响应。

请添加图片描述

比如日志的收集,日志的量级很大,但可靠性的要求不高(丢几条日志没关系),因此单向消息在这个场景就非常合适。

因为不需要等待响应,发完就完事,发送的耗时会很短,且不需要异步线程来等待 Broker,这样一来系统能同时承载更多的消息发送,性能会比较好。

缺点就是 Broker 不一定会收到消息(会丢失消息)。

因此适用于对消息可靠性要求不高的场景。

顺序消息

顺序消息,顾名思义就是按顺序发布消息,并且按发布消息的顺序来消费消息。

请添加图片描述

一个很常见的例子就是订单场景:

  1. 创建订单
  2. 支付
  3. 发货
  4. 完结订单

我们肯定需要先创建订单,才能支付订单,且支付完才能发货,最后收到货完结订单。

这看起来天经地义,但是我们已经知晓消费队列的实际实现:一个 Topic 是分多个队列的,每个队列都有消费者并行消费。

假设订单相关的 Topic 叫 Topic-Order,那么创建订单消息发送到了队列1,紧接着客户支付了,支付消息发送到了队列2,商家立马发货,发货消息发到了队列3。

请添加图片描述

从图上来看,消息的顺序确实是有先后关系的,但是每个消费者消费的速度是不一样的,我们保证不了他们的消费速度!

很可能消费者-2消费了支付的消息,而消费者-1还没有消费完创建订单的消息,这样一来业务的顺序就错乱了,处理就报错了!

订单都没生成支付啥呀?

所以普通消息不能保证先发送的消息一定被先消费,分析可知,本质原因是因为多队列的实现。

如何解决呢?

请添加图片描述

把这几个消息都发送到一个队列不就完事了吗?

这就是顺序消息。

如果报创建订单、支付、发货、完结订单,这几类消息全都发往一个队列,这叫全局顺序消息

如果把同一笔订单的创建、支付、发货、完结发往一个队列,不同的订单可以发往不同队列,这叫分区顺序消息

请添加图片描述

理论上分区顺序消息够用了,并且分区顺序消息的并发度更好,从上图来看,消费者1、2能同时处理订单-1和订单-2,如果是全局顺序消息,那么只有消费者-1一个人在干活。

然后具体将消息发送到哪个队列是生产者指定的。

假设要实现全局顺序消息,那么生产者将这几条消息都指定往队列1发送,即可实现全局顺序消息。

如果是分区顺序消息,生产者只需要依靠一个叫 sharding key 的东西来分区即可,比如订单的场景可以将订单号作为 sharding key。

那么我们仅需在发送消息的时候,根据 sharding key(订单号:orderId)来选择队列即可,我用伪代码来实现下:

int queueIndex = orderId % queue.size();

producer.send(msg, queueIndex);

这样我们就能保证一笔订单相关的消息都发往一个分区!默认普通消息是轮询选择队列,比如上一次消息发送的是队列1,后面就是队列2这样的轮询。

所以在顺序要求的场景下,我们需要采用顺序消息来实现,并且最好是分区顺序消息,这样能提供并发度,加快消息的消费速率。

延迟消息

生产者发送了消息,但是并不想立马被消费者消费,希望延迟一段时间后才能被消费。

比如订单取消场景,一般我们下单后,如果 15 分支没有支付,这笔订单就需要被取消。

这个场景下我们就可以在下单时同时发送一个订单取消的延迟消息,时间是 15分支,这样 15分支后消费者就能收到这个消息,然后看看此时的订单有没有被值,如果没有被支付, 那么就执行订单取消的逻辑。

RocketMQ 中具体是如何实现延迟消息的你?

很容易想到的一个方式是将消息正常发送给 Broker,然后消费者消费的时候来判断是不是延迟消息,看看是否已经到时间了,到了就消费,没到就不消费。

实际上并没有这么简单,还记得我们之前文章提到的消息点位?消费者每消费完一条消息,需要更新消息点位来。

如当前消费的点位是 100,第 101 条消息消费后,点位 +1,即 101。

但延迟消息没到时间无法被消费,那不就使得正常点位卡着了,加不上去了,这样一来排在后面的正常消息不就不能被消费了?

请添加图片描述

所以延迟消息不能这样实现,那 RocketMQ 是怎么做的呢?

实际上延迟消息一开始不放在正常的 Topic 中,RocketMQ 专门搞了个 Topic 叫 SCHEDULE_TOPIC_XXXX,将所有延迟消息都放在这个 Topic 下。

然后有个定时任务来扫描遍历消息的延迟时间到了没,如果到了,那么再把延迟消息发往它本身的 Topic 队列中。

这样就保证了延迟消息到时间之前,消费者不会消费到这个消息(因为消费者根本就没有订阅 SCHEDULE_TOPIC_XXXX),然后一到时间,消息就被投递到原来的 Topic 上,这样消费者就能消费到了。

请添加图片描述

这样的设计就复用了本身关于 Topic、队列还有消费者消费消息的逻辑。

对了,在 RocketMQ 中,延迟的时间是无法自定义的,是有固定的阶梯型限制,我们在发送消息的时候,只能设定投递等级,不同等级固定对应一个延迟时间:

请添加图片描述

在商业版,如阿里云上的服务时支持自定义时间的。

批量消息

批量消息就是一次性打包发送多条消息,在对吞吐量敏感的场景,批量消息非常合适

正常消息是一条一条的发送,然后一条一条的等待响应。

而批量消息是一批一批的发送,比如 100 条消息,本来需要调用 100 次发生接口,且需要等待 100 次响应。

现在将这 100 条消息打包成 1 条消息发送,这样是不是仅需要调用 1 次发生接口,且等待一次响应?

这样处理的效率(吞吐量)肯定是变高了。

当然,如果其中一条数据出错,可能需要一批重来了,处理起来也会比较麻烦。

关于批量消息使用起来也很简单:

在这里插入图片描述

这样传入一个 list,RocketMQ 自然就知道这是一个批量消息了,它内部会有一个 batch 操作来打包这个列表:

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1486984.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

工程视角:数据结构驱动的应用开发--字典(dictionary),列表(list)与实体

这里写目录标题 业务业务场景流程分析 实现数据访问层(DAL)业务逻辑层(BLL)用户界面层(UI)工具类 设计思路为什么抽出工具类关于U层使用字典的好处工程视角 业务 业务场景 在一个金融应用系统中&#xff0c…

【JavaScript】虚拟 DOM

虚拟 DOM 是⼀层对真实DOM的抽象,以JavaScript 对象 (VNode 节点) 作为基础的树,⽤对象的属性来描述节点,最终可以通过⼀系列操作使这棵树映射到真实环境上。 虚拟DOM 表现为⼀个 Object对象。并且最少包含标签名 (tag)、属性 (attrs) 和⼦元…

多路复用IO、TCP并发模型

时分复用 CPU单核在同一时刻只能做一件事情,一种解决办法是对CPU进行时分复用(多个事件流将CPU切割成多个时间片,不同事件流的时间片交替进行)。在计算机系统中,我们用线程或者进程来表示一条执行流,通过不同的线程或进程在操作系…

HarmonyOS 本地真机运行

目录 官网地址 1.开发工具设置签名 2.手机开启开发者模式 3.使用USB连接方式 4.使用无线调试连接方式 5.常见的问题 官网地址 使用真机运行应用 使用本地真机运行应用/服务 1.开发工具设置签名 官网应用/服务签名 1.左上角文件--项目结构-勾选自动生成签名-Sign in登录 2…

WEB前端08-综合案例(动态表格)

使用 HTML、CSS 和 JavaScript 创建动态表格 在本教程中,我们将创建一个动态表格,允许用户添加行、选择项目,并执行批量操作,如全选或删除选中的行。我们将通过 HTML、CSS 和 JavaScript 来实现这一功能。让我们逐步了解每个部分…

Vue前端工程化 安装Vue-Cli与node.js 最详细步骤(带图展示)

一、安装NodeJS 1.官网下载 https://nodejs.org/zh-cn 2.直接从百度网盘中提取安装 链接:https://pan.baidu.com/s/1OKhHZUwPCLamvd_08Vxx0g 提取码:61rw 3.开始安装 二、验证NodeJS环境变量 1.Win R 输入cmd打开控制面板 2.输入 node -v 如果出…

NVIDIA 全面转向开源 GPU 内核模块

NVIDIA 全面转向开源 GPU 内核模块 文章目录 NVIDIA 全面转向开源 GPU 内核模块支持的 GPU安装程序更改使用带有 CUDA 元包的包管理器 使用运行文件使用安装帮助脚本包管理器详细信息dnf:Red Hat Enterprise Linux、Fedora、Kylin、Amazon Linux 或 Rocky Linuxzypp…

程序员信息差,这个工具你必须知道

身为程序员,你是否也曾遇到过这样的情况:费尽心思搭建好服务器,开发好接口API,结果上线后却发现用户体验并不好,甚至还因为各种BUG忙得焦头烂额?别担心,你不是一个人。事实上,很多开…

用神经网络求解微分方程

微分方程是物理科学的主角之一,在工程、生物、经济甚至社会科学中都有广泛的应用。粗略地说,它们告诉我们一个量如何随时间变化(或其他参数,但通常我们对时间变化感兴趣)。我们可以了解人口、股票价格,甚至…

Python 使用TCP\UDP协议创建一个聊天室

server端代码: #encodingutf-8 # 服务端代码 import socketdef server():server_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)host socket.gethostname()port 12345server_socket.bind((host, port))server_socket.listen(5)print(等待客户端连接…

使用Gradle构建编译Spring boot 2.7.x

一、环境准备 JDK 1.8Spring boot 2.7.xGradle 7.5.1 (安装参考:win11安装Gradle)Idea 2023.1 二、源码导入gitee(可选) 按需导入。如果能科学上网,可跳过这一步。 为避免github访问不稳定问题,建议将对应的代码导入到gitee。然后通过git管…

内存泄漏详解

文章目录 什么是内存泄漏内存泄漏的原因排查及解决内存泄漏避免内存泄漏及时释放资源设置合理的变量作用域及时清理不需要的对象避免无限增长避免内部类持有外部类引用使用弱引用 什么是内存泄漏 内存泄漏是指不使用的对象持续占有内存使得内存得不到释放,从而造成…

【Java语法基础】1、变量、运算符、输入输出

1.变量、运算符、输入输出 跟C一样,先把必须写的框架写出来: package org.example; public class Main{public static void main(String[] args){//在里面写实际的代码} }变量 必须先定义,才能使用。与C、C差不多。 没有赋初值的变量无法…

windows网络应急排查

一、系统排查 msinfo32 #GUI显示的系统信息systeminfo #简单了解系统信息用户信息排查 排查恶意账号: 黑客喜欢建立相关账号用作远控: 1.建立新账号2.激活默认账号3.建立隐藏账号(windows中账号名$)cmd方法 net user #打印用户账号信息 ---看不到$结尾的隐藏账…

Linux - 进程的概念、状态、僵尸进程、孤儿进程及进程优先级

目录 进程基本概念 描述进程-PCB task_struct-PCB的一种 task_struct内容分类 查看进程 通过系统目录查看 通过ps命令查看 通过系统调用获取进程的PID和PPID 通过系统调用创建进程- fork初始 fork函数创建子进程 使用if进行分流 Linux进程状态 运行状态-R 浅度睡眠状态-S…

Apache Filnk----入门

文章目录 Flink 概述Flink 是什么有界流和无界流有状态流处理Flink 特点Flink vs SparkStreamingFlink 分层API Flink 快速上手WordCount 代码编写批处理流处理读取socket文本流 Flink 概述 Flink 是什么 有界流和无界流 无界数据流: 有定义流的开始,但没有定义流…

ts一些解决vscode飘红的方法

1、查看是否有些ts的数据类型定义问题,属性缺少或者属性类型不对 把对应属性加上即可 2、在飘红的代码前面设置// ts-ignore忽略此行校验(不过一般不建议用这个方法) 3、移除高版本不用的属性(版本属性兼容问题) 原因…

PP-Human行为识别(RTSP协议视频流实时检测)

基于PaddleDetection本地实现PP-Human行为识别模块(RTSP协议视频流实时检测) 项目介绍环境准备1. Anaconda 创建环境2. 获取 PaddleDetection3. 获取 [MediaMTX](https://github.com/bluenviron/mediamtx/releases/tag/v1.8.4)4. FFmpeg 获取5. VLC 获取…

.NET开源、简单、实用的数据库文档生成工具

前言 今天大姚给大家分享一款.NET开源(MIT License)、免费、简单、实用的数据库文档(字典)生成工具,该工具支持CHM、Word、Excel、PDF、Html、XML、Markdown等多文档格式的导出:DBCHM。 支持的数据库 Sq…

IEEE官方列表会议 | 第三届能源与环境工程国际会议(CFEEE 2024)

会议简介 Brief Introduction 2024年第三届能源与环境工程国际会议(CFEEE 2024) 会议时间:2024年12月2日-4日 召开地点:澳大利亚凯恩斯 大会官网:CFEEE 2024-2024 International Conference on Frontiers of Energy and Environment Engineer…