MQTT Client源码分析

MQTT Client源码分析

目录

  • MQTT Client源码分析
    • 1. mqttclient架构
      • 1.1 API
      • 1.2 mqtt_client_t结构体
      • 1.3 mqtt_yield_thread内部线程
      • 1.4 keepalive
      • 1.5 ack链表
    • 2. mqttclient流程
      • 2.1 MQTT CONNECT
      • 2.2 MQTT SUBSCRIBE
      • 2.3 MQTT PUBLISH
      • 2.4 接收服务器PUBLISH消息

之前基于杰杰的mqttclient代码和韦东山老师的教程,把MQTTClient程序移植到STM32F103开发板,F103的开发板串口连接ESP8266模组实现终端连接到MQTT服务器的功能,仅仅是对着韦老师的教程移植和使用杰杰的mqttclient代码,简单的将mqttclient\platform\FreeRTOS\platform_net_socket.c文件中的接口绑定到ESP8266的TCP AT命令,使用ESP8266的Socket,对于杰杰的mqttclient代码并没有深入分析和理解。

1. mqttclient架构

如下图:

在这里插入图片描述

1.1 API

mqttclient的API接口:

int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);

1.2 mqtt_client_t结构体

typedef struct mqtt_client {char                        *mqtt_client_id; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文char                        *mqtt_user_name; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文char                        *mqtt_password; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文char                        *mqtt_host; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文char                        *mqtt_port; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文char                        *mqtt_ca; //TLS才会用到,暂时不分析void                        *mqtt_reconnect_data; //MQTT需要重连服务器时用到uint8_t                     *mqtt_read_buf; //读数据缓冲区uint8_t                     *mqtt_write_buf; //写数据缓冲区uint16_t                    mqtt_keep_alive_interval; //MQTT保活超时时间uint16_t                    mqtt_packet_id; //报文标识符uint32_t                    mqtt_will_flag          : 1; //遗嘱标志uint32_t                    mqtt_clean_session      : 1; //清理会话标志uint32_t                    mqtt_ping_outstanding   : 2; //PINGREQ后是否正在等待PINGRESP标志uint32_t                    mqtt_version            : 4; //MQTT协议版本uint32_t                    mqtt_ack_handler_number : 24; //用于QOS1和QOS2中ACK记录uint32_t                    mqtt_cmd_timeout; //命令超时时间(主要是读写阻塞时间、等待响应的时间、重连等待时间)uint32_t                    mqtt_read_buf_size; //读数据缓冲区大小uint32_t                    mqtt_write_buf_size; //写数据缓冲区大小uint32_t                    mqtt_reconnect_try_duration; //客户端在尝试重新连接到MQTT服务器时所允许的最大尝试时间size_t                      mqtt_client_id_len; //clientID最大长度size_t                      mqtt_user_name_len; //userName最大长度size_t                      mqtt_password_len; //password最大长度mqtt_will_options_t         *mqtt_will_options; //遗嘱消息配置client_state_t              mqtt_client_state; //客户端状态(INVALID、INITIALIZED、CONNECTED、DISCONNECTED、CLEAN_SESSION)platform_mutex_t            mqtt_write_lock; //写数据锁platform_mutex_t            mqtt_global_lock; //全局锁,比如转换客户端状态时需要先锁mqtt_list_t                 mqtt_msg_handler_list; //所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息),mqtt协议必须实现的内容mqtt_list_t                 mqtt_ack_handler_list; //所有等待响应的报文都会被挂载到这个链表上,异步实现的核心network_t                   *mqtt_network; //网卡接口,保存网络相关信息(host、port、socket)platform_thread_t           *mqtt_thread; //内部线程mqtt_yield_thread,所有来自服务器的mqtt包都会在这里被处理platform_timer_t            mqtt_reconnect_timer; //掉线重连定时器platform_timer_t            mqtt_last_sent; //用于保活定时器platform_timer_t            mqtt_last_received; //保活定时器reconnect_handler_t         mqtt_reconnect_handler; //mqtt重连处理interceptor_handler_t       mqtt_interceptor_handler; //publish数据时调用,个人理解是要发送的数据与client绑定
} mqtt_client_t;

1.3 mqtt_yield_thread内部线程

mqtt_yield_thread线程中主要执行mqtt_yield(c, c->mqtt_cmd_timeout)函数

在这里插入图片描述

mqtt_yield中主要执行mqtt_packet_handle(c, &timer)处理MQTT接收到的消息

在这里插入图片描述

