zoukankan      html  css  js  c++  java
  • MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

     MQTT moquette 的Server发布主题

    Java代码  收藏代码
    1. package com.etrip.mqtt.future;  
    2.   
    3. import java.net.URISyntaxException;  
    4.   
    5. import org.fusesource.mqtt.client.FutureConnection;  
    6. import org.fusesource.mqtt.client.MQTT;  
    7. import org.fusesource.mqtt.client.QoS;  
    8. import org.fusesource.mqtt.client.Topic;  
    9. import org.slf4j.Logger;  
    10. import org.slf4j.LoggerFactory;  
    11.   
    12. /** 
    13.  *  
    14.  *  
    15.  *  
    16.  * 采用Future式 发布主题  
    17.  *  
    18.  * @author longgangbai 
    19.  */  
    20. public class MQTTFutureServer {  
    21.         private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureServer.class);  
    22.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
    23.         private final static boolean CLEAN_START = true;  
    24.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
    25.         public  static Topic[] topics = {  
    26.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
    27.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
    28.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
    29.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
    30.         public final  static long RECONNECTION_DELAY=2000;  
    31.           
    32.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
    33.         public static void main(String[] args)   {  
    34.             MQTT mqtt = new MQTT();  
    35.             try {  
    36.                 //设置服务端的ip  
    37.                 mqtt.setHost(CONNECTION_STRING);  
    38.                 //连接前清空会话信息  
    39.                 mqtt.setCleanSession(CLEAN_START);  
    40.                 //设置重新连接的次数  
    41.                 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
    42.                 //设置重连的间隔时间  
    43.                 mqtt.setReconnectDelay(RECONNECTION_DELAY);  
    44.                 //设置心跳时间  
    45.                 mqtt.setKeepAlive(KEEP_ALIVE);  
    46.                 //设置缓冲的大小  
    47.                 mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
    48.       
    49.                 //创建连接   
    50.                 final FutureConnection connection= mqtt.futureConnection();  
    51.                 connection.connect();  
    52.                 int count=1;  
    53.                 while(true){  
    54.                     count++;  
    55.                     // 用于发布消息,目前手机段不需要向服务端发送消息  
    56.                     //主题的内容  
    57.                     String message="hello "+count+"chinese people !";  
    58.                     String topic = "china/beijing";  
    59.                     connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE,  
    60.                             false);  
    61.                     System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message);  
    62.                       
    63.                 }  
    64.             } catch (URISyntaxException e) {  
    65.                 // TODO Auto-generated catch block  
    66.                 e.printStackTrace();  
    67.             } catch (Exception e) {  
    68.                 // TODO Auto-generated catch block  
    69.                 e.printStackTrace();  
    70.             }  
    71.         }  
    72. }  

     MQTT moquette 的Client接收主题

    Java代码  收藏代码
    1. package com.etrip.mqtt.future;  
    2.   
    3. import java.net.URISyntaxException;  
    4.   
    5. import org.fusesource.mqtt.client.Future;  
    6. import org.fusesource.mqtt.client.FutureConnection;  
    7. import org.fusesource.mqtt.client.MQTT;  
    8. import org.fusesource.mqtt.client.Message;  
    9. import org.fusesource.mqtt.client.QoS;  
    10. import org.fusesource.mqtt.client.Topic;  
    11. import org.slf4j.Logger;  
    12. import org.slf4j.LoggerFactory;  
    13. /** 
    14.  *  
    15.  * MQTT moquette 的Client 段用于订阅主题,并接收主题信息 
    16.  *  
    17.  * 采用Future 式 订阅主题  
    18.  *  
    19.  * @author longgangbai 
    20.  */  
    21. public class MQTTFutureClient {  
    22.         private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureClient.class);  
    23.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
    24.         private final static boolean CLEAN_START = true;  
    25.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
    26.         private final static String CLIENT_ID = "publishService";  
    27.         public  static Topic[] topics = {  
    28.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
    29.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
    30.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
    31.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
    32.         public final  static long RECONNECTION_DELAY=2000;  
    33.           
    34.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
    35.           
    36.           
    37.           public static void main(String[] args)   {  
    38.                 //创建MQTT对象  
    39.                 MQTT mqtt = new MQTT();  
    40.                 try {  
    41.                     //设置mqtt broker的ip和端口  
    42.                     mqtt.setHost(CONNECTION_STRING);  
    43.                     //连接前清空会话信息  
    44.                     mqtt.setCleanSession(CLEAN_START);  
    45.                     //设置重新连接的次数  
    46.                     mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
    47.                     //设置重连的间隔时间  
    48.                     mqtt.setReconnectDelay(RECONNECTION_DELAY);  
    49.                     //设置心跳时间  
    50.                     mqtt.setKeepAlive(KEEP_ALIVE);  
    51.                     //设置缓冲的大小  
    52.                     mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
    53.                       
    54.                     //获取mqtt的连接对象BlockingConnection  
    55.                     final FutureConnection connection= mqtt.futureConnection();  
    56.                     connection.connect();  
    57.                     connection.subscribe(topics);  
    58.                     while(true){  
    59.                         Future<Message> futrueMessage=connection.receive();  
    60.                         Message message =futrueMessage.await();  
    61.                           
    62.                           
    63.                         System.out.println("MQTTFutureClient.Receive Message "+ "Topic Title :"+message.getTopic()+" context :"+String.valueOf(message.getPayloadBuffer()));  
    64.                     }  
    65.                 } catch (URISyntaxException e) {  
    66.                     // TODO Auto-generated catch block  
    67.                     e.printStackTrace();  
    68.                 } catch (Exception e) {  
    69.                     // TODO Auto-generated catch block  
    70.                     e.printStackTrace();  
    71.                 }finally{  
    72.                       
    73.                 }  
    74.             }  
    75. }  
  • 相关阅读:
    Android 自定义标题栏 并进行事件处理
    java synchronized详解
    Java中LinkedList与ArrayList有什么区别
    android动态全屏切换
    java线程机制介绍
    设置导航栏背景和文字属性
    Dictionary的用法
    bundle
    解析Json
    Copy与MutableCopy
  • 原文地址:https://www.cnblogs.com/yudar/p/4615695.html
Copyright © 2011-2022 走看看