zoukankan      html  css  js  c++  java
  • MQTT 发布者订阅者

    添加依赖:

    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.1.1</version>
    </dependency>
    

    MQ发布者:

    package com.ra.car.utils;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * MQ发布者
     * 
     * @author lanhao
     * 
     */
    public class MQTTPubMsg {
        protected static final Logger logger = LoggerFactory
                .getLogger(MQTTPubMsg.class);
        public static final String HOST = "tcp://10.74.20.43:1883";
        public static final String TOPIC = "toclient/124";
    //    public static final String TOPIC125 = "toclient/125";
        private static final String clientid = "server";
        private MqttClient client;
        private MqttTopic topic;
    //    private MqttTopic topic125;
        private String userName = "root";
        private String passWord = "Passw0rd";
        private MqttMessage message;
    
        public MQTTPubMsg() throws MqttException {
            // MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            connect();
        }
    
        private void connect() {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            // 设置超时时间
            options.setConnectionTimeout(10);
            // 设置会话心跳时间
            options.setKeepAliveInterval(20);
            try {
                client.setCallback(new MQTTPushCallback());
                client.connect(options);
                topic = client.getTopic(TOPIC);
    //            topic125 = client.getTopic(TOPIC125);
            } catch (Exception e) {
                logger.error("MQ发布者connect异常......",e);
            }
        }
    
        public void publish(MqttTopic topic, MqttMessage message)
                throws MqttPersistenceException, MqttException {
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            logger.info("message is published completely! "
                    + token.isComplete());
        }
    
        public static void main(String[] args) throws MqttException {
            MQTTPubMsg server = new MQTTPubMsg();
            String s="这是第一条消息";
            server.message = new MqttMessage();
            server.message.setQos(2);
            server.message.setRetained(true);
            server.message.setPayload(s.getBytes());
            server.publish(server.topic, server.message);
            
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
            }
            for (int i = 0; i < 500; i++) {
                String str = "这是后续循环发送的消息" + i;
                server.message = new MqttMessage();
                server.message.setQos(2);
                server.message.setRetained(true);
                server.message.setPayload(str.getBytes());
                server.publish(server.topic, server.message);
    
                // 给另外一个topic发送消息
                
                 * server.message = new MqttMessage(); server.message.setQos(2);
                 * server.message.setRetained(true);
                 * server.message.setPayload("给客户端125推送的信息".getBytes());
                 * server.publish(server.topic125, server.message);
                 
    
                logger.info(server.message.isRetained() + "------ratained状态");
            }
        }
    
    }

    MQ订阅者:

    package com.ra.car.utils;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * MQ订阅者
     * 
     * @author lanhao
     * 
     */
    public class MQTTSubMsg {
        protected static final Logger logger = LoggerFactory
                .getLogger(MQTTSubMsg.class);
        public static final String HOST = "tcp://10.74.20.43:1883";
        public static final String TOPIC = "toclient/124";
        private static final String clientid = "client124";
        private MqttClient client;
        private MqttConnectOptions options;
        private String userName = "root";
        private String passWord = "Passw0rd";
    
        public void start() {
            try {
                logger.info("SubMsg----->start");
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置回调
                client.setCallback(new MQTTPushCallback());
                MqttTopic topic = client.getTopic(TOPIC);
                // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
                options.setWill(topic, "close".getBytes(), 2, true);
                client.connect(options);
                // 订阅消息
                int[] Qos = { 2 };
                String[] topic1 = { TOPIC };
                client.subscribe(topic1, Qos);
            } catch (Exception e) {
                logger.error("MQ订阅者异常",e);
            }
        }
    
        public static void main(String[] args) throws MqttException {
            MQTTSubMsg client = new MQTTSubMsg();
            client.start();
        }
    
    }

    回调类:

    package com.ra.car.utils;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 发布消息的回调类
     * 
     * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
     * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 在回调中,将它用来标识已经启动了该回调的哪个实例。
     * 必须在回调类中实现三个方法
     */
    public class MQTTPushCallback implements MqttCallback{
        protected static final Logger logger = LoggerFactory
                .getLogger(MQTTPushCallback.class);
        public void connectionLost(Throwable cause) {
            // 在断开连接时调用,连接丢失后,一般在这里面进行重连.
            logger.info("连接断开,可以做重连");
        }
    
        public void deliveryComplete(IMqttDeliveryToken token) {
            //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用. 由 MqttClient.connect 激活此回调.
            logger.info("deliveryComplete---------" + token.isComplete());
        }
    
        public void messageArrived(String topic, MqttMessage message)
                throws Exception {
            // subscribe后得到的消息会执行到这里面
            logger.info("接收消息主题 : " + topic);
            logger.info("接收消息Qos : " + message.getQos());
            logger.info("接收消息内容 : " + new String(message.getPayload()));
        }
    }
  • 相关阅读:
    228. Summary Ranges
    227. Basic Calculator II
    224. Basic Calculator
    222. Count Complete Tree Nodes
    223. Rectangle Area
    221. Maximal Square
    220. Contains Duplicate III
    219. Contains Duplicate II
    217. Contains Duplicate
    Java编程思想 4th 第4章 控制执行流程
  • 原文地址:https://www.cnblogs.com/lazyInsects/p/7999786.html
Copyright © 2011-2022 走看看