当前位置: 首页 > news >正文

利用EMQX实现单片机和PyQt的数据MQTT互联

https://www.dong-blog.fun/post/2050

基于MQTT的设备监控与控制系统设计

引言

物联网(IoT)设备的远程监控与控制是现代智能系统的基础需求。本文将介绍一个基于MQTT协议的设备监控与控制系统,该系统由两部分组成:模拟单片机设备和PyQt客户端。我们将详细讨论系统的设计思路、代码实现以及实际应用场景。

系统架构

该系统基于发布-订阅模式,采用MQTT协议实现设备与客户端之间的双向通信。系统结构如下:

  1. 设备模拟器:模拟单片机设备,定期发送GPS位置数据,并接收控制命令
  2. PyQt客户端:显示设备GPS数据,并提供用户界面发送控制命令
  3. MQTT服务器:充当消息代理,负责转发消息

我在之前已经介绍了如何使用EMQT,这里不再重复,有关信息查看这里:

https://www.dong-blog.fun/post/1963

https://www.dong-blog.fun/post/2049

通信协议设计

系统使用两个主题进行通信:

  1. 设备数据主题(device/gps):设备向此主题发布GPS数据

    {"device_id": "device_001","timestamp": 1692345678,"latitude": 30.056789,"longitude": 120.123456
    }
    
  2. 控制指令主题(device/control):客户端向此主题发布控制命令

    • 按钮控制命令:
      {"command": "UP","timestamp": 1692345700
      }
      
    • 自定义文本命令:
      {"text": "自定义指令内容","timestamp": 1692345720
      }
      

通过区分commandtext键,系统能够清晰地区分不同类型的控制信号。

设备模拟器实现

设备模拟器(device_simulator.py)主要功能是:

  1. 模拟GPS数据采集并发布到MQTT服务器
  2. 接收并处理控制命令
import paho.mqtt.client as mqtt
import json
import time
import random# MQTT broker configuration
broker = "10.100.80.98"
port = 1883
topic = "device/gps"
control_topic = "device/control"# Create MQTT client with Version 2 API
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)def on_connect(client, userdata, flags, rc, properties=None):print(f"Connected with result code {rc}")# Subscribe to control topicclient.subscribe(control_topic)print(f"已订阅控制主题: {control_topic}")def on_message(client, userdata, msg, properties=None):try:data = json.loads(msg.payload.decode())print(f"收到控制数据: {data}")# 处理不同类型的消息if "command" in data:command = data["command"]print(f"执行命令: {command}")# 这里可以添加对命令的具体响应逻辑if command == "UP":print("设备向上移动")elif command == "DOWN":print("设备向下移动")elif command == "LEFT":print("设备向左移动")elif command == "RIGHT":print("设备向右移动")elif command == "CONFIRM":print("设备确认操作")elif command == "CANCEL":print("设备取消操作")# 处理自定义文本消息elif "text" in data:text = data["text"]print(f"收到自定义文本: {text}")# 处理文本命令的逻辑print(f"设备执行文本命令: {text}")except Exception as e:print(f"处理消息时出错: {e}")# Set callback functions
client.on_connect = on_connect
client.on_message = on_message# Connect to broker
try:client.connect(broker, port, 60)# Start the network loopclient.loop_start()print(f"已连接到MQTT服务器: {broker}:{port}")
except Exception as e:print(f"连接MQTT服务器失败: {e}")exit(1)try:device_id = "device_001"print(f"设备 {device_id} 模拟器开始运行")print("按Ctrl+C停止程序")while True:# Generate random GPS coordinates around a specific location# Base coordinates: 30.0°N, 120.0°Elatitude = 30.0 + random.uniform(-0.1, 0.1)longitude = 120.0 + random.uniform(-0.1, 0.1)# Create message payloadmessage = {"device_id": device_id,"timestamp": int(time.time()),"latitude": round(latitude, 6),"longitude": round(longitude, 6)}# Publish messageclient.publish(topic, json.dumps(message))print(f"已发送GPS数据: 纬度={message['latitude']}, 经度={message['longitude']}")# Wait for 2 seconds before sending next messagetime.sleep(2)except KeyboardInterrupt:print("\n接收到停止信号,正在停止设备模拟器...")client.loop_stop()client.disconnect()print("设备模拟器已停止")
except Exception as e:print(f"运行时错误: {e}")client.loop_stop()client.disconnect() 

