zoukankan      html  css  js  c++  java
  • 物联网架构成长之路(32)-SpringBoot集成MQTT客户端

    一、前言
      这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。
      还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。
      这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。

    二、配置pom.xml,引入第三方库

     1         <!-- MQTT -->
     2         <dependency>
     3             <groupId>org.springframework.boot</groupId>
     4             <artifactId>spring-boot-starter-integration</artifactId>
     5         </dependency>
     6         <dependency>
     7             <groupId>org.springframework.integration</groupId>
     8             <artifactId>spring-integration-stream</artifactId>
     9         </dependency>
    10         <dependency>
    11             <groupId>org.springframework.integration</groupId>
    12             <artifactId>spring-integration-mqtt</artifactId>
    13         </dependency>

    三、MQTT客户端代码(Java)

      MqttDemoApplication.java

     1 package com.wunaozai.mqtt;
     2 
     3 import org.springframework.boot.SpringApplication;
     4 import org.springframework.boot.autoconfigure.SpringBootApplication;
     5 
     6 import com.wunaozai.mqtt.tools.MqttPushClient;
     7 
     8 @SpringBootApplication
     9 public class MqttDemoApplication {
    10 
    11     public static void main(String[] args) {
    12         SpringApplication.run(MqttDemoApplication.class, args);
    13         
    14         test();
    15     }
    16 
    17     
    18     private static void test(){
    19         MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";
    20         MqttPushClient.MQTT_CLIENTID = "client";
    21         MqttPushClient.MQTT_USERNAME = "username";
    22         MqttPushClient.MQTT_PASSWORD = "password";
    23         MqttPushClient client = MqttPushClient.getInstance();
    24         client.subscribe("/#");
    25     }
    26 }

      MqttPushCallback.java

     1 package com.wunaozai.mqtt.tools;
     2 
     3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
     4 import org.eclipse.paho.client.mqttv3.MqttCallback;
     5 import org.eclipse.paho.client.mqttv3.MqttMessage;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 /**
    10  * MQTT 推送回调
    11  * @author wunaozai
    12  * @date 2018-08-22
    13  */
    14 public class MqttPushCallback implements MqttCallback {
    15     
    16     private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
    17 
    18     @Override
    19     public void connectionLost(Throwable cause) {
    20         log.info("断开连接,建议重连" + this);
    21         //断开连接,建议重连
    22     }
    23 
    24     @Override
    25     public void deliveryComplete(IMqttDeliveryToken token) {
    26         //log.info(token.isComplete() + "");
    27     }
    28 
    29     @Override
    30     public void messageArrived(String topic, MqttMessage message) throws Exception {
    31         log.info("Topic: " + topic);
    32         log.info("Message: " + new String(message.getPayload()));
    33     }
    34 
    35 }

      MqttPushClient.java

      1 package com.wunaozai.mqtt.tools;
      2 
      3 import org.eclipse.paho.client.mqttv3.MqttClient;
      4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
      6 import org.eclipse.paho.client.mqttv3.MqttMessage;
      7 import org.eclipse.paho.client.mqttv3.MqttTopic;
      8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      9 import org.slf4j.Logger;
     10 import org.slf4j.LoggerFactory;
     11 
     12 /**
     13  * 创建一个MQTT客户端
     14  * @author wunaozai
     15  * @date 2018-08-22
     16  */
     17 public class MqttPushClient {
     18     
     19     private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
     20     public static String MQTT_HOST = "";
     21     public static String MQTT_CLIENTID = "";
     22     public static String MQTT_USERNAME = "";
     23     public static String MQTT_PASSWORD = "";
     24     public static int MQTT_TIMEOUT = 10;
     25     public static int MQTT_KEEPALIVE = 10;
     26     
     27     private MqttClient client;
     28     private static volatile MqttPushClient mqttClient = null;
     29     public static MqttPushClient getInstance() {
     30         if(mqttClient == null) {
     31             synchronized (MqttPushClient.class) {
     32                 if(mqttClient == null) {
     33                     mqttClient = new MqttPushClient();
     34                 }
     35             }
     36         }
     37         return mqttClient;
     38     }
     39     
     40     private MqttPushClient() {
     41         log.info("Connect MQTT: " + this);
     42         connect();
     43     }
     44     
     45     private void connect() {
     46         try {
     47             client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
     48             MqttConnectOptions option = new MqttConnectOptions();
     49             option.setCleanSession(true);
     50             option.setUserName(MQTT_USERNAME);
     51             option.setPassword(MQTT_PASSWORD.toCharArray());
     52             option.setConnectionTimeout(MQTT_TIMEOUT);
     53             option.setKeepAliveInterval(MQTT_KEEPALIVE);
     54             option.setAutomaticReconnect(true);
     55             try {
     56                 client.setCallback(new MqttPushCallback());
     57                 client.connect(option);
     58             } catch (Exception e) {
     59                 e.printStackTrace();
     60             }
     61         } catch (Exception e) {
     62             e.printStackTrace();
     63         }
     64     }
     65     /**
     66      * 发布主题,用于通知<br>
     67      * 默认qos为1 非持久化
     68      * @param topic
     69      * @param data
     70      */
     71     public void publish(String topic, String data) {
     72         publish(topic, data, 1, false);
     73     }
     74     /**
     75      * 发布
     76      * @param topic
     77      * @param data
     78      * @param qos
     79      * @param retained
     80      */
     81     public void publish(String topic, String data, int qos, boolean retained) {
     82         MqttMessage message = new MqttMessage();
     83         message.setQos(qos);
     84         message.setRetained(retained);
     85         message.setPayload(data.getBytes());
     86         MqttTopic mqttTopic = client.getTopic(topic);
     87         if(null == mqttTopic) {
     88             log.error("Topic Not Exist");
     89         }
     90         MqttDeliveryToken token;
     91         try {
     92             token = mqttTopic.publish(message);
     93             token.waitForCompletion();
     94         } catch (Exception e) {
     95             e.printStackTrace();
     96         }
     97     }
     98     /**
     99      * 订阅某个主题 qos默认为1
    100      * @param topic
    101      */
    102     public void subscribe(String topic) {
    103         subscribe(topic, 1);
    104     }
    105     /**
    106      * 订阅某个主题
    107      * @param topic
    108      * @param qos
    109      */
    110     public void subscribe(String topic, int qos) {
    111         try {
    112             client.subscribe(topic, qos);
    113         } catch (Exception e) {
    114             e.printStackTrace();
    115         }
    116     }
    117 }

    四、MQTT客户端代码(C#)
      为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。

      部分C#代码(连接服务器与发送数据)

      1 using MQTTClient.Model;
      2 using MQTTnet;
      3 using MQTTnet.Core;
      4 using MQTTnet.Core.Client;
      5 using Newtonsoft.Json;
      6 using System;
      7 using System.Collections.Generic;
      8 using System.Text;
      9 using System.Threading.Tasks;
     10 using System.Windows.Forms;
     11 
     12 namespace MQTTClient
     13 {
     14     public partial class MainPage : Form
     15     {
     16         public MainPage()
     17         {
     18             InitializeComponent();
     19             init();
     20         }
     21         private void init()
     22         {
     23             txtusername.Text = "";
     24             txtpassword.Text = "";
     25             txtclientid.Text = "";
     26             txttopic.Text = "iot/UUID/device/devicepub/update";
     27         }
     28 
     29         IMqttClient client = null;
     30         private async Task ConnectMqttServerAsync()
     31         {
     32             if(client == null)
     33             {
     34                 client = new MqttClientFactory().CreateMqttClient() as MqttClient;
     35                 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived;
     36                 client.Connected += mqttClientConnected;
     37                 client.Disconnected += mqttClientDisconnected;
     38             }
     39             try
     40             {
     41                 await client.DisconnectAsync();
     42                 var option = getMQTTOption();
     43                 await client.ConnectAsync(option);
     44             }catch(Exception e)
     45             {
     46                 Invoke((new Action(() =>
     47                 {
     48                     lblStatus.Text = "连接服务器失败: " + e.Message;
     49                 })));
     50             }
     51         }
     52         private void mqttClientDisconnected(object sender, EventArgs e)
     53         {
     54             Invoke((new Action(() =>
     55             {
     56                 lblStatus.Text = "连接服务器失败: ERROR";
     57             })));
     58         }
     59         private void mqttClientConnected(object sender, EventArgs e)
     60         {
     61             Invoke((new Action(() =>
     62             {
     63                 lblStatus.Text = "连接服务器成功";
     64             })));
     65         }
     66         private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
     67         {
     68             //本工具部收数据
     69             throw new NotImplementedException();
     70         }
     71 
     72         private void btnconnect_Click(object sender, EventArgs e)
     73         {
     74             Task.Run(async () => { await ConnectMqttServerAsync(); });
     75         }
     76         private void btndisconnect_Click(object sender, EventArgs e)
     77         {
     78             client.DisconnectAsync();
     79         }
     80         private void btnsendone_Click(object sender, EventArgs e)
     81         {
     82             sendPayload();
     83         }
     84         private void btnsendts_Click(object sender, EventArgs e)
     85         {
     86             timer1.Interval = Convert.ToInt32(txttime.Text);
     87             timer1.Enabled = true;
     88         }
     89         private void btnstopts_Click(object sender, EventArgs e)
     90         {
     91             timer1.Enabled = false;
     92         }
     93         private void timer1_Tick(object sender, EventArgs e)
     94         {
     95             sendPayload();
     96         }
     97         private int sendPayload()
     98         {
     99             if (client.IsConnected == false)
    100             {
    101                 return -1;
    102             }
    103             PayloadModel payload = getPayload();
    104             string json = JsonConvert.SerializeObject(payload, Formatting.Indented);
    105             txtview.Text = json;
    106             string topic = txttopic.Text;
    107             var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),
    108                 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);
    109             client.PublishAsync(msg);
    110             lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();
    111             return 0;
    112         }
    113 
    114         private MqttClientTcpOptions getMQTTOption()
    115         {
    116             MqttClientTcpOptions option = new MqttClientTcpOptions();
    117             string hostname = txthostname.Text;
    118             string[] host_port = hostname.Split(':');
    119             int port = 1883;
    120             if(host_port.Length >= 2)
    121             {
    122                 hostname = host_port[0];
    123                 port = Convert.ToInt32(host_port[1]);
    124             }
    125             option.Server = hostname;
    126             option.ClientId = txtclientid.Text;
    127             option.UserName = txtusername.Text;
    128             option.Password = txtpassword.Text;
    129             option.Port = port;
    130             option.CleanSession = true;
    131             return option;
    132         }
    133 
    134         private PayloadModel getPayload()
    135         {
    136             PayloadModel payload = new PayloadModel();
    137             //
    138             return payload;
    139         }
    140 
    141         Random rand1 = new Random(System.DateTime.Now.Millisecond);
    142         private int getRandomNum()
    143         {
    144             int data = rand1.Next(0, 100);
    145             return data;
    146         }
    147 
    148         int linenum = 0;
    149         Random rand2 = new Random(System.DateTime.Now.Millisecond);
    150         private int getLineNum()
    151         {
    152             int f = rand2.Next(0, 100);
    153             int data = rand2.Next(0, 5);
    154             if(f % 2 == 1)
    155             {
    156                 linenum += data;
    157             }
    158             else
    159             {
    160                 linenum -= data;
    161             }
    162             return linenum;
    163         }
    164 
    165     }
    166 }

    本文地址: https://www.cnblogs.com/wunaozai/p/11147841.html

  • 相关阅读:
    node.js 安装后怎么打开 node.js 命令框
    thinkPHP5 where多条件查询
    网站title中的图标
    第一次写博客
    Solution to copy paste not working in Remote Desktop
    The operation could not be completed. (Microsoft.Dynamics.BusinessConnectorNet)
    The package failed to load due to error 0xC0011008
    VS2013常用快捷键
    微软Dynamics AX的三层架构
    怎样在TFS(Team Foundation Server)中链接团队项目
  • 原文地址:https://www.cnblogs.com/wunaozai/p/11147841.html
Copyright © 2011-2022 走看看