zoukankan      html  css  js  c++  java
  • mqtt使用二(集成到java代码中)

    1.我采用的是springboot,首先pom文件中添加mqtt需要用到的依赖

     1  <dependency>
     2             <groupId>org.springframework.boot</groupId>
     3             <artifactId>spring-boot-starter-integration</artifactId>
     4         </dependency>
     5         <dependency>
     6             <groupId>org.springframework.integration</groupId>
     7             <artifactId>spring-integration-stream</artifactId>
     8         </dependency>
     9         <dependency>
    10             <groupId>org.springframework.integration</groupId>
    11             <artifactId>spring-integration-mqtt</artifactId>
    12         </dependency>
    13 
    14 
    15         <dependency>
    16             <groupId>org.eclipse.paho</groupId>
    17             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    18             <version>1.2.0</version>
    19         </dependency>

    2.书写客户端,客户端中包括了连接服务器,订阅主题,向主题发布消息的基本操作

    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * Description:
     * mqtt客户端
     * @author Victor
     */
    @Component
    public class MqttPushClient {
    
        @Autowired
        private PushCallback pushCallback;
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MqttPushClient.class);
    
        private static MqttClient client;
    
        public static MqttClient getClient() {
            return client;
        }
    
        public static void setClient(MqttClient client) {
            MqttPushClient.client = client;
        }
    
        /**
         * 连接
         * @param host .
         * @param clientID .
         * @param username .
         * @param password .
         */
        public void connect(String host, String clientID, String username, String password) {
            MqttClient client;
            try {
                client = new MqttClient(host, clientID, new MemoryPersistence());
                MqttConnectOptions options = new MqttConnectOptions();
                options.setCleanSession(false);
                options.setUserName(username);
                options.setPassword(password.toCharArray());
                options.setConnectionTimeout(30);
                options.setKeepAliveInterval(20);
                MqttPushClient.setClient(client);
                try {
                    client.setCallback(pushCallback);
                    client.connect(options);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发布,默认qos为0,非持久化
         *
         * @param topic       .
         * @param pushMessage .
         */
        public void publish(String topic, String pushMessage) {
            publish(0, false, topic, pushMessage);
        }
    
        /**
         * 发布主题和消息队列
         *
         * @param qos         .
         * @param retained    .
         * @param topic       .
         * @param pushMessage .
         */
        public void publish(int qos, boolean retained, String topic, String pushMessage) {
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            message.setPayload(pushMessage.getBytes());
            MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
            if (null == mTopic) {
                LOGGER.error("topic not exist");
            }
            MqttDeliveryToken token;
            try {
                token = mTopic.publish(message);
                token.waitForCompletion();
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 订阅某个主题,qos默认为0
         *
         * @param topic .
         */
        public void subscribe(String topic) {
            subscribe(topic, 0);
        }
    
        /**
         * 订阅某个主题
         *
         * @param topic .
         * @param qos .
         */
        public void subscribe(String topic, int qos) {
            try {
                MqttPushClient.getClient().subscribe(topic, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
    
    }

    还有对应回调函数,实现MqttCallback 接口之后要实现三个方法,如果出现需要断开重连操作就在connectionLost方法中操作,如果需要从我们订阅主题拿到数据保存到数据库,那么就在messageArrived方法里操作。SystemConstants是我写的一个

    配置连接信息的类,大家改成自己的信息就好了。

    import com.alibaba.fastjson.JSONObject;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.util.StringUtils;
    
    import java.math.BigDecimal;
    import java.util.Date;
    import java.util.UUID;
    
    /**
     * mqtt回调类
     *
     * @author Victor
     */
    @Configuration
    public class PushCallback implements MqttCallback {
    
        @Autowired
        private MqttPushClient mqttPushClient;
    
        @Autowired
        private DeviceDao deviceDao;
    
        @Override
        public void connectionLost(Throwable cause) {
            // 连接丢失后,一般在这里面进行重连
            System.out.println("连接断开,可以做重连");
            mqttPushClient.connect(SystemConstants.HOST_URL + ":" + SystemConstants.PORT, "consumer" + SystemConstants.CLIENT_ID, SystemConstants.USERNAME, SystemConstants.PASSWORD);
            mqttPushClient.subscribe("aaa");
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("deliveryComplete---------" + token.isComplete());
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            // subscribe后得到的消息会执行到这里面
            System.out.println("接收消息主题 : " + topic);
            System.out.println("接收消息Qos : " + message.getQos());
            System.out.println("接收消息内容 : " + new String(message.getPayload()));
        }
    }

    如果想要把mqtt配到application中启动只需要在application启动类中添加PostConstruct注解即可

    /**
         * 接受订阅的接口和消息,mqtt消费端
         */
        @PostConstruct
        public void consumeMqttClient() {
            mqttPushClient.connect(SystemConstants.HOST_URL + ":" + SystemConstants.PORT, "consumer" + SystemConstants.CLIENT_ID, SystemConstants.USERNAME, SystemConstants.PASSWORD);
            mqttPushClient.subscribe("aaa");
        }

    到此为止,mqtt集成spriongboot就完成了。

    新手写博,如有不对请多指教

  • 相关阅读:
    基于协程实现并发的套接字通信
    基于tcp协议的套接字通信:远程执行命令
    Java开发中的23种设计模式详解(转)
    SonarLint实践总结
    Java代码规范与质量检测插件SonarLint
    ES的基本介绍和使用
    ES基本介绍(简介)
    弗洛伊德追悼会 事发地市长跪在灵柩前大哭
    阿里云部署Web项目
    SpringBoot上传图片无法走复制流
  • 原文地址:https://www.cnblogs.com/reject-ant/p/9669116.html
Copyright © 2011-2022 走看看