RabbitMQ 高级特性——延迟队列

在这里插入图片描述

文章目录

  • 前言
  • 延迟队列
    • 延迟队列的概念
    • TTL + 死信队列模拟延迟队列
      • 设置队列的 TTL
      • 设置消息的 TTL
    • 延迟队列插件
      • 安装并且启动插件服务
      • 使用插件实现延迟功能

前言

前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交换机投递到死信队列中,然后由专门的消费者进行消费,这篇文章将为大家介绍 RabbitMQ 的另一高级特性——延迟队列。

延迟队列

延迟队列的概念

延迟队列通常指的是一种队列,其中的消息在发送后不会立即被消费,而是会等待一段时间(即延迟时间)后才变得可消费。

RabbitMQ 本身不直接提供名为“延迟队列”的内置功能,但我们可以利用 RabbitMQ 的一些特性和机制来模拟或实现延迟队列的行为。就比如 TTL + 死信队列方式模拟出延迟队列的功能。

TTL + 死信队列模拟延迟队列

在这里插入图片描述

我们的消费者不去消费正常队列中的消息,而是去消费死信队列中的消息,为什么这样能实现延迟对立的功能呢?当生产者生产消息投递给正常交换机之后,交换机会将这个消息根据 routing key 和 binding key 路由到指定的队列中,因为这个正常队列没有消费者可以消费消息,所以到达了了这个队列中的消息,就会存储在这个队列中,因为这个队列设置了 TTL,所以消息到达了过期时间之后就会被投递给死信交换机 DLX,然后死信交换机就会将这个消息投递给 DLQ,然后消费者从这个 DLQ 中去消费消息,这样消费者就会在生产者生产的消息到达队列之后不会立即去消费,而是会等待一段时间,这样就通过 TTL + 死信队列的方式模拟出了延迟队列的功能。

设置队列的 TTL

那么我们看看通过代码如何实现:

public class Constants {public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String DL_EXCHANGE = "dl.exchange";public static final String DL_QUEUE = "dl.queue";
}

声明交换机、队列和绑定关系:

@Configuration
public class DLConfig {@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(1000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}@Bean("DLExchange")public Exchange DLExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("DLQueue")public Queue DLQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("DLBinding")public Binding DLBinding(@Qualifier("DLExchange") Exchange exchange,@Qualifier("DLQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl").noargs();}
}

生产者代码:

@RequestMapping("producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("delay")public String delay() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay,发送时间:" + new Date());return "消息发送成功";}
}

消费者:

@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void listener(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);}
}

在这里插入图片描述

可以看到,通过 TTL + 死信队列的方式是可以实现延迟队列的功能的,虽然可能从消息发送的时间到消费者消费到这个消息的时间可能会比预想的时间多一点(中间消息传输花费了一点时间)。

这是一次发送一条消息的情况,如果我们一次发多条信息看看什么效果:

@RequestMapping("delay")
public String delay() throws InterruptedException {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay1,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay2,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay3,发送时间:" + new Date());return "消息发送成功";
}

在这里插入图片描述

当为队列设置 TTL 的时候,消息会按照进队列的先后顺序进行消费,这是为队列设置 TTL 的情况,为队列设置 TTL 意味着进入该队列中的所有消息都具有过期时间,且过期时间如果没有单独设置消息的过期时间的话,那么该队列中的消息的过期时间都是相同的,那么如果我们想要每个消息的过期时间都不相同的话就需要单独给消息设置过期时间,接下来我们看看给消息单独设置过期时间的情况。

设置消息的 TTL

public static final String NORMAL_QUEUE2 = "normal.queue2";

创建出一个没有设置过期时间的普通队列:

@Bean("normalQueue2")
public Queue normalQueue2() {return QueueBuilder.durable(Constants.NORMAL_QUEUE2).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();
}@Bean("normalBinding2")
public Binding normalBinding2(@Qualifier("normalExchange") Exchange exchange,@Qualifier("normalQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal2").noargs();
}

设置消息的过期时间:

@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});return "消息发送成功";
}

消费者的代码就不需要改动:

在这里插入图片描述

为消息设置 TTL 就可以时间每个消息的过期时间都不相同,但是这里会有一个问题,就是如果先投递的消息的过期时间如果晚于后面投递的消息的过期时间就会出现问题:

@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";
}

在这里插入图片描述

可以发现设置了过期时间为 10s 的消息也是过了 20s 之后才进入到死信队列然后才被消费。

消息过期之后,不一定会被马上丢弃。因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列。
此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。因为我们的普通队列并没有消费者消费其中的消息,我们都知道对消息设置过期时间,只有在消息被快要使用的之前,才会判断它是否过期,但是这里由于队列没有消费者,所以消息也就不会被使用到,那么 RabbitMQ 就只会检查队首的消息是否过期,这样就导致了队首的消息没有过期,那么之后的消息也不会被检查到过期投递到死信队列中。

