【RabbitMQ】⾼级特性

RabbitMQ ⾼级特性

  • 1. 消息确认
    • 1.1 消息确认机制
    • 1.2 代码示例
  • 2. 持久化
    • 2.1 交换机持久化
    • 2.2 队列持久化
    • 2.3 消息持久化
  • 3. 发送⽅确认
    • 3.1 confirm确认模式
    • 3.2 return退回模式
    • 3.3 问题: 如何保证RabbitMQ消息的可靠传输?
  • 4. 重试机制
  • 5. TTL
    • 5.1 设置消息的TTL
    • 5.2 设置队列的TTL
    • 5.3 两者区别
  • 6. 死信队列
    • 6.1 死信的概念
    • 6.2 代码示例
    • 6.3 常⻅问题
  • 7. 延迟队列
    • 7.1 概念
    • 7.2 应⽤场景
    • 7.3 TTL+死信队列实现
      • 存在问题
    • 7.4 延迟队列插件
    • 7.5 常⻅问题
  • 8. 事务
  • 9. 消息分发
    • 9.1 概念
    • 9.2 应⽤场景
      • 9.2.1 限流
      • 9.2.2 负载均衡
  • 代码演示获取

1. 消息确认

1.1 消息确认机制

⽣产者发送消息之后, 到达消费端之后, 可能会有以下情况:

a. 消息处理成功
b. 消息处理异常

在这里插入图片描述

RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第两种情况, 就会造成消息丢失

那么如何确保消费端已经成功接收了, 并正确处理了呢?

为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:

  • ⾃动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, ⽽不管消费者是否真正地消费到了这些消息. ⾃动确认模式适合对于消息可靠性要求不⾼的场景.
  • ⼿动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求⽐较⾼的场景.

当autoAck参数置为false, 对于RabbitMQ服务端⽽⾔, 队列中的消息分成了两个部分:

⼀是等待投递给消费者的消息。
⼆是已经投递给消费者, 但是还没有收到消费者确认信号的消息。

如果RabbitMQ⼀直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者.

在这里插入图片描述
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数

在这里插入图片描述
Ready: 等待投递给消费者的消息数

Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息数

1.2 代码示例

链接

2. 持久化

我们在前⾯讲了消费端处理消息时, 消息如何不丢失, 但是如何保证当RabbitMQ服务停掉以后, ⽣产者发送的消息不丢失呢. 默认情况下, RabbitMQ 退出或者由于某种原因崩溃时, 会忽视队列和消息, 除⾮告知他不要这么做.

RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化.

2.1 交换机持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在.

如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()

2.2 队列持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true实现的.

如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没有了, 消息也⽆处可存了)

队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化.

咱们前⾯⽤的创建队列的⽅式都是持久化的

QueueBuilder.durable(Constant.ACK_QUEUE).build();

点进去看源码会发现,该⽅法默认durable 是true

通过下⾯代码,可以创建⾮持久化的队列

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();在这里插入图片描述

2.3 消息持久化

消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT

在这里插入图片描述

设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继⽽消息也丢失. 所以单单设置消息持久化⽽不设置队列的持久化显得毫⽆意义.

在这里插入图片描述

MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性

在这里插入图片描述
如果使⽤RabbitTemplate 发送持久化消息, 代码如下:

在这里插入图片描述

RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为⾮持久化,或者消息在发送时被标记为⾮持久化

在这里插入图片描述
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的.

  1. 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之后, 还没来得及处理就宕机了, 这样也算数据居丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进⾏⼿动确认,详细可以参考[消息确认]章节.
  2. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理, 可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失.

