RabbitMQ是什么?RabbitMQ简介

一:技术背景

        假如我们有一个支付服务,支付服务的业务逻辑是:首先支付扣减余额,更新支付单状态,更新订单状态,发短信,给这个用户增加积分。在这个场景下,如果我们使用同步调用通信,那么调用支付这个接口是不是得等到给这个用户增加积分的业务执行完后,这个调用链条依次返回到支付接口时,支付接口的业务才算完成,可见这个情况明显很不合理。首先支付接口等待时间太长了,我支付业务的核心逻辑做完后,还要兼顾着非核心的业务,那如果有一个非核心业务down掉了,或者因为网络问题或者硬件问题或者代码问题使这个接口不能响应或者响应时间很长,那么下面的业务也会做不了,这条线程就出了大问题。堆积在tomcat内,持久占用tomcat的资源,再有用户下单,还是一样的占用tomcat的资源,现在一个服务失败了,业务中远程调用的接口及其的多,依次类推,就会造成服务级联失败,雪崩现象产生。假如我们来做限流,或者熔断,那肯定不行啊,这可是支付业务,非常重要,如果熔断,那么业务不可以,就可能造成数据不一致性等问题,这是非常严重的。

        所以同步通信在有些场景下使不适用的,我们应该把业务的核心逻辑来做同步通信,(也就是,我们必须要拿到业务处理完成返回的响应,从而必须做下一步的操作的这种场景下),在同步通信下,可做限流熔断等服务的保护操作;而非核心业务我们就用异步通信来做。非常常见的秒杀场景,它是一种流量激增的业务,激增的这段时间,qps及其高,如果采用同步调用,后果可想而知,轻则响应时间非常慢,用户体验差,重则服务器直接崩溃。异步调用就相当于把一个很长的链条拆开,留其重要的部分做同步调用,不太重要的做异步调用,就类似于把山峰削开,用来填山谷,让它变得平整。

二:安装部署

官网:https://www.rabbitmq.com/tutorials/tutorial-one-java

基于docker安装(前提你已经在官网下载好了rabbitmq)

2-1:加载rabbitmq.tar

docker load -i mq.tar

2-2: 运行rabbitmq

docker run \-e RABBITMQ_DEFAULT_USER=rabbitmq \-e RABBITMQ_DEFAULT_PASS=rabbitmq \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network zzb-network\-d \rabbitmq:3.8-management

三:初识RabbltMQ

整体架构:

  • virtual-host:虚拟主机,起到数据隔离的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

注意:交换机只能路由消息,无法存储消息;交换机只会路由消息给其绑定的队列,因此队列必须与交换机绑定

2-3 Java简单示例 (基于AMQP协议)

引入amqp-starter依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置MQ地址,在发送者服务和接收者服务的application.yml中添加配置(最好配置在nacos中,方便管理):

spring:rabbitmq:host: 192.168.203.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hellomq # 虚拟主机username: zzb # 用户名password: 123456 # 密码

发送者:

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send(){//队列名称String queue = "simple.queue";//消息String message = "hello RabbitMQ";//发送消息rabbitTemplate.convertAndSend(queue, message);
}

接受者

// 把接受消息的类的对象放进ioc容器
@Component
@Slf4j
public class Listener {@RabbitListener(queues = "simple.queue")public void receive(String message) {log.info("接收到消息:{}", message);}
}

三:SpringAMQP

3-1:Work queues,任务模型:

        Work queues,就是多个消费者监听同一个队列。

        在实际开发中,并不是在一个服务里编写多个消费者监听同一个队列,而是部署多个实例,形成集群,监听那个队列。

        1:Work queues的特点

        队列中的一条消息只会被一个消费者监听执行,如果有其他的消费者,是拿不到这个消息的;基于这个特点,如果有其他消费者,其他消费者就会监听执行这个队列后面的消息,所以对于一个队列中所有的消息,多消费者是可以更快的处理这些消息,所以加消费者是在高并发场景下一个不错的处理方案。

        我们要知道,部署多个实例,每个机器的性能可能有好有坏,如果有台机器性能比较差一些,那它处理消息的速度肯定比不过其他的消费者,那队列如何分配消息给消费者呢?答案默认是轮询,每个消费者处理一个消息。显然这个方案和实际情况背道而驰,所以我们需要配置,让每一个消费者只能处理一条消息,等当前消息处理完毕后,才能获取下一条消息。这样完美解决了消息积压问题。在application.yaml中配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

        3-2:交换机

        交换机的作用主要是接受发送者发送的消息,并将消息路由到预期绑定的队列,常见的交换机类型有三种:

