解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

  • 问题发现
  • 问题解决
    • 方法一:只监听死信队列,在死信队列里面处理业务逻辑
    • 方法二:修改预取值

问题发现

最近再学习RabbitMQ过程中,看到关于死信队列内容:

来自队列的消息可以是 “死信”,这意味着当以下四个事件中的任何一个发生时,这些消息将被重新发布到 Exchange

  1. 使用 basic.rejectbasic.nackrequeue 参数设置为 false 的使用者否定该消息
  2. 消息由于每条消息的 TTL 而过期
  3. 队列超出了长度限制

之前的文章里面有讲解过TTL过期后不进入死信队列的疑惑和解决办法,然后上手去实践另一个死信队列的方法,结果又是一道坑等着我,示例代码如下:

默认自动应答模式

@Configuration
public class MQConfig {/*** 死信队列* @return*/@Beanpublic Queue deadQueue(){return new Queue("dead_queue");}/*** 死信队列交换机* @return*/@Beanpublic DirectExchange deadExchange(){return new DirectExchange("dead.exchange");}/*** 死信队列和死信交换机绑定* @return*/@Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}/*** 普通队列* @return*/@Beanpublic Queue queue(){
//          方法一
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
//        normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
//        方法二return  QueueBuilder.durable("normal_queue").deadLetterExchange("dead.exchange").deadLetterRoutingKey("dead").maxLength(5) // 设置队列最大长度为5.build();}/*** 普通交换机* @return*/@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal.exchange");}/*** 普通队列和普通交换机绑定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");}
}

监听普通队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);@RabbitHandlerpublic void receive(String msg) {log.info("收到消息:"+msg);}
}

监听死信队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);@RabbitHandlerpublic void receive(String msg) {log.info("死信队列收到消息:{}",msg);}
}

发送方发送10条消息,示例代码如下:

@Component
public class MQSender {@Autowiredprivate RabbitTemplate template;public void send() {for (int i = 0; i < 10; i++) {String msg = "hello world_"+i;template.convertAndSend("normal.exchange", "normal", msg);}}
}

然后调用send()方法,执行结果如图:

在这里插入图片描述

按道理会将队列前面的5条消息进入死信队列,然后剩下的五条消息正常消费才对,我们检查一下队列是否设置成功,如图所示

在这里插入图片描述
设置没有问题,那就很懵逼了。。。

问题解决

我们先在页面上向普通队交换机发送10条消息,然后查看它的状态,如图所示:
在这里插入图片描述
超过5条消息就会放入死信队列中,如图所示:

.
然后再看一下,默认行为是从队列前面删除或死信消息,如图所示:

在这里插入图片描述
我们可以看到普通队列存放的是最后5条消息,前面的5条消息进入死信队列。也就是说,再没有进入普通消费者之前会将队列前面删除或死信消息(进入消费者之前将消息进行分配)。

方法一:只监听死信队列,在死信队列里面处理业务逻辑

这种做法也是网上大多数文章的一种处理方法(另外一种情况就是进入普通消费者,还没被消费完的情况下,消费者挂了,然后队列就会重新分配,将从队列前面删除或者进入死信队列),如图所示:

在这里插入图片描述

但是这些做法都是基于没有普通消费者监听的情况下进行的,感觉和我理解的略有偏差(应该是再有普通消费者监听和死信队列监听的情况下,发送消息时会对消息进行分配处理)。

发送方代码和配置的代码就不重复展示了(参考之前示例),监听死信队列(自动确认模式和手动确认模式都一样),示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);@RabbitHandlerpublic void receive(String msg,Message message,Channel channel) throws IOException {log.info("死信队列收到消息:{}",msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

调用send()方法,执行结果如图:

在这里插入图片描述
然后再死信队列里面处理业务逻辑即可。

方法二:修改预取值

再监听普通队列时抛异常或者手动拒绝重新进入队列,这两种方式都不能达到我想要的效果,我发现只要有普通消费方监听就会进入消费,然后我就在想是不是因为预期值的问题导致,可能我消息数量太少了,然后默认预期值太高导致消息直接进入消费,然后将预取值改为1自动确认模式,示例代码如下:

spring.rabbitmq.listener.simple.prefetch=1

发送方代码和配置的代码就不重复展示了(参考之前示例),监听普通队列,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);@RabbitHandlerpublic void receive(String msg) throws InterruptedException {log.info("收到消息:"+msg);// 模拟业务处理队列等待场景Thread.sleep(10000);}
}

监听死信队列,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);@RabbitHandlerpublic void receive(String msg) {log.info("死信队列收到消息:{}",msg);}
}

调用send()方法,执行结果如图:

在这里插入图片描述
当然有的时候也不一定完全按照你设置的最大长度进入死信队列,有的时候消费速度太快(队列的第一个已经被消费了的情况),得看实际情况,至少可以确保再设置了大于队列最大长度时是可以正常进入死信队列的。归根结底:消息数量太少了

