MQ基础:RabbitMQ真面目

同步调用方式,指的是发送方直接发送给接收方的形式。而这种方式在某些情况下可能出现问题,比如当业务逻辑变得复杂,同步的方式需要等待上一条指令被接收后才会继续,对性能的影响很大。

异步的方式,增加了一个消息代理的角色,可以管理、暂存、转发消息。这样消息提供者只需要把消息发送给消息代理,就能够继续进行下一个业务。而消息代理只需要把消息发送给消息接受者,就完成了它的任务。

异步的方式,有很多优点:

  • 能够解除消息发送者和消息接受者之间的耦合关系,拓展性强
  • 消息发送者无需等待消息是否发送给消息接受者,性能好
  • 当消息接受者出现故障时,只需要消息代理把消息发送给修复好的消息接受者就代表完成
  • 当消息短时间突然增多时,消息代理可以缓存消息,削峰填谷,减少服务器的压力

MQ(MessageQueue),中文是消息队列,就是存放消息的队列,是异步调用中的Broke。

目录

安装RabbitMQ

Java客户端

Work模型

交换机

Fanout

Direct

Topic

声明队列和交换机

消息转换器


安装RabbitMQ

基于docker安装:

把mq.tar镜像拷贝到linux中,再读取:

docker load -i mq.tar 

通过一串命令就可以把RabbitMQ启动起来:

docker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network demo_network \-d \rabbitmq:3.8-management

RabbitMQ整体架构:

  • publisher:消息发送者
  • consumer:消息消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

在这其中,交换机必须要和队列绑定,才能够把消息路由给队列。

数据隔离

在Rabbitmq的网页中,有Virtual host这一栏。这代表着虚拟主机,不同的虚拟主机,数据是不同的,队列什么的也不通用。

想要创建虚拟主机,只需要在Virtual中新建一个就可以了。这样两个虚拟主机数据是分离开的。

Java客户端

由于Rabbitmq官方提供的API、比较复杂,于是我们使用AMQP来处理MQ。

Spring AMQP是基于AMQP协议定义的一套API规范,提供了板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。我们就通过Spring AMQP来完成对于Rabbitmq的应用。

引入AMQP的依赖:

SpringAMQP如何收发消息?

  1. 引入spring-boot-starter-amqp依赖
  2. 配置rabbitmq服务端信息
  3. 利用RabbitTemplate发送消息
  4. 利用@RabbitListener注解声明要监听的队列,监听消息

利用控制台创建队列simole.queue:

在publisher服务中,利用SpringAMQP直接向simple.queue发送消息:

配置好配置文件,

为了方便测试,我们在test中定义好方法:

在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

Work模型

实现一个队列绑定多个消费者:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息

先新建一个队列作为work.queue

此时再在代码中修改两个消息监听者:

默认情况下,多个RabbitListener接收消息是通过轮询的方式,并且分配很均匀。当50条消息分配给两个消费者,会被均匀的分配给这两个消费者。

但是要是消费者1和消费者2的消息处理能力不同,会发生什么呢?

我们让消费者1每隔20ms才接受一次消息,消费者2每隔200ms才接收一次消息:

这显然不符合我们的要求。既然消费者1的性能要好一些,就应该让它多接受一些消息,而不是像现在这样仍然是每个消费者接收25条消息。

消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:listener:simple:prefetch: 1

这样的分配机制,就充分利用了消费者1的性能,实现“能者多劳”。

交换机

生产环境中都会经过exchange交换机,通过交换机路由到队列。交换机的类型有三种:

  • Fanout  广播
  • Direct    定向
  • Topic     话题

Fanout

这个交换机会把消息广播到每一个跟其绑定的queue。

实现思路:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMQ控制台中,声明交换机fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向fanout发送消息

测试结果是两个消费者都可以接收到消息

Direct

实际生产环境中,交换机和消费者可能会存在复杂的关系,如果按照Fanout的方式来路由,可能会出现精细度不够的情况。Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

跟Fanout大体上一样,需要在Routing key中绑定key:

代码中,在生产者方只需要指定routingkey就完成了。

Topic

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分割。

并且Topic还支持通配符, # 代表0个或者多个单词, * 代表一个单词。这就意味着Topic可以完成Fanout和Direct的功能,更加精细化的控制交换机。

