zoukankan      html  css  js  c++  java
  • MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

    使用IBM MQTTv3实现相关的发布订阅功能

    MQTTv3的发布消息的实现:

    Java代码  收藏代码
    1. package com.etrip.mqttv3;  
    2.   
    3. import com.ibm.micro.client.mqttv3.MqttClient;  
    4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
    5. import com.ibm.micro.client.mqttv3.MqttMessage;  
    6. import com.ibm.micro.client.mqttv3.MqttTopic;  
    7. /** 
    8.  * MQTTV3的发布消息类 
    9.  *  
    10.  * @author longgangbai 
    11.  */  
    12. public class MQTTPub {   
    13.     public static void doTest(){   
    14.         try {   
    15.             MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");   
    16.             MqttTopic topic = client.getTopic("tokudu/china");   
    17.             MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());   
    18.             message.setQos(1);   
    19.             client.connect();  
    20.             while(true){  
    21.                 MqttDeliveryToken token = topic.publish(message);   
    22.                 while (!token.isComplete()){   
    23.                     token.waitForCompletion(1000);   
    24.                 }   
    25.             }  
    26.         } catch (Exception e) {   
    27.             e.printStackTrace();   
    28.         }   
    29.     }   
    30. }   

     MQTTV3的订阅消息类

    Java代码  收藏代码
    1. package com.etrip.mqttv3;  
    2. import com.ibm.micro.client.mqttv3.MqttClient;  
    3. import com.ibm.micro.client.mqttv3.MqttConnectOptions;  
    4. /** 
    5.  * MQTTV3的订阅消息类 
    6.  *  
    7.  * @author longgangbai 
    8.  */  
    9. public class MQTTSubsribe {   
    10.     public static String doTest() {   
    11.         try {   
    12.             //创建MqttClient  
    13.             MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");   
    14.             //回调处理类  
    15.             CallBack callback = new CallBack();   
    16.             client.setCallback(callback);   
    17.             //创建连接可选项信息  
    18.             MqttConnectOptions conOptions = new MqttConnectOptions();   
    19.             //  
    20.             conOptions.setCleanSession(false);   
    21.             //连接broker  
    22.             client.connect(conOptions);   
    23.             //发布相关的订阅  
    24.             client.subscribe("tokudu/china", 1);   
    25.             //client.disconnect();   
    26.         } catch (Exception e) {   
    27.             e.printStackTrace();   
    28.             return "failed";   
    29.         }   
    30.         return "success";   
    31.     }   
    32. }   

     回调处理类处理订阅的消息类

    Java代码  收藏代码
    1. package com.etrip.mqttv3;  
    2.   
    3. import com.ibm.micro.client.mqttv3.MqttCallback;  
    4. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;  
    5. import com.ibm.micro.client.mqttv3.MqttMessage;  
    6. import com.ibm.micro.client.mqttv3.MqttTopic;  
    7. /** 
    8.  * 回调处理类 
    9.  * 处理订阅的消息类 
    10.  *  
    11.  * @author longgangbai 
    12.  */  
    13. public class CallBack implements MqttCallback {   
    14.       
    15.     public CallBack() {   
    16.     }   
    17.     /** 
    18.      * 接收到信息的处理 
    19.      */  
    20.     public void messageArrived(MqttTopic topic, MqttMessage message) {   
    21.         try {   
    22.             System.out.println(" MQTTSubsribe  message.toString()"+message.toString());  
    23.         } catch (Exception e) {   
    24.             e.printStackTrace();   
    25.         }   
    26.     }   
    27.     public void connectionLost(Throwable cause) {  
    28.           
    29.     }   
    30.     public void deliveryComplete(MqttDeliveryToken token) {  
    31.           
    32.     }   
    33. }   

    测试类:

    Java代码  收藏代码
    1. package com.etrip.mqttv3;  
    2. /** 
    3.  * MQTTV3的测试类 
    4.  *  
    5.  * @author longgangbai 
    6.  */  
    7. public class MQTTMain {  
    8.     public static void main(String[] args) {  
    9.         //订阅消息的方法  
    10.         MQTTSubsribe.doTest();  
    11.         //发布消息的类  
    12.         MQTTPub.doTest();  
    13.           
    14.     }  
    15. }  
  • 相关阅读:
    MySQL数据库:数据完整性及约束的应用
    MySQL数据库:运算符
    MySQL数据库:合并结果集
    MySQL数据库:聚合函数的使用
    spring web mvc环境搭建
    golang中type常用用法
    有些事情,你真的要早点明白
    一个小事例,了解golang通道阻塞模式
    golang中,slice的几个易混淆点
    作为面试官的一点小感想
  • 原文地址:https://www.cnblogs.com/yudar/p/4615697.html
Copyright © 2011-2022 走看看