MQ 架构设计原理与消息中间件详解(二)

### 一、RabbitMQ 如何保证消息不丢失?

消息不丢失可以分为三个方面进行保障:**生产者投递消息**、**消费者消费消息** 和 **MQ 服务器持久化**。

#### 1.1 生产者角色的消息确认机制

RabbitMQ 提供了两种方式来确保生产者投递的消息能够被成功接收:
1. **消息确认机制(Confirms)**:
   - 生产者在投递消息后,RabbitMQ 服务器会返回确认消息。生产者可以选择同步或异步接收确认消息,确保消息已被 MQ 服务器成功接收。
   
   例如,生产者投递消息并等待 RabbitMQ 服务器的确认:
 

   channel.confirmSelect();  // 开启确认机制channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());boolean result = channel.waitForConfirms();  // 等待确认if (result) {System.out.println("消息投递成功");}

2. **事务消息**:
   - 生产者可以通过事务来确保消息的投递可靠性。在事务模式下,RabbitMQ 保证消息要么全部成功投递,要么不投递。

   代码示例:
 

   channel.txSelect();  // 开启事务channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());channel.txCommit();  // 提交事务

#### 1.2 消费者角色的消息确认机制

在 RabbitMQ 中,消费者需要手动确认消息已经成功消费,只有当 RabbitMQ 接收到消费者的确认(ACK)后,消息才会从队列中移除。如果没有接收到确认,RabbitMQ 会重新投递该消息,确保不会丢失。

手动确认消息示例:
 

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);  // 关闭自动应答
channel.basicAck(envelope.getDeliveryTag(), false);  // 手动确认消息已处理完毕

#### 1.3 RabbitMQ 服务器端的消息持久化

RabbitMQ 默认会将队列中的消息持久化到硬盘中,以确保系统宕机时消息不会丢失。生产者在创建队列时可以设置 `durable=true` 来确保消息持久化。

channel.queueDeclare("durable_queue", true, false, false, null);  // durable 设置为 true

持久化机制可以有效防止服务器端故障导致消息丢失。

---

### 二、如何通过代码实现消息可靠性?

下面我们通过代码示例展示如何实现生产者与消费者的消息确认机制和持久化设置。

#### 2.1 生产者代码示例

生产者通过开启 **Confirm机制** 来确保消息被成功投递到 RabbitMQ 服务器端。生产者在发送消息后等待服务器的确认,若消息投递失败,生产者可以重新发送该消息。```java

public class Producer {private static final String QUEUE_NAME = "reliable_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 开启消息确认机制channel.confirmSelect();// 发送消息String message = "Reliable Message!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());// 等待服务器确认消息是否投递成功boolean result = channel.waitForConfirms();if (result) {System.out.println("消息投递成功");} else {System.out.println("消息投递失败");}channel.close();connection.close();}
}


```

#### 2.2 消费者代码示例

消费者使用手动消息确认机制,确保每条消息在成功处理后才从队列中移除,避免消息丢失。

public class Consumer {private static final String QUEUE_NAME = "reliable_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 设置手动确认模式channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费消息:" + message);// 手动确认消息channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

---

### 三、RabbitMQ 五种消息模式

RabbitMQ 提供了多种消息模式,用户可以根据业务场景选择合适的消息分发方式。

#### 3.1 工作队列模式


- **工作队列模式** 主要用于多个消费者并行处理任务的场景。通过设置 `basicQos`,可以控制每次只发送一条消息给消费者,避免消息不均匀的分发。
 

channel.basicQos(1);

#### 3.2 交换机类型


- **Direct exchange(直连交换机)**:根据完全匹配的路由键将消息分发到指定队列。
- **Fanout exchange(扇型交换机)**:将消息广播到所有绑定到交换机的队列中。
- **Topic exchange(主题交换机)**:根据模式匹配的路由键进行消息分发。
- **Headers exchange(头交换机)**:通过消息头部的键值对来路由消息。

#### 3.3 发布订阅模式(Fanout Exchange)

在发布订阅模式中,生产者将消息发布到交换机中,交换机会将消息广播到所有绑定的队列上,多个消费者可以并行处理同一消息。生产者代码示例:

