关于RabbitMQ你了解多少?

关于RabbitMQ你了解多少?

文章目录

  • 关于RabbitMQ你了解多少?
    • 基础篇
      • 同步和异步
      • MQ技术选型
      • 介绍和安装
      • 数据隔离
      • SpringAMQP
      • 快速入门
      • Work queues
      • 交换机
        • Fanout交换机
        • Direct交换机
        • Topic交换机
      • 声明队列和交换机
      • MQ消息转换器
    • 高级篇
      • 消息可靠性问题
      • 发送者的可靠性
        • 生产者重连
        • 生产者确认
        • SpringAMQP实现生产者确认

RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件

基础篇

同步和异步

同步调用:

  • 优势:时效性强,等待到结果后才返回
  • 不足:拓展性差、性能下降、级联失败问题

异步调用:

  • 异步调用方式其实就是基于消息通知的方式,一般包含三个角色
    • 消息发送者:投递消息的人,就是原来的调用方
    • 消息代理:管理、暂存、转发消息
    • 消息接收者:接收和处理消息的人,就是原来的服务提供方
  • 优势:解除耦合,拓展性强、无需等待,性能好、故障隔离、缓存消息,流量削峰填谷
  • 不足:不能立刻得到调用结果,时效性差、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性

MQ技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

介绍和安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址

同样基于Docker来安装RabbitMQ,使用下面的命令即可:

docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

  • 安装完成后访问管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面:

    在这里插入图片描述

RabbitMQ对应的整体架构核心概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由、转发消息。生产者发送的消息由交换机决定投递到哪个队列,无存储能力
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

在这里插入图片描述

数据隔离

  • 每个Virtual Host提供了一个独立的消息代理,它们之间完全隔离,相互之间不共享任何资源。通过将不同的应用程序或服务分配到不同的Virtual Host,可以实现数据的隔离。
  • 每个Virtual Host都有自己独立的交换机、队列和绑定规则。这样,消息只能在同一个Virtual Host内进行路由和传递,不会跨越Virtual Host。
  • 通过使用Virtual Hosts,可以将不同的应用程序或服务的消息进行逻辑隔离,确保它们之间不会互相干扰或访问彼此的数据。这在多租户环境下尤为有用,可以确保不同的租户之间的消息数据完全隔离,提高安全性和隐私性。

SpringAMQP

SpringAMQP的官方地址

  • AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
  • Spring AMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

快速入门

  1. 引入spring-amqp依赖,这样publisher和consumer服务都可以使用:

    <!——AMQP依赖,包含RabbitMQ-->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 配置RabbitMQ服务端信息

    spring:rabbitmq:host: 192.168.100.101 #主机名port: 5672 # 端口virtual-host: /liner #虚拟主机username: admin #用户名password: admin123 #密码
    
  3. 发送消息:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {//队列名称String queueName = "simple.queue";//消息String message = "hello,spring amqp!";//发送消息rabbitTemplate.convertAndSend(queueName,message);
    }
    
  4. 接收消息:SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法

    @slf4j
    @Component
    public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring消费者接收到消息:【"+ msg + "】");if (true) {throw new MessageconversionException("故意的");}log.info("消息处理完成");}
    }
    

Work queues

Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息

publisher
queue
consumer1
consumer2

消费者消息推送限制

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息

spring:rabbitmq:listener:simple:prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息

work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout:广播
  • Direct:定向
  • Topic:话题
publisher
exchange
queue1
queue2
consumer1
consumer2
consumer3

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
Fanout交换机

Fanout Exchange:会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

在这里插入图片描述

Direct交换机

Direct Exchange :会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

Topic交换机

TopicExchange:与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 “.” 分割。

Queue与Exchange指定BIndingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

在这里插入图片描述

声明队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

在这里插入图片描述

@Configuration
public class FanoutConfiguration{@Beanpublic FanoutExchange fanoutExchange(){//ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("liner.fanout");}@Beanpublic Queue fanoutQueue(){//QueueBuilder.durable("").build();return new Queue("fanout.queue");}@Beanpublic Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}

SpringAMQP还提供了基于 @RabbitListener 注解来声明队列和交换机的方式:

@RabbitListener(bindings = @QueueBinding(value = Queue (name = "direct.queue",durable = "true"),exchange = @Exchange(name = "liner.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue(String msg){System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}

MQ消息转换器

Spring的消息发送代码接收的消息体是一个Object:在这里插入图片描述

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。存在问题:数据体积过大、有安全漏洞、可读性差

在这里插入图片描述

因此建议采用JSON序列化代替默认的JDK序列化

  • publisherconsumer两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

  • 配置消息转换器,在publisherconsumer两个服务的启动类中添加Bean:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断

在这里插入图片描述

高级篇

消息可靠性问题

发送者的可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 #最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

RabbitMQ有Publisher ConfirmPublisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败
SpringAMQP实现生产者确认
  1. 在publisher这个微服务的application.yml中添加配置:

    spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型publisher-returns: true #开启publisher return机制#这里publisher-confirm-type有三种模式可选:
    #  none:  关闭confirm机制
    #  simple:  同步阻塞等待MQ的回执消息
    #  correlated:  MQ异步回调方式返回回执消息
    
  2. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {log.info("消息发送失败,应答码{{},原因{},交换机{},路由键{},消息{}",replyCode,replyText,exchange,routingKey,message.toString());});}
    }
    
