rabbitMQ

官网:https://www.rabbitmq.com/

一  介绍与安装

1  安装

我们同样基于Docker来安装RabbitMQ,使用下面的命令即可:

docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name rabbitmq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

- 15672:RabbitMQ提供的管理控制台的端口

- 5672:RabbitMQ的消息发送处理接口

2  访问控制台

192.168.6.134:15672

3  基本概念

- `**publisher**`:生产者,也就是发送消息的一方

- `**consumer**`:消费者,也就是消费消息的一方

- `**queue**`:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

- `**exchange**`:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

- `**virtual host**`:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue。我们多个项目有时候为了节约成本,会共用一个rabbitmq集群,所以需要区分不同的交换机和队列

二  基于操作面板的快速入门

1  需求

需求:

在RabbitMO的控制台完成下列操作:

1  新建队列hello.queue1和hello.queue2

2  向默认的amp.fanout交换机发送一条消息
3  查看消息是否到达hello.queue1和hello.queue2
4  总结规律

2  创建队列

3  交换机绑定队列并发送消息

3.1  选择指定的交换机

3.2  根据需求绑定

绑定hello.queue1  and   hello.queue2

此时查看队列情况,队列中也有一个绑定选项,可以看到已经绑定了交换机

3.3  交换机模拟发送消息

3.4  查看队列的消息

4  总结

1  交换机发消息的时候必须绑定相应的队列

2  绑定了的队列都能收到交换机发送的消息

三  数据隔离

1  需求

需求:

在RabbitMQ的控制台完成下列操作:
1  新建一个用户hmall
2  为hmall用户创建一个virtual host
3  测试不同virtualhost之间的数据隔离现象

2  新建一个用户

新建一个叫shuaiqicjx的用户,密码123,权限是超级管理员admin。

创建好了用户之后,可以看到新用户是没有虚拟机的,他可以看到别的用户的队列,虚拟机等,但是没有权限去操作他们

3  为新用户创建一个新的虚拟机

左上角退出默认账号,登陆新创建好的账号,然后创建虚拟机

右上角切换虚拟机之后,可以看到没有队列,因为是一个新的虚拟机。

四  java客户端操作

1  基于客户端的快速入门

1.1  基本了解

SpringAmqp的官方地址:    https://spring.io/projects/spring-amqp

1.2  需求

需求如下:
1  利用控制台创建队列simple.queue
2  在publisher服务中,利用SpringAMOP直接向simple.queue发送消息
3  在consumer服务中,利用SpringAMOP编写消费者,监听simple.queue队列

1.3  引入依赖

注意boot是2.7.12版本,还引入了Spring-AMQP依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency></dependencies>
</project>

1.4  在发送的微服务配置mq

在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ

spring:rabbitmq:host: 192.168.150.101port: 5672virtual-host: /hmallusername: hmallpassword: 123

1.5  发送消息

消息发送成功

1.6  获取消息

获取消息的微服务也要去配置mq的消息

将监听类加载到spring中,然后在接受消息的方法中去填写监听哪一个队列,参数类型是发送消息的类型。

运行后打印了消息

2  Work Queue模型

2.1  简介

2.2  需求

模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
1.在RabbitMQ的控制台创建一个队列,名为work.queue

2.在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue

3.在consumer服务中定义两个消息监听者,都监听work.queue队列

4 一个消费者处理的快,一个慢一点

2.3  创建队列

创建队列work.queue

2.4  创建消费者

每20毫秒发送一次,一秒发送50条

2.5  创建消费者1和2

可以发现

第一点  每条消息只会消费一次

第二点  消息以轮训的方式给消费者

所以,轮训没有利用到不同消费者处理的性能,我们需要去使用到性能

2.6  修改配置

2.7  测试

设置了睡眠时间,模拟消费延迟

可以看到这个时候可以充分使用性能

2.8  总结

Work模型的使用:
1  多个消费者绑定到一个队列,可以加快消息处理速度
2  同一条消息只会被一个消费者处理
3  通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

