消息队列高级

目录

 消息可靠性

生产者消息确认 

 第一步:修改application.yml配置文件信息

 第二步:定义发送者确认confirm回调方法

 第三步:创建消息发送者回执return回调方法(确保消息从交换机到消息队列)

总结:

消息持久化 

消费者消息确认 

SpringAMQP则允许配置三种确认模式:

auto问题

 消费者是失败重试

本地重试

失败策略

总结


消息队列在使用时,有以下的问题需要考虑

  • 消息可靠性问题(一个消息至少被消费一次)
  • 延迟消息问题
  • 高可用问题
  • 消息堆积问题

 消息可靠性

消息从发送,到消费者接收,会经理多个过程:

 

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

生产者消息确认 

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。 

这个可以保证消息发送者成功发送消息到交换机以及消息队列中 

有两种返回结果:

第一种:确认消息是否发送到交换机--》publisher-confirm

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack

第二种:确认消息是否成功从交换机路由到对应的消息队列中--》publisher-return

  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

 

 第一步:修改application.yml配置文件信息

spring:rabbitmq:host: 192.168.230.100 # rabbitMQ的ip地址port: 5672 # 端口username: hhhpassword: 1234virtual-host: /publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory定义消息路由失败时的策略。true,则调用ReturnCallback回调方法;false:则直接丢弃消息

 第二步:定义发送者确认confirm回调方法

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取rabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//设置发送者确认回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 自定义的消息* @param ack ack确认,true为发送到交换机成功* @param s 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if(ack){log.info("消息到达交换机");}else {log.error("消息没有到交换机,原因为:{}",s);}}});}
}

