zoukankan      html  css  js  c++  java
  • MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用

      参看官方文档:

    http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm

     *  Java 为 MQ Telemetry Transport 创建异步发布程序
     *在此任务中,您将遵循教程来修改第一个发布程序。通过修改,
     *使应用程序能够发送发布而不等待传递确认信息。传递确认
     *信息由您创建的回调类来接收。
     *
     *
     *
     *4.使客户机断开连接
     *  a.除去其中包含 token.waitForCompletion 表达式的语句。 主线程将继续执行,而不等待传递发布。
     *  b.测试客户机是否已断开连接。 将错误返回到 MqttCallback 中的 lostConnection 方法之后,MQTT 客户机将断开连接,客户机应用程序也可能断开连接。测试是否有打开的连接。
     *  c.使用常量 Example.quiesceTimeout 来设置使客户机停顿的最长时间。
     *  if (client.isConnected())
     *      client.disconnect(Example.quiesceTimeout);
     *当满足下面三种情况的组合形式时,客户机就完成了: 
     *  a.已经对在此会话中(如果重新启动了会话,则是在先前会话中)已发布的所有消息调用了回调。
     *  b.消息未完成,然而停顿时间间隔已到期。缺省情况下,停顿时间间隔为 30 秒。通过将要等待的毫秒数作为 client.disconnect 的一个参数来传递,即可更改停顿超时。
     *  c.在发布了某些消息并由客户机进行排队之后,但是在发送这些消息之前调用了 client.disconnect。已排队的消息尚未处于“未完成”状态。如果会话可重新启动,那么重新启动会话时就会重新发送消息。
     *  缺省情况下,停顿时间间隔为 30 秒。

    MQTT的消息发布代码:

    Java代码  收藏代码
    1. package com.etrip.wsmqtt.server;  
    2.   
    3. import com.ibm.micro.client.mqttv3.MqttClient;  
    4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
    5. import com.ibm.micro.client.mqttv3.MqttMessage;  
    6. import com.ibm.micro.client.mqttv3.MqttTopic;  
    7. /** 
    8.  * 使用 Java 为 MQ Telemetry Transport 创建异步发布程序 
    9.  *  
    10.  *  
    11.  *  
    12.  * 
    13.  * 消息发布的类的具体的实现 
    14.  *  
    15.  * @author longgangbai 
    16.  *  
    17.  */  
    18. public class WSMQTTServerPubAsync {  
    19.       public static void main(String[] args) {  
    20.             try {  
    21.                   //创建MqttClient对象  
    22.                   MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);  
    23.                    
    24.                   //创建MQTT相关的主题  
    25.                   MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);  
    26.                     
    27.                   //创建MQTT的消息体  
    28.                   MqttMessage message = new MqttMessage();  
    29.                   //设置消息传输的类型  
    30.                   message.setQos(2);  
    31.                     
    32.                   //设置是否在服务器中保存消息体  
    33.                   message.setRetained(false);  
    34.                     
    35.                   //设置消息的内容  
    36.                   message.setPayload(WSMQTTServerCommon.publication.getBytes());  
    37.                     
    38.                   //创建一个MQTT的回调类  
    39.                   WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);  
    40.                     
    41.                   //MqttClient绑定  
    42.                   client.setCallback(callback);  
    43.                     
    44.                   //MqttClient连接  
    45.                   client.connect();  
    46.                     
    47.                   System.out.println("Publishing "" + message.toString()  
    48.                       + "" on topic "" + topic.getName() + "" with QoS = "  
    49.                       + message.getQos());  
    50.                   System.out.println("For client instance "" + client.getClientId()  
    51.                       + "" on address " + client.getServerURI() + """);  
    52.                     
    53.                   //发送消息并获取回执  
    54.                   MqttDeliveryToken token = topic.publish(message);  
    55.                     
    56.                   System.out.println("With delivery token "" + token.hashCode()  
    57.                       + " delivered: " + token.isComplete());  
    58.                   Thread.sleep(100000000000000l);  
    59.                     
    60.                   //关闭连接  
    61.                   if (client.isConnected())  
    62.                       client.disconnect(WSMQTTServerCommon.quiesceTimeout);  
    63.                   System.out.println("Disconnected: delivery token "" + token.hashCode()  
    64.                       + "" received: " + token.isComplete());  
    65.             } catch (Exception e) {  
    66.               e.printStackTrace();  
    67.             }  
    68.       }  
    69. }  

    MQTT消息发布回调代码:

    Java代码  收藏代码
    1. package com.etrip.wsmqtt.server;  
    2.   
    3. import com.ibm.micro.client.mqttv3.*;  
    4. /** 
    5.  * 发布消息的回调类 
    6.  *  
    7.  * 必须实现MqttCallback的接口并实现对应的相关接口方法 
    8.  *      ◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。 
    9.  *  ◦必须在回调类中实现三个方法: 
    10.  *  
    11.  *  public void messageArrived(MqttTopic topic, MqttMessage message) 
    12.  *  接收已经预订的发布。 
    13.  *  
    14.  *  public void connectionLost(Throwable cause) 
    15.  *  在断开连接时调用。 
    16.  *  
    17.  *  public void deliveryComplete(MqttDeliveryToken token)) 
    18.  *      接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 
    19.  *  
    20.  *  
    21.  *  ◦由 MqttClient.connect 激活此回调。 
    22.  *  
    23.  * @author longgangbai 
    24.  */  
    25. public class WSMQTTServerCallBack implements MqttCallback {  
    26.       private String instanceData = "";  
    27.       public WSMQTTServerCallBack(String instance) {  
    28.         instanceData = instance;  
    29.       }  
    30.       /** 
    31.        * 接收到消息的回调的方法 
    32.        */  
    33.       public void messageArrived(MqttTopic topic, MqttMessage message) {  
    34.         try {  
    35.           System.out.println("Message arrived: "" + message.toString()  
    36.               + "" on topic "" + topic.toString() + "" for instance ""  
    37.               + instanceData + """);  
    38.         } catch (Exception e) {  
    39.           e.printStackTrace();  
    40.         }  
    41.       }  
    42.       /** 
    43.        * 消息连接丢失 
    44.        */  
    45.       public void connectionLost(Throwable cause) {  
    46.         System.out.println("Connection lost on instance "" + instanceData  
    47.             + "" with cause "" + cause.getMessage() + "" Reason code "   
    48.             + ((MqttException)cause).getReasonCode() + "" Cause ""   
    49.             + ((MqttException)cause).getCause() +  """);      
    50.         cause.printStackTrace();  
    51.       }  
    52.       /** 
    53.        *  
    54.        */  
    55.       public void deliveryComplete(MqttDeliveryToken token) {  
    56.         try {  
    57.           System.out.println("Delivery token "" + token.hashCode()  
    58.               + "" received by instance "" + instanceData + """);  
    59.         } catch (Exception e) {  
    60.           e.printStackTrace();  
    61.         }  
    62.       }  
    63. }  

    常量类:

    Java代码  收藏代码
    1. package com.etrip.wsmqtt.server;  
    2.   
    3. import java.util.UUID;  
    4. /** 
    5.  *  
    6.  * 消息发布消息的常量字段 
    7.  *  
    8.  * @author longgangbai 
    9.  */  
    10. public final class WSMQTTServerCommon {  
    11.   //发布broker的ip和端口  
    12.   public static final String  TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");  
    13.   //客户端的Id  
    14.   public static String clientId =String.format("%-23.23s",  System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');  
    15.   //发布消息的主题  
    16.   public static final String topicString = System.getProperty("topicString", "china/beijing");  
    17.   //发布的消息  
    18.   public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));  
    19.   //超时时间  
    20.   public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));  
    21.     
    22.   public static final int  sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));  
    23.     
    24.   public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));  
    25.     
    26.   public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));  
    27.     
    28.   public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));  
    29. }  
  • 相关阅读:
    数据结构问题总结
    基础dp问题总结
    搜索问题总结
    二分+贪心check问题总结
    基础图论问题总结
    数学问题总结
    匹配与网络流学习笔记(在学习中)
    我的第一篇题解
    python+Sqlite+Dataframe打造金融股票数据结构
    用Pandas Dataframe来抓取重构金融股票的各种业务&数据形态
  • 原文地址:https://www.cnblogs.com/yudar/p/4613738.html
Copyright © 2011-2022 走看看