zoukankan      html  css  js  c++  java
  • .NET Core MQTT DEMO

    什么是MQTT

      MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。简单来说HQTT是一种通信协议,要实现发布/订阅就必须遵循这个协议。

    二、实现MQTT通讯协议.NET开源库有哪些?

      MQTTnet、MqttDotNet、nMQTT、M2MQTT等,这里我们使用MQTTnet(但MQTTnet搜到的教程基本都是2.7及以下版本的,我们使用的是3.0.9版本)
      官网项目网址:https://github.com/chkr1011/MQTTnet

    三、展示MQTT实现效果图

      

      例:客户端1只要订阅了positon主题,客户端2、客户端3、客户端4.....同样订阅了position主题则他们之间就能共享position主题的所发的内容了。

      如果客户端1订阅了position主题,客户端2订阅了beautiful主题,1发给消息2是收不到的。

    四、创建.NETCore项目(Server和Client)

      

    五、服务器

      添加Nuget包:安装MQTTnet

      

    class Program { public static IMqttServer mqttServer; static void Main(string[] args) { StartMqttServer(); } //启动Mqtt服务器 private static async void StartMqttServer() { try { //验证客户端信息 var options = new MqttServerOptions { //连接验证 ConnectionValidator = new MqttServerConnectionValidatorDelegate(p => { if (p.ClientId == "SpecialClient") { if (p.Username != "USER" || p.Password != "PASS") { p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; } } }) }; //设置端口号 options.DefaultEndpointOptions.Port = 8031; //创建Mqtt服务器 mqttServer = new MqttFactory().CreateMqttServer(); //开启订阅事件 mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic); //取消订阅事件 mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic); //客户端消息事件 mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceived); //客户端连接事件 mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected); //客户端断开事件 mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected); //启动服务器 await mqttServer.StartAsync(options); Console.WriteLine("服务器启动成功!输入任意内容并回车停止服务!"); Console.ReadLine(); await mqttServer.StopAsync(); } catch (Exception e) { Console.Write($"服务器启动失败 Msg:{e}"); } } /// <summary> /// 客户订阅 /// </summary> private static void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) { //客户端Id var ClientId = e.ClientId; var Topic = e.TopicFilter.Topic; Console.WriteLine($"客户端[{ClientId}]已订阅主题:{Topic}"); } /// <summary> /// 客户取消订阅 /// </summary> private static void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) { //客户端Id var ClientId = e.ClientId; var Topic = e.TopicFilter; Console.WriteLine($"客户端[{ClientId}]已取消订阅主题:{Topic}"); } /// <summary> /// 接收消息 /// </summary> private static void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { var ClientId = e.ClientId; var Topic = e.ApplicationMessage.Topic; var Payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); var Qos = e.ApplicationMessage.QualityOfServiceLevel; var Retain = e.ApplicationMessage.Retain; Console.WriteLine($"客户端[{ClientId}]>> 主题:[{Topic}] 负载:[{Payload}] Qos:[{Qos}] 保留:[{Retain}]"); } /// <summary> /// 客户连接 /// </summary> private static void MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e) { var ClientId = e.ClientId; Console.WriteLine($"客户端[{ClientId}]已连接"); } /// <summary> /// 客户连接断开 /// </summary> private static void MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e) { var ClientId = e.ClientId; Console.WriteLine($"客户端[{ClientId}]已断开连接"); } }

    六、客户端

      也要添加Nuget包:安装MQTTnet

    public static IMqttClient mqttClient; static void Main(string[] args) { ConnectMqttServerAsync(); ImportData(); } private static async void ConnectMqttServerAsync() { try { var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1", 8031) .WithCredentials("test", "test") .WithClientId(Guid.NewGuid().ToString().Substring(0, 5)) .Build(); //消息 mqttClient.UseApplicationMessageReceivedHandler(e => { Console.WriteLine("### 收到的信息 ###"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");//主题 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");//页面信息 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");//消息等级 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");//是否保留 Console.WriteLine(); }); //重连机制 mqttClient.UseDisconnectedHandler(async e => { Console.WriteLine("与服务器断开连接!"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch (Exception exp) { Console.Write($"重新连接服务器失败 Msg:{exp}"); } }); await mqttClient.ConnectAsync(options); Console.Write("连接服务器成功!输入任意内容并回车进入菜单页面!"); } catch (Exception exp) { Console.Write($"连接服务器失败 Msg:{exp}"); } } private static void ImportData() { Console.ReadLine(); bool isExit = false; while (!isExit) { Console.WriteLine(@"请输入 1.订阅主题 2.取消订阅 3.发送消息 4.退出"); var input = Console.ReadLine(); switch (input) { case "1": Console.WriteLine(@"请输入主题名称:"); var topicName = Console.ReadLine(); Subscribe(topicName); break; case "2": Console.WriteLine(@"请输入需要取消订阅主题名称:"); topicName = Console.ReadLine(); Unsubscribe(topicName); break; case "3": Console.WriteLine("请输入需要发送的主题名称"); topicName = Console.ReadLine(); Console.WriteLine("请输入需要发送的消息"); var message = Console.ReadLine(); Publish(topicName, message); break; case "4": isExit = true; break; default: Console.WriteLine("请输入正确指令!"); break; } } } /// <summary> /// 订阅 /// </summary> /// <param name="topicName"></param> private static async void Subscribe(string topicName) { string topic = topicName.Trim(); if (string.IsNullOrEmpty(topic)) { Console.Write("订阅主题不能为空!"); return; } if (!mqttClient.IsConnected) { Console.Write("MQTT客户端尚未连接!"); return; } await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); } /// <summary> /// 取消订阅 /// </summary> /// <param name="topicName"></param> private static async void Unsubscribe(string topicName) { string topic = topicName.Trim(); if (string.IsNullOrEmpty(topic)) { Console.Write("订阅主题不能为空!"); return; } if (!mqttClient.IsConnected) { Console.Write("MQTT客户端尚未连接!"); return; } await mqttClient.UnsubscribeAsync(topic); } /// <summary> /// 发送消息 /// </summary> /// <param name="message"></param> private static async void Publish(string topicName, string message) { string topic = topicName.Trim(); string msg = message.Trim(); if (string.IsNullOrEmpty(topic)) { Console.Write("主题不能为空!"); return; } if (!mqttClient.IsConnected) { Console.Write("MQTT客户端尚未连接!"); return; } var MessageBuilder = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(msg) .WithExactlyOnceQoS() .WithRetainFlag() .Build(); await mqttClient.PublishAsync(MessageBuilder); }

      源代码:

      链接:https://pan.baidu.com/s/1rxTBZHHAmkDVcO6XmJPXng
      提取码:05qr

  • 相关阅读:
    Java面向对象
    JBCD技术
    初识数据库(其他数据库对象)
    初识数据库(TCL语句)
    初识数据库(分组函数)
    初识数据库(函数)
    初识数据库(数据类型)
    Java中的IO流
    Java中的线程
    Java中的集合
  • 原文地址:https://www.cnblogs.com/lhxsoft/p/14266161.html
Copyright © 2011-2022 走看看