发送消息:

    @Testpublic void test01() throws InterruptedException {String routingKey="simple";String message="hello spring";rabbitTemplate.convertAndSend("mqAd.topic",routingKey,message);Thread.sleep(2000); //让主线程休眠2s,返回回调方法还没执行,主线程就关闭}

发送失败触发消息确认的回调方法,因为我现在没有创建maAd.topic交换机,所以无法发送消息到交换机中

创建交换机 

 @Beanpublic TopicExchange topicExchange(){return new TopicExchange("mqAd.topic");}

 

 第三步:创建消息发送者回执return回调方法(确保消息从交换机到消息队列)

 //设置消息发送者回执回调方法,交换机路由消息队列错误才会回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//            *
//             *
//             * @param message 返回的信息
//             * @param i 回复的状态码
//             * @param s 回复内容
//             * @param s1 交换机
//             * @param s2 路由@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("路由定位失败,{},{},{},{},{}",message,i,s,s1,s2);}});

这时候没有对应的消息队列,会触发消息回执return 回调方法

路由定位失败,(Body:'hello spring' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),312,NO_ROUTE,mqAd.topic,simple

创建消息队列并绑定 

    @Beanpublic TopicExchange topicExchange(){return new TopicExchange("mqAd.topic");}@Beanpublic Queue simpleQueue(){return new Queue("mqAd.simple.queue");}@Beanpublic Binding binding(){return BindingBuilder.bind(simpleQueue()).to(topicExchange()).with("simple");}

总结:

1.消息确认confirm回调方法触发:不论消息是否成功到达交换机都会触发,成功返回的是ack,失败返回的是nack

2.消息回执return回调方法触发:只有交换机路由消息队列失败时才会触发

消息持久化 

防止mq宕机,然后消息队列里面的消息全部丢失

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

 实际上默认交换机和队列,消息都是持久化的

    @Beanpublic TopicExchange topicExchange(){return new TopicExchange("mqAd.topic",true,false);}@Beanpublic Queue simpleQueue(){return new Queue("mqAd.simple.queue",true);

持久化标识: 

消费者消息确认 

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug
spring:rabbitmq:host: 192.168.230.100 # rabbitMQ的ip地址port: 5672 # 端口#addresses: 192.168.150.101:8071, 192.168.150.101:8072, 192.168.150.101:8073username: hhhpassword: 1234virtual-host: /listener:simple:prefetch: 1acknowledge-mode: auto

auto问题

 auto会有一种问题,就会监听方法出现异常,消息队列的消息就无法正常处理,那么spring会自动发消息到mq说明这个消息没有被正常消费,不要删除,然后mq又会把消息重新返回队列,而监听方法又会监听到这个消息队列的信息,然后监听方法又出现异常,消息又回到消息队列,这就在项目中进行无限次的循环重试

 消费者是失败重试

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力: 

本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 4 # 执行次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败-->1s后重试1次-->2秒后重试第二次-->4秒后重试第三次

每次失败的等待时长是之前的2倍数 ,执行次数包括第一次失败

注意:重试次数结束之后还没有成功,消息队列的消息就会被删除 

失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

声明处理错误消息的交换机和消息队列,并且声明一个RepublishMessageRecoverer bean,告诉spring使用的是这种失败策略

@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorMessageBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}//设置消息失败处理策略@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

重复处理三次之后还是错误,就把这个消息发送到 消息错误交换机中

 

 

总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认,回执机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/11964.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

乐鑫USB方案助力设备互联和数据传输,启明云端乐鑫一级代理商

USB USB 是一种通用的总线标准,用于连接主机和外部设备。 乐鑫 USB 方案为用户提供了方便快捷的设备互联和数据传输方式。乐鑫 SoC 通过将 USB 作为标配外设之一,提供 USB 2.0 OTG 或 USB-Serial-JTAG 接口,支持主机 (Host) 和设备 (Device…

linux详解,基本网络枚举

基本网络枚举 一、基本网络工具 ifconfig ifconfig是一个用于配置和显示网络接口信息的命令行工具。它可以显示网络接口的P地址、子网掩码、MC地址等信息,还可以用于启动、停止或配置网络接口。 ip ip也是用于查看和管理网络接口的命令。 它提供了比ifconfig更…

✬宁波TISAX:✬信息安全管理、✬风险评估与✬数据保护✬的集成宝典✬

😀宁波TISAX:🕵️‍♀️信息安全管理、👩‍💻风险评估与🤷🏻‍♂️数据保护的集成宝典👨🏻‍🎓 🐲在当今数字化时代,💻信息…

【软考】系统架构设计师-计算机系统基础(1):计算机硬件

知识点汇总 1、指令集 精简指令集RISC:寄存器,硬布线,效率高;复杂指令集CISC:微程序控制技术,效率低; 2、奇偶校验码:码距是2(出错位校验位),只…

关于分治法左右区间单调遍历应该如何设计

阅读以下文章,首先至少要求通过一道分治法的题目或听过一道该类型的讲解。 对于分治的题目,想必你应该知道,通常我们是对于一个区间拆分两个部分,而最小子问题通常是只包含一个元素的区间数组。为了后续方便处理更大范围的区间&am…

Mybatis的分页插件的使用方式

插件介绍: 使用mabatis中一个名为PageHelper的插件,会把我们后面的一条SQL进行一个动态的拼接,通过拦截器对sql动态的添加limit,从而实现分页的效果 使用方式: 1.先导入相关的依赖 2.在项目中的Mapper层中对应的Mapper.xml中写动态SQL 3.在项目中的Serviceimpl层通过PageHel…

计算机信息处理技术

信息技术基础知识 数据和信息 数据 “数据是对事实、概念或指令的一种特殊表达形式,这种特殊表达形式可以用人工的方式或者用自动化的装置进行通信,翻译转换或者进行加工处理。”根据这个定义,数字、文字、图形、图像、声音等都是数据。数…

基于Python的膳食健康系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

Kafka面试题(三)

1、kafka是如何做到高效读写 1)Kafka 本身是分布式集群,可以采用分区技术,并行度高。 2)读数据采用稀疏索引,可以快速定位要消费的数据。(mysql中索引多了之后,写入速度就慢了)。 …

