zoukankan      html  css  js  c++  java
  • MQTT介绍(3)java模拟MQTT的发布,订阅


    MQTT目录:

    1.   MQTT简单介绍

    2.       window安装MQTT服务器和client

    3.       java模拟MQTT的发布,订阅


    在此强调一下mqtt的使用场景:

      1、不可靠、网络带宽小的网络

      2、运行的设备CPU、内存非常有限


    在idea中简单模拟测试代码:

      第一步:添加mqtt-client的依赖

      

    <!--验证mqtt协议-->
            <!-- https://mvnrepository.com/artifact/org.eclipse.paho/mqtt-client -->
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>mqtt-client</artifactId>
                <version>0.4.0</version>
            </dependency>

      第二步:具体代码实现:

        分三部分:

          第一:服务端:

        

      1 package com.huhy.web.common.mqtt;
      2 
      3 import org.eclipse.paho.client.mqttv3.MqttClient;
      4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
      6 import org.eclipse.paho.client.mqttv3.MqttException;
      7 import org.eclipse.paho.client.mqttv3.MqttMessage;
      8 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
      9 import org.eclipse.paho.client.mqttv3.MqttTopic;
     10 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
     11 /**
     12  * @Author:huhy
     13  * @DATE:Created on 2017/12/1 14:29
     14  * @Modified By:
     15  * @Class Description:
     16  */
     17 public class ServerMQTT {
     18 
     19     //tcp://MQTT安装的服务器地址:MQTT定义的端口号
     20     public static final String HOST = "tcp://localhost:61613";
     21     //定义一个主题
     22     public static final String TOPIC = "huhy";
     23     //定义MQTT的ID,可以在MQTT服务配置中指定
     24     private static final String clientid = "server";
     25 
     26     private MqttClient client;
     27     private MqttTopic topic11;
     28     private String userName = "admin";  //非必须
     29     private String passWord = "password";  //非必须
     30 
     31     private MqttMessage message;
     32 
     33     /**
     34      * 构造函数
     35      * @throws MqttException
     36      */
     37     public ServerMQTT() throws MqttException {
     38         // MemoryPersistence设置clientid的保存形式,默认为以内存保存
     39         client = new MqttClient(HOST, clientid, new MemoryPersistence());
     40         connect();
     41     }
     42 
     43     /**
     44      *  用来连接服务器
     45      */
     46     private void connect() {
     47         MqttConnectOptions options = new MqttConnectOptions();
     48         options.setCleanSession(false);
     49         options.setUserName(userName);
     50         options.setPassword(passWord.toCharArray());
     51         // 设置超时时间
     52         options.setConnectionTimeout(10);
     53         // 设置会话心跳时间
     54         options.setKeepAliveInterval(20);
     55         try {
     56             client.setCallback(new PushCallback());
     57             client.connect(options);
     58 
     59             topic11 = client.getTopic(TOPIC);
     60         } catch (Exception e) {
     61             e.printStackTrace();
     62         }
     63     }
     64 
     65     /**
     66      *
     67      * @param topic
     68      * @param message
     69      * @throws MqttPersistenceException
     70      * @throws MqttException
     71      */
     72     public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
     73             MqttException {
     74         MqttDeliveryToken token = topic.publish(message);
     75         token.waitForCompletion();
     76         System.out.println("message is published completely! "
     77                                    + token.isComplete());
     78     }
     79 
     80     /**
     81      *  启动入口
     82      * @param args
     83      * @throws MqttException
     84      */
     85     public static void main(String[] args) throws MqttException, InterruptedException {
     86         ServerMQTT server = new ServerMQTT();
     87         server.message = new MqttMessage();
     88         server.message.setQos(1);  //保证消息能到达一次
     89         server.message.setRetained(true);
     90         server.message.setPayload("abcde1".getBytes());
     91         server.publish(server.topic11 , server.message);
     92         Thread.sleep(2000);
     93         server.message.setPayload("abcde2".getBytes());
     94         server.publish(server.topic11 , server.message);
     95         Thread.sleep(2000);
     96         server.message.setPayload("abcde3".getBytes());
     97         server.publish(server.topic11 , server.message);
     98         System.out.println(server.message.isRetained() + "------ratained状态");
     99     }
    100 }

          第二部分:客户端代码:

        

     1 package com.huhy.web.common.mqtt;
     2 
     3 import org.eclipse.paho.client.mqttv3.MqttClient;
     4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
     5 import org.eclipse.paho.client.mqttv3.MqttException;
     6 import org.eclipse.paho.client.mqttv3.MqttTopic;
     7 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
     8 
     9 import java.util.concurrent.ScheduledExecutorService;
    10 
    11 /**
    12  * @Author:huhy
    13  * @DATE:Created on 2017/12/1 14:34
    14  * @Modified By:
    15  * @Class Description:
    16  */
    17 public class ClientMQTT {
    18 
    19     public static final String HOST = "tcp://localhost:61613";
    20     public static final String TOPIC1 = "huhy";
    21     private static final String clientid = "client";
    22     private MqttClient client;
    23     private MqttConnectOptions options;
    24     private String userName = "admin";    //非必须
    25     private String passWord = "password";  //非必须
    26     @SuppressWarnings("unused")
    27     private ScheduledExecutorService scheduler;
    28 
    29     private void start() {
    30         try {
    31             // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
    32             client = new MqttClient(HOST, clientid, new MemoryPersistence());
    33             // MQTT的连接设置
    34             options = new MqttConnectOptions();
    35             // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
    36             options.setCleanSession(false);
    37             // 设置连接的用户名
    38             options.setUserName(userName);
    39             // 设置连接的密码
    40             options.setPassword(passWord.toCharArray());
    41             // 设置超时时间 单位为秒
    42             options.setConnectionTimeout(10);
    43             // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
    44             options.setKeepAliveInterval(20);
    45             // 设置回调
    46             client.setCallback(new PushCallback());
    47             MqttTopic topic = client.getTopic(TOPIC1);
    48             //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
    49 //遗嘱        options.setWill(topic, "close".getBytes(), 2, true);
    50             client.connect(options);
    51             //订阅消息
    52             int[] Qos  = {1};
    53             String[] topic1 = {TOPIC1};
    54             client.subscribe(topic1, Qos);
    55 
    56         } catch (Exception e) {
    57             e.printStackTrace();
    58         }
    59     }
    60 
    61     public static void main(String[] args) throws MqttException {
    62         ClientMQTT client = new ClientMQTT();
    63         client.start();
    64     }
    65 }

        第三部分:关于客户端和服务端的回调函数(注意,我这说客户端和服务端是不准确的,准确的来说是叫发布和订阅,为了好理解就这样了

     1 package com.huhy.web.common.mqtt;
     2 
     3 /**
     4  * @Author:huhy
     5  * @DATE:Created on 2017/12/1 14:33
     6  * @Modified By:
     7  * @Class Description:
     8  */
     9 
    10 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    11 import org.eclipse.paho.client.mqttv3.MqttCallback;
    12 import org.eclipse.paho.client.mqttv3.MqttMessage;
    13 
    14 /**
    15  * 发布消息的回调类
    16  *
    17  * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
    18  * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
    19  * 在回调中,将它用来标识已经启动了该回调的哪个实例。
    20  * 必须在回调类中实现三个方法:
    21  *
    22  *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
    23  *
    24  *  public void connectionLost(Throwable cause)在断开连接时调用。
    25  *
    26  *  public void deliveryComplete(MqttDeliveryToken token))
    27  *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
    28  *  由 MqttClient.connect 激活此回调。
    29  *
    30  */
    31 public class PushCallback implements MqttCallback {
    32     @Override
    33     public void connectionLost(Throwable cause) {
    34         // 连接丢失后,一般在这里面进行重连
    35         System.out.println("连接断开,可以做重连");
    36     }
    37     @Override
    38     public void deliveryComplete(IMqttDeliveryToken token) {
    39         System.out.println("deliveryComplete---------" + token.isComplete());
    40     }
    41     @Override
    42     public void messageArrived(String topic, MqttMessage message) throws Exception {
    43         // subscribe后得到的消息会执行到这里面
    44         System.out.println("接收消息主题 : " + topic);
    45         System.out.println("接收消息Qos : " + message.getQos());
    46         System.out.println("接收消息内容 : " + new String(message.getPayload()));
    47     }
    48 }

    关于运行分别启动就可以了,要先代理服务器启动起来,关于怎么启动看我上节的讲解

      

  • 相关阅读:
    python爬虫学习
    Java基础知识11--Optional类
    07 Windows访问远程共享文件夹---利用\IP地址
    Springcloud 学习笔记15-打开postman console控制台,查看接口测试打印log日志信息
    Springcloud 学习笔记14-IaaS, PaaS和SaaS的区别
    Springcloud 学习笔记13-使用PostMan上传/下载文件,前后端联合测试
    Java基础知识10--Stream API详解02
    Java基础知识09--Stream API详解01
    洛谷 P2587 [ZJOI2008]泡泡堂(贪心)
    洛谷 P3199 [HNOI2009]最小圈(01分数规划,spfa判负环)
  • 原文地址:https://www.cnblogs.com/huhongy/p/7953571.html
Copyright © 2011-2022 走看看