zoukankan      html  css  js  c++  java
  • mqttnet3.0用法

    .net常用的mqtt类库有2个,m2mqtt和mqttnet两个类库

    当然了,这两个库的教程网上一搜一大把

    但mqttnet搜到的教程全是2.7及以下版本的,但3.0版语法却不再兼容,升级版本会导致很多问题,今天进行了成功的升级,现记录下来

    参考文档地址:https://github.com/chkr1011/MQTTnet/wiki/Client

    上代码:

    ///开源库地址:https://github.com/chkr1011/MQTTnet
    ///对应文档:https://github.com/chkr1011/MQTTnet/wiki/Client
    
    using MQTTnet;
    using MQTTnet.Client;
    using MQTTnet.Client.Options;
    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    
    namespace MqttServerTest
    {
        public partial class mqtt测试工具 : Form
        {
            private IMqttClient mqttClient = null;
            private bool isReconnect = true;
    
            public mqtt测试工具()
            {
                InitializeComponent();
            }
    
            private void Form1_Load(object sender, EventArgs e)
            {
    
            }
    
            private async void BtnPublish_Click(object sender, EventArgs e)
            {
                await Publish();
            }
    
            private async void BtnSubscribe_ClickAsync(object sender, EventArgs e)
            {
                await Subscribe();
            }
    
            private async Task Publish()
            {
                string topic = txtPubTopic.Text.Trim();
    
                if (string.IsNullOrEmpty(topic))
                {
                    MessageBox.Show("发布主题不能为空!");
                    return;
                }
    
                string inputString = txtSendMessage.Text.Trim();
                try
                {
    
                    var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload(inputString)
            .WithExactlyOnceQoS()
            .WithRetainFlag()
            .Build();
    
                    await mqttClient.PublishAsync(message);
                }
                catch (Exception ex)
                {
    
                    Invoke((new Action(() =>
                    {
                        txtReceiveMessage.AppendText($"发布主题失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
                    })));
                }
    
    
    
    
            }
    
            private async Task Subscribe()
            {
                string topic = txtSubTopic.Text.Trim();
    
                if (string.IsNullOrEmpty(topic))
                {
                    MessageBox.Show("订阅主题不能为空!");
                    return;
                }
    
                if (!mqttClient.IsConnected)
                {
                    MessageBox.Show("MQTT客户端尚未连接!");
                    return;
                }
    
                // Subscribe to a topic
                await mqttClient.SubscribeAsync(new TopicFilterBuilder()
                    .WithTopic(topic)
                    .WithAtMostOnceQoS()
                    .Build()
                    );
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($"已订阅[{topic}]主题{Environment.NewLine}");
                })));
    
            }
    
            private async Task ConnectMqttServerAsync()
            {
                // Create a new MQTT client.
    
                if (mqttClient == null)
                {
                    try
                    {
                        var factory = new MqttFactory();
                        mqttClient = factory.CreateMqttClient();
    
                        var options = new MqttClientOptionsBuilder()
                            .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)).WithCredentials(txtUsername.Text, txtPsw.Text).WithClientId(txtClientId.Text) // Port is optional
                            .Build();
    
    
                        await mqttClient.ConnectAsync(options, CancellationToken.None);
                        Invoke((new Action(() =>
                        {
                            txtReceiveMessage.AppendText($"连接到MQTT服务器成功!" + txtIp.Text);
                        })));
                        mqttClient.UseApplicationMessageReceivedHandler(e =>
                        {
    
                            Invoke((new Action(() =>
                            {
                                txtReceiveMessage.AppendText($"收到订阅消息!" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
                            })));
                       
                        });
                    }
                    catch (Exception ex)
                    {
    
                        Invoke((new Action(() =>
                        {
                            txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
                        })));
                    }
                }
            }
    
            private void MqttClient_Connected(object sender, EventArgs e)
            {
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.Clear();
                    txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
                })));
            }
    
            private void MqttClient_Disconnected(object sender, EventArgs e)
            {
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.Clear();
                    DateTime curTime = new DateTime();
                    curTime = DateTime.UtcNow;
                    txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]");
                    txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine);
                })));
    
                //Reconnecting
                if (isReconnect)
                {
                    Invoke((new Action(() =>
                    {
                        txtReceiveMessage.AppendText("正在尝试重新连接" + Environment.NewLine);
                    })));
    
                    var options = new MqttClientOptionsBuilder()
                        .WithClientId(txtClientId.Text)
                        .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text))
                        .WithCredentials(txtUsername.Text, txtPsw.Text)
                        //.WithTls()
                        .WithCleanSession()
                        .Build();
                    Invoke((new Action(async () =>
                    {
                        await Task.Delay(TimeSpan.FromSeconds(5));
                        try
                        {
                            await mqttClient.ConnectAsync(options);
                        }
                        catch
                        {
                            txtReceiveMessage.AppendText("### RECONNECTING FAILED ###" + Environment.NewLine);
                        }
                    })));
                }
                else
                {
                    Invoke((new Action(() =>
                    {
                        txtReceiveMessage.AppendText("已下线!" + Environment.NewLine);
                    })));
                }
            }
    
            private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
            {
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($">> {"### RECEIVED APPLICATION MESSAGE ###"}{Environment.NewLine}");
                })));
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($">> Topic = {e.ApplicationMessage.Topic}{Environment.NewLine}");
                })));
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($">> Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
                })));
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($">> QoS = {e.ApplicationMessage.QualityOfServiceLevel}{Environment.NewLine}");
                })));
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($">> Retain = {e.ApplicationMessage.Retain}{Environment.NewLine}");
                })));
            }
    
            private void btnLogIn_Click(object sender, EventArgs e)
            {
                isReconnect = true;
                Task.Run(async () => { await ConnectMqttServerAsync(); });
            }
    
            private void btnLogout_Click(object sender, EventArgs e)
            {
                isReconnect = false;
                Task.Run(async () => { await mqttClient.DisconnectAsync(); });
            }
    
        }
    }

    参考:https://www.cnblogs.com/bjjjunjie/p/mqtt.html

  • 相关阅读:
    linux解压分卷压缩的zip文件
    centos关闭sudo的ldap认证
    IT词汇表
    IT博客汇
    os.waitpid()无法获取sys.exit()退出时的status code
    github下fork后如何同步源的新更新
    git 撤销commit
    g++编译问题:skipping incompatible /usr/lib//libboost_system.so when searching for -lboost_system
    g++动态库静态库混合链接
    thread
  • 原文地址:https://www.cnblogs.com/huandong963/p/12591813.html
Copyright © 2011-2022 走看看