zoukankan      html  css  js  c++  java
  • mqtt安装和使用

    linux下 下载:

    wget https://www.emqx.io/downloads/broker/v3.2.1/emqx-centos7-v3.2.1.zip

    解压:unzip emqx-centos7-v3.2.1.zip

    启动:./emqx start

    配置文件修改:

    /usr/local/mqtt/emqx/etc/emqx.conf

    修改账号密码:

    设置密码认证:

    allow_anonymous = false

    导入插件:

    cd /usr/lib/emqx/bin
    sh emqx_ctl plugins load emqx_auth_username

    设置账号密码

    emqx_ctl users add admin public

    springboot集成:

    pom:

    <!--mqtt-->
    <dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
    </dependency>
    <dependency>
    <groupId>org.fusesource.mqtt-client</groupId>
    <artifactId>mqtt-client</artifactId>
    <version>1.14</version>
    </dependency>

    yml配置:

    spring:
      mqtt:
    username: xxx
    password: xxx
    url: tcp://www.xxx.cn:1883
    client:
    id: xxx
    topic: xxx
    completionTimeout: 3000




    package com.cw.common.mqtt;
    /**
    * @Description:
    * @Auther: CW
    * @Date: 2021/8/5 16:40
    */
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    /**
    * MQTT客户端订阅消息类
    * @author zhongyulin
    *
    */
    @Component
    public class MqttConsumer implements ApplicationRunner {

    private static Logger logger = LoggerFactory.getLogger(MqttConsumer.class);

    private static MqttClient client;

    private static MqttTopic mqttTopic;

    /**
    * MQTT连接属性配置对象
    */
    @Autowired
    public MqttCofigBean mqttCofigBean;

    /**
    * 初始化参数配置
    */
    @Override
    public void run(ApplicationArguments args) throws Exception {
    logger.info("初始化启动MQTT连接");
    this.connect();
    }


    /**
    * 用来连接服务器
    */
    private void connect() throws Exception {
    client = new MqttClient(mqttCofigBean.getHostUrl(), mqttCofigBean.getClientId(), new MemoryPersistence());
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(false);
    options.setUserName(mqttCofigBean.getUsername());
    options.setPassword(mqttCofigBean.getPassword().toCharArray());
    options.setCleanSession(false); //是否清除session
    // 设置超时时间
    options.setConnectionTimeout(30);
    // 设置会话心跳时间
    options.setKeepAliveInterval(20);
    try {
    String[] msgtopic = mqttCofigBean.getMsgTopic();
    //订阅消息
    int[] qos = new int[msgtopic.length];
    for (int i = 0; i < msgtopic.length; i++) {
    qos[i] = 0;
    }
    client.setCallback(new TopMsgCallback(client, options, msgtopic, qos));
    client.connect(options);
    client.subscribe(msgtopic, qos);
    logger.info("MQTT连接成功:" + mqttCofigBean.getClientId() + ":" + client);
    } catch (Exception e) {
    logger.error("MQTT连接异常:" + e);
    }
    }


    /**
    * 重连
    *
    * @throws Exception
    */
    public void reConnect() throws Exception {
    if (null != client) {
    this.connect();
    }
    }

    /**
    * 订阅某个主题
    *
    * @param topic
    * @param qos
    */
    public void subscribe(String topic, int qos) {
    try {
    logger.info("topic:" + topic);
    client.subscribe(topic, qos);
    } catch (MqttException e) {
    e.printStackTrace();
    }
    }

    public MqttClient getClient() {
    return client;
    }

    public void setClient(MqttClient client) {
    this.client = client;
    }

    public MqttTopic getMqttTopic() {
    return mqttTopic;
    }

    public void setMqttTopic(MqttTopic mqttTopic) {
    this.mqttTopic = mqttTopic;
    }
    }









    package com.cw.common.mqtt;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    @Component
    public class MqttCofigBean {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.topic}")
    private String msgTopic;

    @Value("${spring.mqtt.completionTimeout}")
    private int completionTimeout; //连接超时
    /**
    * 获取用户名
    *
    * @return
    */
    public String getUsername() {
    return this.username;
    }

    /**
    * 获取密码
    *
    * @return
    */
    public String getPassword() {
    return this.password;
    }

    /**
    * 获取服务器连接地址
    *
    * @return
    */
    public String getHostUrl() {
    return this.hostUrl;
    }

    /**
    * 获取客户端ID
    *
    * @return
    */
    public String getClientId() {
    return this.clientId;
    }

    /**
    * 获取默认主题
    *
    * @return
    */
    public String[] getMsgTopic() {
    String[] topic = msgTopic.split(",");
    return topic;
    }

    /***
    * 获取连接超时时间
    * @return
    */
    public int getCompletionTimeout() {
    return this.completionTimeout;
    }
    }










    package com.cw.common.mqtt;

    /**
    * @Description:
    * @Auther: CW
    * @Date: 2021/8/5 16:41
    */

    import org.eclipse.paho.client.mqttv3.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    /**
    * MQTT消息处理类
    * @author zhongyulin
    * */
    public class TopMsgCallback implements MqttCallback {

    private static Logger logger = LoggerFactory.getLogger(TopMsgCallback.class);

    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topic;
    private int[] qos;

    public TopMsgCallback() {
    }

    public TopMsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
    this.client = client;
    this.options = options;
    this.topic = topic;
    this.qos = qos;
    }

    /**
    * 断开重连
    */
    public void connectionLost(Throwable cause) {
    logger.info("MQTT连接断开,发起重连");
    while (true) {
    try {
    Thread.sleep(30000);
    client.connect(options);
    //订阅消息
    client.subscribe(topic, qos);
    logger.info("MQTT重新连接成功:" + client);
    break;
    } catch (Exception e) {
    e.printStackTrace();
    continue;
    }
    }

    }

    /**
    * 接收到消息调用令牌中调用
    */
    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    /**
    * 消息处理
    */
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    System.out.println();
    //订阅消息字符
    String msg = new String(message.getPayload());
    // byte[] bymsg = getBytesFromObject(msg);
    logger.info("topic:" + topic);
    logger.info("msg:" + msg);
    }

    //对象转化为字节码
    // public byte[] getBytesFromObject(Serializable obj) throws Exception {
    // if (obj == null) {
    // return null;
    // }
    // ByteArrayOutputStream bo = new ByteArrayOutputStream();
    // ObjectOutputStream oo = new ObjectOutputStream(bo);
    // oo.writeObject(obj);
    // return bo.toByteArray();
    // }
    }


  • 相关阅读:
    <转载> diff 和 patch 命令
    JZ2440开发板之LCD
    发音篇--第一章
    JZ2440开发板之系统始终和串口
    【还是用回csdn了,这个blog就不更新了】
    MyBatis 汉字作为查询条件查询不到 MySQL 中的结果
    LeetCode 617. 合并二叉树 Merge Two Binary Tree
    LeetCode 454. 四数相加 II 4Sum II
    LeetCode 441. 排列硬币 Arranging Coins
    leetcode ——从排序数组中删除重复项 II
  • 原文地址:https://www.cnblogs.com/cw828/p/15104271.html
Copyright © 2011-2022 走看看