zoukankan      html  css  js  c++  java
  • MQTTnet 3.0.5学习笔记

    MQTTnet 3.0.5学习笔记

    段时间在使用MQTTnet,都说这个东西比较好,可是翻了翻网上没有例子给参考一下。

    今天算是找到了,给高手的帖子做个宣传吧.

    原网址如下:https://blog.csdn.net/chenlu5201314/article/details/94740765

    由于GitHub上介绍的东西比较少,以我的水平真是不知道怎么用,先照葫芦画瓢,再看看怎么回事吧:

    功能:

    把订阅与发布做成一个类,还带有自动重连的功能

    复制代码
    using System.Threading;    
    using System.Threading.Tasks;
    using MQTTnet;          
    using MQTTnet.Client;      //客户端需要用到
    using MQTTnet.Client.Options;  //具体连接时需要用到的属性,ID的名称,要连接Server的名称,接入时用到的账号和密码,掉线时是否重新清除原有名称,还有许多...
    using MQTTnet.Packets;    //这个没用上
    using MQTTnet.Protocol;    //这个也没用上
    using MQTTnet.Client.Receiving;    //接收
    using MQTTnet.Client.Disconnecting;  //断线
    using MQTTnet.Client.Connecting;    //连接
    复制代码

    新建一个类:先写一下变量和一些字段

    复制代码
    class HOSMQTT   
    {        
    private static MqttClient mqttClient = null;        
    private static IMqttClientOptions options = null;        
    private static bool runState = false;        
    private static bool running = false;        
     /// <summary>        
    /// 服务器IP        
    /// </summary>        
    private static string ServerUrl = "182.61.51.85";        
    /// <summary>        
    /// 服务器端口        
    /// </summary>        
    private static int Port = 61613;        
    /// <summary>        
    /// 选项 - 开启登录 - 密码        
    /// </summary>        
    private static string Password = "ruichi8888";        
    /// <summary>        
    /// 选项 - 开启登录 - 用户名        
    /// </summary>        
    private static string UserId = "admin";        
    /// <summary>        
    /// 主题        
    /// <para>China/Hunan/Yiyang/Nanxian</para>        
    /// <para>Hotel/Room01/Tv</para>        
    /// <para>Hospital/Dept01/Room001/Bed001</para>        
    /// <para>Hospital/#</para>        
    /// </summary>        
    private static string Topic = "China/Hunan/Yiyang/Nanxian";        
    /// <summary>        
    /// 保留        
    /// </summary>        
    private static bool Retained = false;       
     /// <summary>       
     /// 服务质量        
    /// <para>0 - 至多一次</para>        
    /// <para>1 - 至少一次</para>        
    /// <para>2 - 刚好一次</para>        
    /// </summary>        
    private static int QualityOfServiceLevel = 0;
    }
    复制代码

    先看一下Start方法

    复制代码
    public static void Start()
            {
                try
                {
                    runState = true;
                    Thread thread = new Thread(Work);    //原帖中是这样写的 Thread thread = new Thread(new ThreadStart( Work));
                    thread.IsBackground = true;
                    thread.Start();
                }
                catch (Exception ex)
                {
                    Console.WriteLine( "启动客户端出现问题:" + ex.ToString());
                }
            }
    复制代码

    没进入正题之前,先普及一下基本知识 

    C#的ThreadStart 和 Thread  多线程,new Thread(t1);和new Thread(new ThreadStart(t1));没有什么区别.前者是.net的写法,后者是C#的写法

    具体请看下面的连接

    https://www.cnblogs.com/rosesmall/p/8358348.html

    进入整体,介绍连接方法 Work

    复制代码
    private static void Work()
            {
                running = true;
                Console.WriteLine("Work >>Begin");
                try
                {
                    var factory = new MqttFactory();        //声明一个MQTT客户端的标准步骤 的第一步
                    mqttClient = factory.CreateMqttClient() as MqttClient;  //factory.CreateMqttClient()实际是一个接口类型(IMqttClient),这里是把他的类型变了一下
                    options = new MqttClientOptionsBuilder()    //实例化一个MqttClientOptionsBulider
    .WithTcpServer(ServerUrl, Port) .WithCredentials(UserId, Password) .WithClientId("XMan") .Build();                   mqttClient.ConnectAsync(options);      //连接服务器
            
              //下面这些东西是什么,为什么要这么写,直到刚才我还是不懂,不过在GitHub的网址我发现了出处. mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected)); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func<MqttClientDisconnectedEventArgs, Task>(Disconnected)); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(MqttApplicationMessageReceived)); while (runState) { Thread.Sleep(100); } } catch(Exception exp) { Console.WriteLine(exp); } Console.WriteLine("Work >>End"); running = false; runState = false; }
    复制代码

    先来看看MqttClient 类里面都有什么东西

    需要实现的接口,如何实现,说重点!

    在GitHub上有个地方进去看看就知道了‘

     这个页面的最下方写着如何实现    https://github.com/chkr1011/MQTTnet/wiki/Upgrading-guide

    复制代码
    private void Something()
    {
        mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage);
        mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
        mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
    }
    
    private async void OnAppMessage(MqttApplicationMessageReceivedEventArgs e)
    {
    }
    
    private async void OnConnected(MqttClientConnectedEventArgs e)
    {
    }
    
    private async void OnDisconnected(MqttClientDisconnectedEventArgs e)
    {
    }
    复制代码

    在开始Connected方法之前有必要看一下关于同步和异步的知识,

    现学现卖简单说一下:

    Task就是异步的调用,就在不影响主线程运行的另一个线程,但是他能像线程池一样更高效的利用现有的空闲线程

    async必须用来修饰Task ,void,或者Task<TResult>, await是等待异步线程Task.Run()开始的后台线程执行完毕。

    记住要是Task 实现异步功能,必须用 async 修饰,且async 与await成对出现。

    详见下面大神写的大作:https://www.cnblogs.com/doforfuture/p/6293926.html

    下面是什么意思?

    mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected));

    MqttClientConnectedHandlerDelegate 这个实例实现了mqttClient.ConnectedHandler接口

    new Func<MqttClientConnectedEventArgs, Task>(Connected) ,

    使用Func委托传入MqttClientConnectedEventArgs类型的参数,返回的类型是Task,Task是一个类,这个类没有返回值,如果有返回值就是Task<TResult>。

    是委托就要带一个方法取实现,这个方法就是Connected。

    这句话的意思是,用MqttClientConnectedHandlerDelegate实现接口,同时使用委托取调用Connected的方法,并且给这个方法传入一个MqttClientConnectedEventArgs参数,

    这个委托的返回值是Task(就是不需要返回类型的异步调用),这也就定义了Connected的类型必须是async Task。

    好了来看下 Connected,这个函数什么意思

    就是与服务器连接之后要干什么,订阅一个Topic,或几个Topic。连接之前已经连接了Connectasync(),如果断线还会重连,后面会提到。

    这个就连接之后需要做的事----订阅!

    复制代码
            private static async Task Connected(MqttClientConnectedEventArgs e)
            {
                try
                {
                    List<TopicFilter> listTopic = new List<TopicFilter>();
                    if (listTopic.Count() <= 0)
                    {
                        var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build();
                        listTopic.Add(topicFilterBulder);
                        Console.WriteLine("Connected >>Subscribe " + Topic);
                    }                await mqttClient.SubscribeAsync(listTopic.ToArray());
                    Console.WriteLine("Connected >>Subscribe Success");
                }
                catch (Exception exp)
                {
                    Console.WriteLine(exp.Message);
                }
            }
    复制代码

    TopicFilter是一个Topic详细信息的类

     掉线的发生时会执行这个函数

    复制代码
    private static async Task Disconnected(MqttClientDisconnectedEventArgs e)
            {
                try
                {
                    Console.WriteLine("Disconnected >>Disconnected Server");
                    await Task.Delay(TimeSpan.FromSeconds(5));
                    try
                    {
                        await mqttClient.ConnectAsync(options);
                    }
                    catch (Exception exp)
                    {
                        Console.WriteLine("Disconnected >>Exception " + exp.Message);
                    }
                }
                catch (Exception exp)
                {
                    Console.WriteLine(exp.Message);
                }
            }
    复制代码

    越写问题越多,这个为什么断线的时候会执行这个方法,这不是事件,只是接口!

    怎么实现的?看了一下源码,一时只看了大概,这些功能的绑定都是在ConnectAsync的时候就完成了!

    下面接收到消息的时候

    复制代码
            /// <summary>
            /// 接收消息触发事件
            /// </summary>
            /// <param name="e"></param>
            private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
            {
                try
                {
                    string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
                    string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString();
                    string Retained = e.ApplicationMessage.Retain.ToString();
                    Console.WriteLine("MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained + ";");
                    Console.WriteLine("MessageReceived >>Msg: " + text);
                }
                catch (Exception exp)
                {
                    Console.WriteLine(exp.Message);
                }
            }
    复制代码

     最后就是发布:一般会选择0,如果选择其他的情况在订阅端不在的时候,服务器可能会崩溃

    复制代码
    /// <summary>       
            /// /// 发布        
            /// <paramref name="QoS"/>        
            /// <para>0 - 最多一次</para>        
            /// <para>1 - 至少一次</para>        
            /// <para>2 - 仅一次</para>        
            /// </summary>       
            /// <param name="Topic">发布主题</param>        
            /// <param name="Message">发布内容</param>        
            /// <returns></returns>        
            public static void Publish( string Topic,string Message)
            {
                try
                {
                    if (mqttClient == null)
                        return;
                    if (mqttClient.IsConnected == false)
                        mqttClient.ConnectAsync(options);
                    if (mqttClient.IsConnected == false)
                    {
                        Console.WriteLine("Publish >>Connected Failed! ");
                        return;
                    }
                    Console.WriteLine("Publish >>Topic: " + Topic + "; QoS: " + QualityOfServiceLevel + "; Retained: " + Retained + ";");
                    Console.WriteLine("Publish >>Message: " + Message);
                    MqttApplicationMessageBuilder mamb = new MqttApplicationMessageBuilder()                 
                        .WithTopic(Topic)                 
                        .WithPayload(Message).WithRetainFlag(Retained);
                    if (QualityOfServiceLevel == 0)
                    {
                        mamb = mamb.WithAtMostOnceQoS();
                    }
                    else if (QualityOfServiceLevel == 1)
                    {
                        mamb = mamb.WithAtLeastOnceQoS();
                    }
                    else if (QualityOfServiceLevel == 2)
                    {
                        mamb = mamb.WithExactlyOnceQoS();
                    }
                    mqttClient.PublishAsync(mamb.Build());
                }
                catch (Exception exp)
                {
                    Console.WriteLine("Publish >>" + exp.Message);
                }
            }
    复制代码

     重点补充一下:

    这两天验证的时候发现,对于MQTT服务器来说客户端的用户名必须是唯一的,

    举例:同一台电脑上,两个程序同时发布(publish)到一个MQTT服务器,必须设置两个不同的ClientId,否则只有一个能连接上。

    Paho的使用:

    具体说明也可以看这位高手的:

    https://blog.csdn.net/weixin_42133779/article/details/80226633

    只是有一点需要强调以下:

    Paho的目录不要太深,之前我就是三层文件夹下面,结果无法运行。一直提示 error code=13

    没办法把他放在D盘的根目录下就可以了

     纸上得来终觉浅,要改造成自己想要的些东西,还要花些功夫!不过这已经很好了!谢谢各位高手的贡献

  • 相关阅读:
    算法市场 Algorithmia
    Cucumber
    TeamCity 持续集成工具
    自动化测试
    Selenium
    Android耗时操作
    RecyclerView
    Android报错
    Lakeshore
    BaaS_后端即服务 RESTful
  • 原文地址:https://www.cnblogs.com/li-sx/p/15148981.html
Copyright © 2011-2022 走看看