  3. 发送消息,指定消息ID、消息ConfirmCallback

    @Test
    void testPublisherConfirm() throws InterruptedException {//1.创建CorrelationDataCorrelationData cd = new CorrelationData();//2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){	 // result.isAck(), boolean类型, true代表ack回执,false代表nack回执log.debug("发送消息成功,收到ack!");}else{	// result.getReason(), String类型,返回nack时的异常描述log.error("发送消息失败,收到nack,reason:{}",result.getReason());}}});//3.发送消息rabbitTemplate.convertAndSend("liner.direct","red","hello",cd);
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145772.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

妙不可言的Python之旅----(一)

初识Python python的起源 1989年&#xff0c;为了打发圣诞节假期&#xff0c;Gudio van Rossum吉多 范罗苏姆&#xff08;龟叔&#xff09;决心开发一个新的解释程序&#xff08;Python雏形&#xff09; 1991年&#xff0c;第一个Python解释器诞生 Python这个名字&#xff…

Golang中的包和模块设计

Go&#xff0c;也被称为Golang&#xff0c;是一种静态类型、编译型语言&#xff0c;因其简洁性和对并发编程的强大支持而受到开发者们的喜爱。Go编程的一个关键方面是其包和模块系统&#xff0c;它允许创建可重用、可维护和高效的代码。本博客文章将深入探讨在Go中设计包和模块…

Servlet操作与用法(保姆式教学)

Servlet介绍 什么是servlet Servlet&#xff08;Servlet Applet的缩写&#xff0c;全称 Java Servlet&#xff09;&#xff1a;适用于Java编写的服务器程序&#xff0c;其主要功能是在于交互式的浏览和修改数据&#xff0c;生成动态Web内容。狭义的Servlet是指Java语言实现的…

MySQL学习笔记27

MySQL主从复制的核心思路&#xff1a; 1、slave必须安装相同版本的mysql数据库软件。 2、master端必须开启二进制日志&#xff0c;slave端必须开启relay log 日志。 3、master主服务器和slave从服务器的server-id号不能一致。 4、slave端配置向master端来同步数据。 master…

云安全之访问控制的常见攻击及防御

访问控制攻击概述 访问控制漏洞即应用程序允许攻击者执行或者访问某种攻击者不具备相应权限的功能或资源。 常见的访问控制可以分为垂直访问控制、水平访问控制及多阶段访问控制 (上下文相关访问控制)&#xff0c;与其相应的访问控制漏洞为也垂直越权漏洞(普通用户可以访问或…

【QT】使用toBase64方法将.txt文件的明文变为非明文(类似加密)

目录 0.环境 1.背景 2.详细代码 2.1 .h主要代码 2.2 .cpp主要代码&#xff0c;主要实现上述的四个方法 0.环境 windows 11 64位 Qt Creator 4.13.1 1.背景 项目需求&#xff1a;我们项目中有配置文件&#xff08;类似.txt&#xff0c;但不是这个格式&#xff0c;本文以…

计算机网络之传输层

计算机网络 - 传输层 计算机网络 - 传输层 UDP 和 TCP 的特点UDP 首部格式TCP 首部格式TCP 的三次握手TCP 的四次挥手TCP 可靠传输TCP 滑动窗口TCP 流量控制TCP 拥塞控制 1. 慢开始与拥塞避免2. 快重传与快恢复 网络层只把分组发送到目的主机&#xff0c;但是真正通信的并不是…

leetcode-----二叉树习题

目录 前言 1. 二叉树的中序遍历 2. 相同的树 3. 二叉树的最大深度 4. 二叉树的最小深度 5.二叉树的前序遍历 6. 二叉树的后序遍历 7. 对称二叉树 前言 前面我们学习过了二叉树的相关知识点&#xff0c;那么今天我们就做做练习&#xff0c;下面我会介绍几道关于二叉树的…

JUnit介绍

JUnit是用于编写和运行可重复的自动化测试的开源测试框架&#xff0c; 这样可以保证我们的代码按预期工作。JUnit可广泛用于工业和作为支架(从命令行)或IDE(如Eclipse)内单独的Java程序。 JUnit提供&#xff1a; 断言测试预期结果。 测试功能共享通用的测试数据。 测试套件轻…

Java实现word excel ppt模板渲染与导出及预览 LibreOffice jodconverter

Java Office 一、文档格式转换 文档格式转换是office操作中经常需要进行一个操作&#xff0c;例如将docx文档转换成pdf格式。 java在这方面有许多的操作方式&#xff0c;大致可以分为内部调用&#xff08;无需要安装额外软件&#xff09;&#xff0c;外部调用&#xff08;需…

STM32三种开发方式及标准库和HAL库的编程差异

三种开发方式 STM32基于标准库函数和HAL库编程差异_stm32库函数和hal库-CSDN博客本文目的是以串口通信来简要分析STM32使用标准库函数和HAL库函数编程的差异。目录&#xff08;一&#xff09;开发方式1.配置寄存器2.库函数3.HAL库&#xff08;二&#xff09;库函数与HAL库对比…

RDMA技术(解决主从数据库数据不一致问题)

优质博文&#xff1a;IT-BLOG-CN 一、简介 RDMA(remote direct memory access)即远端直接内存访问&#xff0c;是一种高性能网络通信技术&#xff0c;具有高带宽、低延迟、无CPU消耗等优点。 主要解决网络传输中服务器端数据处理的延迟问题。 Remote&#xff1a;数据通过网络…

Arduino PLC IDE

Arduino PLC IDE MCU单片机进入全新的PLC领域概述需要的硬件和软件下一步操作1. Arduino PLC IDE Tool Setup2. Arduino PLC IDE Setup3. Project Setup4. Download the Runtime5. Connect to the Device6. License Activation with Product Key (Portenta Machine Control) 结…

MySQL 索引介绍和最佳实践

目录 一、前言二、索引类型1.1 主键索引&#xff08;PRIMARY KEY&#xff09;1.2 唯一索引&#xff08;UNIQUE&#xff09;1.3 普通索引&#xff08;NORMAL&#xff09;1.3.1 单列普通索引1.3.2 单列前缀普通索引1.3.3 多列普通索引1.3.4 多列前缀普通索引 1.4 空间索引&#x…

力扣 -- 10. 正则表达式匹配

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:bool isMatch(string s, string p) {int ms.size();int np.size();//处理后续映射关系s s;//处理后续映射关系p p;vector<vector<bool>> dp(m1,vector<bool>(n1));//初始化dp[0][0]true…

WPF 03

staticResource和dynamicResource的区别 首先看一个案例 MainWindow.xaml <Window x:Class"WpfDay03.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml&quo…

led灯什么牌子的质量好?Led护眼台灯排行榜

现在我们很多家长对自己孩子的视力十分关心&#xff0c;生怕自己的孩子是近视、远视、弱视等等。对于父母而言&#xff0c;在孩子读书压力大课业重的关键时期&#xff0c;为孩子选择合适的桌椅&#xff0c;保护灯具从而保护孩子的眼睛是非常重要的事情!那么买给孩子读书做功课的…

驱动插入中断门示例代码

驱动插入中断描述符示例代码 最近做实验&#xff0c;每次在应用层代码写测试代码的时候都要手动挂一个中断描述符&#xff0c;很不方便所以就想着写个驱动挂一个中断门比较省事 驱动测试效果如下&#xff1a; 下面的代码是个架子&#xff0c;用的时候找个驱动历程传递你要插…

聊聊并发编程——并发容器和阻塞队列

目录 一.ConcurrentHashMap 1.为什么要使用ConcurrentHashMap&#xff1f; 2.ConcurrentHashMap的类图 3.ConcurrentHashMap的结构图 二.阻塞队列 Java中的7个阻塞队列 ArrayBlockingQueue&#xff1a;一个由数组结构组成的有界阻塞队列。 LinkedBlockingQueue&#xf…

如何快速搭建app自动化环境编写用例?

使用Airtest 作为测试开发工程师&#xff0c;快速搭建app自动化环境并编写用例可以使用Airtest解决方案来实现。Airtest是一款基于Python的全平台UI自动化测试框架&#xff0c;支持多种移动设备和模拟器&#xff0c;同时集成了丰富的图像识别和手势操作功能。 以下是使用Airt…