所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

延迟队列插件

虽然 RabbitMQ 没有直接实现延迟队列,但是 RabbitMQ 提供了一个延迟的插件来实现延迟的功能。

具体的可以看看官方文档 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq#delaying-messages

安装并且启动插件服务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
也可以点击这里:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
我们下载的插件的版本需要和 RabbitMQ 的版本兼容:

在这里插入图片描述

版本对应关系:

在这里插入图片描述

下载完成插件之后,可以在这里查看如何配置:https://www.rabbitmq.com/docs/installing-plugins

在这里插入图片描述
我们将下载完成的文件放在 RabbitMQ 的插件目录下:

我们使用的是 Linux,所以需要将插件放在 /usr/lib/rabbitmq/plugins 或者 /usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins文件下,如果没有这个路径可以自己创建:

在这里插入图片描述
下载完成插件并且将其放在指定目录下之后,我们就可以启动插件了:

使用 rabbitmq-plugins list 查看 rabbitmq 的插件:
在这里插入图片描述

如果我们之前启动了一个错误版本的插件,可以使用 rabbitmq-plugins disbale 插件名称 来禁用插件:

在这里插入图片描述

使用 rabbitmq-plugins enable 插件名称 来启动插件服务:

在这里插入图片描述
如何验证插件已经安装并且启动成功,我们在 rabbitmq 的管理页面新建一个交换机的时候,看看交换机的类型是否有 x-delayed-message 这个类型:

我们刷新一下管理页面,然后在新建交换机的时候查看交换机的类型:

在这里插入图片描述
这就说明我们的延迟插件启动成功了。

使用插件实现延迟功能

安装并且启动延迟插件之后,我们来看看代码如何实现一个延迟功能的队列:

public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";

声明交换机、对立和绑定关系:

@Configuration
public class DelayConfig {@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}

生产者代码:

@RequestMapping("delay3")
public String delay3(){rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(20000);return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(10000);return message;});return "消息发送成功";
}

消费者代码:

@RabbitListener(queues = Constants.DELAY_QUEUE)
public void listener2(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);
}

启动项目之后,会创建一个 x-delay-message 类型的交换机:

在这里插入图片描述
在这里插入图片描述
使用延迟插件的话,就可以使得队列中的消息可以按照延迟时间到达消费者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/17667.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

腾讯IM uniapp微信小程序版本实现迅飞语音听写(流式版)

在之前文章《腾讯IM web版本实现迅飞语音听写(流式版)》实现了腾讯IM web版本实现迅飞语音听写,本文将基于uniapp vue2/vue3(cli 脚手架)的Demo项目集成迅飞语音听写(流式版): 主要代…

实现高效运行管理:如何使用 NSSM 工具将 IoTDB 注册为 Windows 系统服务

后台自动无干扰运行 IoTDB 的“指导手册”! IoTDB 是一个专为工业物联网领域设计的高性能时间序列数据库。在生产环境中,确保 IoTDB 的稳定运行对于业务连续性至关重要。通常情况下,通过批处理(bat)脚本运行 IoTDB 会始…

Windows配置域名映射IP

一、找到 hosts 文件 打开 C:\Windows\System32\drivers\etc 二、添加hosts文件修改、写入权限 右击hosts文件,点击属性 -> 安全 -> Users -> 编辑 -> Users -> 添加修改、写入权限 -> 确定 -> 确定 进入常规,将只读属性关闭 三、…

专题二十_动态规划_简单多状态dp问题_买卖股票系列问题_算法专题详细总结

目录 动态规划 1. 按摩师(easy) 解析: 1.状态表达式: 2.状态转移方程 3.初始化 4.填表方向 5.返回值: 代码编写: 总结: 2. 打家劫舍II (medium) 解析&#xf…

多模态简述

多模态学习概念 【多模态简述-哔哩哔哩】 https://b23.tv/UrUyfln 定义: 模态:事物表达或感知的方式 多模态:研究异构和相互连接数据的科学,涵盖了从原始的器官信号到抽象概念的多种模态 语音和语言是理解人物交互的关键模态&am…

mac2019环境 Airflow+hive+spark+hadoop本地环境安装

1 环境介绍 本地安装可分为两个部分,mac软件环境, python开发环境 ps: 安装过程参考chatgpt、csdn文章 1.1 mac软件环境 目标安装的的软件是hive、apache-spark、hadoop,但是这三个软件又依赖java(spark依赖)、ssh&#xff08…

HarmonyOS4+NEXT星河版入门与项目实战--------开发工具与环境准备