public class ProducerFanout {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 声明扇型交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 发送消息String message = "Fanout Message!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());channel.close();connection.close();}
}

### 四、Spring Boot 整合 RabbitMQ

在 Spring Boot 中,整合 RabbitMQ 变得非常简单,通过少量的配置和注解即可完成消息队列的集成与使用。下面介绍如何使用 Spring Boot 来整合 RabbitMQ,并提供完整的代码示例。

#### 4.1 Maven 依赖

首先,在 `pom.xml` 文件中添加必要的依赖:

<dependencies><!-- Spring Boot AMQP (RabbitMQ) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- JSON 解析工具 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Lombok (可选,简化代码) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

#### 4.2 配置 RabbitMQ

创建一个 `RabbitMQConfig` 配置类,定义交换机、队列及它们之间的绑定关系。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇型交换机名称private static final String EXCHANGE_NAME = "fanout_exchange";// 短信队列private static final String SMS_QUEUE = "fanout_sms_queue";// 邮件队列private static final String EMAIL_QUEUE = "fanout_email_queue";/*** 定义短信队列*/@Beanpublic Queue smsQueue() {return new Queue(SMS_QUEUE);}/*** 定义邮件队列*/@Beanpublic Queue emailQueue() {return new Queue(EMAIL_QUEUE);}/*** 定义扇型交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(EXCHANGE_NAME);}/*** 绑定短信队列到扇型交换机*/@Beanpublic Binding bindingSmsQueue(Queue smsQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(smsQueue).to(fanoutExchange);}/*** 绑定邮件队列到扇型交换机*/@Beanpublic Binding bindingEmailQueue(Queue emailQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(emailQueue).to(fanoutExchange);}
}

#### 4.3 配置文件 `application.yml`

接下来,需要在 `application.yml` 文件中配置 RabbitMQ 连接信息:

spring:rabbitmq:host: 127.0.0.1        # RabbitMQ 服务器地址port: 5672             # RabbitMQ 端口号username: guest        # RabbitMQ 用户名password: guest        # RabbitMQ 密码virtual-host: /        # RabbitMQ 虚拟主机

#### 4.4 生产者代码

生产者通过 `AmqpTemplate` 将消息发送到指定的交换机:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 发送消息到 RabbitMQ* @param msg 消息内容* @return 操作结果*/@RequestMapping("/sendMsg")public String sendMsg(String msg) {// 将消息发送到指定的交换机amqpTemplate.convertAndSend("fanout_exchange", "", msg);return "消息发送成功";}
}

#### 4.5 消费者代码

短信消费者和邮件消费者分别监听各自的队列,并处理接收到的消息。##### 短信消费者:
 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RabbitListener(queues = "fanout_sms_queue")  // 监听短信队列
public class FanoutSmsConsumer {@RabbitHandlerpublic void process(String msg) {log.info(">> 短信消费者接收到消息: {} <<", msg);}
}

##### 邮件消费者:
 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RabbitListener(queues = "fanout_email_queue")  // 监听邮件队列
public class FanoutEmailConsumer {@RabbitHandlerpublic void process(String msg) {log.info(">> 邮件消费者接收到消息: {} <<", msg);}
}

#### 4.6 测试消息发送

启动 Spring Boot 应用后,可以通过访问浏览器或 Postman 调用生产者的 `/sendMsg` 接口,发送消息到 RabbitMQ。

- 示例请求:

http://localhost:8080/sendMsg?msg=Hello RabbitMQ!

生产者会将消息广播到所有绑定到交换机的队列,短信消费者和邮件消费者将分别接收到并处理消息。

---

### 五、总结

通过本文,我们讲解了如何使用 Spring Boot 集成 RabbitMQ,详细展示了如何配置交换机、队列和生产者/消费者,并通过扇型交换机的发布/订阅模式实现了消息广播。RabbitMQ 与 Spring Boot 的集成非常简便且高效,能够帮助我们轻松实现消息队列的功能,确保消息的可靠传递。

希望这篇文章能帮助你更好地理解并掌握 RabbitMQ 的消息可靠性机制,并能成功将这些技术应用到你的项目中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1555549.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Markdown实用语法汇总

