zoukankan      html  css  js  c++  java
  • MQTT研究之EMQ:【EMQ之HTTP认证/访问控制】

    今天进行验证的逻辑是EMQ的http的Auth以及ACL的逻辑。

    首先,参照HTTP插件认证配置的说明文档进行基本的配置, 我的配置内容如下:

    ##--------------------------------------------------------------------
    ## HTTP Auth/ACL Plugin
    ##--------------------------------------------------------------------
    
    ##--------------------------------------------------------------------
    ## Authentication request.
    ##
    ## Variables:
    ##  - %u: username
    ##  - %c: clientid
    ##  - %a: ipaddress
    ##  - %P: password
    ##
    ## Value: URL
    auth.http.auth_req = http://10.95.177.137:8899/scc/mqtt/auth
    ## Value: post | get | put
    auth.http.auth_req.method = post
    ## Value: Params
    auth.http.auth_req.params = clientid=%c,username=%u,password=%P
    
    ##--------------------------------------------------------------------
    ## Superuser request.
    ##
    ## Variables:
    ##  - %u: username
    ##  - %c: clientid
    ##  - %a: ipaddress
    ##
    ## Value: URL
    auth.http.super_req = http://10.95.177.137:8899/scc/mqtt/superuser
    ## Value: post | get | put
    auth.http.super_req.method = post
    ## Value: Params
    auth.http.super_req.params = clientid=%c,username=%u
    
    ##--------------------------------------------------------------------
    ## ACL request.
    ##
    ## Variables:
    ##  - %A: 1 | 2, 1 = sub, 2 = pub
    ##  - %u: username
    ##  - %c: clientid
    ##  - %a: ipaddress
    ##  - %t: topic
    ##
    ## Value: URL
    auth.http.acl_req = http://10.95.177.137:8899/scc/mqtt/acl
    ## Value: post | get | put
    auth.http.acl_req.method = get
    ## Value: Params
    auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

    这里,非常需要值得注意的是,这个http(包括其他的,例如mysql)的auth以及acl控制,都是基于插件的逻辑实现的,即依赖其他服务进行实现,基于这个服务系统的返回值,EMQ决定auth以及acl的控制。这个理解清楚了,所有的插件相关的auth和acl都好理解了。

    我这里,将auth和acl的服务实现在一个springboot的web项目scc下了,为了验证逻辑,我将真实的auth或者acl控制逻辑都简化了,主要验证流程。

    a) 启动auth、acl的服务scc

    package com.taikang.iot.scc.loadbalance.user.controller;
    
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * @Author: chengsh05
     * @Date: 2019/4/9 19:40
     */
    @Controller
    @RequestMapping("/mqtt")
    public class EmqAuthHttpController {
    
        private Logger logger = Logger.getLogger(EmqAuthHttpController.class);
    
        @RequestMapping("/auth")
        public void mqttAuth(String clientid, String username, String password, HttpServletResponse response) {
            //auth.http.auth_req.params = clientid=%c,username=%u,password=%P
            logger.info("普通用户;clientid:" + clientid + ";username:" + username + ";password:" + password);
            /**
             * TODO 添加认证的逻辑,控制http的返回码, 这里的用户是否存在,通常是基于数据库做的。
             * HTTP 认证/鉴权 API
             * 认证/ACL 成功,API 返回200
             * 认证/ACL 失败,API 返回4xx
             */
            response.setStatus(401);
        }
    
        @RequestMapping("/superuser")
        public void mqttSuperuser(String clientid, String username, HttpServletResponse response) {
            //auth.http.super_req.params = clientid=%c,username=%u
            logger.info("超级用户;clientid:" + clientid + ";username:" + username);
            response.setStatus(401);
        }
    
        @RequestMapping("/acl")
        public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) {
            //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
            logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
            response.setStatus(401);
        }
    }

    b) 首先启动emq服务端

    当然要emqttd_ctl plugins load emq_auth_http这个插件(服务节点 10.95.200.12).

    [tkiot@tkwh-kfcs-app2 plugins]$ emqttd_ctl plugins list
    Plugin(emq_auth_clientid, version=2.3.11, description=Authentication with ClientId/Password, active=false)
    Plugin(emq_auth_http, version=2.3.11, description=Authentication/ACL with HTTP API, active=true)
    Plugin(emq_auth_jwt, version=2.3.11, description=Authentication with JWT, active=false)
    Plugin(emq_auth_ldap, version=2.3.11, description=Authentication/ACL with LDAP, active=false)
    Plugin(emq_auth_mongo, version=2.3.11, description=Authentication/ACL with MongoDB, active=false)
    Plugin(emq_auth_mysql, version=2.3.11, description=Authentication/ACL with MySQL, active=false)
    Plugin(emq_auth_pgsql, version=2.3.11, description=Authentication/ACL with PostgreSQL, active=false)
    Plugin(emq_auth_redis, version=2.3.11, description=Authentication/ACL with Redis, active=false)
    Plugin(emq_auth_username, version=2.3.11, description=Authentication with Username/Password, active=false)
    Plugin(emq_coap, version=2.3.11, description=CoAP Gateway, active=false)
    Plugin(emq_dashboard, version=2.3.11, description=EMQ Web Dashboard, active=true)
    Plugin(emq_lua_hook, version=2.3.11, description=EMQ Hooks in lua, active=false)
    Plugin(emq_modules, version=2.3.11, description=EMQ Modules, active=true)
    Plugin(emq_plugin_template, version=2.3.11, description=EMQ Plugin Template, active=false)
    Plugin(emq_recon, version=2.3.11, description=Recon Plugin, active=true)
    Plugin(emq_reloader, version=2.3.11, description=Reloader Plugin, active=false)
    Plugin(emq_retainer, version=2.3.11, description=EMQ Retainer, active=true)
    Plugin(emq_sn, version=2.3.11, description=MQTT-SN Gateway, active=false)
    Plugin(emq_stomp, version=2.3.11, description=Stomp Protocol Plugin, active=false)
    Plugin(emq_web_hook, version=2.3.11, description=EMQ Webhook Plugin, active=false)

    c) 然后启动一个基于mqtt的客户端

    我这里是用基于paho的一个消费者(subscriber)。

    package com.taikang.iot.rulee.security;
    
    import com.taikang.iot.rulee.paho.PushCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import javax.net.ssl.SSLSocketFactory;
    import java.util.concurrent.ScheduledExecutorService;
    
    public class MQTTSSLConsumer {
    //    public static final String HOST = "tcp://127.0.0.1:61613";
    //    public static final String TOPIC1 = "pos_message_all";
    //    private static final String clientid = "client11";
    //    public static final String HOST = "tcp://10.95.197.1:1883";
        public static final String HOST = "ssl://10.95.200.12:8883";
        public static final String TOPIC1 = "taikang/rulee";
        private static final String clientid = "client11";
        private MqttClient client;
        private MqttConnectOptions options;
        private String userName = "water";    //非必须
        private String passWord = "water";  //非必须
        @SuppressWarnings("unused")
        private ScheduledExecutorService scheduler;
        private String sslPemPath = "E:\2018\IOT\MQTT\javassl\java\";
    
        private void start() {
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                options = new MqttConnectOptions();
                //-----------SSL begin--------------
                SSLSocketFactory factory = OpensslHelper.getSSLSocktet(sslPemPath + "sccCA0.crt",sslPemPath +"sccDevSMP.crt",sslPemPath + "sccDevSMP.key","shihuc");
                options.setSocketFactory(factory);
                //-----------end of SSL ------------
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(false);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置重连机制
                options.setAutomaticReconnect(true);
                // 设置回调
                client.setCallback(new PushCallback());
                MqttTopic topic = client.getTopic(TOPIC1);
                //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
                //options.setWill(topic, "close".getBytes(), 2, true);//遗嘱
                client.connect(options);
                //订阅消息
                int[] Qos  = {1};
                String[] topic1 = {TOPIC1};
                client.subscribe(topic1, Qos);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws MqttException {
            System.setProperty("javax.net.debug", "ssl,handshake");
            MQTTSSLConsumer client = new MQTTSSLConsumer();
            client.start();
        }
    }

    其实,这里用什么方式不是很重要,可以是paho的客户端,也可以是mqtt.fx工具(参照我之前的博文MQTT研究之EMQ:【SSL双向验证】

     d) 结果分析

    按照上述的代码进行测试,会发现,c)步骤的代码会遇到错误,表明客户端订阅接入的时候鉴权不通过。

    。。。。。。
    verify_data:  { 58, 47, 6, 13, 206, 237, 24, 135, 49, 56, 87, 57 }
    ***
    MQTT Con: client11, WRITE: TLSv1 Change Cipher Spec, length = 1
    *** Finished
    verify_data:  { 177, 80, 48, 65, 115, 27, 64, 57, 75, 119, 104, 26 }
    ***
    MQTT Con: client11, WRITE: TLSv1 Handshake, length = 48
    MQTT Con: client11, setSoTimeout(0) called
    MQTT Snd: client11, WRITE: TLSv1 Application Data, length = 64
    MQTT Rec: client11, READ: TLSv1 Application Data, length = 32
    MQTT Rec: client11, READ: TLSv1 Application Data, length = 32
    错误的用户名或密码 (4)
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
        at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:988)
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:145)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    将a)中的response.setStatus(401);代码调整为response.setStatus(200);再次运行c)的客户端代码,认证通过,代码执行到acl权限控制。

    scc服务的输出日志(表明emq执行了auth接口,200的返回值表明成功,然后校验是否超级用户,最后acl校验,因为acl的返回值是4xx,emq认为acl失败):

    2019-04-09 20:45:05.479  INFO 5644 --- [nio-8899-exec-1] c.t.i.s.l.u.c.EmqAuthHttpController      : 普通用户;clientid:client11;username:water;password:water
    2019-04-09 20:45:05.510  INFO 5644 --- [nio-8899-exec-2] c.t.i.s.l.u.c.EmqAuthHttpController      : 超级用户;clientid:client11;username:water
    2019-04-09 20:45:10.362  INFO 5644 --- [nio-8899-exec-3] c.t.i.s.l.u.c.EmqAuthHttpController      : access: 1;username: water;clientid: client11; ipaddr: 10.95.177.137;topic: taikang/rulee

    subscriber的客户端日志(acl鉴权时,emq调用scc的服务,得到401,认为订阅者没有操控权限,所以报错):

    MQTT Snd: client11, WRITE: TLSv1 Application Data, length = 32
    MQTT Snd: client11, WRITE: TLSv1 Application Data, length = 48
    MqttException (128)
        at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:438)
        at com.taikang.iot.rulee.security.MQTTSSLConsumer.start(MQTTSSLConsumer.java:60)
        at com.taikang.iot.rulee.security.MQTTSSLConsumer.main(MQTTSSLConsumer.java:70)
    MQTT Rec: client11, READ: TLSv1 Application Data, length = 32

    补充验证:

    1. 超级用户返回为200,看看acl的逻辑

    package com.taikang.iot.scc.loadbalance.user.controller;
    
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * @Author: chengsh05
     * @Date: 2019/4/9 19:40
     */
    @Controller
    @RequestMapping("/mqtt")
    public class EmqAuthHttpController {
    
        private Logger logger = Logger.getLogger(EmqAuthHttpController.class);
    
        @RequestMapping("/auth")
        public void mqttAuth(String clientid, String username, String password, HttpServletResponse response) {
            //auth.http.auth_req.params = clientid=%c,username=%u,password=%P
            logger.info("普通用户;clientid:" + clientid + ";username:" + username + ";password:" + password);
            /**
             * TODO 添加认证的逻辑,控制http的返回码, 这里的用户是否存在,通常是基于数据库做的。
             * HTTP 认证/鉴权 API
             * 认证/ACL 成功,API 返回200
             * 认证/ACL 失败,API 返回4xx
             */
            response.setStatus(200);
        }
    
        @RequestMapping("/superuser")
        public void mqttSuperuser(String clientid, String username, HttpServletResponse response) {
            //auth.http.super_req.params = clientid=%c,username=%u
            logger.info("超级用户;clientid:" + clientid + ";username:" + username);
            response.setStatus(200);
        }
    
        @RequestMapping("/acl")
        public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) {
            //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
            logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
            response.setStatus(401);
        }
    }

    然后,再次启动subscriber程序,看看scc服务的日志输出。得到下面的结果:

    2019-04-09 20:50:52.816  INFO 8412 --- [nio-8899-exec-1] c.t.i.s.l.u.c.EmqAuthHttpController      : 普通用户;clientid:client11;username:water;password:water
    2019-04-09 20:50:52.832  INFO 8412 --- [nio-8899-exec-1] c.t.i.s.l.u.c.EmqAuthHttpController      : 超级用户;clientid:client11;username:water

    发现什么没有呢?和上面的验证(auth接口返回200,superuser接口返回401,acl返回401)对比,很显然的发现,当superuser接口返回200后,acl接口不再调用,无条件认为acl是通过的,即为有权限。 普通用户通过auth后,需要校验acl

    2. 在emq_auth_http已加载的基础上再加载emq_auth_mysql

    [tkiot@tkwh-kfcs-app2 log]$ emqttd_ctl plugins list
    Plugin(emq_auth_clientid, version=2.3.11, description=Authentication with ClientId/Password, active=false)
    Plugin(emq_auth_http, version=2.3.11, description=Authentication/ACL with HTTP API, active=true)
    Plugin(emq_auth_jwt, version=2.3.11, description=Authentication with JWT, active=false)
    Plugin(emq_auth_ldap, version=2.3.11, description=Authentication/ACL with LDAP, active=false)
    Plugin(emq_auth_mongo, version=2.3.11, description=Authentication/ACL with MongoDB, active=false)
    Plugin(emq_auth_mysql, version=2.3.11, description=Authentication/ACL with MySQL, active=true)
    Plugin(emq_auth_pgsql, version=2.3.11, description=Authentication/ACL with PostgreSQL, active=false)
    Plugin(emq_auth_redis, version=2.3.11, description=Authentication/ACL with Redis, active=false)
    Plugin(emq_auth_username, version=2.3.11, description=Authentication with Username/Password, active=false)
    Plugin(emq_coap, version=2.3.11, description=CoAP Gateway, active=false)
    Plugin(emq_dashboard, version=2.3.11, description=EMQ Web Dashboard, active=true)
    Plugin(emq_lua_hook, version=2.3.11, description=EMQ Hooks in lua, active=false)
    Plugin(emq_modules, version=2.3.11, description=EMQ Modules, active=true)
    Plugin(emq_plugin_template, version=2.3.11, description=EMQ Plugin Template, active=false)
    Plugin(emq_recon, version=2.3.11, description=Recon Plugin, active=true)
    Plugin(emq_reloader, version=2.3.11, description=Reloader Plugin, active=false)
    Plugin(emq_retainer, version=2.3.11, description=EMQ Retainer, active=true)
    Plugin(emq_sn, version=2.3.11, description=MQTT-SN Gateway, active=false)
    Plugin(emq_stomp, version=2.3.11, description=Stomp Protocol Plugin, active=false)
    Plugin(emq_web_hook, version=2.3.11, description=EMQ Webhook Plugin, active=false)

    @@@###》》》1) subscriber程序中的username和password配置成mysql数据库中已经存在的,会发现,http插件认证和acl服务将不会被调用。

    @@@###》》》 2)若将subscriber程序中的username和password配置成mysql数据库中不存在的,会发现,http插件认证和acl服务将会被调用。

    能否说明,若MySQL/Redis等基础服务认证和acl控制器和Http认证/ACL控制器同时配置的时候,EMQ优先查询MySQL/Redis服务???

    实验了MySQL,是这个现象,不知其他是否如我的猜测,没有在EMQ的官方文档看到这个说明。

  • 相关阅读:
    L9,a cold welcome
    别说你不知道java中的包装类,wrapper type,以及容易在自动拆箱中出现的问题
    java导出和读取excel数据
    简单实用句型更新
    PAT1027
    生成英语单词
    c# 操作Word总结【转】
    压缩分卷
    VS2010中,无法嵌入互操作类型“……”,请改用适用的接口的解决方法
    HOW TO:使用 Visual C# .NET 打印 RichTextBox 控件的内容
  • 原文地址:https://www.cnblogs.com/shihuc/p/10679800.html
Copyright © 2011-2022 走看看