这个问题怎么解决呢?

  1. 引⼊RabbitMQ的仲裁队列, 如果主节点(master)在此特殊时间内挂掉, 可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性, 除⾮整个集群都挂掉(此⽅法也不能保证100%可靠, 但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多, 实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
  2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中

3. 发送⽅确认

在使⽤ RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ为我们提供了两种解决⽅案:

a. 通过事务机制实现
b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多, 咱们主要介绍confirm机制来实现发送⽅的确认.

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递

  1. confirm确认模式
  2. return退回模式

3.1 confirm确认模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达Exchange, 这个监听都会被执⾏, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false.

具体代码

关键代码
在这里插入图片描述

3.2 return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理

在配置 RabbitTemplate 的时候要设置这两个属性

        // 设置 return 模式rabbitTemplate.setMandatory(true); // 只有设置为true, 才会回调ReturnCallbackrabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回: " + returned.getMessage());}});

回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:

在这里插入图片描述

3.3 问题: 如何保证RabbitMQ消息的可靠传输?

先放⼀张RabbitMQ消息传递图

在这里插入图片描述

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:

  1. ⽣产者将消息发送到 RabbitMQ失败
    a. 可能原因: ⽹络问题等
    b. 解决办法: 参考本章节[发送⽅确认-confirm确认模式]
  2. 消息在交换机中⽆法路由到指定队列
    a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
    b. 解决办法: 参考本章节[发送⽅确认-return模式]
  3. 消息队列⾃⾝数据丢失
    a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失
    b. 解决办法: 参考本章节[持久性]. 开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会⾃动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的⽅式提⾼可靠性)
  4. 消费者异常, 导致消息丢失
    a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
    b. 解决办法: 参考本章节[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认, 当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制(参考下⼀章节), 当消息消费异常时, 通过消息重试确保消息的可靠性

4. 重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.

但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数

在这里插入图片描述

5. TTL

TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.

当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除

在这里插入图片描述

5.1 设置消息的TTL

⽬前有两种⽅法可以设置消息的TTL.

⼀是设置队列的TTL, 队列中所有消息都有相同的过期时间. ⼆是对消息本⾝进⾏单独设置, 每条消息的TTL可以不同. 如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.

5.2 设置队列的TTL

在这里插入图片描述

5.3 两者区别

设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除

设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的

为什么这两种⽅法处理的⽅式不⼀样?

因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可

⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.

6. 死信队列

6.1 死信的概念

死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.

有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(DeadLetter Queue,简称DLQ).

在这里插入图片描述

消息变成死信⼀般是由于以下⼏种情况:

  1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
  2. 消息过期.
  3. 队列达到最⼤⻓度

6.2 代码示例

队列和交换机的配置

@Configuration
public class DLXConfig {// 死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {return ExchangeBuilder.directExchange(Constants.DLX_EXCHANGE_NAME).durable(true).build();}// 死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constants.DLX_QUEUE_NAME).build();}// 绑定@Bean("dlxBind")public Binding dlxBind(@Qualifier("dlxQueue") Queue queue, @Qualifier("dlxExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(Constants.DLX_ROUTING_KEY).noargs();}// 正常交换机@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE_NAME).durable(true).build();}// 正常队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE_NAME).deadLetterExchange(Constants.DLX_EXCHANGE_NAME).deadLetterRoutingKey(Constants.DLX_ROUTING_KEY) // 正常队列绑定死信交换机.ttl(10 * 1000).maxLength(10L) // 制造死信产⽣的条件: 10s后消息变成死信, 队列最多存10条消息.build();}// 绑定@Bean("normalBind")public Binding normalBind(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(Constants.NORMAL_ROUTING_KEY).noargs();}
}

6.3 常⻅问题

  1. 死信队列的概念

死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息

  1. 死信的来源
  1. 消息过期: 消息在队列中存活的时间超过了设定的TTL

  2. 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.

  3. 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.

  1. 死信队列的应⽤场景

对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.

⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态

为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).

场景的应⽤场景还有:

  • 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
  • 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
  • ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.

7. 延迟队列

7.1 概念

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.

7.2 应⽤场景

延迟队列的使⽤场景有很多, ⽐如:

  1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
  2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
  3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等

RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.

假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息.

在这里插入图片描述

7.3 TTL+死信队列实现

