zoukankan      html  css  js  c++  java
  • MQTT的简单Demo

    MQTT是物联网收发数据的一种协议

    Elipse Paho是一个开源项目实现了MQTT的Clinent可以方便直接操纵数据的上传和下载

    The Eclipse Paho project provides open-source client implementations of MQTT and MQTT-SN messaging protocols aimed at new, existing, and emerging applications for the Internet of Things (IoT).

    发送数据:

     1 import org.eclipse.paho.client.mqttv3.MqttClient;
     2 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
     3 import org.eclipse.paho.client.mqttv3.MqttException;
     4 import org.eclipse.paho.client.mqttv3.MqttMessage;
     5 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
     6 
     7 /**
     8  * 使用同步接口发布一个消息的例子
     9  * @author Administrator
    10  * 得到的输出
    11 Connecting to broker: tcp://iot.eclipse.org:1883
    12 Connected
    13 Publishing message: Message from MqttPublishSample
    14 Message published
    15 Disconnected
    16  *
    17  */
    18 public class MQTT_Client_send {
    19 
    20     public static void main(String[] args) {
    21         // TODO Auto-generated method stub
    22 
    23         String topic        = "MQTT Examples";
    24         String content      = "Message from MqttPublishSample";
    25         int qos             = 2;
    26         String broker       = "tcp://iot.eclipse.org:1883";
    27         String clientId     = "JavaSample";
    28         MemoryPersistence persistence = new MemoryPersistence();
    29 
    30         try {
    31             //Create an MqttClient that can be used to communicate with an MQTT server.就是创建client
    32             MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    33             //Constructs a new MqttConnectOptions object using the default values.就是创建连接属性
    34             MqttConnectOptions connOpts = new MqttConnectOptions();
    35             //Sets whether the client and server should remember state across restarts and reconnects.就是说这个连接是无记忆的
    36             connOpts.setCleanSession(true);
    37             System.out.println("Connecting to broker: "+broker);
    38             //使用连接属性,建立连接
    39             sampleClient.connect(connOpts);
    40             System.out.println("Connected");
    41             System.out.println("Publishing message: "+content);
    42             //得到一个操作系统默认的编码格式的字节数组,Constructs a message with the specified byte array as a payload, and all other values set to defaults.,就是创建一个mqtt消息
    43             MqttMessage message = new MqttMessage(content.getBytes());
    44             //Sets the quality of service for this message.Quality of Service 2 - indicates that a message should be delivered once.
    45             message.setQos(qos);
    46             //调用发布方法,发布一个主题和一个消息
    47             sampleClient.publish(topic, message);
    48             System.out.println("Message published");
    49             //断开连接
    50             sampleClient.disconnect();
    51             System.out.println("Disconnected");
    52             System.exit(0);
    53         } catch(MqttException me) {
    54             System.out.println("reason "+me.getReasonCode());
    55             System.out.println("msg "+me.getMessage());
    56             System.out.println("loc "+me.getLocalizedMessage());
    57             System.out.println("cause "+me.getCause());
    58             System.out.println("excep "+me);
    59             me.printStackTrace();
    60         }
    61     }
    62     
    63     
    64 
    65 }

    接收数据:

      1 import java.io.BufferedWriter;
      2 import java.io.FileWriter;
      3 import java.io.IOException;
      4 import java.util.ArrayList;
      5 import java.util.List;
      6 
      7 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      8 import org.eclipse.paho.client.mqttv3.MqttCallback;
      9 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
     10 import org.eclipse.paho.client.mqttv3.MqttClient;
     11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
     12 import org.eclipse.paho.client.mqttv3.MqttException;
     13 import org.eclipse.paho.client.mqttv3.MqttMessage;
     14 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
     15 public class MQTT_Client_Receive {
     16     
     17     public static void Write2Files(String filePath,String text) throws IOException {
     18         FileWriter fileWriter=new FileWriter(filePath,true);
     19         fileWriter.write("");
     20         BufferedWriter bufferedWriter=new BufferedWriter(fileWriter);
     21         bufferedWriter.append(text+"
    ");
     22         bufferedWriter.close();
     23         fileWriter.close();
     24     }
     25     
     26     public static void Mqtt_Client_2_File(String Topic,String Path) throws MqttException, InterruptedException {
     27 //        String broker="tcp://iotdevrd.chinacloudapp.cn:1889";
     28 //        String clientID="5036cf062ade4a28bf74726e5bff895a";
     29 //        int qos = 2;
     30 //        String topic=Topic;
     31 //        String userName="121";
     32 //        String passWord="121";
     33         //内网测试数据
     34         String broker="tcp://3.1.2.244:1889";
     35         String clientID="5036cf062ade4a28bf74726e5bff895a";
     36         int qos = 2;
     37         String topic=Topic;
     38         String userName="121";
     39         String passWord="121";
     40         //内网决赛数据地址
     41 //        String broker="tcp://3.1.2.244:1889";
     42 //        String clientID="5036cf062ade4a28bf74726e5bff895a";
     43 //        int qos = 2;
     44 //        String topic=Topic;
     45 //        String userName="121";
     46 //        String passWord="121";
     47         
     48         MemoryPersistence persistence = new MemoryPersistence();
     49         
     50         MqttClient myClient=new MqttClient(broker, clientID, persistence);
     51         MqttConnectOptions connectOptions=new MqttConnectOptions();
     52         connectOptions.setUserName(userName);
     53         connectOptions.setPassword(passWord.toCharArray());
     54         connectOptions.setCleanSession(true);
     55         
     56         myClient.connect(connectOptions);
     57         MqttCallback callback=new MqttCallbackExtended() {
     58             @Override
     59             public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
     60                 //String filePath="C:\eclipse-workspace\Projects\IOTProjectPrepare\src\123.txt";
     61                 String filePath=Path;
     62                 Write2Files(filePath, arg1.toString());
     63                 System.out.println(new String(arg1.toString()));
     64             }
     65             @Override
     66             public void deliveryComplete(IMqttDeliveryToken arg0) {
     67             }
     68             @Override
     69             public void connectionLost(Throwable arg0) {
     70             }
     71             @Override
     72             public void connectComplete(boolean arg0, String arg1) {
     73             }
     74         };
     75         myClient.setCallback(callback);
     76         myClient.subscribe(topic, qos);
     77         //监听10秒钟最后断开连接
     78         Thread.sleep(20000);
     79         if(myClient.isConnected())
     80             myClient.disconnect();
     81         //myClient.disconnect();
     82         //System.exit(0);
     83         
     84     }
     85 
     86     
     87     public static void Topics2Files(int index, String test_final,long time) throws MqttException, InterruptedException {
     88         
     89         //循环接收的逻辑
     90 //        List<String> topicList=new ArrayList<>();
     91 ////        topicList.add("GPSLocation/test1/1");
     92 ////        topicList.add("GPSLocation/test1/2");
     93 ////        topicList.add("GPSLocation/test2/1");
     94 ////        topicList.add("GPSLocation/test2/2");
     95 //        topicList.add("GPSLocation/test3/1");
     96 //        topicList.add("GPSLocation/test3/2");
     97 ////        topicList.add("GPSLocation/test4/1");
     98 ////        topicList.add("GPSLocation/test4/2");
     99 //        topicList.add("GPSLocation/test6/1");
    100 //        topicList.add("GPSLocation/test6/2");
    101 //        for (int i = 0; i < topicList.size(); ++i) {
    102 //            String array_element = topicList.get(i);
    103 //            String Path="D:\"+(index)+".json";
    104 //            System.out.println(Path);
    105 //            Mqtt_Client_2_File(array_element, Path);
    106 //        }
    107         //正式比赛一个一个接收,使用下面的逻辑
    108         String topic="";
    109         if(test_final.equals("test")) {
    110             topic="GPSLocation/test"+index+"/2";
    111         }else if(test_final.equals("final")) {
    112             topic="GPULocation/"+index;
    113         }
    114         System.out.println("当前订阅路径是:"+topic);
    115         String path="D:\"+(index)+"_"+time+".json";
    116         System.out.println("当前订阅的保存路径是:"+path);
    117         Mqtt_Client_2_File(topic, path);
    118         
    119     }
    120     public static void main(String[] args) throws MqttException, InterruptedException  {
    121         //Topics2Files();
    122         long time=System.currentTimeMillis()/1000;
    123         Topics2Files(7, "test",time);
    124 
    125     }
    126 
    127 }

    万事走心 精益求美


  • 相关阅读:
    经济地理国情监测
    《城市轨道交通——产业关联理论与应用》读书笔记
    《区域经济学原理》读书笔记(上)
    《国家经济地理》杂志之第一期:探寻中国经济的“第四极”
    《地理空间分析——原理、技术与软件工具》读书笔记
    《国家经济地理》杂志第二期:再望万里海疆——走向海洋经济的中国“大航海时代”
    国家统计遥感项目、商业图盟与品牌地图的碎碎念
    关于城市规划中的投融资规划
    区域功能定位对北京人口总量及分布的影响
    《中国经济地理——经济体成因与地缘架构》读书笔记
  • 原文地址:https://www.cnblogs.com/kongchung/p/9914134.html
Copyright © 2011-2022 走看看