RabbitMQ08_保证消息可靠性

保证消息可靠性

        • 一、生产者可靠性
          • 1、生产者重连机制(防止网络波动)
          • 2、生产者确认机制
            • Publisher Return 确认机制
            • Publisher Confirm 确认机制
        • 二、MQ 可靠性
          • 1、数据持久化
            • 交换机、队列持久化
            • 消息持久化
          • 2、Lazy Queue 惰性队列
        • 三、消费者可靠性
          • 1、消费者确认机制
          • 2、失败重试机制
          • 3、业务幂等性

一、生产者可靠性
1、生产者重连机制(防止网络波动)
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制(默认是false)initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时长= initial-interval * multipliermax-attempts: 3 #最大重试次数
2、生产者确认机制
Publisher Return 确认机制

消息投递到MQ但是MQ路由失败,MQ返回路由失败原因

spring:rabbitmq:publisher-returns: true # 开启publisher return机制
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
Publisher Confirm 确认机制

临时消息投递到了MQ且入队成功,返回ACK
持久消息投递到了MQ且入队完成持久化,返回ACK
消息投递异常,返回NACK

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型

publisher-confirm-type 的三种类型

  • none 关闭confirm机制
  • simple 同步阻塞等待MQ回执消息
  • correlated MQ异步回调返回回执消息
