zoukankan      html  css  js  c++  java
  • MQTT协议之moquette 安装使用

    在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式。具体参看官网,Moquette是基于netty(老版本使用的是mina) 的模型的一个Java MQTT broker,支持websocket,SSL。
    如果想直接启动 moquette-broker-0.4-jar-with-dependencies.jar的jar文件方式
     可以执行一些命令实现 
            java -jar moquette-broker-0.4-jar-with-dependencies.jar
    google code 下载MQTT moquette Broker 地址:

        https://code.google.com/p/moquette-mqtt/

       项目已迁移到github:https://github.com/andsel/moquette,有人对该项目进行改造,可以选择使用mina或netty)https://github.com/milliondreams/moquette-mqtt

        
    GIT 下载MQTT moquette client 地址:
        https://github.com/fusesource/mqtt-client
     
    在应用程序中使用MQTT的应用:

    MQTT moquette 的broker服务启动代码(启动类org.dna.mqtt.moquette.server.Server)如下:

    [java] view plain copy
     print?
    1. /* 
    2.  * Copyright (c) 2012-2014 The original author or authors 
    3.  * ------------------------------------------------------ 
    4.  * All rights reserved. This program and the accompanying materials 
    5.  * are made available under the terms of the Eclipse Public License v1.0 
    6.  * and Apache License v2.0 which accompanies this distribution. 
    7.  * 
    8.  * The Eclipse Public License is available at 
    9.  * http://www.eclipse.org/legal/epl-v10.html 
    10.  * 
    11.  * The Apache License v2.0 is available at 
    12.  * http://www.opensource.org/licenses/apache2.0.php 
    13.  * 
    14.  * You may elect to redistribute this code under either of these licenses. 
    15.  */  
    16. package org.dna.mqtt.moquette.server;  
    17.   
    18. import java.io.File;  
    19. import java.io.IOException;  
    20. import java.text.ParseException;  
    21. import java.util.Properties;  
    22. import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;  
    23. import org.dna.mqtt.moquette.server.netty.NettyAcceptor;  
    24. import org.slf4j.Logger;  
    25. import org.slf4j.LoggerFactory;  
    26. /** 
    27.  * Launch a  configured version of the server. 
    28.  * @author andrea 
    29.  */  
    30. public class Server {  
    31.       
    32.     private static final Logger LOG = LoggerFactory.getLogger(Server.class);  
    33.       
    34.     //数据持久化数据目录,使用mapdb  
    35.     /*MapDB是一个快速、易用的嵌入式Java数据库引擎,它提供了基于磁盘或者堆外(off- heap允许Java直接操作内存空间,  
    36.      * 类似于C的malloc和free)存储的并发的Maps、Sets、Queues。MapDB的前身是JDBM,已经有15年的历史。 
    37.      * MapDB支持 ACID事务、MVCC隔离,它的jar包只有200KB,且无其它依赖,非常轻量。 
    38.      * 相对来说功能已经稳定,并有全职 的开发者支持开发。*/  
    39.       
    40.     public static final String STORAGE_FILE_PATH = System.getProperty("user.home") +  
    41.             File.separator + "moquette_store.mapdb";  
    42.   
    43.     private ServerAcceptor m_acceptor;  
    44.     SimpleMessaging messaging;  
    45.       
    46.     public static void main(String[] args) throws IOException {  
    47.         final Server server = new Server();  
    48.         server.startServer();  
    49.         System.out.println("Server started, version 0.7-SNAPSHOT");  
    50.         //进程关闭前,释放资源  
    51.         Runtime.getRuntime().addShutdownHook(new Thread() {  
    52.             @Override  
    53.             public void run() {  
    54.                 server.stopServer();  
    55.             }  
    56.         });  
    57.     }  
    58.       
    59.     /** 
    60.      * Starts Moquette bringing the configuration from the file  
    61.      * located at config/moquette.conf 
    62.      */  
    63.     public void startServer() throws IOException {  
    64.         String configPath = System.getProperty("moquette.path"null);  
    65.         startServer(new File(configPath, "config/moquette.conf"));  
    66.     }  
    67.   
    68.     /** 
    69.      * Starts Moquette bringing the configuration from the given file 
    70.      */  
    71.     public void startServer(File configFile) throws IOException {  
    72.         ConfigurationParser confParser = new ConfigurationParser();  
    73.         try {  
    74.             confParser.parse(configFile);  
    75.         } catch (ParseException pex) {  
    76.             LOG.warn("An error occurred in parsing configuration, fallback on default configuration", pex);  
    77.         }  
    78.         Properties configProps = confParser.getProperties();  
    79.         startServer(configProps);  
    80.     }  
    81.       
    82.     /** 
    83.      * Starts the server with the given properties. 
    84.      *  
    85.      * Its need at least the following properties: 
    86.      * <ul> 
    87.      *  <li>port</li> 
    88.      *  <li>password_file</li> 
    89.      * </ul> 
    90.      */  
    91.     public void startServer(Properties configProps) throws IOException {  
    92.         messaging = SimpleMessaging.getInstance();  
    93.         messaging.init(configProps);  
    94.           
    95.         m_acceptor = new NettyAcceptor();  
    96.         m_acceptor.initialize(messaging, configProps);  
    97.     }  
    98.       
    99.     public void stopServer() {  
    100.         System.out.println("Server stopping...");  
    101.         messaging.stop();  
    102.         m_acceptor.close();  
    103.         System.out.println("Server stopped");  
    104.     }  
    105. }  

    下载moquette-mqtt源码,导入eclipse中,运行启动类。默认端口:1883

    配置说明:config/moquette.conf

    [plain] view plain copy
     print?
    1. ##############################################  
    2. #  Moquette configuration file.   
    3. #  
    4. #  The synthax is equals to mosquitto.conf  
    5. #   
    6. ##############################################  
    7.   
    8. #启动服务端口  
    9. port 1883  
    10. #websocket 端口  
    11. websocket_port 8080  
    12. #启动主机的IP  
    13. host 0.0.0.0  
    14.   
    15. #密码文件  
    16. password_file password_file.conf  
    17.   
    18. ##支持SSL  
    19. #ssl_port 8883  
    20. #jks_path serverkeystore.jks  
    21. #key_store_password passw0rdsrv  
    22. #key_manager_password passw0rdsrv  
    密码文件password_file.conf,用户名密码采用冒号分割":"。

    该项目使用distribution打包成产品包。下面使用mqtt-client采用阻塞式实现消息的发布并接收。

    发送消息:

    [java] view plain copy
     print?
    1. package cn.smartslim.mqtt.demo.fusesource;  
    2.   
    3. import java.net.URISyntaxException;  
    4.   
    5. import org.fusesource.mqtt.client.BlockingConnection;  
    6. import org.fusesource.mqtt.client.MQTT;  
    7. import org.fusesource.mqtt.client.QoS;  
    8. /** 
    9.  * MQTT moquette 的Server  段用于发布主题,并发布主题信息 
    10.  * 采用阻塞式 发布主题  
    11.  */  
    12. public class MQTTServer {  
    13.         private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";  
    14.         private final static boolean CLEAN_START = true;  
    15.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
    16.           
    17.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
    18.         public final  static long RECONNECTION_DELAY=2000;  
    19.           
    20.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
    21.           
    22.         public static void main(String[] args)   {  
    23.             MQTT mqtt = new MQTT();  
    24.             try {  
    25.                 //设置服务端的ip  
    26.                 mqtt.setHost(CONNECTION_STRING);  
    27.                 //连接前清空会话信息  
    28.                 mqtt.setCleanSession(CLEAN_START);  
    29.                 //设置重新连接的次数  
    30.                 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
    31.                 //设置重连的间隔时间  
    32.                 mqtt.setReconnectDelay(RECONNECTION_DELAY);  
    33.                 //设置心跳时间  
    34.                 mqtt.setKeepAlive(KEEP_ALIVE);  
    35.                 //设置缓冲的大小  
    36.                 mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
    37.       
    38.                 //创建连接 ,使用阻塞式  
    39.                 BlockingConnection connection = mqtt.blockingConnection();  
    40.                 //开始连接  
    41.                 connection.connect();  
    42.                 try {  
    43.                     int count=0;  
    44.                     while(true){  
    45.                         count++;  
    46.                         //订阅的主题  
    47.                         String topic="mqtt/test";  
    48.                         //主题的内容  
    49.                         String message="hello "+count+" mqtt!";  
    50.                         connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false);  
    51.                         System.out.println("MQTTServer Message  Topic="+topic+"  Content :"+message);  
    52.                         Thread.sleep(2000);  
    53.                     }  
    54.                 } catch (InterruptedException e) {  
    55.                     e.printStackTrace();  
    56.                 }  
    57.             } catch (URISyntaxException e) {  
    58.                 e.printStackTrace();  
    59.             } catch (Exception e) {  
    60.                 e.printStackTrace();  
    61.             }  
    62.         }  
    63. }  
    接收消息:

    [java] view plain copy
     print?
    1. package cn.smartslim.mqtt.demo.fusesource;  
    2.   
    3. import java.net.URISyntaxException;  
    4.   
    5. import org.fusesource.mqtt.client.BlockingConnection;  
    6. import org.fusesource.mqtt.client.MQTT;  
    7. import org.fusesource.mqtt.client.Message;  
    8. import org.fusesource.mqtt.client.QoS;  
    9. import org.fusesource.mqtt.client.Topic;  
    10. /** 
    11.  * MQTT moquette 的Client 段用于订阅主题,并接收主题信息 
    12.  * 采用阻塞式 订阅主题  
    13.  */  
    14. public class MQTTClient {  
    15.         private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";  
    16.         private final static boolean CLEAN_START = true;  
    17.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
    18.         public  static Topic[] topics = {  
    19.                         new Topic("china/beijing", QoS.EXACTLY_ONCE)};  
    20.           
    21.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
    22.         public final  static long RECONNECTION_DELAY=2000;  
    23.           
    24.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
    25.           
    26.           
    27.       public static void main(String[] args)   {  
    28.         //创建MQTT对象  
    29.         MQTT mqtt = new MQTT();  
    30.         BlockingConnection connection=null;  
    31.         try {  
    32.             //设置mqtt broker的ip和端口  
    33.             mqtt.setHost(CONNECTION_STRING);  
    34.             //连接前清空会话信息  
    35.             mqtt.setCleanSession(CLEAN_START);  
    36.             //设置重新连接的次数  
    37.             mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
    38.             //设置重连的间隔时间  
    39.             mqtt.setReconnectDelay(RECONNECTION_DELAY);  
    40.             //设置心跳时间  
    41.             mqtt.setKeepAlive(KEEP_ALIVE);  
    42.             //设置缓冲的大小  
    43.             mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
    44.               
    45.             //获取mqtt的连接对象BlockingConnection  
    46.             connection = mqtt.blockingConnection();  
    47.             //MQTT连接的创建   
    48.             connection.connect();  
    49.             //创建相关的MQTT 的主题列表   
    50.             Topic[] topics = {new Topic("mqtt/test", QoS.AT_LEAST_ONCE)};  
    51.             //订阅相关的主题信息   
    52.             byte[] qoses = connection.subscribe(topics);  
    53.             //  
    54.             while(true){  
    55.                 //接收订阅的消息内容  
    56.                 Message message = connection.receive();  
    57.                 //获取订阅的消息内容   
    58.                 byte[] payload = message.getPayload();  
    59.                 // process the message then:  
    60.                 System.out.println("MQTTClient Message  Topic="+message.getTopic()+" Content :"+new String(payload));  
    61.                 //签收消息的回执  
    62.                 message.ack();  
    63.                 Thread.sleep(2000);  
    64.             }  
    65.         } catch (URISyntaxException e) {  
    66.             e.printStackTrace();  
    67.         } catch (Exception e) {  
    68.             e.printStackTrace();  
    69.         }finally{  
    70.             try {  
    71.                 connection.disconnect();  
    72.             } catch (Exception e) {  
    73.                 e.printStackTrace();  
    74.             }  
    75.         }  
    76.     }  
    77. }  

  • 相关阅读:
    Expedition---POJ
    LIS的优化算法O(n log n)
    Super Jumping! Jumping! Jumping! ---HDU
    数据库连接判断
    android stuido控件
    sql查询语句
    c# 字符串操作
    windows操作
    C# sql操作
    datagridview
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13317665.html
Copyright © 2011-2022 走看看