zoukankan      html  css  js  c++  java
  • 物联网时代-跟着Thingsboard学IOT架构-MQTT设备协议

    Thingsboard的MQTT设备协议

    thingsboard官网: https://thingsboard.io/

    thingsboard GitHub: https://github.com/thingsboard/thingsboard

    thingsboard提供的体验地址: http://demo.thingsboard.io/

     

    BY Thingsboard team

    以下内容是在原文基础上演绎的译文。除非另行注明,页面上所有内容采用知识共享-署名(CC BY 2.5 AU)协议共享。

    原文地址: ThingsBoard API参考:MQTT设备API


     

    MQTT基础知识

    MQTT是一种轻量级的发布 - 订阅消息传递协议,可能使其最适合各种物联网设备。您可以在此处找到有关MQTT的更多信息。

    ThingsBoard服务器节点充当MQTT Broker,支持QoS级别0(最多一次)和1(至少一次)以及一组预定义主题。


     

    客户端库设置

    您可以在Web上找到大量MQTT客户端库。本文中的示例将基于Mosquitto,MQTT.jsPaho,要设置其中一个工具。

     

    键值格式

    默认情况下,ThingsBoard支持JSON中的键值内容。Key始终是一个字符串,而value可以是string,boolean,double或long。也可以使用自定义二进制格式或某些序列化框架。有关详细信息,请参阅物模型。例如:

     
    {"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}

     

    遥测上传API

    为了将遥测数据发布到ThingsBoard服务器节点,请将PUBLISH消息发送到以下主题:

     
    v1/devices/me/telemetry

    最简单的支持数据格式是:

     
    {"key1":"value1", "key2":"value2"}

    要么

     [{"key1":"value1"}, {"key2":"value2"}]

    请注意,在这种情况下,服务器端时间戳将分配给上传的数据!

    如果您的设备能够获取客户端时间戳,您可以使用以下格式:

     {"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}

    在上面的示例中,我们假设“1451649600512”是具有毫秒精度的unix时间戳。例如,值'1451649600512'对应于'Fri,2016年1月1日12:00:00.512 GMT'

     

    属性API

    ThingsBoard属性API允许设备

    • 客户端设备属性上载到服务器。

    将属性更新发布到服务器

    要将客户端设备属性发布到ThingsBoard服务器节点,请将PUBLISH消息发送到以下主题:

     v1/devices/me/attributes

     

    更多请看上文给出的连接。


     

    Thingsboard的MQTT传输协议架构

    因为Thingsboard最新release,是基于微服务架构,不利用单独理解代码。

    Thingsboard源代码: https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/mqtt

    本文基于上面源代码后,剔除相关的安全验证和处理之后搭建简易的讲解项目:

    https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-MQTT

     


     

    MQTT框架

    因为Thingsboard是一个JVM技术栈的PaaS平台,所以使用的是基于Java通讯框架的Netty,如果有对Netty不太熟悉的同学,可以参考我之前搭建的Netty实践学习案例: https://github.com/sanshengshui/netty-learning-example

     

    项目结构

     .
     ├── IOT-Guide-MQTT.iml
     ├── pom.xml
     └── src
         └── main
             └── java
                 └── com
                     └── sanshengshui
                         └── mqtt
                             ├── adapter
                             │   └── JsonMqttAdaptor.java // MQTT json转换器,在跟Thingsboard学习IOT-物模型有所讲解
                             ├── IOTMqttServer.java // MQTT服务
                             ├── MqttTopicMatcher.java
                             ├── MqttTopics.java 
                             ├── MqttTransportHandler.java //MQTT处理类
                             └── MqttTransportServerInitializer.java

     

     

    项目代码讲解

     

    IOTMqttServer

     
     1 private static final int PORT = 1884;
     2      private static final String leakDetectorLevel = "DISABLED";
     3      private static final Integer bossGroupThreadCount = 1;
     4      private static final Integer workerGroupThreadCount = 12;
     5      private static final Integer maxPayloadSize = 65536;
     6  7      public static void main(String[] args) throws Exception {
     8          ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
     9 10          EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
    11          EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
    12 13          try {
    14 15              ServerBootstrap b = new ServerBootstrap();
    16              b.group(bossGroup,workerGroup)
    17                      .channel(NioServerSocketChannel.class)
    18                      .handler(new LoggingHandler(LogLevel.INFO))
    19                      .childHandler(new MqttTransportServerInitializer(maxPayloadSize));
    20              ChannelFuture f = b.bind(PORT);
    21              f.channel().closeFuture().sync();
    22          } finally {
    23              bossGroup.shutdownGracefully();
    24              workerGroup.shutdownGracefully();
    25          }
    26      }

    第8行,设置服务端Netty内存读写泄漏级别,缺省条件下为:DISABLED

    第10行和第11行,设置boss线程组和work线程组的线程数量。默认情况下,boss线程组的线程数量为1,work线程组的数量为运行服务机器内核数量的2倍。

    第15行,通过创建ServerBootstrap对象,在第16行设置使用EventLoopGroup

    在17和19行,设置要被实例化的NioServerSockerChannel类,并设置最大的负载内容数量。

    最后我们通过shutdowGracefully()函数优雅的关闭bossGroup和workGroup。


     

    MqttTransportHandler#processMqttMsg()

     
     1 private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
     2          address = (InetSocketAddress) ctx.channel().remoteAddress();
     3          if (msg.fixedHeader() == null) {
     4              processDisconnect(ctx);
     5              return;
     6          }
     7  8          switch (msg.fixedHeader().messageType()) {
     9              case CONNECT:
    10                  processConnect(ctx, (MqttConnectMessage) msg);
    11                  break;
    12              case PUBLISH:
    13                  processPublish(ctx, (MqttPublishMessage) msg);
    14                  break;
    15              case SUBSCRIBE:
    16                  processSubscribe(ctx, (MqttSubscribeMessage) msg);
    17                  break;
    18              case UNSUBSCRIBE:
    19                  processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
    20                  break;
    21              case PINGREQ:
    22                  if (checkConnected(ctx)) {
    23                      ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
    24                  }
    25                  break;
    26              case DISCONNECT:
    27                  if (checkConnected(ctx)) {
    28                      processDisconnect(ctx);
    29                  }
    30                  break;
    31              default:
    32                  break;
    33 34          }
    35      }
    36

    第3行,通过判断消息的固定头部是否为空,如果空;则通过processDisconnect(ctx)将设备连接关闭。

    processDisconnect(channelHandlerContext ctx)

     
    private void processDisconnect(ChannelHandlerContext ctx) {
             ctx.close();  // 关闭socket通道
         }
     ​

    第8行,通过判断固定头部的MQTT消息类型,针对不同消息做相应的处理。


     

    MqttTransportHandler#PublishDevicePublish

    以下是对发布消息进行相关的解读,更多消息类型的处理类,大家请参考我上面的IOT-Guide-MQTT进行阅读。

     
    private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
             try {
                 if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { //如果主题为v1/devices/me/attributes
                     JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg);
                 } else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
                 } else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
                     JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
                 }
             } catch (AdaptorException e) {
     ​
             }
     ​
         }
     ​

    我上面的代码仅是对消息的主题进行判断,然后对主题内的内容进行物模型的解析,得到相关属性或者遥测数据的获得。


     

    演示效果

    我们通过Paho或者MQTT.js和服务进行连接,发布消息到以下主题:

     v1/devices/me/telemetry
     

    简易的数据格式如下:

     {"key1":"value1", "key2":"value2"}

     

    Paho图示:

    服务器控制台打印数据:

     七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
     信息: [id: 0xf2bfb3a8] REGISTERED
     七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind
     信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884
     七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive
     信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE
     七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead
     信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816]
     key= 1563946708305
     属性名=temperature 属性值=38
     属性名=humidity 属性值=60

    如上所示,希望大家对Thingsboard的IOT架构-MQTT设备协议这块有所了解!

  • 相关阅读:
    关于git修改和查看用户名邮箱
    Spring深入理解(三)
    Spring深入理解(二)
    Spring深入理解(一)
    jeecg开源快速开发
    关于面向对象的三大特性和五大基本原则
    关于Excel导入导出POI工具类
    关于Hanoi算法
    LOJ 530 最小倍数(数论)
    BZOJ 4242 水壶(BFS建图+最小生成树+树上倍增)
  • 原文地址:https://www.cnblogs.com/sanshengshui/p/11237695.html
Copyright © 2011-2022 走看看