@Test
void testPublisherConfirm() throws InterruptedException {CorrelationData cd = new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {        @Overridepublic void onFailure(Throwable ex) {// Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ log.debug("发送消息成功,收到 ack!");} else {log.error("发送消息失败,收到 nack, reason : {}", result.getReason());//TODO 重试发送}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "red1", "hello", cd);
}
二、MQ 可靠性
1、数据持久化
交换机、队列持久化

默认创建时就是持久化的(Durability = Durable)

在这里插入图片描述

消息持久化

RabbitTemplate 的 convertAndSend() 方法发送的消息默认就是持久化的(delivery mode = 2)

如果非要发送一个非持久化的消息,需要在调用 rabbitTemplate.convertAndSend() 方法时,显式地设置消息的 MessageProperties,并将 deliveryMode 设置为 1 (非持久化)

2、Lazy Queue 惰性队列

Lazy Queue是一种以惰性模式运行的队列,它尽可能地将消息存储在磁盘上,而不是内存中。只有当消费者需要消费消息时,这些消息才会被加载到内存中,效率比传统队列高。

3.12版本后,所有队列都是Lazy Queue模式,无法更改。

三、消费者可靠性
1、消费者确认机制

消费者回执消息类型

  • ack 消费者处理成功,RabbitMQ 将从队列中删除消息
  • nack 消费者处理失败,RabbitMQ 需再次投递消息
  • reject 消费者拒绝处理,RabbitMQ 将从队列中删除消息

SpringAMQP 消息监听器的三种确认模式

  • none 不处理。即消费者收到消息后立刻返回ack,消息会丢失,非常不安全。
  • manual 手动模式。业务代码手动调用api发送 ack 或 reject,存在业务入侵,但更灵活。
  • auto 自动模式(默认)。通过 AOP 对消息处理方法做环绕增强,正常返回ack,出现业务异常返回nack,出现消息处理或校验异常返回reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
2、失败重试机制

消费者处理消息出现异常时利用本地重试,而不是无限的requeue到mq,让mq重新投递给消费者

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试(默认是关闭的)initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现。

失败消息处理策略

  • RejectAndDontRequeueRecoverer:默认实现,重试耗尽后直接reject,丢弃消息。
  • ImmediateRequeueMessageRecoverer:重试耗尽后返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后将失败消息投递到指定的交换机

以第三种失败消息处理策略为例,配置方式如下:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
3、业务幂等性

由于存在各种确认和重试机制,消费者有重复消费消息的可能性,因此要保证业务的幂等性。
保证业务幂等性的方式如下:

  • 方案一:发送消息时生成唯一消息ID,投递给消费者,消费者接收到消息,业务处理成功后将消息ID保存到数据库,下次根据消息ID去数据库查询判断是否已处理,如果已处理则放弃处理。
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
  • 方案二:结合业务逻辑,基于业务本身做判断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145625.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

CLI示例(V2R8至V2R19C00版本):直连二层组网直接转发【AP+上层网络,增加AP下行口有线接入】

CLI示例(V2R8至V2R19C00版本):直连二层组网直接转发【AP+上层网络,增加AP下行口有线接入】 适用于:V200R008至V200R019C00版本的AC,以及有空闲以太网口的AP。 说明:本示例基于“直连二层组网直接转发【AP+AC+出口网关】”场景来介绍如何增加AP下行口有线接入。 业务需求…

SPI 详解

介绍 串行外设接口是微控制器用来与外设&#xff08;如 SRAM、SD 卡、移位寄存器、传感器等&#xff09;通信的最常见通信协议之一。它是一种同步、全双工、基于主从的协议。它支持高速数据传输&#xff0c;并且 SPI 协议中的数据速度 (bps) 和时钟频率 (Hz) 之间存在直接关系…

[Redis][Hash]详细讲解

目录 0.前言1.常见命令1.HSET2.HGET3.HEXISTS4.HDEL5.HKEYS6.HVALS7.HGETALL8.HMGET9.HLEN10.HSETNX11.HINCRBY12.HINCRBYFLOAT 2.内部编码1.ziplist(压缩链表)2.hashtable(哈希表) 3.使用场景4.缓存方式对比1.原⽣字符串类型2.序列化字符串类型3.哈希类型 0.前言 在Redis中&am…

帧率和丢帧分析理论

一、丢帧问题概述 应用丢帧通常指的是在应用程序的界面绘制过程中&#xff0c;由于某些原因导致界面绘制的帧率下降&#xff0c;从而造成界面卡顿、动画不流畅等问题。以60Hz刷新率为例子&#xff0c;想要达到每秒60帧&#xff08;即60fps&#xff09;的流畅体验&#xff0c;每…

鹏哥C语言复习——函数栈帧的创建和销毁

目录 演示用代码&#xff1a; 提示&#xff1a;后文讲解时后缀为h的指的是16进制表示 疑惑1&#xff1a;自定义函数、库函数都是在main函数内部调用&#xff0c;那么是什么调用了main函数呢&#xff1f; 疑惑2&#xff1a;如何观察ebp、esp等寄存器的运行&#xff1f; 疑惑…

提升效率的AI工具集 - 轻松实现自动化

在这个快节奏、高效率的社会中&#xff0c;我们每个人都渴望能够找到提升工作效率的捷径。幸运的是&#xff0c;随着人工智能&#xff08;AI&#xff09;技术的迅猛发展&#xff0c;越来越多的AI工具涌现出来&#xff0c;为我们提供了强大的支持。这些工具不仅能够帮助我们提高…

算法-分治和逆序

分治法&#xff08;Divide and Conquer&#xff09;是一种重要的算法设计范式&#xff0c;它通过将复杂的问题分解成更小、更易于管理和解决的子问题&#xff0c;然后递归地解决这些子问题&#xff0c;最后将子问题的解合并以得到原问题的解。分治法通常用于排序、搜索、数学计…

基于STM32F103C8T6单片机的DDS信号源设计

本设计能够输出三角波信号、方波信号和正弦波信号,主要由STM32F103C8T6最小核心板、电源供电电路模块、AD9833电路模块、矩阵按键电路模块、LCD1602液晶显示模块等组成。在设计中&#xff0c;使用STM32F103C8T6作为控制芯片&#xff0c;结合LCD1602液晶显示器&#xff0c;矩阵键…

稳了,搭建Docker国内源图文教程

大家好&#xff0c;之前分享了我的开源作品 Cloudflare Workers Proxy&#xff0c;它的作用是代理被屏蔽的地址&#xff0c;理论上支持代理任何被屏蔽的域名&#xff0c;使用方式也很简单&#xff0c;只需要设置环境变量 PROXY_HOSTNAME 为被屏蔽的域名&#xff0c;最后通过你的…

Unity多语言插件I2 Localization国际化应用

【就不收费了&#xff0c;要个关注不过分吧】 【图片来自插件官网&#xff0c;侵删】 前言 目前游戏往往都不会仅局限于国内语言&#xff0c;为了适应产品都要做国际化适配&#xff0c;因此会用到这个插件&#xff0c;这个插件要付费&#xff0c;因此请前往unity官网进行下载…

秒懂Linux之消息队列与信号量(了解)

目录 前言 消息队列原理 信号量理论 信号量原理 IPC资源 前言 消息队列与信息量目前已经不常用了&#xff0c;大家也可以参考共享内存去了解基本原理即可。 消息队列原理 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法 每个数据块都被认为是有一个类型&…

mfc140u.dll引发的软件故障怎么破?mfc140u.dll文件损坏的解决办法全知道!

当这个重要的 DLL 文件丢失或损坏时&#xff0c;用户可能会收到一个错误消息&#xff0c;提示 “程序无法启动&#xff0c;因为计算机中缺失 mfc140u.dll” 或类似的提示。这种情况不仅令人困扰&#xff0c;而且可以干扰正常的工作流程&#xff0c;尤其是当您依赖特定软件完成日…

KMP算法的实现

这是C算法基础-数据结构专栏的第二十六篇文章&#xff0c;专栏详情请见此处。 引入 KMP算法是一种可以快速查找某一字符串在一个文本中的所有出现的算法。 下面我们就来讲KMP算法的实现。 定义 Knuth–Morris–Pratt 算法&#xff0c;简称KMP算法&#xff0c;是由Knuth、Pratt…

基于VUE的教师教学质量网络评测评价统计分析系统

1、 选题的背景与意义 21世纪是信息化的世纪&#xff0c;我们的一些生活习惯因为计算机而发生改变&#xff0c;我们也逐渐习惯于通过计算机的各项功能来获得便利。这其中所带来的挑战和机遇为各行业的发展指明了一个方向。教学质量评测是一项琐碎而又十分细致的工作&#xf…

【永磁同步电机(PMSM)】 3. 基于Matlab 的仿真与控制

【永磁同步电机&#xff08;PMSM&#xff09;】 3. 基于Matlab 的仿真与控制 1. 电机的仿真与控制2. BLDC 电机与 PMSM 电机3. BLDC 的方波控制4. 磁场定向控制&#xff08;FOC&#xff09;5. 空间矢量调制 (SVM)6. PMSM 模型的频率响应估计 电机仿真和控制是能源生产、汽车、航…

Java对象一口气讲完!φ(* ̄0 ̄)

Java Object类 Java面向对象设计 - Java Object类 Java在java.lang包中有一个Object类。 所有Java类都直接或间接扩展Object类。 所有Java类都是Object类的子类Object类是所有类的超类。 Object类本身没有超类。 Object类的引用变量可以保存任何类的对象的引用。 以下代…

OSPFv3协议几类LSA介绍

OSPFv3协议介绍 与OSPFv2相比&#xff0c;OSPFv3在工作机制上与OSPFv2基本相同&#xff1b;但为了支持IPv6地址格式&#xff0c;OSPFv3对OSPFv2做了一些改动。OSPFv3基于OSPFv2基本原理增强&#xff0c;是一个独立的路由协议&#xff08;v3不兼容v2&#xff09;协议号仍然是89…

竹云赋能“中国·贵州”全省统一移动应用平台建设,打造政务服务“新引擎”

近日&#xff0c;2024中国国际大数据产业博览会在贵州贵阳圆满落幕。会上&#xff0c;由贵州省政府办公厅牵头建设的“中国贵州”全省统一移动应用平台正式发布&#xff0c;聚焦民生办事、政务公开、政民互动、扁平高效、数据赋能五大模块&#xff0c;旨在打造公平普惠的服务平…

Hbase操作手册

一&#xff1a;Hbase 创建数据库表 1.进入hbase shell 2.创建数据库表的命令&#xff1a;create 表名, 列族名1,列族名2,列族名N 3.如果想查看所有数据库表&#xff0c;可以使用list 命令&#xff1a; 4.可以看到&#xff0c;刚创建的数据库表user 已经在数据库表的列表中&…

单元格左边放文字右边放按钮

1 . 代码 /* 添加到你的CSS文件中 */ .switch-td { display: flex; justify-content: space-between; /* 两端对齐&#xff0c;这样文本和开关会分别靠左和靠右 */ align-items: left; /* 垂直居中 */ } /* 如果你不想改变其他<td>的默认左对齐&#xff0c;…