zoukankan      html  css  js  c++  java
  • xamarin Mqtt

    1 什么是MQTT?

      mqtt (Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 OASIS 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 IoT 场景。

    2 MQTTnet

      MQTTnet 是一个基于MQTT协议高度专业的.net库,它同时提供MQTT client和MQTT server(broke),支持v3.1.0,v3.1.1和v5.0.0的标准MQTT协议.

    3 MQTTnet支持范围

      .Net Standard 1.3+

      .Net Core 1.1+

      .Net Core App 1.1+

      .Net Framework 4.5.2+(x86,x64,AnyCPU)

      Mono 5.2+

      Universal Windows Platform(UWP) 10.0.1024+(x86,x64,ARM,AnyCPU,Windwos 10 IoT Core)

      Xamarin.Android 7.5+

      Xamarin.iOS 10.14+

    4 创建服务器

      MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

      (1)接受来自客户的网络连接;

      (2)接受客户发布的应用信息;

      (3)处理来自客户端的订阅和退订请求;

      (4)向订阅的客户转发应用程序消息。

      服务器创建一个控制台应用,可选>>控制台应用(.NET Core)创建新项目MqttNetServer,代码如下:

      

      1 sing MQTTnet;
      2 using MQTTnet.Protocol;
      3 using MQTTnet.Server;
      4 using Newtonsoft.Json;
      5 using System;
      6 using System.Collections.Generic;
      7 using System.IO;
      8 using System.Reflection;
      9 using System.Security.Cryptography.X509Certificates;
     10 using System.Text;
     11 using System.Threading.Tasks;
     12 
     13 namespace MqttServerTest
     14 {
     15     class Program
     16     {
     17         public static IMqttServer mqttServer;
     18         static void Main(string[] args)
     19         {
     20             mqttServer = new MQTTnet.MqttFactory().CreateMqttServer();
     21             mqttServer.UseClientConnectedHandler(e =>
     22             {
     23                 Console.WriteLine("***new connect:" + e.ClientId);
     24                 
     25             });
     26             mqttServer.UseClientDisconnectedHandler(e =>
     27             {
     28                 Console.WriteLine("*** disconnect:" + e.ClientId);
     29             });
     30 
     31             //var options = new MqttServerOptions();
     32             //await mqttServer.StartAsync(options);
     33 
     34             //var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
     35             //var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"), "yourPassword", X509KeyStorageFlags.Exportable);
     36 
     37 
     38             var optionsBuilder = new MqttServerOptionsBuilder()
     39                 .WithConnectionBacklog(100)
     40                 .WithDefaultEndpointPort(1884)
     41                 .WithConnectionValidator(c=> {
     42                     //c.SessionItems.
     43                     //if (c.ClientId.Length < 10)
     44                     //{
     45                     //    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
     46                     //    //c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
     47                     //    return;
     48                     //}
     49                     //if (c.Username != "mySecretUser")
     50                     //{
     51                     //    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
     52                     //    return;
     53                     //}
     54 
     55                     //if (c.Password != "mySecretPassword")
     56                     //{
     57                     //    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
     58                     //    return;
     59                     //}
     60 
     61                     c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
     62                     Console.WriteLine("***connect validator:"+c.ClientId);
     63                 })
     64                 //.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
     65                 //.WithEncryptionSslProtocol(SslProtocols.Tls12)
     66                 .WithApplicationMessageInterceptor(context=> {
     67                     //if (context.ApplicationMessage.Topic == "my/custom/topic")
     68                     //{
     69                     //    context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
     70                     //}
     71                     //// It is possible to disallow the sending of messages for a certain client id like this:
     72                     //if (context.ClientId != "Someone")
     73                     //{
     74                     //    context.AcceptPublish = false;
     75                     //    return;
     76                     //}
     77                     // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
     78                     // This is useful when the IoT device has no own clock and the creation time of the message might be important.
     79 
     80                     context.AcceptPublish = true;
     81                     Console.WriteLine("***Message:" + context.ApplicationMessage.Payload);
     82                 })
     83                 .WithSubscriptionInterceptor(context=>
     84                 {
     85                     //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
     86                     //{
     87                     //    context.AcceptSubscription = false;
     88                     //}
     89 
     90                     //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
     91                     //{
     92                     //    context.AcceptSubscription = false;
     93                     //    context.CloseConnection = true;
     94                     //}
     95 
     96                     context.AcceptSubscription = true;
     97                     Console.WriteLine("***Subscript:" + context.TopicFilter);
     98                 })
     99                 //.WithStorage(new RetainedMessageHandler())
    100                 ;
    101             var options = optionsBuilder.Build();
    102 
    103             //// Setting the options
    104             //options.Storage=new RetainedMessageHandler();
    105 
    106             StartServer(options);
    107 
    108             
    109 
    110             Console.WriteLine("Press any key to exit.");
    111             Console.ReadLine();
    112 
    113             //await mqttServer.StopAsync();
    114         }
    115 
    116         public static async void StartServer(IMqttServerOptions options)
    117         {
    118             await mqttServer.StartAsync(options);
    119         }
    120 
    121     }
    122 
    123     // The implementation of the storage:
    124     // This code uses the JSON library "Newtonsoft.Json".
    125     public class RetainedMessageHandler : IMqttServerStorage
    126     {
    127         private const string Filename = "C:\MQTT\RetainedMessages.json";
    128 
    129         public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
    130         {
    131             File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
    132             return Task.FromResult(0);
    133         }
    134 
    135         public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
    136         {
    137             IList<MqttApplicationMessage> retainedMessages;
    138             if (File.Exists(Filename))
    139             {
    140                 var json = File.ReadAllText(Filename);
    141                 retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
    142             }
    143             else
    144             {
    145                 retainedMessages = new List<MqttApplicationMessage>();
    146             }
    147 
    148             return Task.FromResult(retainedMessages);
    149         }
    150     }
    151 
    152 }

    代码直接运行起来,就是一个简单的Mqtt server。

    5 创建xamarin APP

      一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

      (1)发布其他客户端可能会订阅的信息;

      (2)订阅其它客户端发布的消息;

      (3)退订或删除应用程序的消息;

      (4)断开与服务器连接。

      在VS中新建一个xamarin.Forms的移动应用,创建好后在Nuget上搜索mqttnet,添加对MQTTnet包的引用。更改代码如下:

      

     1 <?xml version="1.0" encoding="utf-8" ?>
     2 <ContentPage xmlns="http://xamarin.com/schemas/2014/forms"
     3              xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml"
     4              xmlns:local="clr-namespace:CatShell"
     5              x:Class="CatShell.MainPage">
     6 
     7     <StackLayout>
     8         <!-- Place new controls here -->
     9         <Label Text="SubscribeTopic"/>
    10         <Entry x:Name="txtSubTopic" Placeholder="Subscribe Topic" />
    11         <Button Text="BtnSubscribe" Clicked="SubButton_Clicked"/>
    12         <Entry x:Name="txtReceiveMessage"/>
    13         <Label Text="PublishTopic"/>
    14         <Entry x:Name="txtPubTopic"/>
    15         <Entry x:Name="txtSendMessage" />
    16         <Button Text="Publish" Clicked="PubButton_Clicked"/>
    17         <Editor>
    18             
    19         </Editor>
    20 
    21     </StackLayout>
    22 
    23 </ContentPage>
      1 using MQTTnet;
      2 using MQTTnet.Client;
      3 using MQTTnet.Client.Options;
      4 using System;
      5 using System.Collections.Generic;
      6 using System.Linq;
      7 using System.Text;
      8 using System.Threading;
      9 using System.Threading.Tasks;
     10 using Xamarin.Forms;
     11 
     12 namespace CatShell
     13 {
     14     public partial class MainPage : ContentPage
     15     {
     16         public IMqttClient mqttClient;
     17         public IMqttClientOptions options;
     18         public MainPage()
     19         {
     20             InitializeComponent();
     21             InitMqttClient();
     22             ConnectMqttServer();
     23         }
     24 
     25         public void InitMqttClient()
     26         {
     27             // Create a new MQTT client.
     28             var factory = new MqttFactory();
     29             mqttClient = factory.CreateMqttClient();
     30 
     31             mqttClient.UseConnectedHandler(e => {
     32 
     33                 Device.BeginInvokeOnMainThread(() =>
     34                 {
     35                     txtReceiveMessage.Text = txtReceiveMessage.Text + $">> connect success." + Environment.NewLine;
     36                 });
     37             });
     38             mqttClient.UseDisconnectedHandler(e =>
     39             {
     40                 Device.BeginInvokeOnMainThread(() =>
     41                 {
     42                     txtReceiveMessage.Text = txtReceiveMessage.Text + $">> Disconnect." + Environment.NewLine;
     43                 });
     44             });
     45             mqttClient.UseApplicationMessageReceivedHandler(e =>
     46             {
     47                 Device.BeginInvokeOnMainThread(() =>
     48                 {
     49                     txtReceiveMessage.Text = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}" + Environment.NewLine;
     50                 });
     51             });
     52 
     53             // Create TCP based options using the builder.
     54             options = new MqttClientOptionsBuilder()
     55                 .WithClientId("Client4")
     56                 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
     57                                                      //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
     58                                                      //.WithCredentials("bud", "%spencer%")
     59                                                      //.WithTls()
     60                                                      //.WithTls(new MqttClientOptionsBuilderTlsParameters
     61                                                      //{
     62                                                      //    UseTls = true,
     63                                                      //    CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
     64                                                      //    {
     65                                                      //        // TODO: Check conditions of certificate by using above parameters.
     66                                                      //        return true;
     67                                                      //    }
     68                                                      //})
     69                 .WithCleanSession()
     70                 .Build();
     71 
     72         }
     73 
     74         public async void ConnectMqttServer()
     75         {
     76             await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
     77 
     78         }
     79 
     80         private async void SubButton_Clicked(object sender, EventArgs e)
     81         {
     82             
     83             string topic = txtSubTopic.Text.Trim();
     84 
     85             if (string.IsNullOrEmpty(topic))
     86             {
     87                 //MessageBox.Show("订阅主题不能为空!");
     88                 return;
     89             }
     90 
     91             if (!mqttClient.IsConnected)
     92             {
     93                 //MessageBox.Show("MQTT客户端尚未连接!");
     94                 return;
     95             }
     96 
     97             // Subscribe to a topic
     98             await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
     99 
    100             txtReceiveMessage.Text = txtReceiveMessage.Text + $"已订阅[{topic}]主题" + Environment.NewLine;
    101             txtSubTopic.IsReadOnly = false;
    102             //BtnSubscribe.Enabled = false;
    103         }
    104 
    105         private async void PubButton_Clicked(object sender, EventArgs e)
    106         {
    107             string topic = txtPubTopic.Text.Trim();
    108 
    109             if (string.IsNullOrEmpty(topic))
    110             {
    111                 //MessageBox.Show("发布主题不能为空!");
    112                 return;
    113             }
    114 
    115             string inputString = txtSendMessage.Text.Trim();
    116 
    117             PublishMessages(topic, inputString);
    118         }
    119 
    120         public async void PublishMessages(string topicMsg, string payloadMsg)
    121         {
    122             var message = new MqttApplicationMessageBuilder()
    123                 .WithTopic(topicMsg)
    124                 .WithPayload(payloadMsg)
    125                 .WithExactlyOnceQoS()
    126                 .WithRetainFlag()
    127                 .Build();
    128 
    129             await mqttClient.PublishAsync(message);
    130         }
    131 
    132     }
    133 }

    代码运行起来,在APP上可以直接发信息。

    6 创建winForm client(可选)

      可以创建一个winForm来相互互动,在VS上新建一个windows窗体应用(.NET Framework),界面设计如下

      

      后台代码如下:

      

      1 using MQTTnet;
      2 using MQTTnet.Client.Options;
      3 using MQTTnet.Client;
      4 using System;
      5 using System.Collections.Generic;
      6 using System.ComponentModel;
      7 using System.Data;
      8 using System.Drawing;
      9 using System.Linq;
     10 using System.Net.Security;
     11 using System.Security.Cryptography.X509Certificates;
     12 using System.Text;
     13 using System.Threading;
     14 using System.Threading.Tasks;
     15 using System.Windows.Forms;
     16 
     17 namespace MqttClientWin
     18 {
     19     public partial class Form1 : Form
     20     {
     21         public IMqttClient mqttClient;
     22         public IMqttClientOptions options;
     23         public Form1()
     24         {
     25             InitializeComponent();
     26             InitMqttClient();
     27             ConnectMqttServer();
     28         }
     29 
     30         public void InitMqttClient()
     31         {
     32             // Create a new MQTT client.
     33             var factory = new MqttFactory();
     34             mqttClient = factory.CreateMqttClient();
     35 
     36             mqttClient.UseApplicationMessageReceivedHandler(e =>
     37             {
     38                 Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
     39                 Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
     40                 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
     41                 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
     42                 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
     43                 Console.WriteLine();
     44 
     45                 this.Invoke(new Action(() =>
     46                 {
     47                     txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
     48                 }));
     49 
     50                 //Task.Run(() => mqttClient.PublishAsync("hello/world"));
     51             });
     52 
     53             mqttClient.UseConnectedHandler(async e =>
     54             {
     55                 Console.WriteLine("### CONNECTED WITH SERVER ###");
     56 
     57                 //// Subscribe to a topic
     58                 //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
     59 
     60                 //Console.WriteLine("### SUBSCRIBED ###");
     61 
     62                 this.Invoke(new Action(() =>
     63                 {
     64                     txtReceiveMessage.AppendText($">> connect success.{Environment.NewLine}");
     65                 }));
     66             });
     67 
     68             mqttClient.UseDisconnectedHandler(e =>
     69             {
     70                 this.Invoke(new Action(() =>
     71                 {
     72                     txtReceiveMessage.AppendText($">> Disconnect .{Environment.NewLine}");
     73                 }));
     74             });
     75 
     76             // Create TCP based options using the builder.
     77             options = new MqttClientOptionsBuilder()
     78                 .WithClientId("Client5")
     79                 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
     80                                                           //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
     81                 //.WithCredentials("bud", "%spencer%")
     82                 //.WithTls()
     83                 //.WithTls(new MqttClientOptionsBuilderTlsParameters
     84                 //{
     85                 //    UseTls = true,
     86                 //    CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
     87                 //    {
     88                 //        // TODO: Check conditions of certificate by using above parameters.
     89                 //        return true;
     90                 //    }
     91                 //})
     92                 .WithCleanSession()
     93                 .Build();
     94             
     95         }
     96 
     97         public async void ConnectMqttServer()
     98         {
     99             await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
    100             
    101         }
    102 
    103         public void ReconnectMqttServer()
    104         {
    105             mqttClient.UseDisconnectedHandler(async e =>
    106             {
    107                 Console.WriteLine("### DISCONNECTED FROM SERVER ###");
    108                 await Task.Delay(TimeSpan.FromSeconds(5));
    109 
    110                 try
    111                 {
    112                     await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
    113                 }
    114                 catch
    115                 {
    116                     Console.WriteLine("### RECONNECTING FAILED ###");
    117                 }
    118             });
    119         }
    120 
    121         public async void PublishMessages(string topicMsg,string payloadMsg)
    122         {
    123             var message = new MqttApplicationMessageBuilder()
    124                 .WithTopic(topicMsg)
    125                 .WithPayload(payloadMsg)
    126                 .WithExactlyOnceQoS()
    127                 .WithRetainFlag()
    128                 .Build();
    129 
    130             await mqttClient.PublishAsync(message);
    131         }
    132 
    133         private async void BtnSubscribe_Click(object sender, EventArgs e)
    134         {
    135             string topic = txtSubTopic.Text.Trim();
    136 
    137             if (string.IsNullOrEmpty(topic))
    138             {
    139                 MessageBox.Show("订阅主题不能为空!");
    140                 return;
    141             }
    142 
    143             if (!mqttClient.IsConnected)
    144             {
    145                 MessageBox.Show("MQTT客户端尚未连接!");
    146                 return;
    147             }
    148             
    149             // Subscribe to a topic
    150             await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
    151 
    152             txtReceiveMessage.AppendText($"已订阅[{topic}]主题" + Environment.NewLine);
    153             txtSubTopic.Enabled = false;
    154             BtnSubscribe.Enabled = false;
    155         }
    156 
    157         private void BtnPublish_Click(object sender, EventArgs e)
    158         {
    159             string topic = txtPubTopic.Text.Trim();
    160 
    161             if (string.IsNullOrEmpty(topic))
    162             {
    163                 MessageBox.Show("发布主题不能为空!");
    164                 return;
    165             }
    166 
    167             string inputString = txtSendMessage.Text.Trim();
    168 
    169             PublishMessages(topic, inputString);
    170             
    171         }
    172     }
    173 }

    7 MQTT协议中的订阅、主题、会话

      一、订阅(Subscription)

      订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

      二、会话(Session)

      每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

      三、主题名(Topic Name)

      连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

      四、主题筛选器(Topic Filter)

      一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

      五、负载(Payload)

      消息订阅者所具体接收的内容。

    8 MQTT协议中的方法

      MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

      (1)Connect。等待与服务器建立连接。

      (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。

      (3)Subscribe。等待完成订阅。

      (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。

      (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

  • 相关阅读:
    golang基础--控制语句
    django restful framework 一对多方向更新数据库
    C# 一句很简单而又很经典的代码
    2D图形如何运动模拟出3D效果
    C# 通俗说 委托(和事件)
    C# 定积分求周长&面积原理 代码实现
    Unity 消息发送机制 解析
    Unreal 读书笔记 (二) 类对象的设计
    Unreal 读书笔记 (一) 五个常见基类
    Unreal TEXT FText,FName,FString 浅谈
  • 原文地址:https://www.cnblogs.com/zuimengaitianya/p/11707490.html
Copyright © 2011-2022 走看看