另外我们再来介绍一下溢出方式,一般默认情况下溢出方式为:drop-head(从队列前面删除或者进入死信队列),除此之外还有两种:

  • reject-publishreject-publish-dlx:最近发布的消息将被丢弃。reject-publishreject-publish-dlx 之间的区别在于 reject-publish-dlx 也是死信拒绝消息。

将配置的溢出模式改为reject-publish,示例代码如下:

@Configuration
public class MQConfig {/*** 死信队列* @return*/@Beanpublic Queue deadQueue(){return new Queue("dead_queue");}/*** 死信队列交换机* @return*/@Beanpublic DirectExchange deadExchange(){return new DirectExchange("dead.exchange");}/*** 死信队列和死信交换机绑定* @return*/@Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}/*** 普通队列* @return*/@Beanpublic Queue queue() {
//          方法一
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
//        normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
//        方法二return  QueueBuilder.durable("normal_queue").deadLetterExchange("dead.exchange").deadLetterRoutingKey("dead").maxLength(5) // 设置队列最大长度为5.overflow(QueueBuilder.Overflow.dropHead).build();}/*** 普通交换机* @return*/@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal.exchange");}/*** 普通队列和普通交换机绑定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");}
}

重新创建队列,调用send()方法,如图所示:

在这里插入图片描述
由图可以,死信队列并不会受到消息。

然后我们再来看看将溢出模式设置为reject-publish-dlxQueueBuilder.Overflow没有该参数,手动定义),示例代码如下:

@Configuration
public class MQConfig {//忽略死信队列创建和绑定过程,参考前面示例.../*** 普通队列* @return*/@Beanpublic Queue queue() {Queue normalQueue = new Queue("normal_queue");normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKeynormalQueue.addArgument("x-max-length", 5);//设置队列最大长度normalQueue.addArgument("x-overflow","reject-publish-dlx"); //最近发布的消息进入死信队列return normalQueue;}//忽略普通队列创建过程,参考前面示例...
}

重新创建队列,调用send()方法,如图所示:

在这里插入图片描述

如果你有更好的方法或者不同的理解,欢迎评论区交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147408.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Docker 容器技术:颠覆传统,重塑软件世界的新势力

一、Docker简介 什么是docker Docker 是一种开源的容器化平台&#xff0c;它可以让开发者将应用程序及其所有的依赖项打包成一个标准化的容器&#xff0c;从而实现快速部署、可移植性和一致性。 从功能角度来看&#xff0c;Docker 主要有以下几个重要特点&#xff1a; 轻量…

[Redis][数据类型]详细讲解

1.Redis 特殊数据结构 1.Streams 应用场景&#xff1a;主要用为队列(阻塞队列) 2.Geospatial 应用场景&#xff1a;用来存储坐标(经纬度) 3.HyperLogLog 应用场景&#xff1a;估算集合中的元素个数注意&#xff1a; HyperLogLog不存储元素的内容&#xff0c;但是能够记录“…

计算机毕设设计推荐-基于python+Djanog大数据的电影数据可视化分析

精彩专栏推荐订阅&#xff1a;在下方主页&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f496;&#x1f525;作者主页&#xff1a;计算机毕设木哥&#x1f525; &#x1f496; 文章目录 一、电影数据可视…

JavaWeb--纯小白笔记04:Tomcat整合IDEA

IDEA整合Tomcat 1.点击Idea的导航栏里的Run&#xff0c;选择Edit Configurations 2.点击左上角的""&#xff0c;向下翻找到Tomcat Server 选择里面的Local 3.创建一个web工程&#xff0c;点击IDEA的File-->new-->project 然后选择Java Enterprise&#xff0c;…

crosscrossover24支持的游戏有那些

CrossOver刚刚更新了24版本&#xff0c;支持《地平线零之曙光》、《以撒的结合&#xff1a;重生》等游戏。一起来看看它有哪些更新吧&#xff01;之前买过23版的用户可以在1年之内免费升级哦&#xff0c;点击这里查看升级教程。 一、功能优化 - 更新 Wine 至最新的稳定版 Wine …

七彩云南文化旅游网站设计与实现

摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装七彩云南文化旅游网站软件来发挥其高效地信息处理的作用&am…

[机器学习]04-逻辑回归(python)-03-API与癌症分类案例讲解

逻辑回归&#xff08;Logistic Regression&#xff09; 的一API 介绍 关于如何配置模型中的优化器、正则化和惩罚项。 1. 逻辑回归 API 在 Scikit-learn 中&#xff0c;逻辑回归可以通过如下方式定义&#xff1a; from sklearn.linear_model import LogisticRegression ​ …

Web 安全(Web Security)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…

