消息队列-RabbitMQ(二)

接上文《消息队列-RabbitMQ(一)》

在这里插入图片描述

@Configuration
public class RabbitMqConfig {// 消息的消费方json数据的反序列化@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}// 定义使用json的方式转换数据@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate amqpTemplate = new RabbitTemplate();amqpTemplate.setConnectionFactory(connectionFactory);amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return amqpTemplate;}}
@Service
public class TestConsumer {@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})public void simpleModel(User user) {System.out.println("接收消息message=" + user);}// value=@Queue 创建临时队列// exchange创建交换机@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage1(User user) {System.out.println(String.format("消费者 【one】: %s", user));}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage2(User user) {System.out.println(String.format("消费者 【two】: %s", user));}}
@RestController
@RequestMapping("/rabbitMqReq")
public class RabbitMqController {@Autowiredprivate TestProducer testProducer;@PostMapping("/simple")public void simple(@RequestBody User user) {testProducer.simpleMessageSend();}@PostMapping("/fanoutMessageSend")public void fanoutMessageSend(@RequestBody User user) {testProducer.fanoutMessageSend();}
}
@Data
public class User {private String userId; //用户编号private String userName; //用户姓名
}
@Service
public class TestProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void simpleMessageSend() {System.out.println("simpleMessageSend");User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("simpleQueue", user);}// fanout模型public void fanoutMessageSend() {for (int i = 0; i < 5; i++) {User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("fanout-ex", "", user);}try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class App
{public static void main(String[] args){SpringApplication.run(App.class, args);System.out.println("(♥◠‿◠)ノ゙  启动成功   ლ(´ڡ`ლ)゙ ");}
}
# 开发环境配置
server:# 服务器的HTTP端口,默认为8080port: 8899servlet:# 应用的访问路径context-path: /tomcat:# tomcat的URI编码uri-encoding: UTF-8# tomcat最大线程数,默认为200max-threads: 800# Tomcat启动初始化的线程数,默认值25min-spare-threads: 30
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQDemo</artifactId><version>1.0-SNAPSHOT</version><name>RabbitMQDemo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath /></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><fastjson.version>1.2.70</fastjson.version></properties><dependencies><!-- SpringBoot 核心包 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- AMQP客户端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency></dependencies><repositories><!--阿里云镜像仓库--><repository><id>public</id><name>aliyun nexus</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases></repository><!--oracle驱动没有发布到中央仓库,只能从此仓库下载--><repository><id>jeecg</id><name>jeecg Repository</name><url>http://maven.jeewx.com/nexus/content/repositories/jeecg</url><snapshots><enabled>false</enabled></snapshots></repository></repositories></project>

什么是事务?
数据库事务。原子性、一致性、隔离性、持久性。

本地事务?
整个服务操作只能涉及一个单一的数据库资源。

分布式事务?
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是为了保证不同数据库的数据一致性。

分布式事务应用架构?
微服务架构的分布式应用环境下,越来越多的应用要求对多个数据库资源,多个服务的访问都能纳入到同一个事务当中,分布式事务应运而生。

分布式事务有几种解决方案: 两阶段提交/XA MQ
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者™确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

RabbitMQ与springboot整合
1、添加依赖 spring-boot-starter-amqp
2、配置RabbitConfig,包含RabbitListenerContainerFactory、RabbitTemplate

简单模型
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void simpleMessageSend() {rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
}

}

@RabbitListener(queuesToDeclare = {@Queue(“simpleQueue”)})
public void simpleModel(User user) {
log.info(“message: {}”, user);
}

发布订阅模型
// fanout模型
@Test
public void fanoutMessageSend() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(“fanout-ex”, “”, new User(i, “张三”));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