3  Fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
Fanout:广播
Direct:定向
Topic:话题

3.1  介绍

3.2  需求

实现思路如下:
1  在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

2  在RabbitMO控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4  在publisher中编写测试方法,向hmall.fanout发送消息

3.3  创建队列

声明队列fanout.queue1和fanout.queue2

3.4  声明交换机hmall.fanout并绑定队列

声明交换机,和他的fanout模式

绑定两个交换机

3.5  编写两个消费者方法

3.6  发送消息

这个消息是要发送到交换机中的,要传入的参数有三个,第一个是交换机的名字,第二个是队列(也可以是绑定的属性),第三个是消息

如果不需要指定发送到的队列,就在队列的参数传null

发送成功

3.7  总结

交换机的作用是什么?
1  接收publisher发送的消息
2  将消息按照规则路由到与之绑定的队列

3  FanoutExchange的会将消息路由到每个绑定的队列

4  Direct交换机

4.1  介绍

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
1  每一个Queue都与Exchange设置一个BindingKey
2  发布者发送消息时,指定消息的RoutingKey
3  Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

4.2  需求


1  在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

2  在RabbitMO控制台中,声明交换机hmall.direct,将两个队列与其绑定
3  在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

4  在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息

4.3  声明队列

声明队列direct.queue1和direct.queue2

4.4  创建交换机

声明交换机hmall.direct,将两个队列与其绑定

绑定队列的时候记得绑定属性,队列1绑定蓝色和红色,队列2绑定红色和黄色

4.5  编写两个消费者方法

4.6  编写发送方法

第二个属性这时候指定的是消息绑定的特定的属性

因为绑定的是红色,两个队列都可以收到

如果绑定的是蓝色,就只有一个队列可以收到,只有队列1可以收到

5  Topic交换机

5.1  介绍

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。

Queue与Exchange指定BindingKey时可以使用通配符:
◆#:代指0个或多个单词
◆*:代指一个单词

5.2  需求

需求如下:
1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定

3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

5.3  声明队列

5.4  声明交换机绑定队列

注意这个时候的绑定的属性

5.5  编写消费者

5.6  生产者

5.7  测试

只有队列1可以收到消息,测试成功

6  在代码中声明队列,交换机,和绑定关系

6.1  基于配置类的方式

上述1-5的操作都是我们手动在操作上去声明队列,交换机,和绑定队列与交换机的关系。为了提高效率,我们要在代码中完成这些操作

SpringAMOP提供了几个类,用来声明队列、交换机及其绑定关系:
1  Queue:用于声明队列,可以用工厂类QueueBuilder构建
2  Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

3  Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

6.1.1  示例

可以用构造器或者直接new

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {//@Beanpublic Queue createQueue1(){
//        Queue queue = QueueBuilder.durable("direct.queue1").build();return new Queue("direct.queue1");}@Beanpublic Queue createQueue2(){
//        Queue queue = QueueBuilder.durable("direct.queue2").build();return new Queue("direct.queue2");}@Beanpublic DirectExchange createDiretExchange(){
//        ExchangeBuilder.directExchange("hmall.direct").build();return new DirectExchange("hmall.direct");}@Beanpublic Binding createBinding(){return BindingBuilder.bind(createQueue2()).to(createDiretExchange()).with("blue");}
}

6.2  基于注解的方式

在监听方法上使用注解来创建绑定关系,队列和交换机。可以点进去看可以加哪些参数。

7  消息转换器

7.1  介绍


Spring的对消息对象的处理是出org.springframework.amgp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列问题:
1  JDK的序列化有安全风险
2  JDK序列化的消息太大
3  JDK序列化的消息可读性差

7.2  解决

建议采用JSON序列化代替默认的JDK序列化,

在publisher和consumer中都要引入jackson依赖:

<!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>

在publisher和consumer中都要配置Messageconverter:

@Beanpublic MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}

五  消息的可靠性

1  发送者的可靠性

1.1  生产者重连

这个更多解决的是方式发送消息因为发送者与rabbitmq连接的原因发送失败

