RabbitMQ 实现延迟消息
- 1、使用死信交换机实现延迟消息
- 2、使用延迟消息插件实现延迟消息
1、使用死信交换机实现延迟消息
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息过期
- 设置了消息上限的队列消息堆积满了,最早的消息可能成为死信
发送消息给队列A并设置消息过期时间,使用 dead-letter-exchange
属性给队列A指定死信交换机,该死信交换机将消息路由到队列B,消费者监听消费队列B中的消息。
2、使用延迟消息插件实现延迟消息
这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
- 下载并安装延迟消息插件
将插件放在 RabbitMQ
的 plugins
目录下,执行以下命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 声明延迟交换机
方式一:
@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct").delayed() // 设置delay的属性为true.durable(true) // 持久化.build();
}
方式二:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
- 发送消息时通过消息头
x-delay
设置延迟时间
@Test
void testPublisherDelayMessage() {// 1.创建消息 String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}