文章目录 1、熟悉鸿蒙官网1、打开官网2、下载 DevEco Studio3、HarmonyOS 资源库4、开发指南与API 2、安装 DevEco Studio1、软件安装2、配置开发工具 1、熟悉鸿蒙官网 1、打开官网 百度搜索 鸿蒙开发者官网 点击进入开发者官网,点击开发,可以看到各种…

11.16 JavaScript

什么是JavaScript? JavaScript(简称:js)是一门跨平台,面向对象的脚本语言,是用来控制网页行为的,它能使网页可交互。JavaScript和java是完全不同的语言,不论是概念还是设计。但是基…

【网络安全面经】技术性问题

1.SQL注入原理 主要基于Web应用程序对用户输入数据的合法性缺乏严格的判断或过滤 2.windows上提权的方式和linux提权方式 windows:本地溢出漏洞提权,AT(计划任务提权),SC(创建服务提权),PS(微软官方工具pstool),数据…

20241116下载中科创达的TurboX D660核心板的Android11的SDK的详细LOG

20241116下载中科创达的TurboX D660核心板的Android11的SDK的详细LOG 2024/11/16 15:28 下载速度,工作日:20MBps/周末30MBps。 【实际情况,取决于您的实际网络环境】 https://docs.thundercomm.com/turbox_doc/products/smart-modules/turbox…

计算机网络 (6)物理层的基本概念

前言 计算机网络物理层是OSI模型(开放式系统互联模型)中的第一层,也是七层中的最底层,它涉及到计算机网络中数据的物理传输。 一、物理层的主要任务和功能 物理层的主要任务是处理物理传输介质上的原始比特流,确保数据…

大模型(LLMs)微调篇

大模型(LLMs)微调篇 一、如果想要在某个模型基础上做全参数微调,究竟需要多少显存? 一般 n B的模型,最低需要 16-20 n G的显存。(cpu offload基本不开的情况下) 二、为什么SFT之后感觉LLM傻了…

企业网络链路聚合、数据抓包、远程连接访问实验

前言: 随着信息技术的飞速发展和企业业务的不断扩大,企业网络面临着越来越多的挑战。其中,网络带宽、数据安全和远程访问等问题尤为突出。为了解决这些问题,我们进行了本次企业网络链路聚合、数据抓包和远程连接访问的实验。 链路…

移除元素(leetcode 27)

给定一个数组,在数组中删除等于这个目标值的元素,然后返回新数组的大小 数组理论: 数组是一个连续的类型相近的元素的一个集合,数组上的删除是覆盖,只能由后面的元素进行覆盖,而不能进行真正意义上的地理位…

前端面试笔试(三)

目录 一、数据结构算法等综合篇 二、代码输出篇 1.yield与生成器函数 2.this指向有关 3.instanceof 与Array.isArray 4.继承class cls extends Array,调用里面的sum方法 三、css、html、JavaScript篇 1.哪项不能提高dom元素操作效率? 2.contente…

7.高可用集群架构Keepalived双主热备原理

一. 高可用集群架构Keepalived双主热备原理 (1)主机+备机keepalived配置(192.168.1.171) ! Configuration File for keepalivedglobal_defs {# 路由id:当前安装keepalived节点主机的标识符,全局唯一router_id keep_101 } #计算机节点(主机配置) vrrp_instance VI_1 {</

IntelliJ IDEA 2023.2x——图文配置

IntelliJ IDEA 2023.2——配置说明 界面如下图所示 : 绿泡泡查找 “码猿趣事” 查找【idea99】 IntelliJ IDEA 的官方下载地址 IntelliJ IDEA 官网下载地址 一路上NEXT 到结尾&#xff1a; 继续NEXT 下一步:

Linux网络:守护进程

Linux网络&#xff1a;守护进程 会话进程组会话终端 守护进程setsiddaemon 在创建一个网络服务后&#xff0c;往往这个服务进程是一直运行的。但是对于大部分进程来说&#xff0c;如果退出终端&#xff0c;这个终端上创建的所有进程都会退出&#xff0c;这就导致进程的生命周期…

Linux Android 正点原子RK3568替换开机Logo完整教程

0.这CSDN是有BUG吗?大家注意:表示路径的2个点号全都变成3个点号啦! 接下来的后文中,应该是2个点都被CSDN变成了3个点: 1.将这两个 bmp 图片文件720x1280_8bit拷贝到内核源码目录下,替换内核源码中默认的 logo 图片。注意:此时还缺少电量显示图片 2.编译内核 make d…

安卓开发作业

整体效果: 安卓小作业 [TOC](页面配置) 整体框架有4个fragment页面,聊天,朋友,发现,设置. 配置如下: bash <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android" xm…