zoukankan      html  css  js  c++  java
  • MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

       在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式。具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker。使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动。

     在MQTT moquette 中采用MINA作为底层消息的传递方式 
     
    本类的目的启动MQTT moquette Broker 的方式,
    本文的源代码来自  moquette-broker-0.1-jar-with-dependencies.jar 中的server类
    如果想直接启动 moquette-broker-0.1-jar-with-dependencies.jar的jar文件方式
     可以执行一些命令实现 
            java -jar moquette-broker-0.1-jar-with-dependencies.jar
     
     
    google code 下载MQTT moquette Broker 地址:
        http://code.google.com/p/moquette-mqtt/
        
    GIT 下载MQTT moquette client 地址:
        https://github.com/fusesource/mqtt-client

    在应用程序中使用MQTT的应用:

    MQTT moquette 的broker服务启动代码如下:

    Java代码  收藏代码
    1. package com.etrip.mqtt;  
    2.   
    3. import java.io.File;  
    4. import java.io.IOException;  
    5. import java.net.InetAddress;  
    6. import java.net.InetSocketAddress;  
    7.   
    8. import org.apache.mina.core.service.IoAcceptor;  
    9. import org.apache.mina.core.service.IoServiceStatistics;  
    10. import org.apache.mina.core.session.IdleStatus;  
    11. import org.apache.mina.core.session.IoSession;  
    12. import org.apache.mina.filter.codec.ProtocolCodecFilter;  
    13. import org.apache.mina.filter.codec.demux.DemuxingProtocolDecoder;  
    14. import org.apache.mina.filter.codec.demux.DemuxingProtocolEncoder;  
    15. import org.apache.mina.transport.socket.nio.NioSocketAcceptor;  
    16. import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;  
    17. import org.dna.mqtt.moquette.proto.ConnAckEncoder;  
    18. import org.dna.mqtt.moquette.proto.ConnectDecoder;  
    19. import org.dna.mqtt.moquette.proto.DisconnectDecoder;  
    20. import org.dna.mqtt.moquette.proto.DisconnectEncoder;  
    21. import org.dna.mqtt.moquette.proto.MQTTLoggingFilter;  
    22. import org.dna.mqtt.moquette.proto.PingReqDecoder;  
    23. import org.dna.mqtt.moquette.proto.PingRespEncoder;  
    24. import org.dna.mqtt.moquette.proto.PubAckDecoder;  
    25. import org.dna.mqtt.moquette.proto.PubAckEncoder;  
    26. import org.dna.mqtt.moquette.proto.PubCompDecoder;  
    27. import org.dna.mqtt.moquette.proto.PubCompEncoder;  
    28. import org.dna.mqtt.moquette.proto.PubCompMessage;  
    29. import org.dna.mqtt.moquette.proto.PubRecDecoder;  
    30. import org.dna.mqtt.moquette.proto.PubRecEncoder;  
    31. import org.dna.mqtt.moquette.proto.PubRelDecoder;  
    32. import org.dna.mqtt.moquette.proto.PubRelEncoder;  
    33. import org.dna.mqtt.moquette.proto.PublishDecoder;  
    34. import org.dna.mqtt.moquette.proto.PublishEncoder;  
    35. import org.dna.mqtt.moquette.proto.SubAckEncoder;  
    36. import org.dna.mqtt.moquette.proto.SubscribeDecoder;  
    37. import org.dna.mqtt.moquette.proto.UnsubAckEncoder;  
    38. import org.dna.mqtt.moquette.proto.UnsubscribeDecoder;  
    39. import org.dna.mqtt.moquette.proto.messages.ConnAckMessage;  
    40. import org.dna.mqtt.moquette.proto.messages.DisconnectMessage;  
    41. import org.dna.mqtt.moquette.proto.messages.PingRespMessage;  
    42. import org.dna.mqtt.moquette.proto.messages.PubAckMessage;  
    43. import org.dna.mqtt.moquette.proto.messages.PubRecMessage;  
    44. import org.dna.mqtt.moquette.proto.messages.PubRelMessage;  
    45. import org.dna.mqtt.moquette.proto.messages.PublishMessage;  
    46. import org.dna.mqtt.moquette.proto.messages.SubAckMessage;  
    47. import org.dna.mqtt.moquette.proto.messages.UnsubAckMessage;  
    48. import org.dna.mqtt.moquette.server.MQTTHandler;  
    49. import org.slf4j.Logger;  
    50. import org.slf4j.LoggerFactory;  
    51. /** 
    52.  *  
    53.  * 在MQTT moquette 中采用MINA作为底层消息的传递方式  
    54.  *  
    55.  * 本类的目的启动MQTT moquette Broker 的方式, 
    56.  *本文的源代码来自  moquette-broker-0.1-jar-with-dependencies.jar 中的server类 
    57.  * 如果想直接启动 moquette-broker-0.1-jar-with-dependencies.jar的jar文件方式 
    58.  * 可以执行一些命令实现  
    59.  *        java -jar moquette-broker-0.1-jar-with-dependencies.jar 
    60.  *  
    61.  *  
    62.  * google code 下载MQTT moquette Broker 地址: 
    63.  *    http://code.google.com/p/moquette-mqtt/ 
    64.  *     
    65.  * GIT 下载MQTT moquette client 地址: 
    66.  *  https://github.com/fusesource/mqtt-client  
    67.  *     
    68.  * @author longgangbai 
    69.  *  
    70.  *  
    71.  */  
    72. public class MQTTBrokerProxyServer {  
    73.       private static final Logger LOG = LoggerFactory.getLogger(MQTTBrokerProxyServer.class);  
    74.   
    75.       public static final String STORAGE_FILE_PATH = System.getProperty("user.home") + File.separator + "moquette_store.hawtdb";  
    76.       private IoAcceptor m_acceptor;  
    77.       SimpleMessaging messaging;  
    78.   
    79.       public static void main(String[] args)  
    80.         throws IOException  
    81.       {  
    82.         new MQTTBrokerProxyServer().startServer();  
    83.       }  
    84.   
    85.       protected void startServer() throws IOException  
    86.       {  
    87.         //编码协议类编码器  
    88.         DemuxingProtocolDecoder decoder = new DemuxingProtocolDecoder();  
    89.         decoder.addMessageDecoder(new ConnectDecoder());//连接编码  
    90.         decoder.addMessageDecoder(new PublishDecoder());//发布编码  
    91.         decoder.addMessageDecoder(new PubAckDecoder());//发布回执编码  
    92.         decoder.addMessageDecoder(new PubRelDecoder());  
    93.         decoder.addMessageDecoder(new PubRecDecoder());//接收编码  
    94.         decoder.addMessageDecoder(new PubCompDecoder());  
    95.         decoder.addMessageDecoder(new SubscribeDecoder());//订阅编码  
    96.         decoder.addMessageDecoder(new UnsubscribeDecoder());//取消订阅编码  
    97.         decoder.addMessageDecoder(new DisconnectDecoder());//断开连接编码  
    98.         decoder.addMessageDecoder(new PingReqDecoder());//心跳ping请求编码  
    99.           
    100.         //解码协议类解码器  
    101.         DemuxingProtocolEncoder encoder = new DemuxingProtocolEncoder();  
    102.   
    103.         encoder.addMessageEncoder(ConnAckMessage.class, new ConnAckEncoder());//连接解码  
    104.         encoder.addMessageEncoder(SubAckMessage.class, new SubAckEncoder());//订阅通知解码  
    105.         encoder.addMessageEncoder(UnsubAckMessage.class, new UnsubAckEncoder());//取消订阅解码  
    106.         encoder.addMessageEncoder(PubAckMessage.class, new PubAckEncoder());//发布回执解码  
    107.         encoder.addMessageEncoder(PubRecMessage.class, new PubRecEncoder());//接收解码  
    108.         encoder.addMessageEncoder(PubCompMessage.class, new PubCompEncoder());  
    109.         encoder.addMessageEncoder(PubRelMessage.class, new PubRelEncoder());  
    110.         encoder.addMessageEncoder(PublishMessage.class, new PublishEncoder());//发布解码  
    111.         encoder.addMessageEncoder(PingRespMessage.class, new PingRespEncoder());//心跳ping相应解码  
    112.         encoder.addMessageEncoder(DisconnectMessage.class,new DisconnectEncoder());//断开连接解码  
    113.           
    114.         this.m_acceptor = new NioSocketAcceptor();  
    115.         //设置日志的过滤链  
    116.         this.m_acceptor.getFilterChain().addLast("logger", new MQTTLoggingFilter("SERVER LOG"));  
    117.         //设置编码的过滤链  
    118.         this.m_acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(encoder, decoder));  
    119.         //创建业务处理器类  
    120.         MQTTHandler handler = new MQTTHandler();  
    121.         //创建一个处理消息体的消息  
    122.         this.messaging = SimpleMessaging.getInstance();  
    123.         this.messaging.init();  
    124.         //设置消息体  
    125.         handler.setMessaging(this.messaging);  
    126.         //设置业务处理器类  
    127.         this.m_acceptor.setHandler(handler);  
    128.           
    129.         ((NioSocketAcceptor)this.m_acceptor).setReuseAddress(true);  
    130.         ((NioSocketAcceptor)this.m_acceptor).getSessionConfig().setReuseAddress(true);  
    131.         this.m_acceptor.getSessionConfig().setReadBufferSize(2048);  
    132.         this.m_acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  
    133.         this.m_acceptor.getStatistics().setThroughputCalculationInterval(10);  
    134.         this.m_acceptor.getStatistics().updateThroughput(System.currentTimeMillis());  
    135.         //设置端口号  
    136.         this.m_acceptor.bind(new InetSocketAddress(1883));  
    137.         //获取绑定的本地的ip地址   
    138.         LOG.info("Server binded"+InetAddress.getLocalHost().getHostAddress());  
    139.         try {  
    140.             Thread.sleep(100000000000000L);  
    141.         } catch (InterruptedException e) {  
    142.             // TODO Auto-generated catch block  
    143.             e.printStackTrace();  
    144.         }  
    145.         //销毁broker对象的各种信息  
    146.         Runtime.getRuntime().addShutdownHook(new Thread()  
    147.         {  
    148.           public void run() {  
    149.               MQTTBrokerProxyServer.this.stopServer();  
    150.           }  
    151.         });  
    152.       }  
    153.   
    154.       protected void stopServer() {  
    155.         LOG.info("Server stopping...");  
    156.           
    157.         this.messaging.stop();  
    158.         //Mina  IO 统计类  
    159.         IoServiceStatistics statistics = this.m_acceptor.getStatistics();  
    160.         statistics.updateThroughput(System.currentTimeMillis());  
    161.         System.out.println(String.format("Total read bytes: %d, read throughtput: %f (b/s)", new Object[] { Long.valueOf(statistics.getReadBytes()), Double.valueOf(statistics.getReadBytesThroughput()) }));  
    162.         System.out.println(String.format("Total read msgs: %d, read msg throughtput: %f (msg/s)", new Object[] { Long.valueOf(statistics.getReadMessages()), Double.valueOf(statistics.getReadMessagesThroughput()) }));  
    163.         //关闭相关的会话  
    164.         for (IoSession session : this.m_acceptor.getManagedSessions().values()) {  
    165.           if ((session.isConnected()) && (!session.isClosing())) {  
    166.             session.close(false);  
    167.           }  
    168.         }  
    169.         //销毁本地IoAcceptor对象  
    170.         this.m_acceptor.unbind();  
    171.         this.m_acceptor.dispose();  
    172.         LOG.info("Server stopped");  
    173.       }  
    174.     }  

          由 以上代码可以看出,在发布订阅,心跳检测,连接断开,连接时候都需要创建相关的协议编码器对象类中添加相关的编码器对象。

            MQTTHandler类为主要broker处理发布和订阅消息的业务处理器类。

            IoServiceStatistics类信息统计类。主要统计在mina应用中读写信息的统计。

    上面代码主要讲解MQTT moquette的启动下面主要讲述服务段发布消息和客户端订阅接收信息的实现。

  • 相关阅读:
    从dotNet到VB6之模仿构造OleDbDataAdapter与dataset结合
    编程的偷懒艺术与美感
    给您参考,现在开发数据库项目用.net 2005成熟吗?还是用.net2003比较有保证
    access数据库版权及容量问题
    中天股票数据格式
    受伤与药油的最佳搭配
    关于发布各种股票软件数据格式
    .net 与flash8传递(互传)数组的技巧
    我的C语言合集
    ZOJ 1205 Martian Addition 解题报告
  • 原文地址:https://www.cnblogs.com/yudar/p/4613721.html
Copyright © 2011-2022 走看看