一 前言
MQTT的相关理论内容这里不做过多介绍,请看下面两篇文章:
Introduction · MQTT协议中文版
MQTT协议-CSDN博客
这篇文章只做代码实现,文章中使用MQTTnet作为MQTT开发的组件。
MQTT分为服务端和客户端,一个服务端对应多个客户端。其中服务端相当于是一台服务器,它是MQTT消息传输的枢纽,负责将MQTT客户端发送来的消息传递给另一个客户端;MQTT服务端还负责管理客户端,确保客户端之间的通讯顺畅,保证MQTT消息得以正确接收和准确投递。
MQTT客户端可以向服务端发布信息,也可以从服务端接受信息,我们把客户端向服务端发送消息的行为称为“发布”消息,客户端也可以“订阅”消息。
二 服务端
服务端可以不用自己开发,有几个常用的第三方服务端,比如EMQ,EMQ怎么使用的,可以查看官网:物联网实时消息引擎 | EMQ
这里不介绍第三方服务,这里具体介绍如何自己动手开发服务端。
1、添加MQTTnet引用
新建一个控制台应用程序,打开NuGet程序包,添加MQTTnet,版本选择3.0.13,选择版本这里要注意一下,不同的版本实现方式不同,下面的实现代码中,如果选择高版本,可能会有异常。
2、代码实现
不啰嗦,直接上代码
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System.Text;namespace ConsoleApp2
{internal class Program{static void Main(string[] args){MqttServerClass serverClass = new MqttServerClass();serverClass.StartMqttServer().Wait();Console.ReadLine();}}public static class Config{public static int Port { get; set; } = 1883;public static string UserName { get; set; } = "Username";public static string Password { get; set; } = "Password";}public class UserInstance{public string ClientId { get; set; }public string UserName { get; set; }public string Password { get; set; }}public class MqttServerClass{private IMqttServer mqttServer;private List<MqttApplicationMessage> messages = new List<MqttApplicationMessage>();public async Task StartMqttServer(){try{if (mqttServer == null){var optionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpoint().WithDefaultEndpointPort(Config.Port)//连接拦截器.WithConnectionValidator(c =>{//var flag = c.Username == Config.UserName && c.Password == Config.Password;//if (!flag)//{// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;// return;//}//设置代码为 Successc.ReasonCode = MqttConnectReasonCode.Success;//instances.Add(new UserInstance() //缓存到内存的List集合当中//{// ClientId = c.ClientId,// UserName = c.Username,// Password = c.Password//});})//订阅拦截器.WithSubscriptionInterceptor(c =>{if (c == null) return;c.AcceptSubscription = true;})//应用程序消息拦截器.WithApplicationMessageInterceptor(c =>{if (c == null) return;c.AcceptPublish = true;})//clean session是否生效.WithPersistentSessions();mqttServer = new MqttFactory().CreateMqttServer();//客户端断开连接拦截器//mqttServer.UseClientDisconnectedHandler(c =>//{// //var user = instances.FirstOrDefault(t => t.ClientId == c.ClientId);// //if (user != null)// //{// // instances.Remove(user);// //}//});//服务开始mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);//服务停止mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);//客户端连接mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);//客户端断开连接(此事件会覆盖拦截器)mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);//客户端订阅mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);//客户端取消订阅mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);//服务端收到消息mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServerApplicationMessageReceived);await mqttServer.StartAsync(optionsBuilder.Build());//主动发送消息到客户端//await mqttServer.PublishAsync(new// MqttApplicationMessage//{// Topic = "testtopic",// Payload = Encoding.UTF8.GetBytes("dsdsd")//});//mqttServer.GetClientStatusAsync();//mqttServer.GetRetainedApplicationMessagesAsync();//mqttServer.GetSessionStatusAsync();}}catch (Exception ex){Console.WriteLine($"MQTT Server start fail.>{ex.Message}");}}private void OnMqttServerStarted(EventArgs e){if (mqttServer.IsStarted){Console.WriteLine("MQTT服务启动完成!");}}private void OnMqttServerStopped(EventArgs e){if (!mqttServer.IsStarted){Console.WriteLine("MQTT服务停止完成!");}}private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e){Console.WriteLine($"客户端[{e.ClientId}]已连接");}private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e){Console.WriteLine($"客户端[{e.ClientId}]已断开连接!");}private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e){Console.WriteLine($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter}]!");}private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e){Console.WriteLine($"客户端[{e.ClientId}]已成功取消订阅主题[{e.TopicFilter}]!");}private void OnMqttServerApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e){messages.Add(e.ApplicationMessage);Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));Console.WriteLine($"客户端[{e.ClientId}]>> Topic[{e.ApplicationMessage.Topic}] Payload[{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[] { })}] Qos[{e.ApplicationMessage.QualityOfServiceLevel}] Retain[{e.ApplicationMessage.Retain}]");}}}
三 客户端
客户端可以做成一个独立的项目,如果有什么地方需要调用比如发送消息的方法,可以直接引用MQTT服务直接进行调用,当然这是其中一个思路,我目前把MQTT服务放在了Application项目中,具体的实现思路就不细聊了,看官方文档吧,我这里直接上代码,主打一个拿来就用。
注意:
1、先启动mqtt服务端,在启动客户端,同时客户端配置文件appsetting.json中MqttHost配置要加上,节点MqttHost中无值,MQTT客户端不启用,目前主题是固定的testTopic
2、项目即可发送消息,又可接收消息
先看项目结构
1、appsettings.json增加配置节点
HttpApi.Host项目中的appsettings.json增加MQTT相关配置节点
"MqttSettingsProvider": {"BrokerHostSettings": {"MqttHost": "", //localhost //服务端ip"MqttPort": 1883 //服务端端口},"ClientSettings": {"ClientId": "5eb020f043ba8930506acbdd2","UserName": "","Password": ""},"TopicName": "testTopic"
}
2、添加MQTTnet引用
在Application项目中通过NuGet包添加MQTTnet引用,版本与服务端保持一致3.0.13
3、代码示例
下面的代码示例不是按代码书写顺序来的,为了方便写文档,直接按文件顺序粘贴代码
Application→MqttServer→AspCoreMqttClientOptionBuilder.cs
using MQTTnet.Client.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class AspCoreMqttClientOptionBuilder : MqttClientOptionsBuilder{public IServiceProvider ServiceProvider { get; }public AspCoreMqttClientOptionBuilder(IServiceProvider serviceProvider){ServiceProvider = serviceProvider;}}
}
Application→MqttServer→BrokerHostSettings.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class BrokerHostSettings{public string MqttHost { get; set; }public int MqttPort { get; set; }}
}
Application→MqttServer→ClientSettings.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class ClientSettings{public string ClientId { get; set; }public string UserName { get; set; }public string Password { get; set; }}
}
Application→MqttServer→IMessageSendService.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public interface IMessageSendService{int Order { get; }Task SendMessage(MessageContext context);}
}
Application→MqttServer→IMqttClientService.cs
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Receiving;namespace WMSInterface.MqttServer
{public interface IMqttClientService : IHostedService, IMqttClientConnectedHandler, IMqttClientDisconnectedHandler, IMqttApplicationMessageReceivedHandler{Task Publish(string topicName, string message);}
}
Application→MqttServer→MessageContext.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class MessageContext{public string TopicName { get; set; }public string Title { get; set; }public string Content { get; set; }//public IList<UserModel> UserList { get; set; } = new List<UserModel>();public string[] Users { get; set; }public string ObjId { get; set; }}
}
Application→MqttServer→MqttClientService.cs
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Microsoft.Extensions.Options;
using MQTTnet.Server;
using System.Threading;
using Microsoft.Extensions.Hosting;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Receiving;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Client.Options;
using WMSInterface.Server;namespace WMSInterface.MqttServer
{public class MqttClientService : IMqttClientService, IHostedService, IMqttClientConnectedHandler, IMqttClientDisconnectedHandler, IMqttApplicationMessageReceivedHandler{private IMqttClient mqttClient;private IMqttClientOptions options;private readonly IServiceProvider _serviceProvider;public MqttClientService(IMqttClientOptions options, IServiceProvider serviceProvider){this.options = options;_serviceProvider = serviceProvider;mqttClient = new MqttFactory().CreateMqttClient();ConfigureMqttClient();}private void ConfigureMqttClient(){mqttClient.ConnectedHandler = this;mqttClient.DisconnectedHandler = this;mqttClient.ApplicationMessageReceivedHandler = this;}public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e){IEnumerable<IMqttMessageHandler> handlers = _serviceProvider.GetServices<IMqttMessageHandler>();foreach (IMqttMessageHandler handler in handlers){await handler.HandleMessage(e.ApplicationMessage.Topic, e.ApplicationMessage.Payload);}}public async Task Publish(string topicName, string message){string topic = topicName.Trim();string msg = message.Trim();if (string.IsNullOrEmpty(topic)){Console.Write("主题不能为空!");}else if (!mqttClient.IsConnected){Console.Write("MQTT客户端尚未连接!");}else{await MqttClientExtensions.PublishAsync(applicationMessage: new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(msg).WithAtMostOnceQoS().WithRetainFlag(value: false).Build(), client: mqttClient);}}/// <summary>/// 订阅连接成功事件/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs){await mqttClient.SubscribeAsync("testTopic");//...可订阅多个主题}/// <summary>/// 订阅断开连接事件/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>public async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs){await mqttClient.UnsubscribeAsync("testTopic");//尝试重新连接//await mqttClient.ConnectAsync(options);}public async Task StartAsync(CancellationToken cancellationToken){await mqttClient.ConnectAsync(options);if (!mqttClient.IsConnected){await mqttClient.ReconnectAsync();}}public async Task StopAsync(CancellationToken cancellationToken){if (cancellationToken.IsCancellationRequested){MqttClientDisconnectOptions disconnectOption = new MqttClientDisconnectOptions{ReasonCode = MqttClientDisconnectReason.NormalDisconnection,ReasonString = "NormalDiconnection"};await mqttClient.DisconnectAsync(disconnectOption, cancellationToken);}await mqttClient.DisconnectAsync();}}
}
Application→MqttServer→MqttClientServiceProvider.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class MqttClientServiceProvider{public readonly IMqttClientService MqttClientService;public MqttClientServiceProvider(IMqttClientService mqttClientService){MqttClientService = mqttClientService;}}
}
Application→MqttServer→MqttMessageService.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class MqttMessageService : IMessageSendService{private string _topicName = string.Empty;private readonly IMqttClientService _mqttClientService;private readonly ILogger<MqttMessageService> _logger;private readonly IConfiguration _configuration;public int Order => 0;public MqttMessageService(MqttClientServiceProvider mqttClientServiceProvider, ILogger<MqttMessageService> logger, IConfiguration configuration){_mqttClientService = mqttClientServiceProvider.MqttClientService;_logger = logger;_configuration = configuration;_topicName = configuration["MqttSettingsProvider:TopicName"];}public Task SendMessage(MessageContext context){try{if (!string.IsNullOrEmpty(context.Content)){var content = new{content = context.Content,//users = context.UserList.Select((UserModel m) => m.Id.ToString()).ToList()};_mqttClientService.Publish(_topicName, JsonConvert.SerializeObject(content));}}catch (Exception e){_logger.LogError(e, "MQTT发送消息错误。");}return Task.CompletedTask;}}
}
Application→MqttServer→MqttServerModule.cs
注意:这里跟配置文件appsettings.json中的节点MqttHost有关联,MqttHost为空,不启动MQTT客户端服务,MqttHost不为空,会客户端会连接服务端。
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Modularity;namespace WMSInterface.MqttServer
{public class MqttServerModule : AbpModule{public override void ConfigureServices(ServiceConfigurationContext context){IConfiguration configuration = context.Services.GetConfiguration();MqttSettingsProvider mqttSettingsProvider = configuration.GetSection("MqttSettingsProvider").Get<MqttSettingsProvider>();if (!string.IsNullOrEmpty(mqttSettingsProvider.BrokerHostSettings?.MqttHost)){context.Services.AddMqttClientHostedService(mqttSettingsProvider);context.Services.TryAddEnumerable(ServiceDescriptor.Transient<IMessageSendService, MqttMessageService>());}}}
}
Application→MqttServer→MqttServiceCollectionExtension.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public static class MqttServiceCollectionExtension{public static IServiceCollection AddMqttClientHostedService(this IServiceCollection services, MqttSettingsProvider mqttSettingsProvider){AddMqttClientServiceWithConfig(services, delegate (AspCoreMqttClientOptionBuilder aspOptionBuilder){IConfiguration configuration = services.GetConfiguration();aspOptionBuilder.WithCredentials(mqttSettingsProvider.ClientSettings.UserName, mqttSettingsProvider.ClientSettings.Password).WithClientId(mqttSettingsProvider.ClientSettings.ClientId).WithTcpServer(mqttSettingsProvider.BrokerHostSettings.MqttHost);});return services;}private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttClientOptionBuilder> configure){services.AddSingleton(delegate (IServiceProvider serviceProvider){AspCoreMqttClientOptionBuilder aspCoreMqttClientOptionBuilder = new AspCoreMqttClientOptionBuilder(serviceProvider);configure(aspCoreMqttClientOptionBuilder);return aspCoreMqttClientOptionBuilder.Build();});services.AddSingleton<MqttClientService>();services.AddSingleton((Func<IServiceProvider, IHostedService>)((IServiceProvider serviceProvider) => serviceProvider.GetService<MqttClientService>()));services.AddSingleton(delegate (IServiceProvider serviceProvider){MqttClientService service = serviceProvider.GetService<MqttClientService>();return new MqttClientServiceProvider(service);});return services;}}
}
Application→MqttServer→MqttSettingsProvider.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.MqttServer
{public class MqttSettingsProvider{public BrokerHostSettings BrokerHostSettings { get; set; }public ClientSettings ClientSettings { get; set; }}
}
Application→Server→IMqttMessageHandler.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace WMSInterface.Server
{public interface IMqttMessageHandler{Task HandleMessage(string topic, byte[] data);}
}
Application→Server→MqttMessageHandler.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;namespace WMSInterface.Server
{public class MqttMessageHandler: IMqttMessageHandler,ITransientDependency{public async Task HandleMessage(string topic, byte[] data){if (!string.IsNullOrEmpty(topic) || data != null){Console.WriteLine($"接收到的主题:{topic},消息:{Encoding.UTF8.GetString(data)}");}}}
}
xx.HttpApi.Host→xxHttpApiHostModule.cs
typeof(MqttServerModule)
MQTT客户端发送消息代码示例
xx.HttpApi→Controllers→MqttController.cs
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.AspNetCore.Mvc;
using WMSInterface.MqttServer;namespace WMSInterface.Controllers
{/// <summary>/// mqtt/// </summary>[ApiController][Route("api/[controller]/[action]")]public class MqttController : AbpController{private readonly IConfiguration _configuration;private readonly IMessageSendService _messageSendService;public MqttController(IConfiguration configuration, IMessageSendService messageSendService){_configuration = configuration;_messageSendService = messageSendService;}#region 发送消息/// <summary>/// 发送消息/// </summary>/// <param name="body"></param>/// <returns></returns>[HttpPost]public async Task<IActionResult> SendAsync(MessageContext body){await _messageSendService.SendMessage(body);return Ok("ok");}#endregion}
}
四 MQTT收发消息测试
下载MQTTX工具进行MQTT消息的测试,使用方法就不具体介绍了,基本上是拿来就用
MQTTX:全功能 MQTT 客户端工具
发送消息入口
接收消息示例
五 结尾
本文章不做MQTT科普,默认具有一定的MQTT认知,主要目的是让大家可以直接在abp框架中快速集成MQTT