zoukankan      html  css  js  c++  java
  • MQTT——编写连接报文

    笔者在上一章对连接报文进行了相关的讲解。这一章笔者想写一个连接报文的例子来加深理解。本来这一章也应该在上一章出现的。可是笔者怕太长了。不好方便阅。所以决定分俩章来。正如笔者上一章所讲的。笔者会用Netty通信框架进行编写。主要因为Netty已经为我们集成了相关MQTT功能。

    开发环境

    开发工具:intellij idea.(以前我一直在eclipse。最近新版的老报错。所以就放弃了)

    Netty包:netty-all-4.1.16.Final.jar。下载网站:http://netty.io/downloads.html

    JDK:JAVA 8

    第三包:commons-lang3-3.6.jar。下载网站:http://commons.apache.org/proper/commons-lang/download_lang.cgi

    MQTT编写

    在这里笔者并不打包把客户端的代码一起编写出。事实上关于客户端的开源的代码是非常多的。笔者这里只会略微的编写一下服务端的代码。当然这里代码只是为方更了解MQTT协议。并非企业级的编蜜枣这一点希望读者见谅。为了实现连接报文。笔者定义了三个类。

    Main类:用于启动服务。

    BrokerHandler类:处理接受来的信息。

    BrokerSessionHelper:用于发送信息给客户。

    Main类的源码

     1 public static void main(String[] args) throws Exception  {
     2         EventLoopGroup bossGroup = new NioEventLoopGroup();
     3         EventLoopGroup workerGroup = new NioEventLoopGroup();
     4 
     5         Runtime.getRuntime().addShutdownHook(new Thread() {
     6             public void run() {
     7                 workerGroup.shutdownGracefully();
     8                 bossGroup.shutdownGracefully();
     9             }
    10         });
    11 
    12 
    13         ServerBootstrap b = new ServerBootstrap();
    14         b.group(bossGroup, workerGroup)
    15                 .channel(NioServerSocketChannel.class)
    16                 .handler(new LoggingHandler(LogLevel.INFO))
    17                 .childHandler(new ChannelInitializer<SocketChannel>() {
    18                     @Override
    19                     public void initChannel(SocketChannel ch) throws Exception {
    20 
    21                         ChannelPipeline p = ch.pipeline();
    22 
    23                         p.addFirst("idleHandler", new IdleStateHandler(0, 0, 120));
    24                         p.addLast("encoder", MqttEncoder.INSTANCE);
    25                         p.addLast("decoder", new MqttDecoder());
    26                         p.addLast("logicHandler", new BrokerHandler(65535));
    27 
    28                     }
    29                 })
    30                 .option(ChannelOption.SO_BACKLOG, 511)
    31                 .childOption(ChannelOption.SO_KEEPALIVE, true);
    32 
    33         ChannelFuture f = b.bind("0.0.0.0", 1883).sync();
    34 
    35         f.channel().closeFuture().sync();
    36     }

    上面的1、2俩行表是Netty里面俩个线程组。事实上也就是Reactor线程组。bossGroup 用于处理接受来自客户端的连接。workerGroup 用于处理接受客户端的读取信息。13行的ServerBootstrap可以理解为启动服务的一个引导类。主要关键是他的group方法。这样子就可以把俩个线程组关系在一起了。重点就在17行这里。childHandler用于处理IO事件。比如读取客户端进行。然后自己编码。你们可以看到24行的MqttEncoder.INSTANCE和25行的MqttDecoder吧。他们就是用于处理MQTT协议传来的信息进行处理。而26行BrokerHandler类就是笔者来处理每一个报文对应的响应。笔者就不在过多的说了。如果你们不懂的话,可以去看一下Netty框架的知识在过看的话会比较好。

    BrokerHandler类的源码

      1 public class BrokerHandler extends SimpleChannelInboundHandler<MqttMessage> {
      2     private MqttVersion version;
      3     private String clientId;
      4     private String userName;
      5     private String brokerId;
      6     private boolean connected;
      7     private boolean cleanSession;
      8     private int keepAlive;
      9     private int keepAliveMax;
     10     private MqttPublishMessage willMessage;
     11 
     12     public BrokerHandler(int keepAliveMax) {
     13 
     14         this.keepAliveMax = keepAliveMax;
     15     }
     16 
     17     @Override
     18     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     19     protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
     20 
     21         if (msg.decoderResult().isFailure()) {
     22 
     23             Throwable cause = msg.decoderResult().cause();
     24 
     25             if (cause instanceof MqttUnacceptableProtocolVersionException) {
     26 
     27                 BrokerSessionHelper.sendMessage(
     28                         ctx,
     29                         MqttMessageFactory.newMessage(
     30                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
     31                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false),
     32                                 null),
     33                         "INVALID",
     34                         null,
     35                         true);
     36 
     37             } else if (cause instanceof MqttIdentifierRejectedException) {
     38 
     39                 BrokerSessionHelper.sendMessage(
     40                         ctx,
     41                         MqttMessageFactory.newMessage(
     42                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
     43                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
     44                                 null),
     45                         "INVALID",
     46                         null,
     47                         true);
     48             }
     49 
     50             ctx.close();
     51 
     52             return;
     53         }
     54 
     55         switch (msg.fixedHeader().messageType()) {
     56             case CONNECT:
     57                 onConnect(ctx, (MqttConnectMessage) msg);
     58                 break;
     59             case PUBLISH:
     60                 onPublish(ctx, (MqttPublishMessage) msg);
     61                 break;
     62             case PUBACK:
     63                 onPubAck(ctx, msg);
     64                 break;
     65             case PUBREC:
     66                 onPubRec(ctx, msg);
     67                 break;
     68             case PUBREL:
     69                 onPubRel(ctx, msg);
     70                 break;
     71             case PUBCOMP:
     72                 onPubComp(ctx, msg);
     73                 break;
     74             case SUBSCRIBE:
     75                 onSubscribe(ctx, (MqttSubscribeMessage) msg);
     76                 break;
     77             case UNSUBSCRIBE:
     78                 onUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
     79                 break;
     80             case PINGREQ:
     81                 onPingReq(ctx);
     82                 break;
     83             case DISCONNECT:
     84                 onDisconnect(ctx);
     85                 break;
     86         }
     87 
     88     }
     89 
     90     private void onConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
     91 
     92         this.version = MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version());
     93         this.clientId = msg.payload().clientIdentifier();
     94         this.cleanSession = msg.variableHeader().isCleanSession();
     95 
     96         if (msg.variableHeader().keepAliveTimeSeconds() > 0 && msg.variableHeader().keepAliveTimeSeconds() <= this.keepAliveMax) {
     97             this.keepAlive = msg.variableHeader().keepAliveTimeSeconds();
     98         }
     99 
    100         //MQTT 3.1之后可能存在为空的客户ID。所以要进行处理。如果客户ID是空,而且还在保存处理相关的信息。这样子是不行。
    101         //必须有客户ID我们才能存保相关信息。
    102         if (StringUtils.isBlank(this.clientId)) {
    103             if (!this.cleanSession) {
    104 
    105                 BrokerSessionHelper.sendMessage(
    106                         ctx,
    107                         MqttMessageFactory.newMessage(
    108                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
    109                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
    110                                 null),
    111                         "INVALID",
    112                         null,
    113                         true);
    114 
    115                 ctx.close();
    116 
    117                 return;
    118 
    119             } else {
    120                 this.clientId =  java.util.UUID.randomUUID().toString();
    121             }
    122         }
    123 
    124         //有可能发送俩次的连接包。如果已经存在连接就是关闭当前的连接。
    125         if (this.connected) {
    126             ctx.close();
    127             return;
    128         }
    129 
    130 
    131         boolean userNameFlag = msg.variableHeader().hasUserName();
    132         boolean passwordFlag = msg.variableHeader().hasPassword();
    133         this.userName = msg.payload().userName();
    134 
    135         String password = "" ;
    136         if( msg.payload().passwordInBytes() != null  && msg.payload().passwordInBytes().length > 0)
    137             password =   new String(msg.payload().passwordInBytes());
    138 
    139         boolean mistake = false;
    140 
    141         //如果有用户名标示,那么就必须有密码标示。
    142         //当有用户名标的时候,用户不能为空。
    143         //当有密码标示的时候,密码不能为空。
    144         if (userNameFlag) {
    145             if (StringUtils.isBlank(this.userName))
    146                 mistake = true;
    147         } else {
    148             if (StringUtils.isNotBlank(this.userName) || passwordFlag) mistake = true;
    149         }
    150 
    151 
    152         if (passwordFlag) {
    153 
    154             if (StringUtils.isBlank(password)) mistake = true;
    155         } else {
    156             if (StringUtils.isNotBlank(password)) mistake = true;
    157         }
    158 
    159         if (mistake) {
    160             BrokerSessionHelper.sendMessage(
    161                     ctx,
    162                     MqttMessageFactory.newMessage(
    163                             new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
    164                             new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false),
    165                             null),
    166                     this.clientId,
    167                     null,
    168                     true);
    169             ctx.close();
    170             return;
    171         }
    172 
    173         BrokerSessionHelper.sendMessage(
    174                 ctx,
    175                 MqttMessageFactory.newMessage(
    176                         new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
    177                         new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !this.cleanSession),
    178                         null),
    179                 this.clientId,
    180                 null,
    181                 true);
    182 
    183         ChannelHandlerContext lastSession = BrokerSessionHelper.removeSession(this.clientId);
    184         if (lastSession != null) {
    185             lastSession.close();
    186         }
    187 
    188         String willTopic = msg.payload().willTopic();
    189         String willMessage = "";
    190         if(msg.payload().willMessageInBytes() != null && msg.payload().willMessageInBytes().length > 0)
    191             willMessage =  new String(msg.payload().willMessageInBytes());
    192 
    193         if (msg.variableHeader().isWillFlag() && StringUtils.isNotEmpty(willTopic) && StringUtils.isNotEmpty(willMessage)) {
    194 
    195             this.willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
    196                     new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
    197                     new MqttPublishVariableHeader(willTopic, 0),
    198                     Unpooled.wrappedBuffer(willMessage.getBytes())
    199             );
    200         }
    201 
    202         this.connected = true;
    203         BrokerSessionHelper.saveSession(this.clientId, ctx);
    204     }
    205 
    206     private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
    207     }
    208     
    209     private void onUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) {
    210     }
    211 
    212     private void onPingReq(ChannelHandlerContext ctx) {
    213     }
    214 
    215     private void onDisconnect(ChannelHandlerContext ctx) {
    216 
    217         if (!this.connected) {
    218             ctx.close();
    219             return;
    220         }
    221 
    222         BrokerSessionHelper.removeSession(this.clientId, ctx);
    223 
    224         this.willMessage = null;
    225 
    226         this.connected = false;
    227 
    228         ctx.close();
    229 
    230     }
    231 
    232     private void onPubComp(ChannelHandlerContext ctx, MqttMessage msg) {
    233 
    234     }
    235     
    236     private void onPubRel(ChannelHandlerContext ctx, MqttMessage msg) {
    237     }
    238 
    239     private void onPubRec(ChannelHandlerContext ctx, MqttMessage msg) {
    240     }
    241     
    242     private void onPubAck(ChannelHandlerContext ctx, MqttMessage msg) {
    243     }
    244 
    245     private void onPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
    246     }
    247 }
    BrokerHandler类

     19 行中的channelRead0方法中有俩个参数。一个为ChannelHandlerContext(通首的上下文)。一个是MqttMessage(客户端来的MQTT报文)。我们接下来动作都是跟MqttMessage来做相关的逻辑处理。这一点从55行就可以看出来。我们可以判断他是什么类型的报文。笔者这里只实现连接报文的处理。21行的代码msg.decoderResult().isFailure()是用来判断传过来的报文是不是正确的。事实上是Netty框架帮我们做了第一层的验证。23行就是获得发生的异常。

    从第99行onConnect方法开始就是处理连接报文的处理。笔者这里只做下面相关的处理。

    1.验证保持连接(Keep Alive)的有效性。代码如下

    1      if (msg.variableHeader().keepAliveTimeSeconds() > 0 && msg.variableHeader().keepAliveTimeSeconds() <= this.keepAliveMax) {
    2             this.keepAlive = msg.variableHeader().keepAliveTimeSeconds();
    3         }

    2.验证客户ID为空的时候,还要求保存会话状。这是不合理的。因为我的会话状态是跟根客户ID来保存。否则的话,随更给一个。反正后面还是清除会话状态。那么为什么会有空的呢?主要是在MQTT 3.1.1里面指出客户ID可以为空了。

     1  if (StringUtils.isBlank(this.clientId)) {
     2             if (!this.cleanSession) {
     3 
     4                 BrokerSessionHelper.sendMessage(
     5                         ctx,
     6                         MqttMessageFactory.newMessage(
     7                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
     8                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
     9                                 null),
    10                         "INVALID",
    11                         null,
    12                         true);
    13 
    14                 ctx.close();
    15 
    16                 return;
    17 
    18             } else {
    19                 this.clientId =  java.util.UUID.randomUUID().toString();
    20             }
    21         }

    3.判断是否是第二次连接报文。如果是的话,就要断开了。

    1  if (this.connected) {
    2             ctx.close();
    3             return;
    4         }

    4.判断用户和密码是否合法性。比如上一章出讲到的只有在用户名标志为1的时候,密码才可以出现。

       boolean userNameFlag = msg.variableHeader().hasUserName();
            boolean passwordFlag = msg.variableHeader().hasPassword();
            this.userName = msg.payload().userName();
    
            String password = "" ;
            if( msg.payload().passwordInBytes() != null  && msg.payload().passwordInBytes().length > 0)
                password =   new String(msg.payload().passwordInBytes());
    
            boolean mistake = false;
    
            //如果有用户名标示,那么就必须有密码标示。
            //当有用户名标的时候,用户不能为空。
            //当有密码标示的时候,密码不能为空。
            if (userNameFlag) {
                if (StringUtils.isBlank(this.userName))
                    mistake = true;
            } else {
                if (StringUtils.isNotBlank(this.userName) || passwordFlag) mistake = true;
            }
    
    
            if (passwordFlag) {
    
                if (StringUtils.isBlank(password)) mistake = true;
            } else {
                if (StringUtils.isNotBlank(password)) mistake = true;
            }
    
            if (mistake) {
                BrokerSessionHelper.sendMessage(
                        ctx,
                        MqttMessageFactory.newMessage(
                                new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false),
                                null),
                        this.clientId,
                        null,
                        true);
                ctx.close();
                return;
            }

    6.接受客户端了。事实上笔者还有很多没有做的事情。比如保存会状态的处理。因为主要是为学习所以就没有讲出来。在加上会话状态存保就要思考保存在哪里。同时还有一个就是用户的合法性验证没有处理。

    1   BrokerSessionHelper.sendMessage(
    2                 ctx,
    3                 MqttMessageFactory.newMessage(
    4                         new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
    5                         new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !this.cleanSession),
    6                         null),
    7                 this.clientId,
    8                 null,
    9                 true);

    7.处理当前报文的遗嘱。

     1    String willTopic = msg.payload().willTopic();
     2         String willMessage = "";
     3         if(msg.payload().willMessageInBytes() != null && msg.payload().willMessageInBytes().length > 0)
     4             willMessage =  new String(msg.payload().willMessageInBytes());
     5 
     6         if (msg.variableHeader().isWillFlag() && StringUtils.isNotEmpty(willTopic) && StringUtils.isNotEmpty(willMessage)) {
     7 
     8             this.willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
     9                     new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
    10                     new MqttPublishVariableHeader(willTopic, 0),
    11                     Unpooled.wrappedBuffer(willMessage.getBytes())
    12             );
    13         }

     如果你看到这个类的最后代码的时候,会发现笔者也写了相关的ACNNACK响应。他的内容比较简单。大家看代码吧。

     1 private void onDisconnect(ChannelHandlerContext ctx) {
     2 
     3         if (!this.connected) {
     4             ctx.close();
     5             return;
     6         }
     7 
     8         BrokerSessionHelper.removeSession(this.clientId, ctx);
     9 
    10         this.willMessage = null;
    11 
    12         this.connected = false;
    13 
    14         ctx.close();
    15 
    16     }

    BrokerSessionHelper类的源码

    public class BrokerSessionHelper {
    
        private static final Map<String, ChannelHandlerContext> sessionRepository = new ConcurrentHashMap<>();
    
        public static void saveSession(String clientId, ChannelHandlerContext session) {
            sessionRepository.put(clientId, session);
        }
    
    
        public static ChannelHandlerContext getSession(String clientId) {
    
            return sessionRepository.get(clientId);
        }
    
        public static ChannelHandlerContext removeSession(String clientId) {
    
            return sessionRepository.remove(clientId);
        }
    
        public  static boolean removeSession(String clientId, ChannelHandlerContext session) {
            return sessionRepository.remove(clientId, session);
        }
    
        /**
         * 发送信息
         *
         * @param msg
         * @param clientId
         * @param packetId
         * @param flush
         */
        public static void sendMessage(MqttMessage msg, String clientId, Integer packetId, boolean flush) {
            ChannelHandlerContext ctx = getSession(clientId);
            if (ctx == null) {
                String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId);
                return;
            }
            sendMessage(ctx, msg, clientId, packetId, flush);
        }
    
    
        /**
         * 发送信息
         *
         * @param ctx
         * @param msg
         * @param clientId
         * @param packetId
         * @param flush
         */
        public static void sendMessage(ChannelHandlerContext ctx, MqttMessage msg, String clientId, Integer packetId, boolean flush) {
            String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId);
            ChannelFuture future = flush ? ctx.writeAndFlush(msg) : ctx.write(msg);
            future.addListener(f -> {
                if (f.isSuccess()) {
    
                } else {
    
                }
            });
        }
    }
    View Code

    BrokerSessionHelper类就是用于存放当前服务器上相关通道信息。同时用于发送返回的相关报文。读者们可以进行看代码吧。

    这个时候就你们只按照以前面讲的去做。就可以抓到报文了。客户端的话。笔者只用前面说的MQTTLens来测试。

  • 相关阅读:
    python基础
    c# String ,String[] 和 List<String>之间的转换
    c#上位机与三菱PLC(FX3U)串口通讯
    Convert.ToInt32()和int.Parse()区别
    代码走查25条疑问
    前后端分离-django主机管理开发二
    前后端分离-django主机管理开发一
    django图书管理系统一
    力扣题目练习一
    ELK实战-kibana安装使用
  • 原文地址:https://www.cnblogs.com/hayasi/p/7782780.html
Copyright © 2011-2022 走看看