有的时候由于网络波动,可能会出现客户端连接MO失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout: 1template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时长=initial-interval * multipliermax-attempts: 3 #最大重试次数

注意!!!

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

1.2  消息确认原理

这个更多解决的是消息在发送过程中失败

RabbitMO了Publisher ConfirmPublisher Return两种确认机制。

开启确机制认后,在MQ成功收到消息后会返回消息给生产者

返回的结果有以下几种情况:

1  消息投递到了MQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功

2  临时消息投递到了MQ(交换机),并且入队成功,返回ACK,告知投递成功
3  持久消息投递到了MO(交换机),并且入队完成持久化,返回ACK,告知投递成功

4  其它情况都会返回NACK,告知投递失败

1.3  消息确认实现方式

配置回调
spring:rabbitmq:publisher-confirm-type: correlated   #  开启publisher confirm机制,并设置confirm类型publisher-returns: true  #  开启publisherreturn机制

配置说明:
这里publisher-confirm-type有三种模式可选:
none:                关闭confirm机制
simple:             同步阻塞等待MQ的回执消息
correlated:        MQ异步回调方式返回回执消息

编写Publisher Return
package com.itheima.publisher.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class MqConfirmConfig implements ApplicationContextAware {// 实现ApplicationContextAware接口的setApplicationContext方法,// 该方法会在Spring容器加载上下文时被调用。@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {// 从Spring应用上下文中获取RabbitTemplate的bean实例。RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置RabbitTemplate的消息返回回调,当消息无法被路由到任何队列时,会触发这个回调。rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {// 实现ReturnsCallback接口的returnedMessage方法,处理返回的消息。@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 使用log对象记录调试信息,包括消息内容、交换机、回复码、回复文本和路由键。log.debug("回调函数调用, Message : #{}, Exchange : #{}, Code : #{}, " +"text : #{}, rountingkey : #{} ",// returnedMessage对象包含返回消息的详细信息。returnedMessage.getMessage(), // 返回的消息对象returnedMessage.getExchange(), // 返回消息的交换机名称returnedMessage.getReplyCode(), // 服务器返回的状态码returnedMessage.getReplyText(), // 服务器返回的状态文本returnedMessage.getRoutingKey() // 返回消息的路由键);}});}
}

编写publisher confirm

要注意的是,这里的失败是future的操作失败,也就是spring失败,而不是rabbitmq失败。成功也不是消息发送成功,而是future回调成功。

@Testvoid testConfirm() throws InterruptedException {// 创建CorrelationData对象,用于存储消息确认的相关数据。CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 为CorrelationData对象的Future添加回调,当消息确认或否定确认时会被调用。cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {// 当Future操作失败时调用此方法。@Overridepublic void onFailure(Throwable ex) {// 使用log对象记录错误日志,包括失败的原因。log.error("失败原因:#{}", ex);}// 当Future操作成功时调用此方法。@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 记录调试信息,表示收到了确认响应。log.debug("收到Confirm");// 检查确认结果是否为Ack(确认)。if(result.isAck()){// 如果是Ack,记录调试信息,表示消息成功被确认。log.debug("成功回调ack");}else{// 如果不是Ack,记录调试信息,表示消息被否定确认,并记录原因。log.debug("成功回调,nack ,原因 : #{}", result.getReason());}}});// 使用rabbitTemplate发送消息到RabbitMQ,指定交换机类型为direct,路由键为"red",消息内容为"hello"。// CorrelationData对象用于关联发送的消息和其确认响应。rabbitTemplate.convertAndSend("hmall.direct","red", "hello", cd);// 让当前线程休眠10000毫秒(10秒),以等待消息确认的结果。// 这个休眠时间可能过长,实际应用中应根据业务需求和消息确认机制的响应时间来调整。Thread.sleep(10000);}

演示

情况1:当路由错误,也就是没有发送到队列,会返回ack并且调用return机制

情况2:当交换机错误,也就是消息没有传到mq中,会返回nack

1.4  总结

SpringAMOP中生产者消息确认的几种返回值情况:


1  消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK
2  临时消息投递到了MQ,并且入队成功,返回ACK

3  持久消息投递到了MO,并且入队完成持久化,返回ACK
4  其它情况都会返回NACK,告知投递失败

如何处理生产者的确认消息?
1  生产者确认需要额外的网络和系统资源开销,尽量不要使用如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
2  对于nack消息可以有限次数重试,依然失败则记录异常消息

2  MQ的可靠性

2.1  问题


在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。

这样会导致两个问题:

1  一旦MO宕机,内存中的消息会丢失
2  内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。mq在消息满了的时候,有新的消息进来,mq会把老的消息存到磁盘上,新的消息放到队列里面。这个过程mq是阻塞的。(也称为Paged out)

2.2  消息持久化

默认情况下,我们通过spring创建的交换机,队列和消息都是持久化的,不需要我们特意地去配置持久化。在操作面板中我们要去选择持久化的选项

这样发消息超过mq内存,mq清除消息的时候,就不会去阻塞。但是性能也会减小,因为要写到磁盘中

2.3  Lazy Queue(优先选择)


从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列。

惰性队列的特征如下:
1  接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)

