RabbitMq死信队列延迟交换机

架构图

配置

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterConfig {public String Nomarl_Exchange = "normal_exchange";public String Normal_Queue = "normal_queue";public String Normal_RoutingKey = "normal.#";public String Dead_Exchange = "dead_exchange";public String Dead_Queue = "dead_queue";public String Dead_RoutingKey = "dead.#";@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(Nomarl_Exchange).build();}@Beanpublic Queue normalQueue(){return QueueBuilder.durable(Normal_Queue).deadLetterExchange(Dead_Exchange).deadLetterRoutingKey("dead.sss.a").build();}@Beanpublic Binding bindingNormalKey(Exchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with(Normal_RoutingKey).noargs();}@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(Dead_Exchange).build();}@Beanpublic Queue deadQueue(){return QueueBuilder.durable(Dead_Queue).build();}@Beanpublic Binding bindingDeadKey(Exchange deadExchange,Queue deadQueue){return BindingBuilder.bind(deadQueue).to(deadExchange).with(Dead_RoutingKey).noargs();}}

1.被消费者拒绝,并且requeue值设置为false

package com.example.demo.consumer;import com.example.demo.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RejectConsumer {@RabbitListener(queues = DeadLetterConfig.Normal_Queue)public void rejectOrBicNack(String str, Channel channel, Message message) throws IOException {System.out.println("接收到消息"+str);//1.进行channel进行basicNack,记得将requeue设置为false
//        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//记得在配置文件配置 acknowledge-mode: manual #开启手动ACK
//        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);//以上方式二选一即可}
}

2.消息过期或者队列存储消息过期

  public void publishExpire(){String msg = "dead dlx test expire";rabbitTemplate.convertAndSend(DeadLetterConfig.Nomarl_Exchange, "normal.211313",msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");//设置过期时间return message;}});}

消息过期

  @Beanpublic Queue normalQueue(){return QueueBuilder.durable(Normal_Queue).deadLetterExchange(Dead_Exchange).deadLetterRoutingKey("dead.sss.a").ttl(10000).build();}

给队列存储消息设置最大时间,超过这个时间,消息将会通过设置的这个routingkey从死信交换机转发给对应的死信队列。

3.队列消息达到最大长度 

   @Beanpublic Queue normalQueue(){return QueueBuilder.durable(Normal_Queue).deadLetterExchange(Dead_Exchange).deadLetterRoutingKey("dead.sss.a")//.ttl(10000).maxLength(1).build();}

通过maxLength属性设置最大数量,这里设置属性最大为1

4.设置延迟交换机

延迟交换机下载地址

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {public static final String Delayed_Exchange = "delayed_exchange";public static final String Delayed_Queue = "delayed_queue";public static final String Delayed_RoutingKey = "delayed_routingLey";@Beanpublic Exchange buildDealayedExchange(){Map<String,Object>arguments =new HashMap<>();arguments.put("x-delayed-type","topic");String type = "x-delayed-type";Exchange exchange =new CustomExchange(Delayed_Exchange,type,true,false,arguments);return exchange;}@Beanpublic Queue buildDealyedQueue(){return QueueBuilder.durable(Delayed_Queue).build();}@Beanpublic Binding bindingDelayed(Exchange buildDealayedExchange,Queue buildDealyedQueue){return BindingBuilder.bind(buildDealyedQueue).to(buildDealayedExchange).with(Delayed_RoutingKey).noargs();}
}

这样设置之后,发送的消息会在交换机中待够设置的过期时间后才会到相应的队列。

如果消息过期时间一致,可以只不设置延迟交换机,当过期时间类型过多的时候,就可以通过设置延迟交换机来满足不同过期时间的类型。 

注意,这里有个参数,arguments 类型为Map<String,Object> 注意要在这个参数里面设置交换机类型,并且放入CustomExchange的构造函数中,不然交换机会创建失败。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/34280.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Sringboot项目实现文件上传至linux指定目录

本篇文章讲述一个springboot项目如何实现一个文件上传接口&#xff0c;涉及vsftpd服务、SSH协议以及对linux系统的一些配置。 一、springboot工程部分 本篇文章略过springboot创建过程&#xff0c;具体见之前发过的文章 1.1在pom.xml中添加SFTP&#xff08;SSH 文件传输协议…

电气自动化 基于PLC工业机器人视觉定位及自动码垛系统的设计

摘要 随着我国经济的不断发展&#xff0c;工业机器人将会得到更多的应用&#xff0c;从而达到整个行业的自动化和高速度。由于生产效率的不断提升&#xff0c;对成品进行检验、加工、分级等工作尤为关键。工业机器人是一种高科技的机械设备&#xff0c;它被广泛地运用于焊接、…

云数据库 OceanBase

OceanBase 是阿里巴巴集团自主研发的一款分布式关系型数据库。它采用了分布式架构&#xff0c;能够在大规模、复杂环境下处理海量数据。OceanBase 旨在解决传统数据库在高并发、大规模数据和高可用性场景下的瓶颈&#xff0c;尤其适用于金融、电商、物流等需要高性能、高可靠的…

数据库性能诊断工具DBdoctor 产品介绍

基本信息 DBdoctor是一款专注于数据库性能的生态软件&#xff0c;致力于解决一切数据库性能问题&#xff0c;实现DB AGI。行业首次将eBPF技术聚焦在数据库领域&#xff0c;创新性实现性能可观测。 功能介绍 1.核心功能 SQL审核&#xff0c;性能评估&#xff1a; 独家SQL性能…

AIGC与医学统计学的完美融合:打造智能医疗新时代

文章目录 一、理解统计学基础概念二、掌握描述性统计方法三、学习假设检验方法四、掌握回归分析方法五、学习生存分析方法六、利用现代技术和工具七、注重实践和应用《医学统计学从入门到精通》亮点内容简介作者简介目录获取方式 在AIGC&#xff08;人工智能生成内容&#xff0…

【git reset】本地下载特定历史提交哈希值的github文件【未联网服务器】进行git reset操作

本地电脑下载git文件&#xff0c;并进行git reset操作 问题描述&#xff1a;解决方法&#xff1a;方法1&#xff1a;直接下载特定版本的github压缩包。方法二&#xff1a; 在本地windows电脑上安装git工具进行git reset版本回退&#xff0c;之后上传相应版本的压缩包到服务器上…

emacs 折腾日记(一)——序言

初次知道emacs这个东西是在《程序员的呐喊》这本书。书中的作者提倡学习编译原理&#xff0c;推崇emacs。现在距离我知道emacs已经过去了快8年&#xff0c;期间不断的重复学习——放弃——学习的路子。与过去学习vim类似&#xff0c;vim我也经历过放弃到学习&#xff0c;最后有…

Django基础cookie和session

1.会话跟踪 ​ 什么是会话&#xff01;可以把会话理解为客户端与服务器之间的一次会晤&#xff0c;在一次会晤中可能会包含多次请求和响应。例如给10086打个电话&#xff0c;你就是客户端&#xff0c;而10086服务人员就是服务器。从双方接通电话那一刻起&#xff0c;会话就开始…

EMC测试——RE、CE、ESD

①辐射发射测试(RE)&#xff1a;评估电子、电气产品或系统在工作状态下产生的电磁辐射干扰程度&#xff0c;确保其不会干扰其他电子设备&#xff0c;同时可以确保产品的电磁辐射水平在安全范围内&#xff0c;从而保护用户免受电磁辐射的危害。消费类常见测试标准&#xff1a;EN…

iOS平台接入Facebook登录

1、FB开发者后台注册账户 2、完善App信息 3、git clone库文件代码接入 4、印尼手机卡开热点调试 备注&#xff1a; 可能遇到的问题&#xff1a; 1、Cocos2dx新建的项目要更改xcode的git设置&#xff0c;不然卡在clone&#xff0c;无法在线获取FBSDK 2、动态库链接 需要在…

解决 PyTorch 中的 AttributeError: ‘NoneType‘ object has no attribute ‘reshape‘ 错误

这里写目录标题 一、错误分析二、错误原因三、解决方案1. 检查损失函数2. 检查前向传播3. 检查 backward 函数4. 检查梯度传递 四、前向传播与反向传播1. 前向传播2. 反向传播3. 自定义 backward 函数示例反向传播过程&#xff1a;常见的错误&#xff1a;1&#xff1a;损失函数…

PT8M2102 触控型 8Bit MCU

1 产品概述 ● PT8M2102 是一款基于 RISC 内核的8位 MTP 单片机&#xff0c;内部集成了电容式触摸感应模块、TIMER&#xff0c;PWM、LVR、LVD、WDT等外设&#xff0c;其主要用作触摸按键开关&#xff0c;广泛适用于触控调光、电子玩具、消费电子、家用电器等领域&#xff0c;具…

工业—使用Flink处理Kafka中的数据_EnvironmentData2

使用Flink 消费 Kafka 中 EnvironmentData 主题的数据 , 监控各环境检测设备数据,当温度 ( Temperature 字段)持续 3 分钟高于

如何通过 Windows 自带的启动管理功能优化电脑启动程序

在日常使用电脑的过程中&#xff0c;您可能注意到开机后某些程序会自动运行。这些程序被称为“自启动”或“启动项”&#xff0c;它们可以在系统启动时自动加载并开始运行&#xff0c;有时甚至在后台默默工作。虽然一些启动项可能是必要的&#xff08;如杀毒软件&#xff09;&a…

javaScript13DOM获取

3.1、DOM初相识 3.1.1、DOM简介 文档对象模型&#xff08;Document Object Model &#xff0c;简称DOM&#xff09;&#xff0c;它就是一些系列编程接口&#xff0c;有了这些接口&#xff0c;就可以改变页面内容&#xff0c;结构和样式 DOM树&#xff1a; 文档&#xff1a;一…

【深度学习基础之Scikit-learn库3】Scikit-learn 库提供了丰富的功能,包括数据预处理、特征选择、模型训练与评估....

【深度学习基础之Scikit-learn库3】Scikit-learn 库提供了丰富的功能&#xff0c;包括数据预处理、特征选择、模型训练与评估… 【深度学习基础之Scikit-learn库3】Scikit-learn 库提供了丰富的功能&#xff0c;包括数据预处理、特征选择、模型训练与评估… 文章目录 【深度学…

【Calibre-Web】Calibre-Web服务器安装详细步骤(个人搭建自用的电子书网站,docker-compose安装)

文章目录 一、Calibre-Web和Calibre的区别是什么&#xff1f;使用场景分别是什么&#xff1f;二、服务器安装docker和docker-compose三、服务器安装Calibre-Web步骤1、安装完成后的目录结构2、安装步骤3、初始配置4、启动上传 四、安装Calibre五、docker-compose常用命令 最近想…

【Canvas与图标】乡土风金属铝边立方红黄底黑字图像处理图标

【成图】 120*120图标&#xff1a; 大小图&#xff1a; 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>金属铝边立方红黄底黑…

vxe-table 键盘操作,设置按键编辑方式,支持覆盖方式与追加方式

vxe-table 全键盘操作&#xff0c;按键编辑方式设置&#xff0c;覆盖方式与追加方式&#xff1b; 通过 keyboard-config.editMode 设置按键编辑方式&#xff1b;支持覆盖方式编辑和追加方式编辑 安装 npm install vxe-pc-ui4.3.15 vxe-table4.9.15// ... import VxeUI from v…

ros2人脸检测

第一步&#xff1a; 首先在工作空间/src下创建数据结构目录service_interfaces ros2 pkg create service_interfaces --build-type ament_cmake 然后再创建一个srv目录 在里面创建FaceDetect.srv&#xff08;注意&#xff0c;首字母要大写&#xff09; sensor_msgs/Image …