生产者

    @RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE_NAME,Constants.NORMAL_ROUTING_KEY, "delay test 5s... " + new Date(), message -> {message.getMessageProperties().setExpiration("5000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE_NAME,Constants.NORMAL_ROUTING_KEY, "delay test 10s... " + new Date(), message -> {message.getMessageProperties().setExpiration("10000");return message;});return "delay";}

消费者

    @RabbitListener(queues = Constants.DLX_QUEUE_NAME)public void listenerDLXQueue(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n",new Date(), new String(message.getBody(), StandardCharsets.UTF_8),message.getMessageProperties().getDeliveryTag());// 手动确认channel.basicAck(deliveryTag, true);}

运行结果

在这里插入图片描述

存在问题

接下来把⽣产消息的顺序修改⼀下
先发送20s过期数据, 再发送10s过期数据

在这里插入图片描述

运行结果

在这里插入图片描述
这时会发现: 10s过期的消息, 也是在20s后才进⼊到死信队列.

消息过期之后, 不⼀定会被⻢上丢弃. 因为RabbitMQ只会检查队⾸消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很⻓, 第⼆个消息的延时时间很短, 那第⼆个消息并不会优先得到执⾏.

所以在考虑使⽤TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列.

7.4 延迟队列插件

安装

docker compose 安装队列插件的 rabbitmq

交换机和队列声明并绑定

在这里插入图片描述

生产者

在这里插入图片描述
消费者

在这里插入图片描述
运行结果

在这里插入图片描述
从结果可以看出, 使⽤延迟队列, 可以保证消息按照延迟时间到达消费者.

7.5 常⻅问题

介绍下RabbitMQ的延迟队列

延迟队列是⼀个特殊的队列, 消息发送之后, 并不⽴即给消费者, ⽽是等待特定的时间, 才发送给消费者.

延迟队列的应⽤场景有很多, ⽐如:

  1. 订单在⼗分钟内未⽀付⾃动取消
  2. ⽤⼾注册成功后, 3天后发调查问卷
  3. ⽤⼾发起退款, 24⼩时后商家未处理, 则默认同意, ⾃动退款

但RabbitMQ本⾝并没直接实现延迟队列, 通常有两种⽅法:

  1. TTL+死信队列组合的⽅式
  2. 使⽤官⽅提供的延迟插件实现延迟功能

⼆者对⽐:

  1. 基于死信实现的延迟队列
    a. 优点: 1) 灵活不需要额外的插件⽀持
    b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
  2. 基于插件实现的延迟队列
    a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
    b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本

8. 事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败.

配置类

在这里插入图片描述
生产者

在这里插入图片描述

9. 消息分发

9.1 概念

RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者. 这种⽅式⾮常适合扩展, 如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可.

默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降.

如何处理呢? 我们可以使⽤前⾯章节讲到的channel.basicQos(int prefetchCount) ⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量

⽐如: 消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗⼝".

prefetchCount 设置为0时表⽰没有上限.
basicQos 对拉模式的消费⽆效

9.2 应⽤场景

消息分发的常⻅应⽤场景有如下:

  1. 限流
  2. ⾮公平分发

9.2.1 限流

如下使⽤场景:

订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求

但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮.

在这里插入图片描述
RabbitMQ提供了限流机制, 可以控制消费端⼀次只拉取N个请求

通过设置prefetchCount参数, 同时也必须要设置消息应答⽅式为⼿动应答

prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡.

9.2.2 负载均衡

我们也可以⽤此配置,来实现"负载均衡"

如下图所⽰, 在有两个消费者的情况下,⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量.

在这里插入图片描述
我们可以使⽤设置prefetch=1 的⽅式, 告诉 RabbitMQ ⼀次只给⼀个消费者⼀条消息, 也就是说, 在处理并确认前⼀条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下⼀个不忙的消费者.

消费者

在这里插入图片描述

运行结果

在这里插入图片描述

deliveryTag 有重复是因为两个消费者使⽤的是不同的Channel, 每个 Channel 上的deliveryTag 是独⽴计数的.

代码演示获取

代码演示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/146226.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

C++——模拟实现string

1.再谈string string为什么要被设计成模板?日常使用string好像都是char*,char*不够使用吗,为什么要设计成模板呢? 1.1 关于编码 //计算机的存储如何区分呢?int main() {//比如在C语言中,有整型//如果是有…

msvcp140.dll0丢失的解决方法,总结6种靠谱的解决方法

再使用计算机的过程中,我们经常会遇到一些错误提示,其中之一就是“msvcp140.dll丢失”。这个问题可能会影响到我们的正常使用,因此需要及时解决。经过一段时间的学习和实践,我总结了以下六种靠谱的解决方法,希望对大家…

Spring的任务调度

Spring的任务调度 1.概述 Spring框架为任务调度提供了专门的解决方案。在Spring框架的org.springframework.scheduling包中,通过对JDK 的ScheduledExecutorService接口的实例进行封装,对外提供了一些注解和接口,为开发者处理定时任务提供了…

STM32F407单片机编程入门(十二) FreeRTOS实时操作系统详解及实战含源码

文章目录 一.概要二.什么是实时操作系统三.FreeRTOS的特性四.FreeRTOS的任务详解1.任务函数定义2.任务的创建3.任务的调度原理 五.CubeMX配置一个FreeRTOS例程1.硬件准备2.创建工程3.调试FreeRTOS任务调度 六.CubeMX工程源代码下载七.小结 一.概要 FreeRTOS是一个迷你的实时操…

联想(lenovo) 小新Pro13锐龙版(新机整理、查看硬件配置和系统版本、无线网络问题、windows可选功能)

新机整理 小新pro13win10新机整理 查看硬件配置和系统版本 设置-》系统-》系统信息 无线网络问题 部分热点可以,部分不可以 问题:是因为自己修改了WLAN的IP分配方式为手动分配,导致只能在连接家里无线网的时候可以,连接其他…

统信UOS的「端侧模型」

统信UOS早有布局的「端侧模型」的相关信息。 文章目录 前言一、究其原因1. 从需求侧来看2. 从技术侧来看二、产品特色1. 更流畅的使用体验2. 更智能的加速框架3. 更包容的硬件策略前言 在苹果2024秋季新品发布会上,苹果发布了有史以来最大的iPhone。而同一天开发布会的华为,…

【Linux】POSIX信号量、基于环形队列实现的生产者消费者模型

目录 一、POSIX信号量概述 信号量的基本概念 信号量在临界区的作用 与互斥锁的比较 信号量的原理 信号量的优势 二、信号量的操作 1、初始化信号量:sem_init 2、信号量申请(P操作):sem_wait 3、信号量的释放&#xff08…

机器之心 | 阿里云Qwen2.5发布!再登开源大模型王座,Qwen-Max性能逼近GPT-4o

本文来源公众号“机器之心”,仅用于学术分享,侵权删,干货满满。 原文链接:阿里云Qwen2.5发布!再登开源大模型王座,Qwen-Max性能逼近GPT-4o 人工智能领域再度迎来重磅消息! 2023 年 8 月&#…

尚品汇-H5移动端整合系统(五十五)

目录: (1)运行前端页面 (2)启动前端页面 (3)添加搜索分类接口 (4)购物车模块修改 (5)登录模块 (6)订单模块 &#…

【巧用ddddocr破解算术运算验证码的经典示范】

计算型验证码 算术验证码,也叫计算型验证码, 计算型验证码其实是一种特殊的字符型验证码,只不过在它的基础上增加了数字运算。   计算型验证码在将人类视觉和计算机视觉的差异作为区分用户和电脑的依据的同时,还加上了逻辑运算&#xff0c…

基于SpringBoot+Vue的在线酒店预订系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于JavaSpringBootVueMySQL的…

人工智能开发实战常用分类算法归纳与解析

内容导读 决策树贝叶斯分类器最近邻分类器支持向量机神经网络 一、决策树 决策树(Decision Tree)是用于决策的一棵树,从根节点出发,通过决策节点对样本的不同特征属性进行划分,按照结果进入不同的分支,最终达到某一叶子节点&am…

计算机毕业设计 基于Python的校园个人闲置物品换购平台 闲置物品交易平台 Python+Django+Vue 前后端分离 附源码 讲解 文档

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…

深耕电通二十年,崔光荣升电通中国首席执行官

电通今日宣布,任命拥有二十年深厚电通工作经验的杰出行业领袖崔光(Guang Cui)为电通中国首席执行官,该任命自2024年9月27日起生效。崔光自2004年加入电通以来,从策略规划岗位逐步成长为公司的核心领导者,这也是他职业生涯中的第9次…

篮球运动场景物体检测系统源码分享

篮球运动场景物体检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Comp…

Linux基础---13三剑客及正则表达式

一.划水阶段 首先我们先来一个三剑客与正则表达式混合使用的简单示例,大致了解是个啥玩意儿。下面我来演示一下如何查询登录失败的ip地址及次数。 1.首先,进入到 /var/log目录下 cd /var/log效果如下 2.最后,输入如下指令即可查看&#xf…

OpenGL渲染管线(Rendering Pipeline)介绍

渲染管线 计算机图形学中,计算机图形管线(渲染管线 或简称 图形管线、流水线)是一个概念模型,它描述了t图像系统将 3D场景渲染到2D屏幕所需执行的一系列步骤。渲染管线大的可以分为三个阶段。 (一)应用阶段…

[UTCTF2020]sstv

用goldwave和010editor打开均未发现线索, 网上搜索sstv,豆包回答如下: 慢扫描电视(Slow Scan Television,简称 SSTV)是一种通过无线电传输和接收静态图像的技术。 一、工作原理 SSTV 通过将图像逐行扫描并…

【GMNER】Grounded Multimodal Named Entity Recognition on Social Media

Grounded Multimodal Named Entity Recognition on Social Media 动机解决方法特征抽取多模态索引设计索引生成框架EncoderDecoder 实体定位、实体-类型-区域三元组重建 出处:ACL2023 论文链接:https://aclanthology.org/2023.acl-long.508.pdf code链接…

[Linux] Linux操作系统 进程的状态

标题:[Linux] Linux操作系统 进程的状态 个人主页:水墨不写bug (图片来源于网络) 目录 一、前置概念的理解 1.并行和并发 2.时间片 3.进程间具有独立性 4.等待的本质 正文开始: 在校的时候,你一定学过《…