说明&#xff1a; 本来只展示本人常用的、markdown特有优势的一些语法。表格输入markdown的弱项&#xff0c;不作介绍&#xff0c;借助软件创建即可。引用图片、音频、视频等&#xff0c;虽然很方便&#xff0c;但是内容集成度不高&#xff0c;需要上传发布的时候很不方便&…

学习C语言(23)

整理今天的学习内容 1.文件的概念 使用文件是为了将数据永久化地保存 按照文件功能&#xff0c;在程序设计中一般把文件分成两类&#xff1a; 每个文件都有一个唯一的文字标识&#xff0c;文字标识常被称为文件名&#xff0c;文件名包含文件路径&#xff0c;文件名主干和文件…

Apollo9.0 Planning2.0决策规划算法代码详细解析 (4): PlanningComponent::Proc()

&#x1f31f; 面向自动驾驶规划算法工程师的专属指南 &#x1f31f; 欢迎来到《Apollo9.0 Planning2.0决策规划算法代码详细解析》专栏&#xff01;本专栏专为自动驾驶规划算法工程师量身打造&#xff0c;旨在通过深入剖析Apollo9.0开源自动驾驶软件栈中的Planning2.0模块&am…

vAPI靶场

前言 自行去搭建vAPI靶场&#xff0c;配合postman使用 vapi1 创建用户 第一个用户 {"username": "shi","name": "shi1","course": "nihao","id": 10 } 第二个用户 {"username": "hui…

论文理解【LLM-CV】—— 【MAE】Masked Autoencoders Are Scalable Vision Learners

文章链接&#xff1a;Masked Autoencoders Are Scalable Vision Learners代码&#xff1a;GitHub - facebookresearch/mae发表&#xff1a;CVPR 2022领域&#xff1a;LLM CV一句话总结&#xff1a;本文提出的 MAE 是一种将 Transformer 模型用作 CV backbone 的方法&#xff0c…

制作一个流水灯,控制发光二极管由上至下再由下至上反复循环点亮显示,每次点亮一个发光二级管(Proteus 与Keil uVision联合仿真)

一、代码编写 &#xff08;1&#xff09;编写程序来控制发光二极管由上至下的反复循环流水点亮&#xff0c;每次点亮一个发光二极管。 #define uchar unsigned char // 定义uchar为unsigned char类型uchar tab[] {0xfe, 0xfd, 0xfb, 0xf7, 0xef, 0xdf, 0xbf, 0x7f, 0x7f, 0x…

一个不错的 SQL 编码风格的指南

前言 SQL语句的编写对于我们后端开发者而言是一个必备的技巧&#xff0c;在日常工作中&#xff0c;SQL语言编写的质量不仅仅会影响到团队的合作效率与项目的可维护性&#xff0c;还直接关系到数据库的性能优化与数据安全。今天大姚给大家分享一个不错的 SQL 编码风格的指南&am…

【Qt】控件概述(4)—— 输出类控件

输出类控件 1. QLineEdit——单行输入框2. QTextEdit——多行输入框3. QComboBox——下拉框4. QSpinBox——微调框5. QDateEdit && QTimeEdit && QDateTimeEdit6 QDial——旋钮7. QSlider——滑动条 1. QLineEdit——单行输入框 QLineEdit是一个单行的输入框&…

定时器实验(Proteus 与Keil uVision联合仿真)

一、 &#xff08;1&#xff09;设置TMOD寄存器 T0工作在方式1&#xff0c;应使TMOD寄存器的M1、M001&#xff1b;应设置C/T*0&#xff0c;为定时器模式&#xff1b;对T0的运行控制仅由TR0来控制&#xff0c;应使相应的GATE位为0。定时器T1不使用&#xff0c;各相关位均设为…

执行路径带空格的服务漏洞

原理 当系统管理员配置Windows服务时&#xff0c;必须指定要执行的命令&#xff0c;或者运行可执行文件的路径。 当Windows服务运行时&#xff0c;会发生以下两种情况之一。 1、如果给出了可执行文件&#xff0c;并且引用了完整路径&#xff0c;则系统会按字面解释它并执行 …

