【RabbitMQ】消息分发、事务

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制为手动确认prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBind")public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}}
@RestController
@RequestMapping("/qos")
public class QosController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void qosQueue() {for (int i = 0; i < 10; i++) {this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);System.out.println("第" + i + "次发送消息成功!");}}}
@Configuration
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void qosListener(String msg, Channel channel, Message message) throws IOException {System.out.println("接收的消息为:" + msg);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {@Bean("transactionRabbitTemplate")public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 开启事务return rabbitTemplate;}}
@Configuration
public class TransactionConfig {@Bean("transactionQueue")public Queue transactionQueue() {return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();}@Bean("transactionExchange")public Exchange transactionExchange() {return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();}@Bean("transactionQueueBind")public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,@Qualifier("transactionExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();}}
@RestController
@RequestMapping("/transaction")
public class TransactionController {@Resource(name = "transactionRabbitTemplate")private RabbitTemplate rabbitTemplate;@Transactional@RequestMappingpublic void transactionQueue() {System.out.println("发送成功");this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");int i = 1 / 0;this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");}}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147041.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

springboot实战学习(6)(用户模块的登录认证)(初识令牌)(JWT)

接着上篇博客学习。上篇博客是在基本完成用户模块的注册接口的开发以及注册时的参数合法性校验的基础上&#xff0c;基本完成用户模块的登录接口的主逻辑。具体往回看了解的链接如下。 springboot实战学习笔记&#xff08;5&#xff09;(用户登录接口的主逻辑)-CSDN博客文章浏览…

回归预测 | Matlab实现ReliefF-XGBoost多变量回归预测

回归预测 | Matlab实现ReliefF-XGBoost多变量回归预测 目录 回归预测 | Matlab实现ReliefF-XGBoost多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.ReliefF-xgboost回归预测代码&#xff0c;对序列数据预测性能相对较高。首先通过ReleifF对输入特征计算权…

AWS 管理控制台

目录 控制台主页 AWS 账户信息 AWS 区域 AWS 服务选择器 AWS 搜索 AWS CloudShell AWS 控制面板小部件 控制台主页 注册新的 AWS 账户并登录后&#xff0c;您将看到控制台控制面板。这是与各种 AWS 服务以及其他重要控制台组件进行交互的起点。控制面板由页面顶部的导航…

【笔记】1.3 塑性变形

一、塑性变形的方式 DDWs&#xff08;Dislocation-Dipole Walls&#xff0c;位错偶极墙&#xff09;&#xff1a;指由两个位错构成的结构&#xff0c;它们以一种特定的方式排列在一起&#xff0c;形成一个稳定的结构单元。 DTs&#xff08;Dislocation Tangles&#xff0c;位错…

【软考】传输层协议TCP与UDP

目录 1. TCP1.1 说明1.2 三次握手 2. UDP3. 例题3.1 例题1 1. TCP 1.1 说明 1.TCP(Transmission Control Protocol&#xff0c;传输控制协议)是整个 TCP/IP 协议族中最重要的协议之一。2.它在IP提供的不可靠数据服务的基础上为应用程序提供了一个可靠的、面向连接的、全双工的…

【Geoserver使用】SRS处理选项

文章目录 前言一、Geoserver的三种SRS处理二、对Bounding Boxes计算的影响总结 前言 今天来看看Geoserver中发布图层时的坐标参考处理这一项。根据Geoserver官方文档&#xff0c;坐标参考系统 (CRS) 定义了地理参考空间数据与地球表面实际位置的关系。CRS 是更通用的模型&…

发布策略说明

发布策略说明 发布策略 区别 标准发布 在部署新版本应用时删除旧版本应用。发布过程中&#xff0c;您的服务会出现短暂中断。 蓝绿发布 应用更新时生成蓝绿两个版本&#xff0c;两个版本互相热备&#xff0c;通过切换路由权重的方式实现不同版本应用的上下线。 该发布策略具…

Apipost IDEA插件新升级,Apipost Helper上架IDEA插件市场

大家好&#xff01;今天向大家介绍一个非常方便的IDEA插件——Apipost Helper&#xff01;相信很多使用过Apipost的朋友在开发过程中都希望能够直接将编写好的API同步至Apipost&#xff0c;而无需手动填写。前段时间&#xff0c;Apipost推出了Apipost IDEA插件的内测版&#xf…

项目第三弹:基础工具类实现

项目第三弹&#xff1a;基础工具类实现 一、工具类的介绍1.生活例子2.专业术语 二、FileHelper1.判断文件是否存在1.C IO流2.stat &#xff1a;Linux系统调用 2.获取文件大小3.创建/删除文件4.创建/删除目录5.read6.write7.获取文件父级目录8.文件的重命名9.FileHelper完整代码…

华为摄像机/NVR主动注册协议接入SVMSP平台

华为摄像机/NVR主动注册协议接入SVMSPro平台 步骤一&#xff1a;进华为网页或者NVR界面进配置选项&#xff0c;左边选配置-网络-平台对接参数 勾选启用SDK注册开关&#xff1b;SDK主动注册 服务器地址&#xff1a;平台软件IP地址 端口&#xff1a;6060&#xff08;默认&#xf…

科研入门学习

学习视频链接 为什么要读论文 读哪些论文 论文的分类 论文质量 如何找论文 根据领域大牛的名字进行搜索查看高水平论文引用的论文&#xff0c;高水平论文引用的论文很大程度也是高水平的论文 如何整理论文 如何读论文 读论文的困境 不同人群阅读差异 读论文的方式 论文的结构…

【pyVista】在三维模型中的网格属性

一&#xff0c;什么是属性&#xff1f; 属性是存在于 一个网格。在 PyVista 中&#xff0c;我们同时使用点数据和单元数据&#xff0c;并且 允许轻松访问数据字典以保存属性数组 它们位于网格的所有点或所有单元上。 点数据 点数据是指值数组&#xff08;标量、向量等&#x…

mockito+junit搞定单元测试(2h)

一&#xff0c;简介 1.1 单元测试的特点 配合断言使用(杜绝 System.out )可重复执行不依赖环境不会对数据产生影响spring 的上下文环境不是必须的一般都需要配合 mock 类框架来实现 1.2 mock 类框架使用场景 要进行测试的方法存在外部依赖(如 db, redis, 第三方接口调用等)…

在Linux中运行flask项目

准备 这里我准备了一个GitHub上某个大佬写的留言板的Flask项目&#xff0c;就用这个来给大家做示范了。 查看留言板的目录结构 查看主程序所用的库函数 只有一个第三方库 Flask 安装pip sudo apt install python3-pip -y测试 pip 安装成功 修改pip镜像源 修改pip的默认下载…

Django学习实战之评论验证码功能(附A)

前言&#xff1a; 对于具有评论功能的博客来说&#xff0c;无论是否为知名博客&#xff0c;都会被恶意广告关注&#xff0c;上线没几天就会有人开始通过程序灌入广告评论&#xff0c;因此针对所有有用户输入的页面&#xff0c;验证码是必需品。 在Django系统中使用验证码非常简…

[Python数据可视化]探讨数据可视化的实际应用:三个案例分析

数据可视化是理解复杂数据集的重要工具&#xff0c;通过图形化的方法&#xff0c;可以直观地展示信息、趋势和模式。本文将深入探讨三个实际案例&#xff0c;包括健康数据分析、销售趋势分析、城市交通流量分析。每个案例将提供假设数据、详细注释的代码及分析结果。 案例 1: …

【每日刷题】Day128

【每日刷题】Day128 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 606. 根据二叉树创建字符串 - 力扣&#xff08;LeetCode&#xff09; 2. LCR 194. 二叉树的最近公…

Spring在不同类型之间也能相互拷贝?

场景还原 日常开发中&#xff0c;我们会定义非常多的实体&#xff0c;例如VO、DTO等&#xff0c;在涉及实体类的相互转换时&#xff0c;常使用Spring提供的BeanUtils.copyProperties&#xff0c;该类虽好&#xff0c;可不能贪用。 这不在使用过程中就遇到一个大坑&#xff0c…

逻辑分析仪看波形方法

一、串口波形讲解 异步串行数据的一般格式是&#xff1a;起始位数据位停止位&#xff0c;其中起始位1 位&#xff0c;数据位可以是5、6、7、8位&#xff0c;停止位可以是1、1.5、2位。 对于正逻辑的TTL电平&#xff0c; a.起始位是一个值为0的位&#xff0c;低电平&#xff…

leetcode练习 二叉树的最大深度

给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3提示&#xff1a; 树中节点的数量在 [0, 104] 区间内。-100 …