Renesas R7FA8D1BH (Cortex®-M85)的 General PWM的应用实践

目录 概述 1 General PWM介绍 1.1 特性 1.2 定时器选择注意点 2 时钟配置 3 应用案例 3.1 基本定时器应用 3.2 定时器回调函数案例 3.3 输入捕捉功能案例 3.4 更新周期案例 3.5 更新占空比案例 3.6 单次触发脉冲案例 4 测试 4.1 代码介绍 4.2 验证 概述 本文主…

算法练习题25——leetcode3279统计重新排列后包含另一个字符串的子字符串的数目(滑动窗口 双指针 哈希)

题目描述 解题思路 本题用到了滑动窗口 双指针 哈希 刚开始我是没读懂题的因为我笨 我想把我的思路说一下 左端不轻易缩小 只有找到跟word2匹配了 比如说abbcdd 遍历到c的时候才能匹配这个word2 对吧 那么之后加上以一个d或者俩d 都符合了 然后我们算完了 才能缩小左端 扩大…

python爬虫案例——异步加载网站数据抓取,post请求(6)

文章目录 前言1、任务目标2、抓取流程2.1 分析网页2.2 编写代码2.3 思路分析前言 本篇案例主要讲解异步加载网站如何分析网页接口,以及如何观察post请求URL的参数,网站数据并不难抓取,主要是将要抓取的数据接口分析清楚,才能根据需求编写想要的代码。 1、任务目标 目标网…

STM32篇:按键点亮LED灯

输入&#xff08;按键&#xff09;&#xff1a;KEY1---PA0 KEY2---PA1 输出&#xff08;LED灯&#xff09;&#xff1a;LED1---PB8 LED2---PB9

【M-LOAM学习】

M-LOAM(INITIALIZATION) Article Analysis Scan-Based Motion Estimation 通过在consecutive frame (each LiDAR)&#xff08;因为omp parallel&#xff09;中寻找correspondences然后通过最小化所有考虑feature之间residual error的transformation between frame to frame 针…

(done) 声音信号处理基础知识(7) (Understanding Time Domain Audio Features)

参考&#xff1a;https://www.youtube.com/watch?vSRrQ_v-OOSg&t1s 时域特征包括&#xff1a; 1.幅度包络 2.均方根能量 3.过零率 振幅包络的定义&#xff1a;一个 frame 里&#xff0c;所有采样点中最大的振幅值 一个形象的关于振幅包络的可视化解释如下&#xff1a;…

MateBook 16s 2023在Deepin下开启性能模式,调节风扇转速到最大,全网首发!

方法 在Deepin下按住Fnp快捷键&#xff0c;开启性能模式。 验证 首先去debian下载acpi-call-dkms https://packages.debian.org/sid/all/acpi-call-dkms/download 然后使用root用户执行&#xff1a; apt install --simulate ./acpi-call-dkms_1.2.2-2.1_all.deb apt inst…

C++入门——(类的默认成员函数)析构函数

文章目录 一、析构函数二、析构函数的特点总结 一、析构函数 析构函数与构造函数功能相反&#xff0c;析构函数不是完成对对象本⾝的销毁&#xff0c;⽐如局部对象是存在栈帧的&#xff0c;函数结束栈帧销毁&#xff0c;他就释放了&#xff0c;不需要我们管&#xff0c;C规定对…

【ChatGPT】提示词助力广告文案、PPT制作与书籍推荐的高效新模式

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;高效广告推销文案提示词使用方法 &#x1f4af;AI自动生成PPT全流程提示词使用方法 &#x1f4af;精选书籍推荐爆款文案提示词使用方法 &#x1f4af;小结 &#x1f4af;…

第一个NDK项目

新建项目 选择Native C的项目&#xff0c;我这里给项目的命名是NDKTest。 目录分析 新增了一个cpp目录&#xff0c;里面有一个CMakeLists和.cpp文件。 CMakeLists 文件是用来配置C编译过程的。 # Sets the minimum CMake version required for this project. cmake_minimum_…

【解密 Kotlin 扩展函数】命名参数和默认值(十三)

导读大纲 1.0.1 命名参数1.0.2 默认参数值 上一节讲述如何自定义 joinToString 函数来代替集合的默认字符串表示 文末遗留下几个待优化问题–传送门 1.0.1 命名参数 我们要解决的第一个问题涉及函数调用的可读性 例如,请看下面的joinToString调用: joinToString(collection,&…

循环中用sleep

echo <pre>;for ($i0;$i<10000000;$i){var_dump($i);} 没有用sleep,快速消耗cpu和内存 使用sleep后效果 echo <pre>;for ($i0;$i<10000000;$i){var_dump($i);usleep(1000);//php 暂停0.001秒} 总结&#xff1a;sleep能释放资源(cpu和内存)&#xff0c;但是运…