【MQ篇】RabbitMQ之惰性队列!
目录
- 引言:当“生产”大于“消费”,队列就“胖”了!肥宅快乐队列?🤔
- 队列界的“躺平”大师:惰性队列(Lazy Queues)驾到!😴
- 如何“激活”你的队列的“惰性”属性?💤
- 惰性队列 vs 普通队列:实测见真章!(附代码)📊
- 惰性队列的“个性”总结:优缺点一览!📊
- 什么时候选择惰性队列?✅
🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!
🌟了解 MQ 请看 : 【MQ篇】初识MQ!
其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等
如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning
引言:当“生产”大于“消费”,队列就“胖”了!肥宅快乐队列?🤔
朋友们!🤝 在高并发的系统里,生产者(负责发消息的)和消费者(负责处理消息的)就像两条生产线。理想情况是生产者发得快,消费者处理得也快,消息在队列里“嗖嗖嗖”地流动,来去如风!🌪️
但现实往往是残酷的!当生产者突然“打了鸡血”,疯狂发消息,而消费者却“打起了瞌睡”,处理不过来时,麻烦就来了!消息开始在队列里堆积!就像仓库的货物越积越多,库存爆满!
这就是咱们常遇到的 “消息堆积问题”!如果队列的消息数量达到了上限,新的消息再来,老的消息就可能被无情地丢弃,甚至变成咱们之前聊的“死信”了。这可不是闹着玩的,可能导致业务数据丢失!
解决消息堆积,有啥招儿? 文件里提了两个思路:
- 多请几个工人(增加消费者): 让更多消费者并行处理消息,提高整体消费速度!这对应咱们之前说的 Work Queue(工作队列)模式。
- 把仓库扩建一下(扩大队列容积): 让队列能存下更多消息,提高它“抗压”的上限!
第一种方法是治本,提高处理能力。但有时候处理能力不是说提就能提的,或者堆积是突发的、临时的。这时候,第二种方法就很重要了——让队列能存更多的消息。
问题来了,RabbitMQ 默认的队列模式,是尽量把消息保存在内存里的!内存快是快,但金贵啊!🐏 几百万、几千万条消息都放内存里?内存条会哭的!😭 而且,当内存实在放不下时,RabbitMQ 会被迫把一部分内存中的消息“分页”到磁盘上(page-out),这个过程会影响性能,而且当消息要被消费时,又得从磁盘“换页”回内存(page-in),一来一回,性能波动大。
怎么办?有没有一种队列,它不像默认队列那么“勤快”,非要把所有消息都往内存里搬,而是更“懒”一点,收到消息就先存盘,需要时再加载呢?
队列界的“躺平”大师:惰性队列(Lazy Queues)驾到!😴
从 RabbitMQ 的 3.6.0 版本开始,引入了一个新概念——Lazy Queues,也就是惰性队列!💡 它的出现,就是为了解决大量消息堆积时,默认队列模式的内存压力和性能抖动问题!
惰性队列的特点,正如它的名字一样——“惰性”!
- 消息直接存盘,不着急进内存: 默认队列收到消息,首先往内存里塞。惰性队列收到消息,二话不说,直接“啪!”存到磁盘上!💾 内存?等等再说!
- 消费者要了才从盘里捞: 只有当消费者来拉取消息时,惰性队列才会去磁盘读取消息,然后加载到内存中发送给消费者。
- 海量消息存储能力: 既然大部分消息都在磁盘上“躺着”,那它能存储的消息数量上限就大大提高了,号称支持数百万条消息存储!📦💯
惰性队列就像一个超大的“档案馆”,平时把文件(消息)都整理好存档(磁盘),只有需要调阅(消费)时,才去档案室取出文件,放到办公桌上(内存)处理。而默认队列就像一个“仓鼠”,总想把所有东西都塞到颊囊(内存)里,满了就抓狂(page-out)。🐿️➡️🧠➡️💥
如何“激活”你的队列的“惰性”属性?💤
让一个队列变得“惰性”,方法也很简单,就是在声明队列时,给它指定一个特殊的属性:x-queue-mode
,值为 lazy
。
你可以通过多种方式来设置:
-
命令行设置(给已有队列“变身”): 如果你的队列已经运行起来了,想把它改成惰性队列,可以使用 RabbitMQ 的命令行工具
rabbitmqctl set_policy
来设置一个策略。文件里提供了示例命令:rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
这条命令的意思是:
rabbitmqctl set_policy Lazy
:添加一个名为Lazy
的策略。"^lazy-queue$"
:这个策略应用到名字匹配正则表达式^lazy-queue$
的队列上(即名字叫lazy-queue
的队列)。'{"queue-mode":"lazy"}'
:策略的具体内容是设置queue-mode
属性为lazy
。--apply-to queues
:这个策略应用的对象是所有的队列(虽然前面已经用正则匹配了名字)。
通过这种策略方式,你可以给符合某些规则的队列批量设置属性,很方便!
-
基于
@Bean
声明队列时设置: 在 Spring Boot 中,使用@Bean
声明 Queue 时,可以通过QueueBuilder
的.lazy()
方法,或者更底层地通过withArguments
方法添加x-queue-mode
参数来设置。文件里提供了.lazy()
方法的示例:// 惰性队列 - 使用 QueueBuilder.lazy() 方法 @Bean public Queue lazyQueue(){System.out.println("🛠️ 声明一个惰性队列 lazy.queue"); // 添加日志return QueueBuilder.durable("lazy.queue") // 指定队列名称,并持久化 ✅.lazy() // ⭐ 就这一句!让队列变惰性!😴.build(); }// 也可以通过 arguments 来设置,效果一样: // @Bean // public Queue lazyQueueWithArgs(){ // Map<String, Object> args = new HashMap<>(); // args.put("x-queue-mode", "lazy"); // ⭐ 设置 x-queue-mode 为 lazy ⭐ // return new Queue("lazy.queue.with.args", true, false, false, args); // }
.lazy()
方法就是QueueBuilder
提供的一个便捷方法,底层就是帮你加了x-queue-mode: lazy
这个参数。 -
基于
@RabbitListener
声明队列时设置: 如果你习惯直接在消费者注解上声明队列,也可以通过@Queue
注解的arguments
属性来设置x-queue-mode
。文件里提供了示例:@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "lazy.listener.queue", durable = "true",arguments = {@Argument(name = "x-queue-mode", value = "lazy")} // ⭐ 在这里设置惰性属性 ⭐),exchange = @Exchange(name = "listener.exchange"),key = "lazy.key" )) public void listenLazyQueueFromListener(String msg){System.out.println("👂 接收到惰性队列消息:" + msg); // 添加日志 }
惰性队列 vs 普通队列:实测见真章!(附代码)📊
这是简单的测试思路:同时声明一个惰性队列和一个普通队列,然后往它们里面大量发送消息(比如 20 万条),看看有什么区别。
首先,声明这两个队列(通常绑定到同一个交换机或使用默认交换机):
// RabbitConfig.java 中定义队列
@Bean
public Queue lazyQueue(){System.out.println("🛠️ 声明惰性队列 lazy.queue");return QueueBuilder.durable("lazy.queue").lazy() // 设置为惰性队列.build();
}@Bean
public Queue normalQueue(){System.out.println("🛠️ 声明普通队列 normal.queue");return QueueBuilder.durable("normal.queue").build(); // 默认就是普通队列
}// 你还需要一个交换机(或者使用默认交换机 ""),并把这两个队列绑定上去
// @Bean public TopicExchange testExchange() { ... }
// @Bean public Binding lazyBinding(...) { ... bind(lazyQueue).to(testExchange).with("lazy.key"); }
// @Bean public Binding normalBinding(...) { ... bind(normalQueue).to(testExchange).with("normal.key"); }
然后,编写发送大量消息的测试代码:
// 你的测试类中
@Test
public void testSendManyMsg(){System.out.println("📨 正在批量发送消息...");long startTime = System.currentTimeMillis();for (int i = 0; i < 200000; i++) { // 发送 20 万条消息CorrelationData data = new CorrelationData(UUID.randomUUID().toString());// 往惰性队列发送rabbitTemplate.convertAndSend( "","lazy.queue", "message "+i,data);// 往普通队列发送// rabbitTemplate.convertAndSend( "","normal.queue", "message "+i,data); // 如果要测普通队列,取消注释,但要注意内存和性能}long endTime = System.currentTimeMillis();System.out.println("✅ 批量发送消息完成!");System.out.println("⏱️ 消耗时间: " + (endTime - startTime) + " ms");
}
当你运行这个测试,并且 RabbitMQ 中没有消费者快速消费时,消息就会在队列里堆积。
测试现象(预期):
- 发送到普通队列: 如果发送数量巨大(比如文件里的 20 万条)且超过了内存阈值,你会观察到 RabbitMQ 的内存使用率会很高,并且可能会出现 page-out(消息从内存换到磁盘)的现象,这个过程可能导致发送速度变慢,甚至 Blocking。如果你不设置队列长度上限 (
x-max-length
),它会一直往内存/磁盘里塞,直到资源耗尽。如果设置了上限,超过上限的消息就会被丢弃。 - 发送到惰性队列: 发送过程相对稳定,RabbitMQ 的内存使用率不会像普通队列那样暴增(因为它直接存盘了),但磁盘 I/O 会比较高。理论上,在资源允许的情况下,它能接收并存储比普通队列多得多的消息,而不会因为内存不足而开始 page-out 或丢弃消息(除非磁盘满了)。
这个测试的目的就是直观地告诉你,惰性队列在处理海量消息堆积时的优势——能存,而且存得比较“平稳”!
惰性队列的“个性”总结:优缺点一览!📊
解决消息堆积的方案:
- 增加更多消费者,提高消费速度(治本)。
- 使用惰性队列,提高队列的容积上限,能存更多消息(治标,但很重要!)。
惰性队列的优点(为啥要用它?):
- 基于磁盘存储,消息上限高: 这是它最突出的优点!能应对可怕的消息堆积,存下数百万条消息而不崩。📦💯
- 没有间歇性的 page-out,性能比较稳定: 默认队列在内存不足时换页操作会带来性能波动。惰性队列一开始就存盘,避免了这种不稳定的换页过程。📊
惰性队列的缺点(啥时候不适合?):
- 基于磁盘存储,消息时效性会降低: 从磁盘读消息比从内存慢是肯定的。如果你的业务对消息的“新鲜度”要求极高,需要消息一到就被秒级甚至毫秒级处理,那么惰性队列可能不是最佳选择。⏰
- 性能受限于磁盘的 IO: 既然消息主要在磁盘,那队列的吞吐量和延迟就很大程度上取决于你的磁盘性能(SSD 肯定比机械硬盘强)。如果你的磁盘 IO 是瓶颈,惰性队列的性能也会受影响。🐌
什么时候选择惰性队列?✅
- 预期会有大量消息堆积的场景: 这是惰性队列最适合发挥作用的时候,比如秒杀系统、日志收集、大数据量同步等。
- 对消息时效性要求不是极致苛刻的场景: 允许消息在队列里稍微多待几十毫秒甚至几百毫秒。
- 希望避免默认队列在内存满时带来的性能不稳定性。
如果你需要构建一个能“吞吐”海量消息,即使消费者处理不过来也能稳定存储,不丢消息(在磁盘空间允许前提下)的系统,那么惰性队列绝对是你应该考虑的利器!它不像默认队列那样“拼命”往内存里挤,而是选择一种更“躺平”、更稳健的方式来应对消息洪流!😴👍
希望这篇关于惰性队列的详细解析,能让你彻底理解它的“惰性”哲学和实际用途!如果你的系统正饱受消息堆积之苦,不妨试试这位“躺平”大师,也许会有意想不到的效果!🎉
了解RabbitMQ消息不丢的“三板斧”请看:
【MQ篇】RabbitMQ的生产者消息确认实战!
【MQ篇】RabbitMQ之消息持久化!
【MQ篇】RabbitMQ的消费者确认机制实战!
了解RabbitMQ消息失败重试请看:
【MQ篇】RabbitMQ之消费失败重试!
了解RabbitMQ死信交换机请看:
【MQ篇】RabbitMQ之死信交换机!