2  消费者要消费消息时才会从磁盘中读取并加载到内存
3  支持数百万条的消息存储
4  在3.12版本后,所有队列都是LazyQueue模式,无法更改

看起来和持久化差不多,但是这个会在写操作的时候,也就是磁盘io的时候做优化,性能会高很多

如何实现?

基于控制台

点击lazy queue后会自动加上参数

基于注解

加上arguments参数

2.4  总结

3  消费者的可靠性

3.1  介绍

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)

当消费者处理消息结束后,应该向RabbitMO发送一个回执,告知RabbitM0自己消息处理状态。回执有三种可选值:

ack:成功处理消息,RabbitMO从队列中删除该消息
nack:消息处理失败,通常是执行的时候出错误,可以解决的错误,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,通常是消息本身有错误,不能解决,比如参数异常等。RabbitMO从队列中删除该消息

SpringAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

1  none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

2  manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

3  auto:自动模式。SpringAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack当业务出现异常时,根据异常判断返回不同结果:
        如果是业务异常,会自动返回nack
        如果是消息处理或校验异常,自动返回reject

3.2  实现

需要在配置中加上        acknowledge-mode  :  auto    这个选项,默认是   none

spring:rabbitmq:listener:simple:acknowledge-mode: auto

3.3  失败重试机制

配置重试机制


当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true #开启消费者失败重试initial-interval: 1000ms #初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长=multiplier *last-intervalmax-attempts: 3 #最大重试次数stateless: true #true无状态:false有状态。如果业务中包含事务,这里改为false

选择处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(使用)

我们一般使用第三种方式

为选择的策略创建指定交换机,队列,和编写指定投送到交换机的动作

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置类,用于定义错误处理相关的Bean。*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled",havingValue = "true")
public class ErrorConfiguration {/*** 创建一个名为"error.queue"的队列。* 这个队列用于存储处理失败的消息。* @return 创建的队列对象*/@Beanpublic Queue createErrorQueue(){return new Queue("error.queue");}/*** 创建一个名为"error.direct"的直连交换机。* 这个交换机用于将消息路由到错误处理队列。* @return 创建的交换机对象*/@Beanpublic DirectExchange createErrorExchange(){return new DirectExchange("error.direct");}/*** 创建一个绑定,将"error.queue"队列绑定到"error.direct"交换机。* 并设置路由键为"error",这样标记为"error"的消息会被路由到错误处理队列。* @return 创建的绑定对象*/@Beanpublic Binding binding(){return BindingBuilder.bind(createErrorQueue()).to(createErrorExchange()).with("error");}/*** 创建一个消息恢复器,用于处理消息的重试。* 当消息处理失败时,这个恢复器会将消息重新发布到"error.direct"交换机。* @param rabbitTemplate RabbitTemplate对象,用于操作消息* @return 创建的消息恢复器对象*/@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");}
}