直连模式(direct)
@Test
public void directMessageSend() {
//rabbitTemplate.convertAndSend(“direct-ex”, “success”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“direct-ex”, “error”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”, “success”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

topic模型
@Test
public void topicMessageSend() {
//rabbitTemplate.convertAndSend(“topic-ex”, “company”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“topic-ex”, “company.java”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.java.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.html.*”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format(“消费者 【three】: %s”, user));
}

消息的手动确认
spring:
rabbitmq:
listener:
simple:
# 提交方式为手动
acknowledge-mode: MANUAL

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

RabbitMQ如何做到消息不丢失?

1.持久化

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

2.确认机制

消费者通过basic.ack命令向RabbitMQ服务器确认已经消费了消息。如果消费者处理消息时发生错误或宕机,RabbitMQ会重新将消息发送给其他消费者。RabbitMQ在接收到消费者确认消息前会将消息保存在内存中,在确认后才会删除消息。

3.发布者确认

RabbitMQ支持发布者确认机制,即发布者在将消息发送到队列后,等待RabbitMQ服务器的确认消息。成功保存到队列的消息会返回确认消息给发布者,如果无法保存则返回Nack(Negative Acknowledgement)消息。通过发布者确认机制,可以确保消息成功发送到RabbitMQ服务器。

4.备份队列

RabbitMQ支持备份队列(Alternate Exchange)机制,即在消息发送到队列之前,先将消息发送到备份队列。如果主队列无法接收消息,RabbitMQ会将消息发送到备份队列中。备份队列通常是一个交换机,在创建队列时可以通过x-dead-letter-exchange属性指定备份队列。

三、注意事项 在使用RabbitMQ消息确认机制时,需注意以下几点:

1、确认模式的选择: 根据具体业务需求选择合适的确认模式。如果对消息传递的可靠性要求较高,建议使用手动确认模式或批量确认模式。

2、设置消息持久化: 在发布消息时,通过设置消息的持久化属性,可以确保即使RabbitMQ服务器意外关闭或重启,消息仍然能够被保存下来。

3、处理未确认消息: 如果消费者在处理消息时发生异常,导致无法发送确认信号给RabbitMQ,那么这些消息将保持在队列中,并可能被重新投递。需要设定适当的重试策略和错误处理机制来处理这些未确认的消息。

4、监听确认超时: 在使用批量确认模式时,如果长时间没有收到确认信号,需要设置合理的超时时间,并采取相应措施来处理超时的情况。

RabbitMQ消息确认机制详解:确保交付成功

消息确认机制是通过发布者(Producer)和消费者(Consumer)之间的交互来实现的。
当发布者发送消息到RabbitMQ后,会等待确认结果。如果消息成功被消费者接收并处理,消费者会发送一个确认信号给RabbitMQ,告知消息已经处理完成。而RabbitMQ则会根据接收到的确认信号,判断消息是否成功交付。
消息确认机制一般有两种模式:简单模式和批量模式。在简单模式中,每发送一条消息后就等待确认;而在批量模式中,可以一次发送多条消息,然后等待批量确认。无论是简单模式还是批量模式,都可以提高消息传递的可靠性。

https://blog.csdn.net/weixin_52278591/article/details/128841155 RabbitMq 消息确认机制详解

https://blog.csdn.net/u011942456/article/details/128198956 总结RabbitMq消息丢失和消息重复消费问题

https://www.zhihu.com/question/54756562?utm_id=0 rabbitmq是什么,主要用于哪些方面?

https://zhuanlan.zhihu.com/p/583520436?utm_id=0 用了8年MQ!聊聊消息队列的技术选型,哪个最香!

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

https://zhuanlan.zhihu.com/p/590948541 绝对详细的 RabbitMQ入门,看完本系列就够了

MQ的优势和劣势

https://cloud.tencent.com/developer/article/2113514 面试百问:使用MQ的优势、劣势以及问题

rabbitmq basicAck

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/146536.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构-哈希表

系列文章目录 1.集合-Collection-CSDN博客​​​​​​ 2.集合-List集合-CSDN博客 3.集合-ArrayList源码分析(面试)_喜欢吃animal milk的博客-CSDN博客 4.数据结构-哈希表_喜欢吃animal milk的博客-CSDN博客 文章目录 目录 系列文章目录 文章目录 前言 一 . 什么是哈希表&a…

【教学类-38-02】20230724京剧脸谱2.0——竖版(小彩图 大面具)(Python 彩图彩照转素描线描稿)

结果展示 背景需求&#xff1a; 前文体运用Python颜色提取功能&#xff0c;将“京剧脸谱”彩色图片转化为线描图案。 【教学类-38】20230724京剧脸谱1.0——横版“彩图线图等大”&#xff08;Python 彩图彩照转素描线描稿&#xff09;_reasonsummer的博客-CSDN博客 存在问题&…

计算机图形学、贝塞尔曲线及绘制方法、反走样问题的解决(附完整代码)

贝塞尔曲线 1. 本次作业实现的函数及简单描述&#xff08;详细代码见后&#xff09;2. 与本次作业有关的基础知识整理3. 代码描述&#xff08;详细&#xff09;4. 完整代码5. 参考文献 &#xff08;本篇为作者学习计算机图形学时根据作业所撰写的笔记&#xff0c; 如有同课程请…

数字时代古文的传承———云南文化瑰宝“爨文化“(我为家乡发声)

文章目录 前言⭐ "爨"意味着什么&#xff0c;究竟何为"爨文化"&#xff1f;⭐ 爨文化鲜明的特点1.经济生活2.政治生活3.文化艺术 ⭐ 数字时代古文的传承与传播1.藏品数字化2.建立数据库3.传播大众化 前言 爨文化是继古滇文化之后崛起于珠江正源南盘江流域…

根据GWAS数据估算样本量N和使用千人基因组填充maf的参考文献

https://github.com/GenomicSEM/GenomicSEM/wiki/2.1-Calculating-Sum-of-Effective-Sample-Size-and-Preparing-GWAS-Summary-Statistics

【SQL】mysql创建定时任务执行存储过程--20230928

1.先设定时区 https://blog.csdn.net/m0_46629123/article/details/133382375 输入命令show variables like “%time_zone%”;&#xff08;注意分号结尾&#xff09;设置时区&#xff0c;输入 set global time_zone “8:00”; 回车,然后退出重启&#xff08;一定记得重启&am…

【C++】手撕vector(vector的模拟实现)

手撕vector目录&#xff1a; 一、基本实现思路方针 二、vector的构造函数剖析&#xff08;构造歧义拷贝构造&#xff09; 2.1构造函数使用的歧义问题 2.2 vector的拷贝构造和赋值重载&#xff08;赋值重载不是构造哦&#xff0c;为了方便写在一起&#xff09; 三、vector的…

Apollo自动驾驶系统概述(文末参与活动赠送百度周边)

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

算法基础课第二部分

算法基础课 第四讲 数学知识AcWing1381. 阶乘(同余&#xff0c;因式分解) 质数AcWing 866. 质数的判定---试除法AcWing 868. 质数的判定---埃氏筛AcWing867. 分解质因数---试除法AcWing 197. 阶乘---分解质因数---埃式筛 约数AcWing 869. 求约数---试除法AcWing 870. 约数个数-…

云可观测性安全平台——掌动智能

云可观测性安全平台是一个跨架构、跨平台的可观测性方案&#xff0c;实现对云环境下的细粒度数据可视化&#xff0c;满足安全部门对云内部安全领域的多场景诉求&#xff0c;包括敏感数据动态监管、云网攻击回溯分析、攻击横移风险监控、云异常流量分析。本文将介绍掌动智能云可…

腾讯云中使用ubuntu安装属于自己的overleaf

在自己的云服务器上安装overleaf的需求是从写论文开始的&#xff0c;总担心自己的论文放在一个网站上被泄露&#xff0c;所以想要在自己的服务器上安装自己的overleaf&#xff0c;正好手边有一个云服务器&#xff0c;现在开始。 配置腾讯云 因为使用overleaf的优势就是在不同…

SQLAlchemy常用数据类型

目录 SQLAlchemy常用数据类型 代码演示 代码分析 SQLAlchemy常用数据类型 SQLAlchemy 是一个Python的SQL工具库和对象关系映射(ORM)工具&#xff0c;它提供了一种在Python中操作数据库的高效方式。下面是SQLAlchemy中常用的一些数据类型&#xff1a; Integer&#xff1a;整形&…

【数据结构】选择排序 堆排序(二)

目录 一&#xff0c;选择排序 1&#xff0c;基本思想 2&#xff0c; 基本思路 3&#xff0c;思路实现 二&#xff0c;堆排序 1&#xff0c;直接选择排序的特性总结&#xff1a; 2&#xff0c;思路实现 3&#xff0c;源代码 最后祝大家国庆快乐&#xff01; 一&#xf…

数组和切⽚ - Go语言从入门到实战

数组和切⽚ - Go语言从入门到实战 数组的声明 package main import "fmt" func main() { var a [3]int //声明并初始化为默认零值 a[0] 1 fmt.Println("a:", a) // 输出: a: [1 0 0] b : [3]int{1, 2, 3} //声明同时初始化 fmt.Println("b:…

Python3数据科学包系列(二):数据分析实战

Python3中类的高级语法及实战 Python3(基础|高级)语法实战(|多线程|多进程|线程池|进程池技术)|多线程安全问题解决方案 Python3数据科学包系列(一):数据分析实战 Python3数据科学包系列(二):数据分析实战 一&#xff1a;通过read_table函数读取数据创建(DataFrame)数据框 #…

C++指针的使用

文章目录 1.C指针1.1 定义指针1.2 使用指针 2.空指针和野指针2.1 空指针2.2 野指针 3.指针所占空间4.使用const修饰指针4.1 const修饰指针4.2 const修饰常量4.3 const 既修饰指针也修饰常量 5.指针操作数组6.指针做函数参数7.使用指针知识实现冒泡排序 1.C指针 指针其实就是一…

mysql主从同步

原理 概述 将主库的数据变更同步到从库&#xff0c;从而保证主库和从库数据一致 数据备份&#xff0c;失败迁移&#xff0c;读写分离&#xff0c;降低单库读写压力 原理 主数据库设置 docker run --restartalways --name mysql-master -p 3306:3306 -v /home/apps/mysql-…

偏微分方程的人工智能

9 偏微分方程的人工智能 在本节中&#xff0c;我们详细介绍了用于解决偏微分方程&#xff08;Partial Differential Equations&#xff0c;PDEs&#xff09;的人工智能领域的进展。我们在第9.1节中概述了PDE建模的一般形式&#xff0c;并阐述了在这个背景下使用机器学习方法的…

竞赛 大数据房价预测分析与可视

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 大数据房价预测分析与可视 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&#xff0c;适合…

JavaScript中如何确定this的值?如何指定this的值?

&#x1f380;JavaScript中的this 在绝大多数情况下&#xff0c;函数的调用方法决定了this的值&#xff08;运行时绑定&#xff09;。this不能在执行期间被赋值&#xff0c;并且在每次函数呗调用时this的值也可能会不同。 &#x1f37f;如何确定this的值&#xff1a; 在非严格…