3-2-1:Fauout:广播rabbitTemplate.convertAndSend(x,x,x)三个参数

每个监听不同队列的消费者都可以接受到来自同一个交换机转发来的消息。

3-2-2:Direct:定向

Direct Exchange会将接受到的消息根据规则路由到指定的queue,因此称为定向路由

  • 每一个queue都会erexchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
3-2-3:Topic:话题

TopicExchange也是基于RoutingKey做消息路由,然是routingKey通常是多个单词的组合,并且以   .  分割,比如                 user.#                  item.#                     #.cn 

BindingKey可以使用通配符:#代表0个或者多个单词,*代表一个单词

        3-3 声明队列交换机

1. 基于Bean声明队列交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队别,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

fanout示例:

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("a.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

direct示例:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("a.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

可以看到,声明direct交换机与队列绑定关系的代码显得非常臃肿,所以一般我们使用注解来声明

//在监听消息时,指定绑定关系
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "a.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "a.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

3-4 消息转换器

发送消息时,默认的把消息转换是jdk来做的,把对象序列化为字节进行传输,这种方式不推荐

建议采用别的消息转换器,如json序列化

步骤一:publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

步骤二:配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/149666.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

vscode将c++项目打包exe进行反汇编练习

vscode将c&c项目打包成控制台exe全过程&#xff0c;进行c反汇编练习&#xff0c;反汇编只有不断的练习才能巩固、积累经验。 一、打包exe 创建新项目&#xff0c;选择c&#xff0c;Windows桌面向导 直接点击创建 直接点确定 直接点击运行即可&#xff0c;可以看到我的exe…

15 跨组件通信依赖注入provide和inject

Provide / Inject 通常&#xff0c;当我们需要从父组件向子组件传递数据时&#xff0c;我们使用 props。想象一下这样的结构&#xff1a;有一些深度嵌套的组件&#xff0c;而深层的子组件只需要父组件的部分内容。在这种情况下&#xff0c;如果仍然将 prop 沿着组件链逐级传递…

ROS2 技术及分布式介绍

PC端开发环境搭建 WSL环境搭建 https://www.guyuehome.com/46574 In Windows 11 builds that support wslg: 1. Open up powershell and enter wsl --install ROS2系统安装 方法一 • 设置编码 Bash $ sudo apt update && sudo apt install loca…

EffcientNetV2(2021):更快、更强、效率更高的EffcientNet!

EffcientNetV2: Smaller Models and Faster Training EfficientNetV2&#xff1a;更小的模型和更快的训练 论文地址&#xff1a; https://arxiv.org/abs/2104.00298 本文介绍了 EfficientNetV2&#xff0c;这是一个新的卷积网络系列&#xff0c;与以前的模型相比&#xff0c;它…

HDFS_API文件和文件夹

代码&#xff1a; Beforepublic void init() throws URISyntaxException, IOException {URI uri new URI("hdfs://master:9000");// 创建一个配置文件Configuration entries new Configuration();// 获取到了客户端对象 // entries.set("dfs.replicat…

【嵌入式linux开发】SPI设备文件读取ICM-40609D传感器

【嵌入式linux开发】SPI设备文件操作ICM-40609D传感器 前言一、数据手册浅读二、linux系统下使用SPI设备文件操作ICM-40609-D三、ros1发布imu数据3.1、创建ros1工作空间3.2、数据发布节点代码 前言 在本篇博客中&#xff0c;将从ICM-40609-D传感器的数据手册出发&#xff0c;简…

公安局软件管理平台建设方案和必要性,论文-3-———未来之窗行业应用跨平台架构

三、平台功能设计 四、技术架构 1. 前端界面 - 采用简洁、易用的设计风格&#xff0c;适应不同终端设备的访问。 - 基于 HTML5、CSS3 和 JavaScript 构建。 2. 后端服务 - 选择主流的 Web 开发框架&#xff0c;如 未来之窗跨平台架构&#xff0c;VUE。 - 数据库…

Github Webhook触发Jenkins自动构建

1.功能说明 Github Webhook可以触发Jenkins自动构建&#xff0c;通过配置Github Webhook&#xff0c;每次代码变更之后&#xff08;例如push操作&#xff09;&#xff0c;Webhook会自动通知Jenkins服务器&#xff0c;Jenkins会自动执行预定义的构建任务&#xff08;如Jenkins …

Redis-认识与应用(从ChatGpt的角度看Redis)

问题&#xff1a;您好&#xff0c;我的项目是在线教育平台&#xff0c;用springboot3搭建&#xff0c;我现在想学redis&#xff0c;请问redis能在我的项目中有什么应用场景呢 问题&#xff1a;就是我项目能应用上具体什么场景&#xff0c;请给我例子&#xff0c;并给我具体代码…

springboot整合openfeign

文章目录 准备一、引入必要依赖二、写一个feign client并暴露到注册中心2.1 client2.2 开启Feign客户端功能 三、别的服务引入IProductClient并调用方法3.1 建一个order-service&#xff0c;引入IProductClient所在模块3.2 注入IProductClient&#xff0c;并调用方法 四、启动服…

JAVA基本简介(期末)

1、JDK JRE JVM &#xff08;1&#xff09;JDK JAVA标准开发包&#xff0c;提供了编译、运行JAVA程序所需的各种工具和资源&#xff0c;包括JAVA编译器、JAVA运行时的环境&#xff0c;及常用的JAVA类库等 &#xff08;2&#xff09;JRE JAVA运行环境&#xff0c;用于解释执行JA…

JW01二氧化碳传感器(串行通信 STM32)

目录 一、介绍 二、传感器原理 1.工作原理介绍 2.串口数据流格式 三、程序设计 main.c文件 usart3.h文件 usart3.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 JW01-CO2检测模块是一种用于检测空气中二氧化碳浓度的传感器模块。它可以广泛应用于室内空气质量…

美畅物联丨GB/T 28181系列之TCP/UDP被动模式和TCP主动模式

GB/T 28181《安全防范视频监控联网系统信息传输、交换、控制技术要求》作为我国安防领域的重要标准&#xff0c;为视频监控系统的建设提供了全面的技术指导和规范。该标准详细规定了视频监控系统的信息传输、交换和控制技术要求&#xff0c;在视频流传输方面&#xff0c;GB/T 2…

【Day20240924】05git 两人协作 冲突

git 两人协作 冲突 命令行解决 两个人修改同一文件时 的冲突可视化解决 两个人修改同一文件时 的冲突参考 命令行解决 两个人修改同一文件时 的冲突 假设kerwin.js是项目的路由文件。tiechui文件夹是组员铁锤的工作目录&#xff1b;test2008文件夹是组长的工作目录。此时&…

Redis 优化

目录 优雅的 key 删除 Bigkey 恰当的数据类型 批处理优化 Pipeline 集群下的批处理 服务端优化 持久化配置 慢查询 命令以及安全配置 内存安全和配置 内存缓冲区配置 集群最佳实践 集群带宽问题 集群还是主从 优雅的 key 删除 Bigkey Bigkey 内存占用较多&…

ubuntu 安裝 Poetry 示例

ubuntu 安裝 Poetry 示例 一、前言 poetry 是一个命令行工具&#xff0c;安装之后就可以使用 poetry 指令。可以将其安装全局环境或者是虚拟环境&#xff0c;我推荐安装在全局环境&#xff0c;这样在后面使用时不需要单独激活虚拟环境。 &#xff08;1&#xff09;安装 Poet…

【Linux】组管理权限管理任务调度【更详细,带实操】

Linux全套讲解系列&#xff0c;参考视频-B站韩顺平&#xff0c;本文的讲解更为详细 一、组管理 1、linux组的介绍 linux对文件的管理机制 linux中的文件有三个概念&#xff1a; 1、文件所有者是谁&#xff0c;谁创建了文件&#xff0c;当然文件所有者也可以修改2、文件属于…

基于内容的推荐算法

算法原理概述 首先推荐算法的作用是给用户推荐其可能喜欢的物品。此算法所依赖的数据大概分为两部分&#xff1a;&#xff08;1&#xff09;用户过去喜欢的物品&#xff1b;&#xff08;2&#xff09;每个物品的标签。 算法步骤 &#xff08;1&#xff09;根据用户过去喜欢的…

IIS HTTPS 网页可能暂时无法连接,或者它已永久性地移动到了新网址 ERR_HTTP2_INADEQUATE_TRANSPORT_SECURITY

问题描述&#xff1a;站点突然无法访问&#xff0c;经排查发现&#xff0c;HTTP协议的网址可以继续访问&#xff0c;HTTPS的网址不可以访问。 问题分析&#xff1a;在Windows更新和滚动之后&#xff0c;由于 HTTP/2&#xff0c;当站点启动了 HTTP/2 连接&#xff0c;会出现一个…

手机如何快速切换ip?探索多种方法

在当今这个信息高速流通的时代&#xff0c;IP地址作为网络世界中的独特标识&#xff0c;其重要性日益凸显。对于手机用户而言&#xff0c;无论是出于保护个人隐私、访问地域限制内容&#xff0c;还是进行网络测试等需求&#xff0c;快速切换IP地址都显得尤为关键。本文将深入探…