发送消息之后会重试三次然后抛出异常,消息转发给error队列

查看错误队列,我们可以看到消息的信息

业务幂等性

上面两个重试措施已经可以保证消息至少消费一次了,但是不能保证消息只消费一次。

什么是幂等性?

方案一:唯一消息id

方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:


1  每一条消息都生成一个唯一的id,与消息一起投递给消费者
2  消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

3  如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

方案一实现:

在消息转换器设置,内部会自动生成一个uuid,伴随消息发出

@Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();jjmc.setCreateMessageIds(true);return jjmc;}

可以看到消息带有一个id

方案二:根据业务逻辑

可以根据业务逻辑 进行处理。根据业务中是否含有某些字段来处理。比如说我要执行修改操作,根据状态值来判断修改的数据。下面就是课程中的案例,可以学习一下。

六  延迟消息

1  是什么

假如我有一个业务,我下单之后就立马扣去库存,但是没有支付。这种情况下别人是抢不了这个商品的。这时候我们可以使用延迟消息,延迟任务。

就比如一个下单业务,顾客下单之后会立即发送下单请求给mq,然后mq在30分钟后(延迟)去检查这个下单任务是否已经支付,如果支付就扣去库存。

2  实现一: 死信交换机

2.1  死信是什么

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

1  消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
2  消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
3  要投递的队列消息堆积满了,最早的消息可能成为死信

2.2  实现:基于操作面板

创建simple.direct

创建dlx.direct

创建dlx,queue

创建simple.queue并设置死信交换机

队列与交换机之间的绑定

simple.direct  AND   simple.queue

dlx.direct  AND  dlx.queue

测试

用代码发送消息,并设置过期时间10秒

