RabbitMQ延迟队列
- 1、延迟队列
- 1.1、延迟队列使用场景
- 1.2、延迟队列实现原理
- 2、使用rabbitmq-delayed-message-exchange 延迟插件
- 2.1、下载
- 2.2、安装
- 2.2.1、解压
- 2.2.2、启用插件
- 2.2.3、查询安装情况
- 2.4、示例
- 2.4.1、RabbitConfig配置类(关键代码)
- 2.4.2、发送消息(关键代码)
- 2.4.3、application.yml配置类
- 2.4.4、接收消息
- 2.4.5、pom.xml
- 2.4.6、测试
1、延迟队列
https://blog.csdn.net/weixin_42942786/article/details/139940269
1.1、延迟队列使用场景
场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;
1.2、延迟队列实现原理
RabbitMQ本身不支持延迟队列,可以使用TTL(过期消息)结合DLX(死信交换机)的方式来实现消息的延迟投递,即把DLX(死信交换机)跟某个队列绑定,到了指定时间,消息过期后,就会从DLX(死信交换机)路由到这个队列,消费者可以从这个队列取走消息。
2、使用rabbitmq-delayed-message-exchange 延迟插件
2.1、下载
选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,
下载地址:http://www.rabbitmq.com/community-plugins.html
2.2、安装
插件拷贝到 RabbitMQ 服务器plugins目录下
2.2.1、解压
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
如果unzip 没有安装,先安装一下
yum install unzip -y
2.2.2、启用插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件;
2.2.3、查询安装情况
./rabbitmq-plugins list 查询安装的所有插件;
重启rabbitmq使其生效;(此处也可以不重启)
消息发送后不会直接投递到队列,
而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题。
2.4、示例
2.4.1、RabbitConfig配置类(关键代码)
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueDelayName}")private String queueDelayName;//创建自定义交换机@Beanpublic CustomExchange customExchange() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);}//创建队列@Beanpublic Queue queueNormal() {//建造者模式创建队列return QueueBuilder.durable(queueDelayName)//队列名称.build();}//队列绑定交换机@Beanpublic Binding binding(CustomExchange customExchange, Queue queueNormal) {return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs();}}
2.4.2、发送消息(关键代码)
package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic void sendMsg(){{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay",25000);//第一条消息设置过期时间25秒Message message = MessageBuilder.withBody("hello world 01".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.04","plugin",message);log.info("消息order发送完毕,发送时间是:{}",new Date());}{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay",15000);//第二条消息设置过期时间15秒Message message = MessageBuilder.withBody("hello world 02".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.04","plugin",message);log.info("消息pay发送完毕,发送时间是:{}",new Date());}}
}
2.4.3、application.yml配置类
server:port: 8080
spring:application:name: delay-plugins-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeName: exchange.delay.04 # 交换机queueDelayName: queue.delay.04 # 正常队列
2.4.4、接收消息
package com.power.message;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
@Slf4j
public class ReceiveMessage {@RabbitListener(queues = "queue.delay.04")public void receiveMsg(Message message){String body = new String(message.getBody());log.info("接收到的消息为:{},接收时间为:{}",body,new Date());}
}
2.4.5、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.power</groupId><artifactId>rabbit_07_delay04_plugins</artifactId><version>1.0-SNAPSHOT</version><name>rabbit_07_delay04_plugins</name><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>