static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{int rc = MQTT_SUCCESS_ERROR;int packet_type = 0;rc = mqtt_read_packet(c, &packet_type, timer);//printf("read packet rc = %d, packet_type = %d, g_remain_len = %d\r\n", rc, packet_type, g_remain_len);switch (packet_type) {case 0: /* timed out reading packet */break;case CONNACK: /* has been processed */goto exit;case PUBACK:case PUBCOMP:rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);break;case SUBACK:rc = mqtt_suback_packet_handle(c, timer);break;case UNSUBACK:rc = mqtt_unsuback_packet_handle(c, timer);break;case PUBLISH:rc = mqtt_publish_packet_handle(c, timer);break;case PUBREC:case PUBREL:rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);break;case PINGRESP:c->mqtt_ping_outstanding = 0;    /* keep alive ping success */break;default:goto exit;}rc = mqtt_keep_alive(c);	/* Keep the treatment alive */exit:if (rc == MQTT_SUCCESS_ERROR)rc = packet_type;RETURN_ERROR(rc);
}

1.4 keepalive

当第一次发生超时时,会在mqtt_keep_alive中序列还一个心跳包发送给服务器,并将mqtt_ping_outstanding加1,当第二次超时时会设置client状态为CLIENT_STATE_DISCONNECTED尝试重连,若重连成功后需要重新订阅

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1.5 ack链表

需要等待服务器应答消息时会加入ack链表(参考2.2中SUBACK),每次接收到服务器消息时,会对ack链表进行扫描,超时后会销毁链表节点,如果是PUBACK、PUBREC、PUBREL、PUBCOMP则需要重发

在这里插入图片描述

在这里插入图片描述

2. mqttclient流程

2.1 MQTT CONNECT

  • 用户初始化mqtt_client_t参数后,调用int mqtt_connect(mqtt_client_t* c);连接MQTT服务器

在这里插入图片描述

  • 调用到底层rc = network_connect(c->mqtt_network);连接MQTT服务器

在这里插入图片描述

  • 最终调用到平台层的n->socket = platform_net_socket_connect(n->host, n->port, PLATFORM_NET_PROTO_TCP);发送连接请求,其中实现了与ESP8266 TCP AT SOCKET绑定

在这里插入图片描述

  • 之后使用MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)序列化mqtt的CONNECT报文,并使用mqtt_send_packet(c, len, &connect_timer)发送出去

在这里插入图片描述

  • mqtt_send_packet也是调用网络接口network_write(c->mqtt_network, &c->mqtt_write_buf[sent], length, platform_timer_remain(timer)最终调用平台接口platform_net_socket_write_timeout(n->socket, write_buf, len, timeout)

在这里插入图片描述

在这里插入图片描述

  • 发送完毕后在mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK)等待服务器回复的CONNACK报文

在这里插入图片描述

  • 连接服务器成功后创建一个MQTT内部线程platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)并启动,所有来自服务器的mqtt包都会在这里被处理

在这里插入图片描述

  • 当需要进行重连时,不需要重新创建MQTT内部线程,只需要改变MQTT Client的状态即可

在这里插入图片描述

在这里插入图片描述

2.2 MQTT SUBSCRIBE

  • 连接MQTT服务器后,用户可以直接调用mqtt_subscribe(client, "home", QOS0, smarthome_msg_handler);订阅主题

在这里插入图片描述

  • mqtt_subscribe会调用MQTTSerialize_subscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic, (int*)&qos)序列化订阅报文,并调用mqtt_send_packet(c, len, &timer)发送订阅消息

在这里插入图片描述

  • 然后使用mqtt_msg_handler_create(topic_filter, qos, handler)创建对应的消息处理节点,这个消息节点在收到服务器的SUBACK订阅应答报文后会挂在到消息处理列表msg_handler上

在这里插入图片描述

  • mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler)中记录等待服务器响应的SUBACK

在这里插入图片描述

  • 收到服务器响应的SUBACK回复后,在mqtt_suback_packet_handle(c, timer)中会取消ack_list里的SUBACK记录,并安装到msg_handler_list

在这里插入图片描述

在这里插入图片描述

MQTT UNSUBSCRIBE的流程与MQTT SUBSCRIBE的流程差不多。

2.3 MQTT PUBLISH

  • 连接服务器后,用户可以直接调用mqtt_publish(client, "home", &msg)向某主题发布消息