【Pikachu】任意文件上传实战

将过去和羁绊全部丢弃,不要吝惜那为了梦想流下的泪水。 1.不安全的文件上传漏洞概述 不安全的文件上传漏洞概述 文件上传功能在web应用系统很常见,比如很多网站注册的时候需要上传头像、上传附件等等。当用户点击上传按钮后,后台会对上传的…

C++【STL容器系列(二)】vector的模拟实现

文章目录 1. vector的结构2. vector的默认成员函数2.1构造函数2.1.1 默认构造2.1.2 迭代器构造2.1.3 用n个val初始化构造 2.2 拷贝构造2.3 析构函数2.4 operator 3. vector iterator函数3.1 begin 和 cbegin函数3.2 end() 和 cend()函数 4. vector的小函数4.1 size函数4.2 capa…

边缘检测的100种方法

文章目录 什么是边缘检测 ?一、边缘检测算子:Sobel算子、Scharr算子、Laplacian算子、Canny算子二、梯度计算 顶帽 黑帽 拉普拉斯金字塔三、相位一致性(Phase Congruency,PC)3.1、底层代码(2D)3.2、ski…

【Linux探索学习】第十二弹——初识进程:进程的定义、描述和一些简单的相关操作

Linux学习笔记: https://blog.csdn.net/2301_80220607/category_12805278.html?spm1001.2014.3001.5482 前言: 在前面经过那么多篇的铺垫后,今天我们正式进入Linux学习的第一个重难点——进程,理解进程对于我们学习操作系统的其…

Java项目实战II基于微信小程序的订餐系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导 一、前言 随着移动互联网技术的飞速发展&#xff0…

触想染织厂MES产线终端工位机,打造数字化高效车间

一、行业发展背景 在纺织细分领域中,印染行业一直是整个产业链的效率短板,因其涉及染色、定型及后整理加工等多个复杂工艺、上百个参数变量,质量波动较大,依赖个人经验和手工操作,常常陷入高成本、低效率发展困境。 △…

CSS查缺补漏 two

11.6~11.11查缺补漏 一、熟记1.结构伪类选择器2.伪元素选择器3.盒子模型4.居中对齐(重中之重!!!)5.清除默认样式6.元素溢出(滚动条)7.行内元素 – 内外边距问题8.圆角9 .盒子阴影(拓…

Taro React-Native IOS 打包发布

http网络请求不到 配置 fix react-native facebook::flipper::SocketCertificateProvider‘ (aka ‘int‘) is not a function or func_rn运行debug提示flipper-CSDN博客 Xcode 15(iOS17)编译适配报错_no template named function in namespace std-CS…

本地搭建你的私有网盘:在Ubuntu上使用Portainer CE安装NextCloud

文章目录 前言1. 在PortainerCE中创建NextCloud容器2. 公网远程访问本地NextCloud容器2.1 内网穿透工具安装3.2 创建远程连接公网地址 3. 固定NextCloud私有云盘公网地址 前言 本篇文章介绍如何在本地使用Portainer CE可视化图形界面创建NextCloud私有网盘容器,并结…

超好用shell脚本NuShell mac安装

利用管道控制任意系统 Nu 可以在 Linux、macOS 和 Windows 上运行。一次学习,处处可用。 一切皆数据 Nu 管道使用结构化数据,你可以用同样的方式安全地选择,过滤和排序。停止解析字符串,开始解决问题。 强大的插件系统 具备强…

游戏引擎中LOD渲染技术

一.LOD(Level Of Detail) 为了降低GPU渲染压力,根据摄像机距离模型距离将面数较高的模型替换为面数较低的模型. LOD LOD0(distance<10) LOD1(distance<20) LOD2(distance<30) 故通常引擎中MetaMesh是由一个或多个LOD模型构成. MetaMesh mesh mesh.lod1 mesh.lod…