@Testvoid sendTTLMessage(){rabbitTemplate.convertAndSend("simple.direct", "red", "hello",new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");return message;}});log.info("发送完成!");}
@RabbitListener(queues = "dlx.queue")public void listemTTL(String msg){log.info("收到死信消息 :{}" , msg);}

相隔10秒,确认成功

2.3  实现:基于java代码

思路和上面操作面板的一样

发送消息
@Testvoid sendTTLMessage(){rabbitTemplate.convertAndSend("simple.direct", "red", "hello",new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");return message;}});log.info("发送完成!");}
消费者

这里不能直接使用注解直接设置两个队列和交换机,必须使用配置类来创建一组队列和交换机

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ 配置类,用于定义消息队列、交换器和绑定规则。*/
@Configuration
public class DiedConfiguration {/*** 定义一个直连交换器(DirectExchange),用于将消息路由到指定的队列。* * @return 创建的直连交换器实例。*/@Beanpublic DirectExchange simpledirec() {return new DirectExchange("simple.direct");}/*** 定义一个队列,用于存储消息。* 该队列被设置为持久化的,以确保在RabbitMQ服务重启后消息不会丢失。* 同时,配置了死信交换器,用于处理无法正常路由的消息。* * @return 创建的队列实例。*/@Beanpublic Queue simplequeue() {return QueueBuilder.durable("simple.queue").deadLetterExchange("dlx.direct") // 设置死信交换器.build();}/*** 定义一个绑定规则,将队列与交换器绑定,并指定路由键。* 消息将根据路由键被路由到对应的队列。* * @return 创建的绑定规则实例。*/@Beanpublic Binding binding() {return BindingBuilder.bind(simplequeue()) // 绑定到队列.to(simpledirec()) // 绑定到直连交换器.with("red"); // 指定路由键}
}
/*** 监听死信队列的方法。* 当死信队列接收到消息时,该方法会被调用。* * @param msg 从死信队列接收到的消息内容。*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlx.queue"), // 指定死信队列的名称exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT), // 指定死信交换器的名称和类型key = {"red"} // 指定路由键,用于将消息路由到对应的队列))public void listemTTL(@Payload String msg) { // 使用@Payload注解指定方法参数为消息的负载log.info("收到死信消息 :{}", msg); // 记录日志,输出收到的死信消息内容}

3  实现二:延迟消息插件

3.1  是什么  

RabbitMO的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

3.2  实现

消费者

在定义交换机的时候开启delay

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "hmall.delay", type = ExchangeTypes.DIRECT, delayed = "true"),key = {"red", "blue"}))public void listemlazyQueue(String msg) {log.info(msg);}
生产者

设置延迟时间的方法是setDelay()

@Testvoid sendDelayMessage(){rabbitTemplate.convertAndSend("hmall.delay", "red", "hello",new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000);return message;}});log.info("发送完成!");}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5418.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

reg注册表研究与物理Hack

reg注册表研究与物理Hack 声明&#xff1a;内容的只是方便各位师傅学习知识&#xff0c;以下网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 目录 reg注册表研究与物理HackWindows注册表修改注册表实现应用程序开机…

【黑盒测试】等价类划分法及实例

本文主要介绍黑盒测试之等价类划分法&#xff0c;如什么是等价类划分法&#xff0c;以及如何划分&#xff0c;设计等价类表。以及关于三角形案例的等价类划分法。 文章目录 一、什么是等价类划分法 二、划分等价类和列出等价类表 三、确定等价类的原则 四、建立等价类表 …

适用于个人或团队的文档管理和知识库系统,NAS快速部署『BookStack』

适用于个人或团队的文档管理和知识库系统&#xff0c;NAS快速部署『BookStack』 哈喽小伙伴们好&#xff0c;我是Stark-C~ 知识库对于很多需要和文字打交道的个人或者团队都不陌生对吧&#xff1f;对于我们个人来说&#xff0c;它可以将常用的学习资料、工作笔记、项目计划和…

delphi fmx android 自动更新(一)

12.2 android10测试通过 一,安卓权限设置 1,REQUEST_INSTALL_PACKAGES 权限 2,INTERNET 权限 3,READ_EXTERNAL_STORAGE 权限 4,WRITE_EXTERNAL_STORAGE 权限 5,READ_PHONE_STATE 二,安卓下载过程 一般是从http下载安装包 apk 所以,如果是http 则,manife…

《JVM第7课》堆区

文章目录 1.概念2.指定堆大小3.新生代和老年代3.1 新生代3.2 老年代3.3 动画演示 4.分代收集理念 1.概念 堆是JVM中最重要的一块区域&#xff0c;JVM规范中规定所有的对象和数组都应该存放在堆中&#xff0c;在执行字节码指令时&#xff0c;会把创建的对象存入堆中&#xff0c…

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …

Spring Boot框架下的知识管理与多维分类

4 系统设计 系统分析接下来的操作步骤就是系统的设计&#xff0c;这部分内容也是不能马虎对待的。因为生活都是在不断产生变化&#xff0c;人们需求也是在不断改变&#xff0c;开发技术也是在不断升级&#xff0c;所以程序也需要考虑在今后可以方便进行功能扩展&#xff0c;完成…

LeetCode17. 电话号码的字母组合(2024秋季每日一题 59)

给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#xff1a;digits “23” 输出&#xff1a;[“…

Nature Methods | 基于流形约束的RNA速度推断精准解析细胞周期动态调节规律

生信碱移 VeloCycle算法 VeloCycle&#xff1a;基于流形约束的RNA速度推断在细胞周期动态中的精准解析 今天给各位老铁们分享一篇于2024年10月31号发表在 Nature Methods [IF: 36.1] 的文章&#xff1a;"Statistical inference with a manifold-constrained RNA velocity…

Spring挖掘:(AOP篇)

学习AOP时,我们首先来了解一下何为AOP 一. 概念 AOP&#xff08;面向切面编程&#xff0c;Aspect Oriented Programming&#xff09;是一种编程技术&#xff0c;旨在通过预编译方式或运行期动态代理实现程序功能的统一管理和增强。AOP的主要目标是在不改变原有业务逻辑代码的…

【机器学习】k最近邻分类

&#x1f4dd;本文介绍 本文为作者阅读鸢尾花书籍以及一些其他资料的k最近邻分类后&#xff0c;所作笔记 &#x1f44b;作者简介&#xff1a;一个正在积极探索的本科生 &#x1f4f1;联系方式&#xff1a;943641266(QQ) &#x1f6aa;Github地址&#xff1a;https://github.com…

《深度学习》bert自然语言处理框架

目录 一&#xff0c;关于bert框架 1、什么是bert 2、模型结构 自注意力机制&#xff1a; 3、预训练任务 4、双向性 5、微调&#xff08;Fine-tuning&#xff09; 6、表现与影响 二、Transformer 1、传统RNN网络计算时存在的问题 1&#xff09;串联 2&#xff09;并行…

开源 - Ideal库 - 常用时间转换扩展方法(一)

从事软件开发这么多年&#xff0c;平时也积累了一些方便自己快速开发的帮助类&#xff0c;一直在想着以什么方式分享出来&#xff0c;因此有了这个系列文章&#xff0c;后面我将以《开源-Ideal库》系列文章分享一些我认为比较成熟、比较方便、比较好的代码&#xff0c;如果感觉…

网络安全漏洞管理十大度量指标

前言 当前&#xff0c;网络安全漏洞所带来的风险及产生的后果&#xff0c;影响到网络空间乃至现实世界的方方面面&#xff0c;通信、金融、能源、电力、铁路、医院、水务、航空、制造业等行业各类勒索、数据泄露、供应链、钓鱼等网络安全攻击事件层出不穷。因此&#xff0c;加…

R语言*号标识显著性差异判断组间差异是否具有统计意义

前言 该R代码用于对Iris数据集进行多组比较分析&#xff0c;探讨不同鸢尾花品种在不同测量变量&#xff08;花萼和花瓣长度与宽度&#xff09;上的显著性差异。通过将数据转换为长格式&#xff0c;并利用ANOVA和Tukey检验&#xff0c;代码生成了不同品种间的显著性标记&#x…

Web前端PC端开发者工具详细介绍(约10000字保姆级讲解)

1.Elements部分 首先按下F12键即可进入开发者工具页面&#xff0c;以CSDN博客页面为例&#xff0c;如下可以看到右侧是该页面所对应的前端代码。 在Elements部分的Styles模块下可以看页面的各个类别的样式等。 &#xff08;1&#xff09;点击.cls可以开启动态修改元素的class&a…

SQL Server 日志记录

SQL Server是一个关系数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;旨在有效地存储、组织、检索和操作大量结构化数据。SQL Server日志是监控数据库活动、排查问题和确保数据一致性的基础&#xff0c;这些日志记录了SQL Server实例中发生的事件的时间顺序。它们充当…

Qt QCustomplot 在采集信号领域的应用

文章目录 一、常用的几种开源库:1、QCustomPlot:2、QChart:3、Qwt:QCustomplot 在采集信号领域的应用1、应用实例时域分析频谱分析2.数据筛选和处理其他参考自然界中的物理过程、传感器和传感器网络、电路和电子设备、通信系统等都是模拟信号的来源。通过可视化模拟信号,可以…

【数据结构】堆和二叉树(2)

文章目录 前言一、建堆和堆排序1.堆排序 二、二叉树链式结构的实现1.二叉树的遍历 三、链式二叉树的功能函数1.二叉树结点个数2.二叉树叶子结点个数3.二叉树的高度4.二叉树第k层结点个数5. 二叉树查找值为x的结点6.二叉树销毁 总结 前言 接着上一篇博客&#xff0c;我们继续分…

PDF多功能工具箱 PDF Shaper v14.6

如今对PDF处理的软件很多都是只是单一的功能。PDF Shaper给你完全不同的体验&#xff0c;因为PDF Shaper是一款免费的PDF工具集合的软件。有了PDF Shaper&#xff0c;你以后再也不用下载其他处理PDF的软件了。PDF Shaper的功能有&#xff1a;合并&#xff0c;分割&#xff0c;加…