SpringBoot(Java)实现MQTT连接(本地Mosquitto)通讯调试

1.工作及使用背景

        工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。

        目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。

        但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。

2.开发环境及工具

JDK1.8、maven、Mosquitto、IDEA、postman

3.框架结构及文件声明

因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。

MqttClientConnectorPool对外提供一个初始化的Mqtt客户端,在服务启动时初始化
MqttMsgSender对外提供一个可以执行消息发送的方法
MqttMsgSubscriber初始化一个Mqtt客户端,并根据配置订阅topic
TestController接收web请求的调用消息发送,用于测试
BusinessApplicationStartup服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待
BusinessApplicationShutdown服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端
MqttBrokerServerSpringBoot服务启动类

4.具体实现逻辑及代码

4.1 maven依赖

<properties><MQTTv3.version>1.2.5</MQTTv3.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>${MQTTv3.version}</version></dependency></dependencies>
</dependencyManagement>或者直接使用
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>1.2.5</version>
</dependency>

4.2 MqttClientConnectorPool

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttClientConnectorPool {public static MqttClient mqttClient;/*** 连接MQTT客户端* @return 获取MQTT连队对象*/public static MqttClient connectMQTT() {if (mqttClient != null){log.info("已存在,我深深的脑海!");return mqttClient;}try {// broker及连接信息String broker = "tcp://127.0.0.1:1883";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";//创建MQTT客户端(指定broker、客户端id、消息持久策略)mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());//创建连接参数配置MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);//连接超时时间options.setKeepAliveInterval(20);//是否自动重连options.setAutomaticReconnect(true);mqttClient.connect(options);log.info("MqttClient 服务启动broker初始化!");} catch (MqttException e){log.error("MqttClient connect Error:{}", e.getMessage());e.printStackTrace();}return mqttClient;}/*** 关闭MQTT客户端* @param client client*/public static void closeClient(MqttClient client){try {// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}/*** 关闭MQTT客户端*/public static void closeStaticClient(){try {if (mqttClient != null){// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();}} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}
}

4.3 MqttMsgSender

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class MqttMsgSender {public void sendMessage(MqttClient client,String topic,String content,int qos){MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try{client.publish(topic,message);} catch (MqttException e){log.error("MqttClient publish text info Error:{}!", e.getMessage());e.printStackTrace();}}
}

4.4 MqttMsgSubscriber

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttMsgSubscriber {String broker = "tcp://127.0.0.1:1883";String topic = "/deviceUp";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";int qos = 1;public void readSubscribeTopicMessage(){try {MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);options.setConnectionTimeout(60);options.setKeepAliveInterval(60);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.error("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("topic为: " + topic);log.info("qos为: " + mqttMessage.getQos());log.info("消息内容为: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {// 当消息被完全传送出去后调用log.info("交付完成 ---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}});client.connect(options);client.subscribe(topic, qos);} catch (MqttException e){log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());} catch (Exception e){log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());}}}

4.5 TestController

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RestController
@RequestMapping()
public class TestController {@GetMapping("/test/mqtt/{msg}")public String testSendMqttMsg(@PathVariable("msg") String msg){log.info("消息内容:{}.", msg);MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();MqttMsgSender sender = new MqttMsgSender();String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";String topic = "/deviceUp";int qos = 1;if (null != mqttClient){sender.sendMessage(mqttClient, topic, content, qos);} else {log.info("MqttClient为空,无法发送!");return "失败!";}return "成功!";}}

4.6 BusinessApplicationStartup

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("MqttClientConnectorPool ===================== Startup");MqttClientConnectorPool.connectMQTT();log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");log.info("MqttMsgSubscriber ===================== Startup");// 先订阅等待MqttMsgSubscriber subscriber = new MqttMsgSubscriber();subscriber.readSubscribeTopicMessage();}
}

4.7 BusinessApplicationShutdown

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {@Overridepublic void onApplicationEvent(ContextClosedEvent contextClosedEvent) {log.info("服务终止! shutdown hook, ContextClosedEvent");MqttClientConnectorPool.closeStaticClient();}}

4.8 MqttBrokerServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {public static void main(String[] args) {SpringApplication.run(MqttBrokerServer.class, args);}}

5.其他备注

5.1 需要Mqtt(Broker)服务器

