zoukankan      html  css  js  c++  java
  • SpringBoot整合MQTT( EMQ)

    1.下载EMQ安装包,配置EMQ环境

    下载地址:https://www.emqx.cn/downloads#broker

    下载压缩包解压,cmd进入bin文件夹

    输入  emqx start 启动服务,打卡浏览器输入本地ip:18083  进入登录页面   默认用户名密码 admin/public

    2.配置application.properties文件,设置EMQ参数,添加pom引入jar包

    #MQTT Config
      mqtt:
      #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
        host: tcp://127.0.0.1:11883
        #MQTT-连接服务器默认客户端ID
        clientid: mqtt_id
        #MQTT-用户名
        username: admin
        #MQTT-密码
        password: admin
        #MQTT-默认的消息推送主题,实际可在调用接口时指定
        topic: test
        #连接超时
        timeout: 1000
        #设置会话心跳时间
        keepalive: 100

    <!-- mqtt -->
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>

    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.4</version>
    </dependency>

    3.创建工具类

    1.配置文件

    package com.st.modules.pump.mqtt;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    @Component
    @Configuration
    @ConfigurationProperties(MqttConfiguration.PREFIX)
    public class MqttConfiguration {
    
        @Autowired
        private MqttPushClient mqttPushClient;
    
    
        public static final  String PREFIX="mqtt";
        private String host;
        private String clientid;
        private String username;
        private String password;
        private String topic;
        private int timeout;
        private int keepalive;
    
        @Bean
        public MqttPushClient getMqttPushClient() {
            mqttPushClient.connect(host, clientid, username, password, timeout,keepalive);
            // 以/#结尾表示订阅所有以test开头的主题
            mqttPushClient.subscribe("test/#", 0);
            return mqttPushClient;
        }
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public String getClientid() {
            return clientid;
        }
    
        public void setClientid(String clientid) {
            this.clientid = clientid;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public int getTimeout() {
            return timeout;
        }
    
        public void setTimeout(int timeout) {
            this.timeout = timeout;
        }
    
        public int getKeepalive() {
            return keepalive;
        }
    
        public void setKeepalive(int keepalive) {
            this.keepalive = keepalive;
        }
    }

    2.发布者

    package com.st.modules.pump.mqtt;
    
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class MqttPushClient {
    
        @Autowired
        private PushCallback pushCallback;
    
    
        private static MqttClient client;
    
        public  static MqttClient getClient(){
            return  client;
        }
    
        public static void setClient(MqttClient client){
            MqttPushClient.client=client;
        }
    
        /**
         * 客户端连接
         *
         * @param host      ip+端口
         * @param clientID  客户端Id
         * @param username  用户名
         * @param password  密码
         * @param timeout   超时时间
         * @param keeplive 保留数
         */
        public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
            MqttClient client;
    
    
    
            try {
    
                client=new MqttClient(host,clientID,new MemoryPersistence());
                MqttConnectOptions options=new MqttConnectOptions();
                options.setCleanSession(true);
                options.setUserName(username);
                options.setPassword(password.toCharArray());
                options.setConnectionTimeout(timeout);
                options.setKeepAliveInterval(keeplive);
                MqttPushClient.setClient(client);
                try {
                    client.setCallback(pushCallback);
                    client.connect(options);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    
        /**
         * 发布,默认qos为0,非持久化
         * @param topic
         * @param pushMessage
         */
    
        public void pushlish(String topic,String pushMessage){
            pushlish(0,false,topic,pushMessage);
        }
    
        /**
         * 发布
         *
         * @param qos         连接方式
         * @param retained    是否保留
         * @param topic       主题
         * @param pushMessage 消息体
         */
    
        public void pushlish(int qos,boolean retained,String topic,String pushMessage){
            MqttMessage message=new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            message.setPayload(pushMessage.getBytes());
            MqttTopic mqttTopic=MqttPushClient.getClient().getTopic(topic);
            if(null== mqttTopic){
                log.error("topic not exist");
            }
            MqttDeliveryToken token;
            try {
                token=mqttTopic.publish(message);
                token.waitForCompletion();
            }catch (MqttPersistenceException e){
                e.printStackTrace();
            }catch (MqttException e){
                e.printStackTrace();
            }
    
        }
    
        /**
         * 订阅某个主题,qos默认为0
         * @param topic
         */
        public void subscribe(String topic){
            log.error("开始订阅主题" + topic);
            subscribe(topic,0);
        }
    
    
        public void subscribe(String topic,int qos){
            try {
                MqttPushClient.getClient().subscribe(topic,qos);
            }catch (MqttException e){
                e.printStackTrace();
            }
        }
    }

    3.消费监听类

    package com.st.modules.pump.mqtt;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @Classname PushCallback
     * @Description 消费监听类
     */
    @Component
    public class PushCallback implements MqttCallback {
        @Autowired
        private MqttConfiguration mqttConfiguration;
    
        private static MqttClient client;
    
        @Override
        public void connectionLost(Throwable throwable) {if (client == null || !client.isConnected()) {
                System.out.println("连接断开,正在重连....");
                mqttConfiguration.getMqttPushClient();
            }
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            
            System.out.println("接收消息主题 : " + topic);
            System.out.println("接收消息Qos : " + message.getQos());
            System.out.println("接收消息内容 : " + new String(message.getPayload()));
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("deliveryComplete---------" + token.isComplete());
        }
    }

    ~~~~~~~~~~~~~Over~~~~~~~~~~~~~~

    本文来自博客园,作者:Forever丶随风,转载请注明原文链接:https://www.cnblogs.com/Forever-wind/p/14894597.html

  • 相关阅读:
    nagios安装配置
    Nagios:企业级系统监控方案
    使用Maven搭建Struts2+Spring3+Hibernate4的整合开发环境
    SecureCRT最佳配色方法+直接修改默认配置方法
    highcharts插件使用总结和开发中遇到的问题及解决办法
    关于Highcharts图表组件动态修改属性的方法(API)总结之Series
    Linux中环境变量文件及配置
    使用正则表达式匹配任意字符包括空格和换行符
    设置mysql远程连接root权限
    java读取文件夹下所有文件并替换文件每一行中指定的字符串
  • 原文地址:https://www.cnblogs.com/Forever-wind/p/14894597.html
Copyright © 2011-2022 走看看