在这里插入图片描述

  • 与订阅流程类似,先使用MQTTSerialize_publish(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, msg->qos, msg->retained, msg->id, topic, (uint8_t*)msg->payload, msg->payloadlen)序列化发布报文,在使用mqtt_send_packet(c, len, &timer)并发布到服务器

在这里插入图片描述

  • 对于QOS1或QOS2需要将PUBACK或PUBREC加入到ack_list中,与SUBACK类似,并提前设置了DUP重发标志位

在这里插入图片描述

在这里插入图片描述

2.4 接收服务器PUBLISH消息

  • 服务器发送的PUBLISH消息会在client的内部线程mqtt_yield_thread中的mqtt_packet_handle中处理

在这里插入图片描述

  • 先对收到的消息进行反序列化,QOS1和QOS2类型需要回复ACK,让后处理收到的消息,注意QOS还需要先等待服务器的PUBREL ACK后再处理接收的消息

在这里插入图片描述

  • mqtt_get_msg_handler(c, topic_name)中获取当前主题的处理函数,并将接收的数据在数据处理函数msg_handler中处理,该处理函数在订阅主题时定义

在这里插入图片描述

在这里插入图片描述

以上是杰杰mqttclient代码分析,后期如果能够有更深的认识再继续补充,欢迎各位大佬补充和指正!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1523360.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等

点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…

手机怎么把wmv转换成mp4格式?视频格式这样做,让你的视频更加通用

“我最近想在手机上编辑视频,但遇到一个问题,就是我有一些wmv格式的视频,想把它们转换成mp4格式,好把它们发布到平台上。但是我不会转格式。请问手机怎么把wmv转换成mp4格式呢?请大家帮帮我。” 格式转换对于没怎么接…

JAVA 二维码生成

1.pom依赖 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.3.3</version></dependency><dependency><groupId>com.google.zxing</groupId><artifactId>ja…

四川财谷通抖音小店创新引领新风尚

在数字化浪潮的推动下&#xff0c;电商行业蓬勃发展&#xff0c;抖音小店作为新兴的电商平台&#xff0c;凭借其独特的社交属性和便捷的购物体验&#xff0c;迅速赢得了广大消费者的青睐。在众多抖音小店中&#xff0c;四川财谷通抖音小店以其精准定位、高质量内容、一站式服务…

智慧公厕:城市公共卫生管理的新篇章‌@卓振思众

在快节奏的现代生活中&#xff0c;公共厕所作为城市基础设施的重要组成部分&#xff0c;其使用体验和管理效率直接影响着市民的生活质量与城市形象。随着科技的飞速发展&#xff0c;智慧公厕应运而生&#xff0c;它以一种全新的姿态&#xff0c;为城市公共卫生管理带来了前所未…

鸿蒙Harmony--状态变量更改通知--@Watch装饰器详解

风雨飘摇中&#xff0c;我心起伏&#xff0c; 万丈雄心&#xff0c;却难以施展。 天高地远&#xff0c;路途迷茫&#xff0c; 理想如星&#xff0c;却遥不可及。 千百次跌倒&#xff0c;千百次爬起&#xff0c; 在命运的手掌中&#xff0c;挣扎前行。 谁知我心中的热血滚烫&…

向 ADC 模型和 DAC 建模添加低通滤波器

与单音测试信号相比&#xff0c;双音测试信号可提供更多有关 ADC 性能的信息。您的作者的模型与特定 ADC 的制造商模型非常匹配&#xff0c;因此可以方便地运行误码率模拟。该 ADC 恰好具有非常宽的输入带宽。 对于带宽较低的 ADC&#xff0c;添加如图 1 所示的低通滤波器将提…

用亚马逊AI代码开发助手Amazon Q Developer开发小游戏(中篇)

快用人工智能帮程序员写代码、开发游戏&#xff01;今天小李哥就来介绍亚马逊推出的国际前沿人工智能AI代码开发助手Amazon Q Developer。目前该代码助手在Hugging Face代码生成权威测试集SWE-bench中排名第一&#xff0c;可以根据我们的需求生成整个代码项目&#xff0c;并可以…

IDEA莫名奇妙自动选择光标所在行 -罪魁祸首居然是钉钉

请看受害者视角 作为开发者&#xff0c;工作时基本都会运行钉钉吧。最近&#xff0c;钉钉更新了AI功能&#xff0c;但不知道是不是开发团队平时不使用IDE&#xff0c;竟然让这个AI功能影响到了其他软件&#xff0c;简直让人无语。不仅仅是IDEA受影响&#xff0c;就连WebStorm也…

