【RabbitMQ】死信队列、延迟队列

死信队列

死信,简单理解就是因为种种原因,无法被消费的消息。

有死信,自然就有死信队列。当一个消息在一个队列中变成死信消息之后,就会被重新发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定该交换器的队列,就被称为死信队列DLQ(Dead Letter Queue)。

消息变成死信消息一般是由于以下几条:

  • 队列达到最大长度
  • 消息过期
  • 消息被拒绝,即消息确认机中手动确认的两种拒绝情况,并且不允许重新入队

队列达到最大长度

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-test
@Configuration
public class DeadConfig {// 正常队列,当正常队列中的消息出现一些不确定情况时,消息就会进入死信交换机中@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead.maxLength(3) // 设置队列的最大长度为3.build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalQueueBind")public Binding normalQueueBind(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}// 死信队列@Bean("deadQueue")public Queue deadQueue() {return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadExchange")public Exchange deadExchange() {return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueueBind")public Binding deadQueueBind(@Qualifier("deadQueue") Queue queue,@Qualifier("deadExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();}}
@RestController
@RequestMapping("/dead")
public class DeadController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void deadQueue() {this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信");System.out.println("正常队列发送消息成功");}}
@Configuration
public class DeadListener {@RabbitListener(queues = Constants.DEAD_QUEUE)public void deadListener(String msg) {System.out.println("死信队列接收到消息:" + msg);}}

在上述代码中,主要内容是声明了正常队列、交换机和绑定关系以及声明死信队列、死信交换机以及其绑定关系、正常队列的生产者代码、死亡队列的消费者代码。

队列达到最大长度和死信消息要转发到的DLX和路由键都是由正常队列在声明时进行绑定的。

启动上述程序之后,当正常队列存在三条消息之时,假设再来消息,那么消息就要进入死信交换机,从而路由到死信队列了。如下图可以看出,当发送第四条消息之后,死信队列的消费者就消费了一条消息:

在上述图片中,D表示队列是持久化的,Lim表示队列有最大长度,DLX表示队列存在死信交换机、DLK表示队列存在路由键。把鼠标放在这些字母上方,详细的消息都会表示。

在下述代码中,主要是对上述代码改进之后地方的指出,并没有把所有的代码全部给出。

消息过期

消息过期分为两种,一种是设置队列过期时间让消息过期,另一种是设置消息过期时间让消息过期,都可以进行测试。

设置队列过期时间

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
//                .maxLength(3) // 设置队列的最大长度为3.ttl(5 * 1000) // 设置队列的过期时间为5秒.build();}

 由上图以及结合代码可以看出,将消息由正常生产者发送给Broker之后,大概5秒钟之后,消息过期。此时消息就会发送给死信交换机,从而交给其对应的消费者消费。

设置消息的过期时间

@Slf4j
@RestController
@RequestMapping("/dead")
public class DeadController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void deadQueue() {// 设置消息的过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}};this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信", messagePostProcessor);log.info("死信队列发送成功");}}

同样,结合上图和代码来说,19秒的时候消息发送功,24秒的时候死信消费者消费消息成功。

消息被拒绝

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制,手动确认
@Slf4j
@Configuration
public class DeadListener {// 正常队列接收消息@RabbitListener(queues = Constants.NORMAL_QUEUE)public void normalListener(Channel channel, String msg, Message message) throws IOException {try {log.info("正常队列监听器接收消息:{}", msg);int num = 3 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {log.error("正常队列监听器接收消息异常:{}", e.getMessage());channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}// 死信队列接收消息@RabbitListener(queues = Constants.DEAD_QUEUE)public void deadListener(String msg, Channel channel, Message message) throws IOException {try {log.info("死信队列监听器接收消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

由上图以及代码可以看到,当消息的确认机制是手动确认时,当出现异常并且拒绝消息重新入队以后,消息就会来到死信队列中。

使用场景

用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。

消息重试:将死信消息发送到原队列或另一个队列进行重试处理。

消息丢弃:直接丢弃这些无法处理的消息,避免占用系统资源。

日志收集:将死信消息做为日志收集起来,用户后续分析和问题定位。

延迟队列

概念

延迟队列就是消息发送之后,并不想让消费者立即拿到消息,而是在等待特定时间之后,消费者才能拿到消息进行消费

应用场景

