zoukankan      html  css  js  c++  java
  • 物联网架构成长之路(32)-SpringBoot集成MQTT客户端

    一、前言
      这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。
      还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。
      这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。

    二、配置pom.xml,引入第三方库

     1         <!-- MQTT -->
     2         <dependency>
     3             <groupId>org.springframework.boot</groupId>
     4             <artifactId>spring-boot-starter-integration</artifactId>
     5         </dependency>
     6         <dependency>
     7             <groupId>org.springframework.integration</groupId>
     8             <artifactId>spring-integration-stream</artifactId>
     9         </dependency>
    10         <dependency>
    11             <groupId>org.springframework.integration</groupId>
    12             <artifactId>spring-integration-mqtt</artifactId>
    13         </dependency>

    三、MQTT客户端代码(Java)

      MqttDemoApplication.java

     1 package com.wunaozai.mqtt;
     2 
     3 import org.springframework.boot.SpringApplication;
     4 import org.springframework.boot.autoconfigure.SpringBootApplication;
     5 
     6 import com.wunaozai.mqtt.tools.MqttPushClient;
     7 
     8 @SpringBootApplication
     9 public class MqttDemoApplication {
    10 
    11     public static void main(String[] args) {
    12         SpringApplication.run(MqttDemoApplication.class, args);
    13         
    14         test();
    15     }
    16 
    17     
    18     private static void test(){
    19         MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";
    20         MqttPushClient.MQTT_CLIENTID = "client";
    21         MqttPushClient.MQTT_USERNAME = "username";
    22         MqttPushClient.MQTT_PASSWORD = "password";
    23         MqttPushClient client = MqttPushClient.getInstance();
    24         client.subscribe("/#");
    25     }
    26 }

      MqttPushCallback.java

     1 package com.wunaozai.mqtt.tools;
     2 
     3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
     4 import org.eclipse.paho.client.mqttv3.MqttCallback;
     5 import org.eclipse.paho.client.mqttv3.MqttMessage;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 /**
    10  * MQTT 推送回调
    11  * @author wunaozai
    12  * @date 2018-08-22
    13  */
    14 public class MqttPushCallback implements MqttCallback {
    15     
    16     private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
    17 
    18     @Override
    19     public void connectionLost(Throwable cause) {
    20         log.info("断开连接,建议重连" + this);
    21         //断开连接,建议重连
    22     }
    23 
    24     @Override
    25     public void deliveryComplete(IMqttDeliveryToken token) {
    26         //log.info(token.isComplete() + "");
    27     }
    28 
    29     @Override
    30     public void messageArrived(String topic, MqttMessage message) throws Exception {
    31         log.info("Topic: " + topic);
    32         log.info("Message: " + new String(message.getPayload()));
    33     }
    34 
    35 }

      MqttPushClient.java

      1 package com.wunaozai.mqtt.tools;
      2 
      3 import org.eclipse.paho.client.mqttv3.MqttClient;
      4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
      6 import org.eclipse.paho.client.mqttv3.MqttMessage;
      7 import org.eclipse.paho.client.mqttv3.MqttTopic;
      8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      9 import org.slf4j.Logger;
     10 import org.slf4j.LoggerFactory;
     11 
     12 /**
     13  * 创建一个MQTT客户端
     14  * @author wunaozai
     15  * @date 2018-08-22
     16  */
     17 public class MqttPushClient {
     18     
     19     private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
     20     public static String MQTT_HOST = "";
     21     public static String MQTT_CLIENTID = "";
     22     public static String MQTT_USERNAME = "";
     23     public static String MQTT_PASSWORD = "";
     24     public static int MQTT_TIMEOUT = 10;
     25     public static int MQTT_KEEPALIVE = 10;
     26     
     27     private MqttClient client;
     28     private static volatile MqttPushClient mqttClient = null;
     29     public static MqttPushClient getInstance() {
     30         if(mqttClient == null) {
     31             synchronized (MqttPushClient.class) {
     32                 if(mqttClient == null) {
     33                     mqttClient = new MqttPushClient();
     34                 }
     35             }
     36         }
     37         return mqttClient;
     38     }
     39     
     40     private MqttPushClient() {
     41         log.info("Connect MQTT: " + this);
     42         connect();
     43     }
     44     
     45     private void connect() {
     46         try {
     47             client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
     48             MqttConnectOptions option = new MqttConnectOptions();
     49             option.setCleanSession(true);
     50             option.setUserName(MQTT_USERNAME);
     51             option.setPassword(MQTT_PASSWORD.toCharArray());
     52             option.setConnectionTimeout(MQTT_TIMEOUT);
     53             option.setKeepAliveInterval(MQTT_KEEPALIVE);
     54             option.setAutomaticReconnect(true);
     55             try {
     56                 client.setCallback(new MqttPushCallback());
     57                 client.connect(option);
     58             } catch (Exception e) {
     59                 e.printStackTrace();
     60             }
     61         } catch (Exception e) {
     62             e.printStackTrace();
     63         }
     64     }
     65     /**
     66      * 发布主题,用于通知<br>
     67      * 默认qos为1 非持久化
     68      * @param topic
     69      * @param data
     70      */
     71     public void publish(String topic, String data) {
     72         publish(topic, data, 1, false);
     73     }
     74     /**
     75      * 发布
     76      * @param topic
     77      * @param data
     78      * @param qos
     79      * @param retained
     80      */
     81     public void publish(String topic, String data, int qos, boolean retained) {
     82         MqttMessage message = new MqttMessage();
     83         message.setQos(qos);
     84         message.setRetained(retained);
     85         message.setPayload(data.getBytes());
     86         MqttTopic mqttTopic = client.getTopic(topic);
     87         if(null == mqttTopic) {
     88             log.error("Topic Not Exist");
     89         }
     90         MqttDeliveryToken token;
     91         try {
     92             token = mqttTopic.publish(message);
     93             token.waitForCompletion();
     94         } catch (Exception e) {
     95             e.printStackTrace();
     96         }
     97     }
     98     /**
     99      * 订阅某个主题 qos默认为1
    100      * @param topic
    101      */
    102     public void subscribe(String topic) {
    103         subscribe(topic, 1);
    104     }
    105     /**
    106      * 订阅某个主题
    107      * @param topic
    108      * @param qos
    109      */
    110     public void subscribe(String topic, int qos) {
    111         try {
    112             client.subscribe(topic, qos);
    113         } catch (Exception e) {
    114             e.printStackTrace();
    115         }
    116     }
    117 }

    四、MQTT客户端代码(C#)
      为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。

      部分C#代码(连接服务器与发送数据)

      1 using MQTTClient.Model;
      2 using MQTTnet;
      3 using MQTTnet.Core;
      4 using MQTTnet.Core.Client;
      5 using Newtonsoft.Json;
      6 using System;
      7 using System.Collections.Generic;
      8 using System.Text;
      9 using System.Threading.Tasks;
     10 using System.Windows.Forms;
     11 
     12 namespace MQTTClient
     13 {
     14     public partial class MainPage : Form
     15     {
     16         public MainPage()
     17         {
     18             InitializeComponent();
     19             init();
     20         }
     21         private void init()
     22         {
     23             txtusername.Text = "";
     24             txtpassword.Text = "";
     25             txtclientid.Text = "";
     26             txttopic.Text = "iot/UUID/device/devicepub/update";
     27         }
     28 
     29         IMqttClient client = null;
     30         private async Task ConnectMqttServerAsync()
     31         {
     32             if(client == null)
     33             {
     34                 client = new MqttClientFactory().CreateMqttClient() as MqttClient;
     35                 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived;
     36                 client.Connected += mqttClientConnected;
     37                 client.Disconnected += mqttClientDisconnected;
     38             }
     39             try
     40             {
     41                 await client.DisconnectAsync();
     42                 var option = getMQTTOption();
     43                 await client.ConnectAsync(option);
     44             }catch(Exception e)
     45             {
     46                 Invoke((new Action(() =>
     47                 {
     48                     lblStatus.Text = "连接服务器失败: " + e.Message;
     49                 })));
     50             }
     51         }
     52         private void mqttClientDisconnected(object sender, EventArgs e)
     53         {
     54             Invoke((new Action(() =>
     55             {
     56                 lblStatus.Text = "连接服务器失败: ERROR";
     57             })));
     58         }
     59         private void mqttClientConnected(object sender, EventArgs e)
     60         {
     61             Invoke((new Action(() =>
     62             {
     63                 lblStatus.Text = "连接服务器成功";
     64             })));
     65         }
     66         private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
     67         {
     68             //本工具部收数据
     69             throw new NotImplementedException();
     70         }
     71 
     72         private void btnconnect_Click(object sender, EventArgs e)
     73         {
     74             Task.Run(async () => { await ConnectMqttServerAsync(); });
     75         }
     76         private void btndisconnect_Click(object sender, EventArgs e)
     77         {
     78             client.DisconnectAsync();
     79         }
     80         private void btnsendone_Click(object sender, EventArgs e)
     81         {
     82             sendPayload();
     83         }
     84         private void btnsendts_Click(object sender, EventArgs e)
     85         {
     86             timer1.Interval = Convert.ToInt32(txttime.Text);
     87             timer1.Enabled = true;
     88         }
     89         private void btnstopts_Click(object sender, EventArgs e)
     90         {
     91             timer1.Enabled = false;
     92         }
     93         private void timer1_Tick(object sender, EventArgs e)
     94         {
     95             sendPayload();
     96         }
     97         private int sendPayload()
     98         {
     99             if (client.IsConnected == false)
    100             {
    101                 return -1;
    102             }
    103             PayloadModel payload = getPayload();
    104             string json = JsonConvert.SerializeObject(payload, Formatting.Indented);
    105             txtview.Text = json;
    106             string topic = txttopic.Text;
    107             var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),
    108                 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);
    109             client.PublishAsync(msg);
    110             lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();
    111             return 0;
    112         }
    113 
    114         private MqttClientTcpOptions getMQTTOption()
    115         {
    116             MqttClientTcpOptions option = new MqttClientTcpOptions();
    117             string hostname = txthostname.Text;
    118             string[] host_port = hostname.Split(':');
    119             int port = 1883;
    120             if(host_port.Length >= 2)
    121             {
    122                 hostname = host_port[0];
    123                 port = Convert.ToInt32(host_port[1]);
    124             }
    125             option.Server = hostname;
    126             option.ClientId = txtclientid.Text;
    127             option.UserName = txtusername.Text;
    128             option.Password = txtpassword.Text;
    129             option.Port = port;
    130             option.CleanSession = true;
    131             return option;
    132         }
    133 
    134         private PayloadModel getPayload()
    135         {
    136             PayloadModel payload = new PayloadModel();
    137             //
    138             return payload;
    139         }
    140 
    141         Random rand1 = new Random(System.DateTime.Now.Millisecond);
    142         private int getRandomNum()
    143         {
    144             int data = rand1.Next(0, 100);
    145             return data;
    146         }
    147 
    148         int linenum = 0;
    149         Random rand2 = new Random(System.DateTime.Now.Millisecond);
    150         private int getLineNum()
    151         {
    152             int f = rand2.Next(0, 100);
    153             int data = rand2.Next(0, 5);
    154             if(f % 2 == 1)
    155             {
    156                 linenum += data;
    157             }
    158             else
    159             {
    160                 linenum -= data;
    161             }
    162             return linenum;
    163         }
    164 
    165     }
    166 }

    本文地址: https://www.cnblogs.com/wunaozai/p/11147841.html

  • 相关阅读:
    mybatis-01-简单概述基础点
    04-书城缺少方法
    03-书城bean类中的id缺少get属性
    02-书城传参类型异常
    执行Oracle存储过程报权限不足的解决方法
    创建表空间及用户的SQL
    Oracle instr函数与SqlServer charindex的区别
    利用ExtJS导出Excel
    Java循环日期
    Oracle给不同组数据添加顺序
  • 原文地址:https://www.cnblogs.com/wunaozai/p/11147841.html
Copyright © 2011-2022 走看看