代码中,也只需要更改routingkey即可完成匹配。

声明队列和交换机

在之前演示的过程中,就已经手动创建了很多个交换机和队列。实际生产环境中,只会比这个更多,所以用代码来创建队列和交换机就显得尤为重要。

SpringAMOP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

fanout是最简单的,如果是direct和topic,都需要在绑定关系中加上routingkey参数:

public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

除了这种方式,还可以用注解的方式。基于@RabbitListener注解来声明队列和交换机的方式:

注意,都是在消费者中创建。

消息转换器

当我们发送的消息是一个Map类型时

接收端接收到的是这样子的:

Payload采用的是JDK的序列化方式,明明msg中只有简单的jack,但是却占用了这么大的空间,并且这样的序列化方式还不安全,容易被篡改。

解决方式就是,我们采用Json序列化。

在publisher和consumer中都引入jackson依赖,并且配置MessageConverter:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter jacksonMessageConverter() {return new Jackson2JsonMessageConverter();
}
//消费者生产者都需要配置这个,配置到启动类即可

此处就可以看到消息转换器发挥了作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1549780.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序操作蓝牙

主要流程&#xff1a; 1.初始化蓝牙适配器openBluetoothAdapter&#xff0c;如果不成功就onBluetoothAdapterStateChange监听蓝牙适配器状态变化事件 2.startBluetoothDevicesDiscovery开始搜寻附近的蓝牙外围设备 3.onBluetoothDeviceFound监听寻找到新设备的事件&#xff0c;…

PHP爬虫淘宝商品SKU详细信息获取指南

在电子商务领域&#xff0c;获取商品的SKU&#xff08;Stock Keeping Unit&#xff0c;库存单位&#xff09;详细信息对于商家进行库存管理、订单处理和客户服务至关重要。淘宝作为中国最大的电商平台之一&#xff0c;提供了丰富的API接口&#xff0c;使得开发者能够通过PHP爬虫…

前端学习笔记-JS进阶篇-02

构造函数&数据常用函数 1、深入对象 1.1、创建对象三种方式 1. 利用对象字面量创建对象 2. 利用new Object 创建对象 3. 利用构造函数创建对象 1.2、构造函数 构造函数&#xff1a;是一种特殊的函数&#xff0c;主要用来初始化对象 使用场景&#xff1a;常规的{...} 语…

springboot购物网站源码分享

开头&#xff1a;springboot购物网站源码分享 题目&#xff1a;springboot购物网站源码分享 主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Mysql|大数据|SSM|SpringBoot|Vue|Jsp|MYSQL等)、学习资料、JAVA源码、技术咨询 文末联系获取 感兴趣可以先收藏起来&#xff…

报数游戏 - 华为OD统一考试(E卷)

2024华为OD机试&#xff08;E卷D卷C卷&#xff09;最新题库【超值优惠】Java/Python/C合集 题目描述 100个人围成一圈&#xff0c;每个人有一个编号&#xff0c;编号从1开始到100。他们从1开始依次报数&#xff0c;报到为M的人自动退出圈圈&#xff0c;然后下一个人接着从1开始…

基于SpringBoot+Vue的茶园茶农文化交流平台

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目…

【MySQL实战45讲4-5】索引

文章目录 索引的定义索引的常见模型哈希表有序数组二叉搜索树 InnoDB的索引模型索引维护页分裂页合并页分裂和页合并的影响避免页分裂 覆盖索引最左前缀原则索引下推 索引的定义 索引的出现其实就是为了提高数据查询的效率&#xff0c;就像书的目录一样。一本500页的书&#x…

tee命令:轻松同步输出到屏幕与文件

一、命令简介 ​tee​ 命令在 Linux 和 Unix 系统中用于读取标准输入的数据&#xff0c;并将其同时输出到标准输出和文件中。简单来说&#xff0c;tee​ 命令可以用来分割数据流&#xff0c;使其既能够被输出到屏幕&#xff0c;也能够被写入到文件中。 ​​ ‍ 二、命令参数…

基于PI控制器的车辆行驶控制系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 步骤一: 确定目标与测量 4.2 步骤二: 计算误差 4.3 步骤三: 设计PI控制器 4.4 步骤四: 应用控制信号 4.5 步骤五: 反馈循环 5.完整工程文件 1.课题概述 基于PI控制器的车辆行驶控制系统是一种常…

