zoukankan      html  css  js  c++  java
  • mqtt client api: 阻塞API

    fusesource版本:mqtt-client-1.11.jar
    下载地址:https://github.com/fusesource/mqtt-client

    fusesource提供三种mqtt client api: 阻塞API,基于Futur的API和回调API。其中,回调API是最复杂的也是性能最好的,另外两种均是对回调API的封装。 我们下面就简单介绍一下回调API的使用方法。

      1 import org.fusesource.hawtbuf.Buffer;
      2 import org.fusesource.hawtbuf.UTF8Buffer;
      3 import org.fusesource.hawtdispatch.Dispatch;
      4 import org.fusesource.hawtdispatch.DispatchQueue;
      5 import org.fusesource.mqtt.client.BlockingConnection;
      6 import org.fusesource.mqtt.client.Callback;
      7 import org.fusesource.mqtt.client.CallbackConnection;
      8 import org.fusesource.mqtt.client.FutureConnection;
      9 import org.fusesource.mqtt.client.Listener;
     10 import org.fusesource.mqtt.client.MQTT;
     11 import org.fusesource.mqtt.client.Message;
     12 import org.fusesource.mqtt.client.QoS;
     13 import org.fusesource.mqtt.client.Topic;
     14 import org.fusesource.mqtt.client.Tracer;
     15 import org.fusesource.mqtt.codec.MQTTFrame; 
     16 public class MqttClient {
     17 public static void main(String[] args)
     18 {
     19  try {
     20    MQTT mqtt=new MQTT();
     21    
     22    //MQTT设置说明
     23    mqtt.setHost("tcp://10.1.58.191:1883");
     24    mqtt.setClientId("876543210"); //用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成
     25    mqtt.setCleanSession(false); //若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
     26    mqtt.setKeepAlive((short) 60);//定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待
     27    mqtt.setUserName("admin");//服务器认证用户名
     28    mqtt.setPassword("admin");//服务器认证密码
     29    
     30    mqtt.setWillTopic("willTopic");//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
     31    mqtt.setWillMessage("willMessage");//设置“遗嘱”消息的内容,默认是长度为零的消息
     32    mqtt.setWillQos(QoS.AT_LEAST_ONCE);//设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
     33    mqtt.setWillRetain(true);//若想要在发布“遗嘱”消息时拥有retain选项,则为true
     34    mqtt.setVersion("3.1.1");
     35    
     36    //失败重连接设置说明
     37    mqtt.setConnectAttemptsMax(10L);//客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
     38    mqtt.setReconnectAttemptsMax(3L);//客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
     39    mqtt.setReconnectDelay(10L);//首次重连接间隔毫秒数,默认为10ms
     40    mqtt.setReconnectDelayMax(30000L);//重连接间隔毫秒数,默认为30000ms
     41    mqtt.setReconnectBackOffMultiplier(2);//设置重连接指数回归。设置为1则停用指数回归,默认为2
     42    
     43    //Socket设置说明
     44          mqtt.setReceiveBufferSize(65536);//设置socket接收缓冲区大小,默认为65536(64k)
     45          mqtt.setSendBufferSize(65536);//设置socket发送缓冲区大小,默认为65536(64k)
     46          mqtt.setTrafficClass(8);//设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输
     47          
     48          //带宽限制设置说明
     49          mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制
     50          mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制
     51          
     52          //选择消息分发队列
     53          mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法
     54    
     55          //设置跟踪器
     56    mqtt.setTracer(new Tracer(){
     57              @Override
     58              public void onReceive(MQTTFrame frame) {
     59                  System.out.println("recv: "+frame);
     60              } 
     61              @Override
     62              public void onSend(MQTTFrame frame) {
     63                  System.out.println("send: "+frame);
     64              } 
     65              @Override
     66              public void debug(String message, Object... args) {
     67                  System.out.println(String.format("debug: "+message, args));
     68              }
     69          });
     70    
     71    
     72    
     73          //使用回调式API
     74    final CallbackConnection callbackConnection=mqtt.callbackConnection();
     75    
     76    //连接监听
     77    callbackConnection.listener(new Listener() {
     78    
     79    //接收订阅话题发布的消息
     80    @Override
     81    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
     82     System.out.println("=============receive msg================"+new String(payload.toByteArray()));
     83      onComplete.run();
     84    }
     85    
     86    //连接失败
     87    @Override
     88    public void onFailure(Throwable value) {
     89     System.out.println("===========connect failure===========");
     90     callbackConnection.disconnect(null);
     91    }
     92    
     93       //连接断开
     94    @Override
     95    public void onDisconnected() {
     96     System.out.println("====mqtt disconnected=====");
     97     
     98    }
     99    
    100    //连接成功
    101    @Override
    102    public void onConnected() {
    103     System.out.println("====mqtt connected=====");
    104     
    105    }
    106   });
    107    
    108    
    109    
    110    //连接
    111    callbackConnection.connect(new Callback() {
    112     
    113      //连接失败
    114        public void onFailure(Throwable value) {
    115            System.out.println("============连接失败:"+value.getLocalizedMessage()+"============");
    116        } 
    117        // 连接成功
    118        public void onSuccess(Void v) { 
    119            //订阅主题
    120            Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
    121            callbackConnection.subscribe(topics, new Callback<byte[]>() {
    122                //订阅主题成功
    123             public void onSuccess(byte[] qoses) {
    124                    System.out.println("========订阅成功=======");
    125                }
    126             //订阅主题失败
    127                public void onFailure(Throwable value) {
    128                 System.out.println("========订阅失败=======");
    129                 callbackConnection.disconnect(null);
    130                }
    131            });
    132            
    133            
    134             //发布消息
    135            callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
    136                public void onSuccess(Void v) {
    137                  System.out.println("===========消息发布成功============");
    138                }
    139                public void onFailure(Throwable value) {
    140                 System.out.println("========消息发布失败=======");
    141                 callbackConnection.disconnect(null);
    142                }
    143            }); 
    144   
    145        }
    146    });
    147    
    148    
    149    
    150    while(true)
    151    {
    152     
    153    }
    154  
    155   
    156  } catch (Exception e) {
    157   e.printStackTrace();
    158  } 
    159   
    160 }
    161 }
  • 相关阅读:
    线程池
    单例设计模式
    String,StringBuffer,StringBuilder
    马踏棋盘算法
    最短路径问题 (迪杰斯特拉算法,弗洛伊德算法)
    最小生成树 修路问题(普里姆算法,克鲁斯卡尔算法)
    贪心算法 求解集合覆盖问题
    Stream 数组转换
    unittest与pytest对比
    条件编译
  • 原文地址:https://www.cnblogs.com/endv/p/11037424.html
Copyright © 2011-2022 走看看