Listen1 0.8.2| 免费无广告,整合多平台音乐,界面简洁,操作便捷。

Listen 1 是一款开源且免费的跨平台音乐播放器&#xff0c;它能够整合多个主流音乐平台的资源&#xff0c;让你在一个应用中就能听到来自不同平台的歌曲。无论你是网易云音乐、QQ音乐还是虾米音乐的用户&#xff0c;你都可以通过 Listen 1 来享受无缝的音乐体验。它支持网易云音…

【每天学个新注解】Day 16 Lombok注解简解(十五)—@FieldNameConstants

FieldNameConstants 根据属性名生成常量类的常量。 1、如何使用 加在需要根据属性名生成常量的属性上。 2、代码示例 例&#xff1a; FieldNameConstants public class Test {private String iAmAField;private int andSoAmI;FieldNameConstants.Exclude private int asA…

【机器学习(十一)】糖尿病数据集分类预测案例分析—XGBoost分类算法—Sentosa_DSML社区版

文章目录 一、XGBoost算法二、Python代码和Sentosa_DSML社区版算法实现对比(一) 数据读入和统计分析(二)数据预处理(三)模型训练与评估(四)模型可视化 三、总结 一、XGBoost算法 关于集成学习中的XGBoost算法原理&#xff0c;已经进行了介绍与总结&#xff0c;相关内容可参考【…

openEuler 24.03 (LTS) 部署 K8s(v1.31.1) 高可用集群(Kubespray Ansible 方式)

写在前面 实验需要一个 CNI 为 flannel 的 K8s 集群之前有一个 calico 的版本有些旧了,所以国庆部署了一个v1.31.1 版本 3 * master 5 * work时间关系直接用的工具 kubespray博文内容为部署过程以及一些躺坑分享需要科学上网理解不足小伙伴帮忙指正 &#x1f603;,生活加油 99…

第二百六十九节 JPA教程 - JPA查询OrderBy两个属性示例

JPA教程 - JPA查询OrderBy两个属性示例 以下代码显示如何按两个属性排序&#xff0c;一个升序&#xff0c;另一个降序。 List l em.createQuery("SELECT e FROM Professor e " "JOIN e.department d ORDER BY d.name, e.name DESC").getResultList();例子…

数据结构实验二 顺序表的应用

数据结构实验二 顺序表的应用 一、实验目的 1、掌握建立顺序表的基本方法。 2、掌握顺序表的插入、删除算法的思想和实现&#xff0c;并能灵活运用 二、实验内容 用顺序表实现病历信息的管理与查询功能。具体要求如下: 1.利用教材中定义顺序表类型存储病人病历信息(病历号…

什么是高斯积分,以及如何求它的值(error function)

文章目录 什么是高斯积分高斯积分与误差函数的关系求值证明过程技巧1 两个相互独立的积分的乘积转为双重积分技巧2 富比尼定理技巧3 坐标系转换总结 什么是高斯积分 高斯积分的公式如下&#xff1a; 高斯积分与误差函数的关系 参考wiki&#xff0c;误差函数的定义如下&…

SQL自用小结

推荐一下这个知识点总结 《数据库系统概论》第五版 学习笔记总目录 1. SQL概述 SQL&#xff08;Structured Query Language&#xff0c;结构化查询语言&#xff09;是一种用于定义、查询、更新和控制关系数据库的标准化语言。 它包含了数据定义语言&#xff08;DDL&#xff0…

Unity MVC框架演示 1-1 理论分析

本文仅作学习笔记分享与交流&#xff0c;不做任何商业用途&#xff0c;该课程资源来源于唐老狮 1.一般的图解MVC 什么是MVC我就不说了&#xff0c;老生常谈&#xff0c;网上有大量的介绍&#xff0c;想看看这三层都起到什么职责&#xff1f;那就直接上图吧 2.我举一个栗子 我有…

“迷雾深渊”炮击图设计

python尝试C题目&#xff0c;ai查错审码还写“代码解读”和学习总结。 (笔记模板由python脚本于2024年09月29日 10:51:58创建&#xff0c;本篇笔记适合喜欢python&#xff0c;鼓捣算法的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/…