timedatectl命令:告别时间烦恼,一键同步系统时间

一、命令简介 ​timedatectl​ 命令用于查看和设置系统的时间和日期&#xff0c;以及配置时区和 NTP&#xff08;Network Time Protocol&#xff09;设置。 相关命令&#xff1a;cal ​显示日历、 date ​查看、设置日期 ‍ 二、命令参数 格式&#xff1a; timedatectl […

[Redis][集群][下]详细讲解

目录 1.集群搭建(基于 Docker)2.主节点宕机1.宕机后会发生什么&#xff1f;2.处理流程1.故障判定2.故障迁移 3.集群扩容0.前言1.把新的主节点加入到集群2.重新分配slots3.给新的主节点添加从节点 1.集群搭建(基于 Docker) 拓扑结构如下&#xff1a; 创建目录和配置&#xff1…

【Python】FeinCMS:轻量级且可扩展的Django内容管理系统

在互联网飞速发展的今天&#xff0c;内容管理系统&#xff08;CMS&#xff09;成为了网站开发中的核心工具&#xff0c;尤其对于需要频繁更新内容的企业和个人站点而言&#xff0c;CMS 提供了极大的便利。市场上有许多不同的 CMS 工具可供选择&#xff0c;其中基于 Django 框架…

在IDEA中构建Jar包,安装Jar包到Maven仓库并在Maven项目中使用

文章目录 0. 关于本文1. IDEA构建Jar包1.1 准备一份Java代码&#xff08;就是你要构建工件的代码&#xff09;1.2 进行如下步骤构建工件 2. 关于Maven3. 将Jar包安装到Maven仓库4. 使用安装的Jar包依赖 0. 关于本文 本文内容&#xff1a; 借助IDEA构建Jar包将Jar包安装到Mave…

甄选范文“论网络安全体系设计”,软考高级论文,系统架构设计师论文

论文真题 随着社会信息化的普及,计算机网络已经在各行各业得到了广泛的应用。目前,绝大多数业务处理几乎完全依赖计算机和网络执行,各种重要数据如政府文件、工资档案、财务账目和人事档案等均依赖计算机和网络进行存储与传输。另一方面,针对计算机和网络的攻击活动日益猖…

STM32-HAL库驱动DHT11温湿度传感器 --2024.9.28

目录 一、教程简介 二、驱动原理讲解 &#xff08;一&#xff09;通信4步骤 &#xff08;二&#xff09;传感器数据解析 三、CubeMX生成底层代码 &#xff08;一&#xff09;基础配置 &#xff08;二&#xff09;配置DHT11的驱动引脚 &#xff08;三&#xff09;配置串口 四…

uni-app在线预览pdf

这里推荐下载pdf.js 插件 PDF.js - Browse Files at SourceForge.net 特此注意 如果报 Promise.withResolvers is not a function 请去查看版本兼容问题 降低pdf.js版本提高node版本 下载完成后 在 static 文件夹下新建 pdf 文件夹&#xff0c;将解压文件放进 pdf 文件…

【Python】Django Grappelli:打造优雅且现代化的 Django 管理后台

在 Django 开发中&#xff0c;默认的 Django Admin 界面尽管功能强大且能满足大多数管理需求&#xff0c;但其界面设计相对基础&#xff0c;尤其在用户体验和视觉呈现上显得较为简约。在一些项目中&#xff0c;开发者可能需要更加现代化且美观的后台界面。这时&#xff0c;Djan…

[Linux#58][HTTP] 自己构建服务器 | 实现网页分离 | 设计思路

目录 一. 最简单的HTTP服务器 二.服务器 2.0 Protocol.hpp httpServer.hpp 子进程的创建和退出 子进程退出的意义 父进程关闭连接套接字 httpServer.cc argc (argument count) argv (argument vector) 三.服务器和网页分离 思考与补充&#xff1a; 一. 最简单的HTT…

Tomcat搭建zrlog

1.基础环境准备&#xff1a; 在开始进行软件安装和服务配置之前&#xff0c;必须对系统进行基础设置&#xff0c;以确保服务器的正常运行。这个任务要求你进行一些基础的系统配置操作&#xff0c;包括修改主机名、关闭防火墙以及临时关闭 SELinux等。通过这些步骤&#xff0c;…