QQ聊天记录删除了怎么恢复?学会这3个方法,简单又有效

QQ作为我们日常沟通的重要工具之一&#xff0c;其聊天记录往往承载着许多珍贵的记忆和重要的信息。但在操作中我们会不小心删除或丢失这些聊天记录&#xff0c;那么QQ聊天记录删除了怎么恢复就成为我们急切需要解决的问题。先别急&#xff0c;本文就为你介绍3种简单又有效的QQ聊…

【Qt笔记】QListWidget控件详解

目录 引言 一、基本概念和特性 二、基本用法 2.1 创建和初始化 2.2 添加和删除项 2.3 选择和遍历项 三、信号与槽 3.1 itemClicked 3.2 itemDoubleClicked 3.3 itemSelectionChanged 四、自定义项 五、排序和查找 六、代码示例 6.1 头文件 6.2 源文件 6.3 主…

腾讯云TRTC无UI集成——分享屏幕主流、辅流(Vue2+JS+TRTC无UI集成)

先阐述一下问题&#xff0c;在项目中用到腾讯云的TRTC&#xff0c;A端发布A1、A2两个视频源&#xff0c;在B端订阅A1、A2使用两个view进行播放渲染 问题主流视频源和辅流视频源渲染在同一view上&#xff0c;控制台报错 // 播放远端视频 TRTCService.js; setRemoteVideo(view)…

【数据结构入门】排序算法之插入排序与选择排序

目录 前言 一、排序的概念及运用 1.排序的概念 2.排序的运用 3.常见排序算法 二、插入排序与选择排序 2.1插入排序 2.1.1直接插入排序 1&#xff09;基本思想 2&#xff09;具体步骤 3&#xff09;算法特性 4&#xff09;算法实现 2.1.2希尔排序 1) 基本思想 2&…

草原灭火车的功能与性能_鼎跃安全

在内蒙古的草原火灾中&#xff0c;水陆两栖全地形草原灭火车曾多次用于紧急救援。其强大的越野能力和高速反应&#xff0c;使其在广袤的草原上能够迅速到达火场&#xff0c;并使用集成的多功能灭火设备进行灭火作业&#xff0c;有效防止了火灾的进一步蔓延。 水陆两栖全地形草原…

React学习-hooks

官方文档&#xff1a;https://zh-hans.react.dev/reference/react/useActionState 1.useEffect(setup, dependencies?) 1.1 基础使用 //hooks import { useEffect } from "react"; import "./App.css";function App(){useEffect(()>{console.log(us…

redis的共享session应用

项目背景&#xff1a; 该项目背景就是黑马的黑马点评项目。 一&#xff1a;基于Session实现验证码登录流程 基本的登录流程我们做了很多了。这个是短信登录流程 其实和普通的登录流程就多了一个生成验证码&#xff0c;并将验证码保存在session中&#xff0c;并且呢&#xf…

vue3中使用supermap icilent3d for cesium

记录从头开始学习supermap icilent3d fro cesium 1.新建vue3项目 npm create vitelatest 添加这个&#xff0c;自动打开浏览器 2.使用supermap icilent3d for Cesium 复制这个Cesium&#xff0c;放到pulibc目录下面 然后分别引入css和js 然后就可以使用了&#xff0c;但是会…

Oracle 客户端 PL/SQL Developer 15.0.4 安装与使用

目录 官网下载与安装 切换中文与注册 连接Oracle数据库 tnsnames.ora 文件使用 Oracle 客户端 PL/SQL Developer 12.0.7 安装、数据导出、Oracle 执行/解释计划、for update。 官网下载与安装 1、官网&#xff1a;https://www.allroundautomations.com/products/pl-sql-d…

【STM32】通用定时器TIM(输出比较)

本篇博客重点在于标准库函数的理解与使用&#xff0c;搭建一个框架便于快速开发 目录 前言 输出比较简介 PWM简介 输出比较配置 初始化IO口 输出比较初始化 输出比较代码 PWM.h PWM.c main.c 应用案例 前言 建议先阅读这篇博客&#xff0c;理解时基单元的配置 【…

CDGA|数据治理:构建高效数据管理体系的实践路径

在当今数字化时代&#xff0c;数据已成为企业最宝贵的资产之一&#xff0c;其质量、安全性和有效利用率直接影响着企业的决策能力、运营效率和市场竞争力。因此&#xff0c;数据治理作为确保数据质量、促进数据价值最大化的关键环节&#xff0c;其重要性日益凸显。本文将从几个…