设备模拟器使用随机生成的GPS坐标数据,模拟设备移动的场景,同时能够对不同类型的命令做出响应。

日志信息:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

PyQt客户端实现

PyQt客户端(gps_monitor.py)提供了图形界面,主要功能包括:

  1. 显示设备GPS数据
  2. 提供方向控制按钮
  3. 支持自定义文本命令输入和发送

客户端的核心在于MQTT与PyQt的集成,我们使用信号-槽机制来解决多线程问题:

import sys
import json
import time
import paho.mqtt.client as mqtt
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QTextEdit, QLabel)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject# 创建一个MQTT信号处理类
class MQTTSignals(QObject):message_received = pyqtSignal(dict)class GPSMonitor(QMainWindow):def __init__(self):super().__init__()self.setWindowTitle("GPS Monitor and Controller")self.setGeometry(100, 100, 800, 600)# MQTT setupself.broker = "10.100.80.98"self.port = 1883self.gps_topic = "device/gps"self.control_topic = "device/control"# 创建信号对象self.mqtt_signals = MQTTSignals()self.mqtt_signals.message_received.connect(self.update_display)self.setup_mqtt()self.setup_ui()def setup_mqtt(self):# 使用MQTT V5客户端self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)self.client.on_connect = self.on_connectself.client.on_message = self.on_messagetry:self.client.connect(self.broker, self.port, 60)self.client.loop_start()except Exception as e:print(f"MQTT连接错误: {e}")def on_connect(self, client, userdata, flags, rc, properties=None):print(f"Connected with result code {rc}")self.client.subscribe(self.gps_topic)def on_message(self, client, userdata, msg, properties=None):try:data = json.loads(msg.payload.decode())# 使用信号发送数据,不直接更新UIself.mqtt_signals.message_received.emit(data)except Exception as e:print(f"Error processing message: {e}")def setup_ui(self):# Main widget and layoutmain_widget = QWidget()self.setCentralWidget(main_widget)layout = QVBoxLayout(main_widget)# GPS display areaself.gps_display = QTextEdit()self.gps_display.setReadOnly(True)layout.addWidget(self.gps_display)# Control buttons layoutbutton_layout = QHBoxLayout()# Direction buttonsself.btn_up = QPushButton("上")self.btn_down = QPushButton("下")self.btn_left = QPushButton("左")self.btn_right = QPushButton("右")self.btn_confirm = QPushButton("确认")self.btn_cancel = QPushButton("取消")# Add buttons to layoutbutton_layout.addWidget(self.btn_up)button_layout.addWidget(self.btn_down)button_layout.addWidget(self.btn_left)button_layout.addWidget(self.btn_right)button_layout.addWidget(self.btn_confirm)button_layout.addWidget(self.btn_cancel)# Command inputself.command_input = QTextEdit()self.command_input.setMaximumHeight(100)layout.addWidget(QLabel("命令输入:"))layout.addWidget(self.command_input)# Send buttonself.btn_send = QPushButton("发送")layout.addWidget(self.btn_send)# Add button layout to main layoutlayout.addLayout(button_layout)# Connect button signalsself.btn_up.clicked.connect(lambda: self.send_command("UP"))self.btn_down.clicked.connect(lambda: self.send_command("DOWN"))self.btn_left.clicked.connect(lambda: self.send_command("LEFT"))self.btn_right.clicked.connect(lambda: self.send_command("RIGHT"))self.btn_confirm.clicked.connect(lambda: self.send_command("CONFIRM"))self.btn_cancel.clicked.connect(lambda: self.send_command("CANCEL"))self.btn_send.clicked.connect(self.send_custom_command)def update_display(self, data):display_text = f"时间戳: {data['timestamp']}\n"display_text += f"设备ID: {data['device_id']}\n"display_text += f"纬度: {data['latitude']}\n"display_text += f"经度: {data['longitude']}\n"display_text += "-" * 50 + "\n"current_text = self.gps_display.toPlainText()self.gps_display.setText(display_text + current_text)def send_command(self, command):message = {"command": command,"timestamp": int(time.time())}self.client.publish(self.control_topic, json.dumps(message))print(f"已发送命令: {command}")def send_custom_command(self):command = self.command_input.toPlainText().strip()if command:# 使用"text"作为键名而不是"command"message = {"text": command,"timestamp": int(time.time())}self.client.publish(self.control_topic, json.dumps(message))self.command_input.clear()print(f"已发送自定义文本: {command}")def closeEvent(self, event):"""窗口关闭时的处理"""self.client.loop_stop()self.client.disconnect()event.accept()if __name__ == "__main__":app = QApplication(sys.argv)window = GPSMonitor()window.show()sys.exit(app.exec_()) 

