zoukankan      html  css  js  c++  java
  • MQTT介绍与使用

      物联网是新一代信息技术的重要组成部分,也是“信息化”时代的重要发展阶段。其英文名称是:“Internet of things(IoT)”。顾名思义,物联网就是物物相连的互联网。这有两层意思:其一,物联网的核心和基础仍然是互联网,是在互联网基础上的延伸和扩展的网络;其二,其用户端延伸和扩展到了任何物品与物品之间,进行信息交换和通信,也就是物物相息。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。

      而在物联网的应用上,对于信息传输,MQTT是一种再合适不过的协议工具了。

    一、MQTT简介

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

      MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。  

    二、特性

      MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

      (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

      (2)对负载内容屏蔽的消息传输。

      (3)使用TCP/IP提供网络连接。

      主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

      (4)有三种消息发布服务质量:

      “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

      “至少一次”,确保消息到达,但消息重复可能会发生。

      “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。 

      (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

      这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

    三、实现方式  

      实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

      MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

      (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

      (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

    四、MQTT的搭建(ubuntu)

      1、apt-get安装mqtt相关包

      

      

      2、测试mosquitto是否正确运行

      

      3、本机终端测试mqtt

      打开一个终端,订阅主题

    mosquitto_sub -h 192.168.1.102 -t "mqtt" -v

      【-h】指定要连接的MQTT服务器 
      【-t】订阅主题,此处为mqtt 
      【-v】打印更多的调试信息

      再打开一个终端,发布主题

    mosquitto_pub -h 192.168.1.102 -t "mqtt" -m "Hello Stonegeek"

      【-h】指定要连接的MQTT服务器 
      【-t】向指定主题推送消息 
      【-m】指定消息内容

      结果展示

      

    五、MQTT权限配置

      前面我们基于Mosquitto服务器已经搭建成功了,但是默认是允许匿名用户登录,对于正式上线的项目则是需要进行用户认证(当然,用户一般都会与数据库映射,不过在这里我们就会直接将用户写入配置文件中)

      1、Mosquitto服务器的配置文件为/etc/mosquitto/mosquitto.conf,关于用户认证的方式和读取的配置都在这个文件中进行

      配置文件参数说明:

    ID allow_anonymous password_file  acl_file result
    1 True(默认)     允许匿名方式登录
    2 False password_file   开启用户验证机制
    3 False password_file acl_file 开启用户验证机制,但访问控制不起作用
    4 True password_file acl_file 用户名及密码不为空,将自动进行用户验证且受到访问控制的限制;用户名及密码为空,将不进行用户验证且受到访问控制的限制
    5 False     无法启动服务

      allow_anonymous允许匿名

      password-file密码文件

      acl_file访问控制列表

      2、修改配置文件

      命令:sudo vi /etc/mosquitto/mosquitto.conf

      

      3、添加用户信息

      

      命令解释: -c 创建一个用户、/etc/mosquitto/pwfile.example 是将用户创建到 pwfile.example  文件中、admin 是用户名。 

            同样连续会提示连续输入两次密码。注意第二次创建用户时不用加 -c 如果加 -c 会把第一次创建的用户覆盖。

            至此两个用户创建成功,此时如果查看 pwfile.example 文件会发现其中多了两个用户。

      4、添加Topic和用户的关系

      

      5、用户认证测试

      (1)重启Mosquitto步骤    

      查看mosquitto的进程

      命令:ps -aux|grep mosquitto

      

      (2)杀死进程

      命令:sudo kill -9 pid

      

      (3)启动

      命令:mosquitto -c /etc/mosquitto/mosquitto.conf 

      (4)订阅端启动(不加用户)

      

      订阅端启动(加用户)

      

      (5)发布端启动

      

    六、MQTT实现(Java语言)

      注意:由于我们在上面配置了MQTT的用户权限控制,所以下面的用户只能使用stonegeek登录,否则项目会运行报错,而且我们在上面设置的访问控制列表中只有mtopic主题,所以我们必须使用此主题,否则,订阅者会收不到已发布的主题内容(已经测试过了)

      

      下面是我们Java语言实现的MQTT服务的发布/订阅

      1、添加Maven依赖

        <dependency>
          <groupId>org.eclipse.paho</groupId>
          <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
          <version>1.1.1</version>
        </dependency>

      2、ServerMQTT.class

    package com.stonegeek;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    /**
     * Created by StoneGeek on 2018/6/5.
     * 博客地址:http://www.cnblogs.com/sxkgeek
     * 服务器向多个客户端推送主题,即不同客户端可向服务端订阅相同的主题
     */
    public class ServerMQTT {
        //tcp://MQTT安装的服务器地址:MQTT定义的端口号
        public static final String HOST = "tcp://192.168.1.102:1883";
        //定义一个主题
        public static final String TOPIC = "mtopic";
        //定义MQTT的ID,可以在MQTT服务配置中指定
        private static final String clientid = "server11";
    
        private MqttClient client;
        private MqttTopic topic11;
        private String userName = "stonegeek";
        private String passWord = "123456";
    
        private MqttMessage message;
    
        /**
         * 构造函数
         * @throws MqttException
         */
        public ServerMQTT() throws MqttException {
            // MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            connect();
        }
    
        /**
         *  用来连接服务器
         */
        private void connect() {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            // 设置超时时间
            options.setConnectionTimeout(10);
            // 设置会话心跳时间
            options.setKeepAliveInterval(20);
            try {
                client.setCallback(new PushCallback());
                client.connect(options);
    
                topic11 = client.getTopic(TOPIC);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         *
         * @param topic
         * @param message
         * @throws MqttPersistenceException
         * @throws MqttException
         */
        public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
                MqttException {
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            System.out.println("message is published completely! "
                    + token.isComplete());
        }
    
        /**
         *  启动入口
         * @param args
         * @throws MqttException
         */
        public static void main(String[] args) throws MqttException {
            ServerMQTT server = new ServerMQTT();
    
            server.message = new MqttMessage();
            server.message.setQos(1);
            server.message.setRetained(true);
            server.message.setPayload("hello,topic11".getBytes());
            server.publish(server.topic11 , server.message);
            System.out.println(server.message.isRetained() + "------ratained状态");
        }
    }

      3、ClientMQTT.class

    package com.stonegeek;
    import java.util.concurrent.ScheduledExecutorService;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    /**
     * Created by StoneGeek on 2018/6/5.
     * 博客地址:http://www.cnblogs.com/sxkgeek
     */
    public class ClientMQTT {
        public static final String HOST = "tcp://192.168.1.102:1883";
        public static final String TOPIC = "mtopic";
        private static final String clientid = "client11";
        private MqttClient client;
        private MqttConnectOptions options;
        private String userName = "stonegeek";
        private String passWord = "123456";
    
        private ScheduledExecutorService scheduler;
    
        private void start() {
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置回调
                client.setCallback(new PushCallback());
                MqttTopic topic = client.getTopic(TOPIC);
                //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
                options.setWill(topic, "close".getBytes(), 2, true);
    
                client.connect(options);
                //订阅消息
                int[] Qos  = {1};
                String[] topic1 = {TOPIC};
                client.subscribe(topic1, Qos);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws MqttException {
            ClientMQTT client = new ClientMQTT();
            client.start();
        }
    }

      4、PushCallback.class

    package com.stonegeek;

    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttMessage;

    /**
    * Created by StoneGeek on 2018/6/5.
    * 博客地址:http://www.cnblogs.com/sxkgeek
    * 发布消息的回调类
    *
    * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
    * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
    * 在回调中,将它用来标识已经启动了该回调的哪个实例。
    * 必须在回调类中实现三个方法:
    *
    * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
    *
    * public void connectionLost(Throwable cause)在断开连接时调用。
    *
    * public void deliveryComplete(MqttDeliveryToken token))
    * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
    * 由 MqttClient.connect 激活此回调。
    */
    public class PushCallback implements MqttCallback{
    public void connectionLost(Throwable cause) {
    // 连接丢失后,一般在这里面进行重连
    System.out.println("连接断开,可以做重连");
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("deliveryComplete---------" + token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
    // subscribe后得到的消息会执行到这里面
    System.out.println("接收消息主题 : " + topic);
    System.out.println("接收消息Qos : " + message.getQos());
    System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
    }

      5、结果展示

      

      

      以上我们MQTT的简单搭建与应用就结束了,之后我们会深入了解MQTT协议的!!!

      

      

      

      

      

  • 相关阅读:
    mysql 函数 存储过程 事件(event) job 模板
    protobuf 无proto 解码 decode 语言 java python
    mitmproxy fiddler 抓包 填坑
    android adb 常用命令
    android机器人 模拟 踩坑过程
    RabbitMQ添加新用户并支持远程访问
    Windows下RabbitMQ安装及配置
    Java mybatis mysql 常用数据类型对应关系
    easyExcel 踩坑
    linux防火墙查看状态firewall、iptable
  • 原文地址:https://www.cnblogs.com/sxkgeek/p/9140180.html
Copyright © 2011-2022 走看看