MQTT+Disruptor 提高物联网高并发

基于springboot2.5.7

废话不多说,直接上干货:

@Slf4j
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@IntegrationComponentScan(basePackages = {"扫描包路径","扫描包路径"})
public class MqttAutoConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate DisruptorProperties disruptorProperties;@RefreshScope@Bean(value = "mqttParallelQueueHandler",initMethod = "start",destroyMethod = "shutDown")@ConditionalOnProperty(prefix = "custom-config.mqtt", name = "disruptor", havingValue = "true")public ParallelQueueHandler mqttParallelQueueHandler(){log.info("初始化Disruptor...");return new ParallelQueueHandler.Builder<DisruptorEventData>().setDisruptorProperties(disruptorProperties).setWaitStrategy(new BlockingWaitStrategy()).setListener(new MQTTMsgListener()).build();}@Bean@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());List<String> hostList = Arrays.asList(mqttProperties.getHostUrl().trim().split(","));String[] serverURIs = new String[hostList.size()];hostList.toArray(serverURIs);mqttConnectOptions.setServerURIs(serverURIs);mqttConnectOptions.setKeepAliveInterval(2);mqttConnectOptions.setAutomaticReconnect(true);return mqttConnectOptions;}/***  MQTT 连接工厂* @return MqttPahoClientFactory*/@Bean@ConditionalOnMissingBeanpublic MqttPahoClientFactory receiverMqttClientFactoryForSub(MqttConnectOptions mqttConnectOptions) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions);log.info("【MQTT】-初始化连接工厂...");return factory;}/*** 出站通道* @return MessageChannel*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/***  MQTT 消息发送处理器* @return MessageHandler*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId()+"out", factory);messageHandler.setDefaultQos(1);//开启异步messageHandler.setAsync(true);messageHandler.setDefaultTopic("test");return messageHandler;}/*** 此处可以使用其他消息通道* Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。** @return MessageChannel*/@Beanpublic MessageChannel mqttInBoundChannel() {return new DirectChannel();}/*** 适配器, 多个topic共用一个adapter* 客户端作为消费者,订阅主题,消费消息*/@Bean@ConditionalOnMissingBeanpublic MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory factory) {List<String> topics = mqttProperties.getSubscribeTopics();String[] topicArray = new String[topics.size()];for (int i = 0; i < topics.size(); i++) {topicArray[i] = "$queue/"+ topics.get(i);}log.info("【MQTT】-订阅TOPIC:{}", Arrays.toString(topicArray));MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId(), factory, topicArray);adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());adapter.setConverter(new DefaultPahoMessageConverter());adapter.setRecoveryInterval(10000);adapter.setQos(1);adapter.setOutputChannel(mqttInBoundChannel());return adapter;}@Autowired(required = false)@Qualifier("mqttParallelQueueHandler")private ParallelQueueHandler<DisruptorEventData> parallelQueueHandler;/*** mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。* @return MessageHandler*/@Bean@RefreshScope@ServiceActivator(inputChannel = "mqttInBoundChannel")public MessageHandler mqttMessageHandler() {// 获取配置中的设备品牌MqttProperties.DeviceBrand deviceBrand = mqttProperties.getDeviceBrand();boolean disruptor = mqttProperties.isDisruptor();// 获取所有实现了 CustomMqttMessageHandler 接口的 Beanreturn message -> {log.info("【MQTT】-收到MQTT消息,Topic: {}, Payload: {}",message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),message.getPayload());String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);Map<String, CustomMqttMessageReceiverHandler> handlers = applicationContext.getBeansOfType(CustomMqttMessageReceiverHandler.class);boolean handled = false;if (Objects.nonNull(deviceBrand)){CustomMqttMessageReceiverHandler handler = handlers.get(deviceBrand.getServiceName());if (Objects.nonNull(handler)){if (handler.supportsTopic(topic)) {handled = this.run(handler,message,topic,disruptor);}}else {log.error("【MQTT】-未找到设备品牌消息接收处理器,deviceBrand->{}",deviceBrand);}}else {for (CustomMqttMessageReceiverHandler handler : handlers.values()) {if (handler.supportsTopic(topic)) {handled = this.run(handler,message,topic,disruptor);}}}if (!handled) {log.warn("【MQTT】-未找到匹配的处理器来处理Topic {} 的消息", topic);}};}@Bean@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})public MqttMessageSender mqttMessageSender(){return new MqttMessageSender();}private boolean run(CustomMqttMessageReceiverHandler handler,Message<?> message,String topic,boolean disruptor){try {String traceId = MDC.get("traceId");if (!StringUtils.hasText(traceId)){traceId = UUID.randomUUID().toString().replaceAll("-", "");MDC.put("traceId",traceId);}if (disruptor && Objects.nonNull(parallelQueueHandler)){log.info("【MQTT】-使用Disruptor处理...");DisruptorEventData data = new DisruptorEventData();Map<String,Object> map = new HashMap<>();map.put("data",message);map.put("handler",handler);map.put("traceId",traceId);data.setMessage(map);parallelQueueHandler.add(data);}else {handler.handleMessage(message);}return true;} catch (Exception e) {log.error("【MQTT】-Handler {} 处理Topic {} 的消息时出错", handler.getClass().getSimpleName(), topic, e);return false;}finally {MDC.clear();}}

由于涉及隐私,其余代码可以留言

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/2744.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

JAVA:常见 JSON 库的技术详解

1、简述 在现代应用开发中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;已成为数据交换的标准格式。Java 提供了多种方式将对象转换为 JSON 或从 JSON 转换为对象&#xff0c;常见的库包括 Jackson、Gson 和 org.json。本文将介绍几种常用的 JSON 处理…

【贪心】【可行范围内最大边界】SCNU习题 P25.跳跃游戏

算法思想&#xff1a; 每次迭代更新可行至的最大范围r(r必保证>原位置&#xff09;&#xff0c;至到迭代结束&#xff0c;若r>length of array则说明可以跳跃至此 #include <iostream> #include <vector> #include <string> #include <sstre…

Redis 组网方式入门

文章目录 一、组网方式1. 单实例模式描述优点缺点适用场景 2. 主从复制模式&#xff08;Master-Slave Replication&#xff09;描述优点缺点适用场景基于docker的redis主从复制1. 配置主节点2. 配置从节点3. 查看节点状态4. 验证主从数据同步5. 查看同步进度 3. 哨兵模式&#…

【系统集成项目管理工程师教程】第5章 软件工程

软件工程是一门研究用工程化方法构建和维护有效、实用和高质量软件的学科&#xff0c;涵盖软件需求、设计、实现、测试、部署交付、质量管理和过程能力成熟度等方面&#xff0c;旨在提高软件生产率、质量并降低成本&#xff0c;确保软件项目的成功开发与维护。 5.1软件工程定义…

PowerDesigner使用教程:设置注释、默认值属性

使用场景: 进行表设计时&#xff0c;我们需要对字段增加注释、设置默认值 解决方案&#xff1a; 如下图设置即可实现

如果 MySQL 主库出现了问题,从库该何去何从呢?

🚀 博主介绍:大家好,我是无休居士!一枚任职于一线Top3互联网大厂的Java开发工程师! 🚀 🌟 在这里,你将找到通往Java技术大门的钥匙。作为一个爱敲代码技术人,我不仅热衷于探索一些框架源码和算法技巧奥秘,还乐于分享这些宝贵的知识和经验。 💡 无论你是刚刚踏…

C# 日志框架 NLog、log4net 和 Serilog对比

文章目录 前言NLog、log4net 和 Serilog 三个框架的详细对比:一、NLog优点:缺点:二、 log4net优点缺点三、Serilog优点缺点四、Serilog使用举例总结前言 NLog、log4net 和 Serilog 三个框架的详细对比: NLog、log4net 和 Serilog 是三个非常流行的 .NET 日志框架,它们各自…

本地缓存库分析(四):fastcache

文章目录 本系列前言设计索引和数组怎么判断是否被覆盖其他问题 源码走读数据结构setget 总结 本系列 本地缓存库分析&#xff08;一&#xff09;&#xff1a;golang-lru本地缓存库分析&#xff08;二&#xff09;&#xff1a;bigcache本地缓存库分析&#xff08;三&#xff0…

安科瑞5G基站直流叠光监控系统-安科瑞黄安南

基站现状和趋势 5G基站是专门提供5G网络服务的公用移动通信基站。5G基站主要用于提供5G空口协议功能&#xff0c;支持与用户设备、核心网之间的通信。按照逻辑功能划分&#xff0c;5G基站可分为5G基带单元与5G射频单元&#xff0c;二者之间可通过CPRI或eCPRI接口连接。 2019年…

Pr 视频效果:过渡

效果面板/视频效果/过渡 Video Effects/Transition Adobe Premiere Pro 的视频效果中&#xff0c;过渡 Transition效果组用于创建单个剪辑内过渡效果的一组视频效果。这些效果可以增强视频的视觉连贯性&#xff0c;添加创意性的视觉转换&#xff0c;为观众提供流畅的观看体验。…

DataX 的安装配置和使用 (详细版)

1&#xff0c;上传解压 1&#xff0c;开始上传安装包到你虚拟机上放置安装包的文件夹 2&#xff0c;开始解压 ,配置环境变量 1、上传 /opt/modules 2、解压 tar -zxvf datax.tar.gz -C /opt/installs 3、修改 vi /etc/profile 配置环境变量&#xff1a; export DAT…

zookeeper安装

安装之前&#xff1a;先关闭三台服务器的防火墙&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; systemctl stop firewalld systemctl disable firewalld 1)上传 /opt/modules下面 2&#xff09;解压 /opt/installs下面 tar -zxvf zookeeper-3.4.10.tar.gz …

Nature文章《deep learning》文章翻译

这篇文章是对Nature上《deep learning》文章的翻译。原作者 Yann LeCun, Yoshua Bengio& Geoffrey Hinton。 这篇文章的中心思想是深入探讨深度学习在机器学习中的革命性贡献&#xff0c;重点介绍其在特征学习、监督学习、无监督学习等方面的突破&#xff0c;并阐述其在图…

动态规划—整数拆分

class Solution {public int integerBreak(int n) {int[] dp new int[n1];dp[2] 1;for(int i 3; i< n; i){for(int j 1; j< i/2; j){//j拆i&#xff0c;只需要遍历到 i/2 就可以&#xff0c;后面没有必要遍历dp[i] Math.max(dp[i], Math.max(j*(i-j) , j*dp[i-j]));…

OceanBase V4.3.3,首个面向实时分析场景的GA版本发布

在10月23日举办的 OceanBase年度发布会 上&#xff0c;我们怀着激动之情&#xff0c;正式向大家宣布了 OceanBase 4.3.3 GA 版的正式发布&#xff0c;这也是OceanBase 为实时分析&#xff08;AP&#xff09;场景打造的首个GA版本。 2024 年初&#xff0c;我们推出了 4.3.0 版本…

儿童安全座椅行业全面深入分析

儿童安全座椅就是一种专为不同体重&#xff08;或年龄段&#xff09;的儿童设计&#xff0c;将孩子束缚在安全座椅内&#xff0c;能有效提高儿童乘车安全的座椅。欧洲强制性执行标准ECE R44/03的定义是&#xff1a;能够固定到机动车辆上&#xff0c;带有ISOFIX接口、LATCH接口的…

算法笔记:Day-09(初始动态规划)

509. 斐波那契数 斐波那契数 &#xff08;通常用 F(n) 表示&#xff09;形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始&#xff0c;后面的每一项数字都是前面两项数字的和。也就是&#xff1a; F(0) 0&#xff0c;F(1) 1 F(n) F(n - 1) F(n - 2)&#xff0c;其中 …

HTTP和HTTPS 的作用和应用场景 (python 爬虫简单入门)

HTTP和HTTPS HTTP HTTP协议&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;&#xff1a;是一种发布和接收 HTML页面的方法。 HTTP的端口号为80 HTTPS HTTPS&#xff08;Hypertext Transfer Protocol over Secure Socket Layer&#xff09;…

Java多线程编程(三)一>详解synchronized, 死锁,wait和notify

目录&#xff1a; 一.synchronized 的使用&#xff1a; 二. 常见死锁情况&#xff1a; 三 .如何避免死锁&#xff1a; 四.wait和notify 一.synchronized 的使用&#xff1a; 我们知道synchronized锁具有互斥的特点&#xff1a; synchronized 会起到互斥效果, 某个线程…

linux入门——“初识make”

make是linux中的自动化构建工具&#xff0c;一般来说系统会自带make&#xff0c;如果没有&#xff0c;那么可以使用命令“sudo apt install -y make”来安装。 1.初识make make使用的前提是维护makefile/Makefile文件&#xff0c;需要在自己的目录下自己创建。 我在此目录下创…