这种设计解决了MQTT回调线程与GUI线程的冲突问题,确保了应用程序的稳定性。

PyQt运行界面:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

线程安全设计

MQTT客户端的回调函数在非GUI线程中执行,而PyQt应用程序要求UI更新必须在主线程中进行。为了解决这个问题,我们采用了Qt的信号-槽机制:

  1. 创建一个MQTTSignals类,定义message_received信号
  2. 在MQTT回调函数中发射信号
  3. 将信号连接到GUI线程中的槽函数

这种方式确保了线程安全,避免了直接在回调函数中更新UI导致的崩溃问题。

系统扩展性

该系统具有良好的扩展性:

  1. 多设备支持:通过增加设备ID识别,可轻松扩展为多设备系统
  2. 协议扩展:可以增加新的消息类型和命令
  3. 功能增强:可以添加历史数据记录、轨迹显示、报警功能等

系统运行效果

系统运行后,设备模拟器会每2秒发送一次GPS数据,PyQt客户端会实时显示这些数据,并可通过按钮或自定义文本发送控制命令。设备模拟器收到命令后会打印相应的响应信息。

源码与使用方法

使用前需要安装必要的依赖:

pip install paho-mqtt PyQt5

要运行系统,请先启动设备模拟器,然后运行PyQt客户端:

python device_simulator.py
python gps_monitor.py
http://www.xdnf.cn/news/199081.html

相关文章:

  • 数据库系统概论|第三章:关系数据库标准语言SQL—课程笔记6
  • 计算机基础—(九道题)
  • 云上玩转DeepSeek系列之六:DeepSeek云端加速版发布,具备超高推理性能
  • AI图片跳舞生成视频,animate X本地部署。
  • 2025系统架构师---论企业集成平台的技术与应用
  • 永磁同步电机控制算法-反馈线性化滑模控制
  • Telephony VoiceMail
  • 数据库基础与核心操作:从概念到实战的全面解析
  • 嵌入式多功能浏览器系统设计详解
  • 使用双端队列deque模拟栈stack
  • 获得ecovadis徽章资格标准是什么?ecovadis评估失败的风险
  • sortablejs + antd-menu 拖拽出重复菜单
  • 【个人理解】MCP server和client二者各自的角色以及发挥的作用
  • 【TS入门笔记4---装饰器】
  • DPanel 一款更适合国人的 Docker 管理工具
  • linux 使用nginx部署vue、react项目
  • 结合大语言模型的机械臂抓取操作学习
  • Python 中支持函数式编程的 operator 与 functools 包
  • 第一节:Linux系统简介
  • Android显示学习笔记本
  • 打造即插即用的企业级云原生平台——KubeSphere 4.1 扩展组件在生产环境的价值全解
  • 解决跨域实现方案
  • 用vite动态导入vue的路由配置
  • 本地部署Qwen-7B实战 vLLM加速推理
  • 网络协议之为什么要分层
  • 论文分享 | 基于区块链和签名的去中心化跨域认证方案
  • 受限字符+环境变量RCE
  • vue3:v-model的原理示例
  • python练习:求数字的阶乘
  • 思科bsp社招面试