  1. 用户发起退款后,24小时内商家未处理,默认退款
  2. 用户注册成功后,三天后发送短信,提高用户活跃度
  3. 预定会议后,在会议开始前15分钟提醒众人参加会议
  4. 用户通过手机远程遥控家里的智能设备在指定时间进行工作,这就可以使用延迟队列。用户发送消息到延迟队列,当指定时间到了再将指令推送到智能设备。

实现方法

  1. RabbitMQ本身并没有实现延迟队列,因此可以使用TTL + 死信队列的方式来实现延迟队列。
  2. 安装延迟队列插件来实现延迟队列。

TTL + 死信队列

@Configuration
public class MockDelayConfig {@Bean("mockDelayNormalQueue")public Queue mockDelayNormalQueue() {return QueueBuilder.durable(Constants.MOCk_DELAY_NORMAL_QUEUE).ttl(5000 * 10) // 设置消息过期时间为50秒.deadLetterExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("mock.delay.dead") // 设置死信路由键.build();}@Bean("mockDelayNormalExchange")public Exchange mockDelayNormalExchange() {return ExchangeBuilder.directExchange(Constants.MOCk_DELAY_NORMAL_EXCHANGE).durable(true).build();}@Bean("mockDelayNormalQueueBind")public Binding mockDelayNormalQueueBind(@Qualifier("mockDelayNormalQueue") Queue queue,@Qualifier("mockDelayNormalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("mock.delay.normal").noargs();}@Bean("mockDelayDeadQueue")public Queue mockDelayDeadQueue() {return QueueBuilder.durable(Constants.MOCK_DELAY_DEAD_QUEUE).build();}@Bean("mockDelayDeadExchange")public Exchange mockDelayDeadExchange() {return ExchangeBuilder.directExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE).durable(true).build();}@Bean("mockDelayDeadQueueBind")public Binding mockDelayDeadQueueBind(@Qualifier("mockDelayDeadQueue") Queue queue,@Qualifier("mockDelayDeadExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("mock.delay.dead").noargs();}}
@Slf4j
@RestController
@RequestMapping("/mockDelay")
public class MockDelayController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void mockDelayQueue() {this.rabbitTemplate.convertAndSend(Constants.MOCk_DELAY_NORMAL_EXCHANGE,"mock.delay.normal", "hello 延迟队列");log.info("延迟队列生产者发送成功");}}
@Slf4j
@Configuration
public class MockDelayListener {@RabbitListener(queues = Constants.MOCK_DELAY_DEAD_QUEUE)public void mockDelayListener(String msg) {log.info("模拟延迟队列消费者接收到消息:" + msg);}}

在上述代码中,实现的功能是生产者发送消息后,消费者在50秒之后获得消息,对消息进行消费:

在TTL一文中,已经说明了RabbitMQ只会检查队首消息是否过期,不会扫描整个队列。因此如果想要放在模拟延迟队列中的消息过期时间不一致,那就会出现死信消息无法被及时处理的情况。因此,我们想要模拟实现延迟队列,就要确保队列中所有消息的过期时间是一致的。如果存在时间不一致的情况,我们就可以使用不同的模拟延迟队列来实现。

延迟队列插件

下载插件:官方网站进行下载(注意版本对应关系)

启动插件

rabbitma-plusins list // 查看插件列表rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启动插件service rabbitmq-server restart # 重启服务

如下图,当交换机中有了x-delayed-message就表示延迟插件安装成功 

代码测试

@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed() // 延迟交换机.durable(true) // 持久化.build();}@Bean("delayQueueBind")public Binding delayQueueBind(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();}}
@Slf4j
@RestController
@RequestMapping("/delay")
public class DelayController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void delayQueue() {for(int i = 0; i < 5; i++) {// 随机生成延迟时间Random random = new Random();int time = random.nextInt(20);// 消息处理器,设置延迟时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong((long) (time * 1000)); // 设置延迟时间return message;}};// 发送消息到延迟队列this.rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "hello 延迟队列 " + i, messagePostProcessor);log.info("发送延迟队列第" + i + "消息成功,延迟时间为:" + time);}}}
@Slf4j
@Configuration
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void delayListener(String msg) {log.info("延迟队列监听器,接收到的消息:{}", msg);}}

本质上,延迟插件就是让消息停留在交换机中,等到延迟时间结束之后,再发送到对应的队列中去。 

两者对比

使用TTL + 死信队列的好处是不需要额外安装插件。缺点是受消息的延迟时间影响,同一个队列中的消息必须延迟时间相同。

使用延迟队列插件的好处是不受延迟时间影响,同一队列中的所有消息延迟时间可以不同,额外的插件使得延迟队列的实现比较容易。缺点是需要依赖特定的插件,并且插件的版本必须和对应的RabbitMQ相对应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1540586.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

对于C++继承中子类与父类对象同时定义其析构顺序的探究

思考这样一串代码的运行结果&#xff1a; #include <iostream> using namespace std; class Person { public:~Person() { cout << "~Person()" << endl; } }; class Student:public Person { public:~Student() { cout << "~Student(…

谷歌做外链的文章一定要原创吗?

在谷歌上做外链发布时&#xff0c;原创文章是必须的。虽然你可能会想到用一篇文章群发到很多网站&#xff0c;但这种做法并不会带来太大的SEO效果。谷歌非常重视内容的独特性和相关性。如果同样的文章重复发布到多个网站&#xff0c;搜索引擎很快就会识别出这种策略&#xff0c…

开发谷歌插件之GA埋点

目录 一、背景 二、踩坑 三、谷歌插件开发的GA埋点的实现方式 一、背景 开发了一个谷歌插件&#xff0c;领导需要对用户的一些行为进行分析&#xff0c;于是让我在代码里面加上GA埋点。由于我们的PC端的项目一直都有进行GA埋点&#xff0c;当时就想着&#xff0c;这不就是把…

页面关键路径渲染详解

关键路径渲染 浏览器不会等待全部资源都下载完后才进行渲染&#xff0c;而是采用渐进式的渲染方式&#xff0c;本文就介绍一下这种渐进式的渲染方式。 当浏览器获取到用于呈现网页的资源后&#xff0c;通常就会开始渲染网页。那么究竟是在什么时候就会开始渲染&#xff1f; …

LeetCode讲解篇之220. 存在重复元素 III

文章目录 题目描述题解思路题解代码 题目描述 题解思路 我们可以考虑存储数组中连续indexDiff个数字&#xff0c;这样我们只需要在这连续的indexDiff个数字中查找相差小于等于valueDiff的两个数字的问题 对于该查找问题&#xff0c;我们可以考虑使用以valueDiff大小为一个桶&a…

大厂程序员的健身之路

大厂程序员的健身之路 基本信息饮食正餐营养补剂 睡眠训练计划 基本信息 健身时间&#xff1a;2023.03 -> 2024.09体重变化&#xff1a;52kg -> 67kg 饮食 正餐 早餐&#xff1a;不吃午餐&#xff1a;两碗米饭 鱼/鸡肉 蔬菜 酸奶晚餐&#xff1a;两碗米饭 鱼/鸡肉…

简单题35-搜索插入位置(Java and Python)20240919

问题描述&#xff1a; Java&#xff1a; class Solution {public int searchInsert(int[] nums, int target) {int k 0;int i 0;while(i<nums.length){if(nums[i]target){return i;}if(nums[i]<target){k i1;}i;}return k;}}class Solution(object):def searchInsert(…

6.C_数据结构_查询_哈希表

概述 哈希表的查询是通过计算的方式获取数据的地址&#xff0c;而不是依次比较。在哈希表中&#xff0c;有一个键值key&#xff0c;通过一些函数转换为哈希表的索引值。 其中&#xff1a;这个函数被称为哈希函数、散列函数、杂凑函数&#xff0c;记为&#xff1a;H(key) 哈希…

NFT Insider #148:The Sandbox 推出 SHIBUYA Y3K 时尚系列,Azuki 进军动漫 NFT 领域

市场数据 加密艺术及收藏品新闻 Infinex 新推 NFT 系列首四日销售额破4000万美元 尽管顶级 NFT 系列表现不佳&#xff0c;Infinex 的最新 NFT 系列在首四日内销售额已超过 4000 万美元。Infinex 是一个非托管平台&#xff0c;提供轻松访问链上协议和 dApp。 Infinex Core 的…

189 轮转数组

解题思路&#xff1a; \qquad 首先要理解轮转的含义&#xff0c;轮转 将数组末尾元素移动至首位。轮转k不为负数&#xff0c;那如果k大于数组长度时会发生什么&#xff1f;定义n为数组长度&#xff0c;当k n时&#xff0c;数组元素的顺序又恢复成初始状态&#xff0c;下一次…

翻唱技巧:AU和Cubase翻唱录制对轨技巧

分享和记录一下个人翻唱的经验和技巧&#xff01;防止后续自己忘了&#xff01;同时如果有大佬看到&#xff0c;希望可以帮我指出其中的错误&#xff01;个人推荐用Cubase12录制翻唱&#xff0c;因为Cubase12可以做乐段的标记&#xff0c;翻唱时有助于学习一些歌曲的层次设计。…

opengl-redbook环境搭建(静态库)

所需库下载 gl3w(github地址)https://github.com/skaslev/gl3w 使用python3执行根目录下的gen脚本&#xff0c;会生成头文件include文件夹和src下gl3w.c文件。 glfw(github地址)https://github.com/glfw/glfw 本文项目结构 本文如红宝书一致&#xff0c;将glfw和gl3w引入…

【C高级】有关shell脚本的一些练习

目录 1、写一个shell脚本&#xff0c;将以下内容放到脚本中&#xff1a; 2、写一个脚本&#xff0c;包含以下内容&#xff1a; 1、写一个shell脚本&#xff0c;将以下内容放到脚本中&#xff1a; 1、在家目录下创建目录文件&#xff0c;dir 2、dir下创建dir1和dir2 …

【JAVA入门】Day48 - 线程池

【JAVA入门】Day48 - 线程池 文章目录 【JAVA入门】Day48 - 线程池一、线程池的主要核心原理二、自定义线程池三、线程池的大小 我们之前写的代码都是&#xff0c;用到线程的时候再创建&#xff0c;用完之后线程也就消失了&#xff0c;实际上这是不对的&#xff0c;它会浪费计算…

网络流之最大流(EK 模板)

EK的时间复杂度是O( )。 EK 算法 和 dinic 算法的区别是 &#xff1a;EK是通过 bfs 找到一条增广流&#xff0c;然后累加&#xff0c;循环此步骤直到 bfs 找不到增广流&#xff1b;而 dinic 算法 是通过 bfs 分层找到一条增广流&#xff0c;然后通过 dfs 跑完 当前分层图中所…

基于SpringBoot的中小医院管理系统

系列文章目录 1.基于SSM的洗衣房管理系统原生微信小程序LW参考示例 2.基于SpringBoot的宠物摄影网站管理系统LW参考示例 3.基于SpringBootVue的企业人事管理系统LW参考示例 4.基于SSM的高校实验室管理系统LW参考示例 5.基于SpringBoot的二手数码回收系统原生微信小程序LW参考示…

温故--javaproject

nginx反向代理和负载均衡 nginx 反向代理&#xff0c;就是将前端发送的动态请求由 nginx 转发到后端服务器 提高访问速度 因为nginx本身可以进行缓存&#xff0c;如果访问的同一接口&#xff0c;并且做了数据缓存&#xff0c;nginx就直接可把数据返回&#xff0c;不需要真正…

C++_21_模板

模板 简介&#xff1a; 一种用于实现通用编程的机制。 通过使用模板我们可以编写可复用的代码&#xff0c;可以适用于多种数据类型。 C模板的语法使用角括号 < > 来表示泛型类型&#xff0c;并使用关键字 template 来定义和声明模板 概念&#xff1a; c范式编程 特点&…

Telephony VOWIFI

1、VOWIFI框架 参考3GPP 23402文档, VOWIFI有如下相关架构设置。 1、S2a信任的WIFI热点 2、S2b非信任WIF热点 3、S2c直联核心WIF热点 目前使用比较多的为S2b非信任WIF热点。 2、EPDG建立过程 //Telephony Log IWLAN拨号 08-30 21:36:34.702857 1347 5131 D ConnectivityS…

基于YOLOv5的教室人数检测统计系统

基于YOLOv5的教室人数检测统计系统可以有效地用于监控教室内的学生数量&#xff0c;适用于多种应用场景&#xff0c;比如 自动考勤、安全监控或空间利用分析 以下是如何构建这样一个系统的概述&#xff0c;包括环境准备、数据集创建、模型训练以及如何处理不同类型的媒体输入…