        如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过

// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";

5.2 调试地址

如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段

6.参考文章

MQTT协议介绍及Java教程

https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc

7.喜欢作者

暂无

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1547748.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

NDI多画面系统(Multiview Pro)

NDI多画面系统(Multiview Pro)是千视以Multiview Player为基础打造的一款全新的多画面监看切换系统,支持自定义多画面/多窗口显示,单窗口可监看20路视频流,可实现多窗口、多屏幕预监+切换,且兼容NDI High Bandwidth和NDI HX/NDI HX3。同时Multiview Pro基于WebRTC技术,可…

用 Git Absorb 轻松管理 commit,告别频繁 fixup,效率提升 10 倍!

你是不是经常在使用 Git 的时候被频繁的 commit --fixup 弄得头疼?尤其是在修复代码时,一个小改动就得新建一个 commit,搞得整个 commit 历史乱七八糟,不仅影响工作效率,后来要查找问题时也变得更复杂。如果你对这个问题深有感触,那么这篇文章就是为你写的。 今天我想…

JVM 类加载机制2

扩展类加载器&#xff08;Extension ClassLoader&#xff09;&#xff0c;该类加载器是由 ExtClassLoader&#xff08;sun.misc.Launcher$ExtClassLoader&#xff09;实现&#xff0c;负责将 <JRE_HOME>/lib/ext 或者被 java.ext.dir 系统变量所指定路径中的所有类库加载…

open-resty 服务安装kafka插件

从github下载 作者&#xff1a;程序那点事儿 日期&#xff1a;2023/11/16 22:01 lua-resty-kafka 插件安装 下载代码后直接解压 mkdir -p /usr/local/openresty/modules/ #创建一个目录&#xff0c;存放lua插件cd /usr/local/openresty/modules/ #进入目录rz -y #上传lua插件…

SOMEIP_ETS_136: SD_Option_Length_shorter_GT_0_as_specified_for_type

测试目的&#xff1a; 验证DUT能够处理一个UDP选项长度小于其类型所指定长度的SubscribeEventgroup消息&#xff0c;并以SubscribeEventgroupNAck作为响应或完全忽略该请求。 描述 本测试用例旨在确保DUT遵循SOME/IP协议&#xff0c;当接收到一个UDP选项长度小于其类型所指定…

WPS在表格中填写材料时,内容过多导致表格不换页,其余内容无法正常显示 以及 内容过多,导致表格换页——解决方法

一、现象 1&#xff0c;内容过多导致表格不换页&#xff0c;其余内容无法正常显示 2&#xff0c;内容过多&#xff0c;导致表格换页 二、解决方法 在表格内右击&#xff0c;选择表格属性 在菜单栏选择行&#xff0c;勾选允许跨页断行&#xff0c;点击确定即可 1&#xff0…

windows安装Redis以后配置远程访问

修改配置文件&#xff1a; 第一个地方&#xff1a; 第二个地方&#xff1a; 启动服务&#xff1a; redis-server .\redis.windows.conf 可能需要重启计算机 经过实测&#xff0c;这个配置文件也得改&#xff1a; 如果不想重启计算机&#xff0c;可以执行下面的命令重启…

代码随想录冲冲冲 Day56 图论Part8

117. 软件构建 这道题是使用拓扑排序的方法 看多个任务有优先级的情况下 怎么排序 对应到这道题就是文件排序 首先要记录每一个点的入度&#xff0c;当一个点的入度为0时&#xff0c;就说明这个点是顶点 然后记录每一个点向那些点相连 之后建立一个queue 寻找一个入度为0的…

基于quill2.0的富文本编辑器,Fluent Editor,支持表格,图片,表情等

官网&#xff1a;Fluent Editor | 基于 Quill 2.0 的富文本编辑器 安装 npm i opentiny/fluent-editor quill 使用案例 <template><div class"publish-form-container"><!-- TODO --><div ref"quillEditorRef" class"quill…

python的逻辑控制

python逻辑执行 python条件控制if嵌套循环语句内置函数range()Pass语句 python迭代器与生成器在python中创建一个迭代器生成器yield python条件控制 # 在python中&#xff0c;不使用括号将条件固定 if exp: # 条件满足则执行&#xff0c;后续语句不执行# 不使用{}将执行语句固…

Java框架学习(mybatis)(01)

简介&#xff1a;以本片记录在尚硅谷学习ssm-mybatis时遇到的小知识 详情移步&#xff1a;想参考的朋友建议全部打开相互配合学习&#xff01; 官方文档&#xff1a; MyBatis中文网https://mybatis.net.cn/index.html 学习视频&#xff1a; 067-mybatis-介绍和对比_哔哩哔…

《向量数据库指南》——Zilliz迁移服务:一键解锁跨平台数据迁移新纪元

在数据驱动的时代背景下&#xff0c;非结构化数据的处理与迁移已成为企业数字化转型中不可或缺的一环。随着向量数据库技术的飞速发展&#xff0c;尤其是像Milvus这样的高性能向量数据库系统的广泛应用&#xff0c;如何高效、安全、准确地实现数据在不同系统间的迁移&#xff0…

Arthas mc(Memory Compiler/内存编译器 )

文章目录 二、命令列表2.2 class/classloader相关命令2.2.2 mc &#xff08;Memory Compiler/内存编译器 &#xff09;举例1&#xff1a;可以通过-d命令指定输出目录&#xff1a;mc -d /temporary/tmp /temporary/tmp/AccountController.java举例2&#xff1a;通过--classLoade…

书生大模型实战营学习[7] InternLM + LlamaIndex RAG 实践

环境配置 选择30%A100做本次任务 conda create -n llamaindex python3.10 conda activate llamaindex conda install pytorch2.0.1 torchvision0.15.2 torchaudio2.0.2 pytorch-cuda11.7 -c pytorch -c nvidia pip install einops pip install protobuf安装Llamaindex cond…

2024年9月27日历史上的今天大事件早读

1540年09月27日罗马教皇正式批准耶稣会 1605年09月27日吉尔霍尔姆战役爆发 1825年09月27日世界第一条铁路在英国正式通车 1840年09月27日美国海军战略思想家马汉出生 1858年09月27日天地会起义&#xff0c;建立大成国 1910年09月27日橡胶股灾亡国录 1913年09月27日孙中山…

隆道供应商四大类服务升级全面速览!

隆道供应商服务升级&#xff0c;全流程在线业务协同场景&#xff0c;支持系统性交易协同服务和企业营销&#xff0c;通过持续深入洞察供应商的核心需求&#xff0c;对营销类、协同类、风控类及数据类四大关键服务进行了全面升级。推出一系列创新应用&#xff0c;致力于帮助供应…

项目启动错误

说明&#xff1a;记录一次项目启动&#xff0c;报数据库访问错误&#xff0c;如下&#xff1a; 错误信息&#xff1a;Invalid default&#xff1a;public abstract java.lang.Class tk.mybatis.spring.annotation.MapperScan.fatoryBean() 解决 没有引入mybatis依赖&#xff…

AlphaFold3 | 详解 AlphaFold3 的模型结构及其在不同类型的预测实验中的表现

Jumper 本文将介绍 24 年 5 月发布的 Alaphafold3&#xff0c;其以“使用 AlphaFold 3 进行生物分子相互作用的精确结构预测”为标题发表在《nature》上&#xff0c;通讯作者为 Jumper。 Jumper 具有物理、化学、生物和计算方面的丰富背景。Jumper 本科学的是物理和数学&#…

Python中的数据处理与分析:从基础到高级

在数据科学和数据分析领域&#xff0c;Python凭借其丰富的库和强大的生态系统&#xff0c;成为了最受欢迎的语言之一。本文将从基础到高级&#xff0c;详细介绍如何使用Python进行数据处理和分析&#xff0c;涵盖数据清洗、数据转换、数据可视化等多个方面。 1. 数据导入与导出…

网络安全专业,在校大学生如何赚外快,实现财富自由?零基础入门到精通,收藏这一篇就够了

如今&#xff0c;计算机行业内卷严重&#xff0c;我们不找点赚外快的路子这么行呢&#xff1f; 今天就来说说网络安全专业平时都怎么赚外快。 一、安全众测 国内有很多成熟的src众测平台&#xff0c;如漏洞盒子、火线众测、补天、CNVD、漏洞银行等。一些大厂也有自己的src&a…