zoukankan      html  css  js  c++  java
  • 转:EMQ(emqttd) 2.x 安装和使用(物联网传输控制协议的Broker)

    支持下国产开源。

    MQTT物联网传输控制协议:《MQTT-3.1.1-CN.pdf

    下载:emqttd-centos64-v2.0-rc.2-20161019.zip

    安装:

    $ unzip emqttd-centos64-v2.0-rc.2-20161019.zip -d /data/

    $ mv /data/emqttd /data/emqttd-centos64-v2.0-rc.2-20161019

    $ ln -s /data/emqttd-centos64-v2.0-rc.2-20161019 /data/emqttd

    系统优化配置:

    # ulimit -n 1048576

    $ sudo sysctl -w fs.file-max=2097152

    $ sudo sysctl -w fs.nr_open=2097152

    $ sudo sysctl -w net.core.somaxconn=65535

    $ sudo sysctl -p

    修改配置文件(node.cookie必须每台都要一样,node.name的@后面必须是ip地址或者fqdn方式的主机名):

    $ cd /data/emqttd

    $ vi etc/emq.conf

    ## Node name

    node.name = emqttd@192.168.60.58

    ## Cookie for distributed node

    node.cookie = emq_dist_cookie_533d99ckd9ji475

    ## Erlang Process Limit

    node.process_limit = 2000000

    ## Sets the maximum number of simultaneously existing ports for this system

    node.max_ports = 1000000

    ## Size of acceptor pool

    mqtt.listener.tcp.acceptors = 64

    ## Maximum number of concurrent clients

    mqtt.listener.tcp.max_clients = 1000000

    ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

    ## mqtt.listener.tcp.rate_limit = 100,10

    ## TCP Socket Options

    mqtt.listener.tcp.backlog = 262144

    ## Distributed node port range

    node.dist_listen_min = 6000

    node.dist_listen_max = 6999

    ## 如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。

    ## Expired after 1 day:

    ## w - week

    ## d - day

    ## h - hour

    ## m - minute

    ## s - second

    mqtt.session.expired_after = 2w

    # 上面为持久会话到期时间,从客户端断开算起,超时后客户端没有收到的消息会丢弃(不想丢失消息的话,该值就要设置的很大)。

    ## Console log. Enum: off, file, console, both

    log.console = both

    ## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency

    log.console.level = info

    ## Console log file

    log.console.file = log/console.log

    ## Error log file

    log.error.file = log/error.log

    ## Enable the crash log. Enum: on, off

    log.crash = on

    log.crash.file = log/crash.log

    修改下启动脚本(在stop处增加一行):

    $ vi bin/emqttd

    ...

        stop)

            # Wait for the node to completely stop...

            PID="$(relx_get_pid)"

            if ! relx_nodetool "stop"; then

                exit 1

            fi

            while $(kill -s 0 "$PID" 2>/dev/null);

            do

                sleep 1

            done

            killall epmd

            ;;

    启动:

    $ cd emqttd

    直接进入控制台模式:

    $ ./bin/emqttd console

    后台运行模式:

    $ ./bin/emqttd start

    $ ./bin/emqttd_ctl status

    $ ./bin/emqttd stop

    开启防火墙:

    端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。

    sudo firewall-cmd                      --zone=public --add-port=1883/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

    sudo firewall-cmd                      --zone=public --add-port=8883/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=8883/tcp

    sudo firewall-cmd                      --zone=public --add-port=8083/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=8083/tcp

    sudo firewall-cmd                      --zone=public --add-port=18083/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=18083/tcp

    sudo firewall-cmd                      --zone=public --add-port=4369/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=4369/tcp

    sudo firewall-cmd                      --zone=public --add-port=6000-6999/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=6000-6999/tcp

    sudo firewall-cmd                      --zone=public --list-all

    sudo firewall-cmd --permanent --zone=public --list-all

    查看各台的启动状态:

    http://192.168.60.55:8083/status

    http://192.168.60.55:18083/      用户名/密码: admin / public

    $ vi data/configs/vm.xxx.args

    $ vi data/configs/app.xxx.conf

    LOG位置:

    $ vi log/

    把节点加入集群:

    在各个节点上执行(重复执行也没关系,其中192.168.60.55这台会提示错误:cannot_join_with_self,这个没关系,自己不用加入自己):

    $ ./bin/emqttd_ctl cluster join emqttd@192.168.60.55

    $ ./bin/emqttd_ctl cluster status

    Cluster status: [{running_nodes,['emqttd@192.168.60.58',

                                     'emqttd@192.168.60.56',

                                     'emqttd@192.168.60.57',

                                     'emqttd@192.168.60.55']}]

    把节点退出集群:

    本机退出集群:

    $ ./bin/emqttd_ctl cluster leave

    把某节点退出集群:

    $ ./bin/emqttd_ctl cluster remove emqttd@192.168.60.56

    测试一下:

    下载安装MTQQ协议的客户端:https://mosquitto.org/download/

    $ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm

    $ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm

    订阅:

    $ mosquitto_sub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -c -i 1111

    发布:另开一个ssh或者另外机器上执行:

    $ mosquitto_pub -h 192.168.60.57 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 7"

    参数:

    -h: 连接到哪台broker。

    -p: 连接端口。

    -t: topic名字,topic类似文件系统的组织方式,以"/"分隔符来分层,订阅者订阅时可以使用通配符"+"和"#",发布者不能使用通配符。

    -q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。

    -c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。

    -i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。

    -r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -r -n  -u user -P 123456。

    -m: 该topic的一条消息内容。

    加入MariaDB的认证:

    修改配置:

    $ vi ./etc/emq.conf

    ## Allow Anonymous authentication

    mqtt.allow_anonymous = false

    ## Default ACL File

    mqtt.acl_file = etc/acl.conf

    $ vi ./data/loaded_plugins

    emq_dashboard.

    emq_auth_mysql.

    修改plugins配置:

    $ vi etc/plugins/emq_auth_mysql.conf

    ## Mysql Server

    auth.mysql.server = 192.168.60.60:3306

    ## Mysql Pool Size

    auth.mysql.pool = 8

    ## Mysql Username

    auth.mysql.username = root

    ## Mysql Password

    auth.mysql.password = 123456

    ## Mysql Database

    auth.mysql.database = mqtt

    ## Variables: %u = username, %c = clientid

    ## Authentication Query: select password only

    auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1

    ## Password hash: plain, md5, sha, sha256, pbkdf2

    auth.mysql.password_hash = plain

    ## %% Superuser Query

    auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

    ## ACL Query Command

    auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

    ## ACL nomatch

    auth.mysql.acl_nomatch = deny

    往MariaDB插入初始化数据库和表(引擎需要改成InnDB):

    认证的用户表也可以共用其他系统的用户表,由上面的emq_auth_mysql.conf来配置:auth.mysql.auth_query、auth.mysql.super_query、auth.mysql.acl_query:

    建立mqtt数据库:

    $ mysql -uroot -p123456 -hxxxx

    MariaDB [test]> create database mqtt;

    用户表:

    MariaDB [test]> CREATE TABLE `mqtt_user` (

      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,

      `username` varchar(100) DEFAULT NULL,

      `password` varchar(100) DEFAULT NULL,

      `salt` varchar(20) DEFAULT NULL,

      `is_superuser` tinyint(1) DEFAULT 0,

      `created` datetime DEFAULT NULL,

      PRIMARY KEY (`id`),

      UNIQUE KEY `mqtt_username` (`username`)

    ) DEFAULT CHARSET=utf8;

    用户表插入测试数据:

    MariaDB [mqtt]> INSERT INTO mqtt_user (id, username, password, salt, is_superuser, created)

    VALUES

        (1,'superuser','123456','123456',True,'2016-10-26 10:00:00'),

        (2,'user','123456','123456',False,'2016-10-26 10:01:00');

    acl表:

    MariaDB [mqtt]> CREATE TABLE `mqtt_acl` (

      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,

      `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',

      `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',

      `username` varchar(100) DEFAULT NULL COMMENT 'Username',

      `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',

      `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',

      `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',

      PRIMARY KEY (`id`)

    ) DEFAULT CHARSET=utf8;

    acl表插入默认数据(注意带$SYS为broker保留的特殊topic用来统计使用,username为$all表示所有在user表中的用户):

    MariaDB [mqtt]> INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)

    VALUES

        (1,1,NULL,'$all',NULL,2,'#'),

        (2,1,NULL,'$all',NULL,1,'#'),

        (3,1,NULL,'$all',NULL,3,'#'),

        (4,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),

        (5,1,'127.0.0.1',NULL,NULL,2,'#'),

        (6,1,NULL,'dashboard',NULL,1,'$SYS/#');

    测试,重启每台服务器后执行:

    $ mosquitto_sub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -c -i 1112  -u user -P 123456

    $ mosquitto_pub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 1" -u user -P 123456

    重新初始化emq:

    $ bin/emqttd stop

    $ rm -rf data/mnesia/*

    $ rm -rf data/configs/*

    $ rm -rf log/*

    $ bin/emqttd start

    $ bin/emqttd_ctl cluster join emqttd@192.168.60.55

    使用Haproxy来实现负载分担:

    因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。

    $ vi /etc/haproxy/haproxy.cfg

    ################## EMQ ######################

    listen emq_emqttd *:1883

        mode tcp

        balance source

        log global

        timeout connect         25s

        timeout client          0

        timeout server          0

        option tcpka

        option tcplog

        option tcp-check

        server emq1 192.168.60.55:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

        server emq2 192.168.60.56:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

        server emq3 192.168.60.57:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

        server emq4 192.168.60.58:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    开启haproxy上的防火墙:

    sudo firewall-cmd                      --zone=public --add-port=1883/tcp

    sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

    sudo firewall-cmd                      --zone=public --list-all

    sudo firewall-cmd --permanent --zone=public --list-all

    使用:

    broker操作:

    显示broker状态:

    $ ./bin/emqttd_ctl status

    显示broker版本、启用时间等:

    $ ./bin/emqttd_ctl broker

    显示broker的pubsub进程状态、内存、队列长度、规约数等:

    $ ./bin/emqttd_ctl broker  pubsub

    显示broker的统计信息:客户端连接数、会话数、主题数、订阅数、路由数等:

    $ ./bin/emqttd_ctl broker stats

    显示broker的测量信息:底层流量、MQTT报文数、消息数等:

    $ ./bin/emqttd_ctl broker metrics

    Cluster操作:

    $ ./bin/emqttd_ctl cluster

    cluster join <Node>                             # Join the cluster

    cluster leave                                   # Leave the cluster

    cluster remove <Node>                           # Remove the node from cluster

    cluster status                                  # Cluster status

    Client操作:

    $ ./bin/emqttd_ctl clients

    clients list                                    # List all clients

    clients show <ClientId>                         # Show a client

    clients kick <ClientId>                         # Kick out a client

    Sessions操作:

    $ ./bin/emqttd_ctl sessions

    sessions list                                   # List all sessions

    sessions list persistent                        # List all persistent sessions ,桥接Bridge的时候才会用到(即clean_sess=false的类型)

    sessions list transient                         # List all transient sessions

    sessions show <ClientId>                        # Show a session

    Routes操作:

    $ ./bin/emqttd_ctl routes

    routes list                                     # List all routes

    routes show <Topic>                             # Show a route

    Topics操作:

    $ ./bin/emqttd_ctl topics

    topics list                                     # List all topics

    topics show <Topic>                             # Show a topic

    Subscription订阅者操作:

    $ ./bin/emqttd_ctl subscriptions

    subscriptions list                              # List all subscriptions

    subscriptions show <ClientId>                   # Show subscriptions of a client

    subscriptions add <ClientId> <Topic> <QoS>      # Add a static subscription manually

    subscriptions del <ClientId>                    # Delete static subscriptions manually

    subscriptions del <ClientId> <Topic>            # Delete a static subscription manually

    Plugins插件操作:

    $ ./bin/emqttd_ctl plugins

    plugins list                                    # Show loaded plugins

    plugins load <Plugin>                           # Load plugin

    plugins unload <Plugin>                         # Unload plugin

    $ ./bin/emqttd_ctl plugins list

    Plugin(emq_auth_clientid, version=2.0, description=Authentication with ClientId/Password, active=false)

    Plugin(emq_auth_http, version=2.0, description=Authentication/ACL with HTTP API, active=false)

    Plugin(emq_auth_ldap, version=2.0, description=Authentication/ACL with LDAP, active=false)

    Plugin(emq_auth_mongo, version=2.0, description=Authentication/ACL with MongoDB, active=false)

    Plugin(emq_auth_mysql, version=2.0, description=Authentication/ACL with MySQL, active=false)

    Plugin(emq_auth_pgsql, version=2.0, description=Authentication/ACL with PostgreSQL, active=false)

    Plugin(emq_auth_redis, version=2.0, description=Authentication/ACL with Redis, active=false)

    Plugin(emq_auth_username, version=2.0, description=Authentication with Username/Password, active=false)

    Plugin(emq_coap, version=0.2, description=CoAP Gateway, active=false)

    Plugin(emq_dashboard, version=2.0, description=Dashboard, active=true)

    Plugin(emq_mod_rewrite, version=2.0, description=Rewrite Module, active=false)

    Plugin(emq_plugin_template, version=2.0, description=EMQ Plugin Template, active=false)

    Plugin(emq_recon, version=2.0, description=Recon Plugin, active=false)

    Plugin(emq_reloader, version=2.0, description=Reloader Plugin, active=false)

    Plugin(emq_sn, version=0.2, description=MQTT-SN Gateway, active=false)

    Plugin(emq_stomp, version=2.0, description=Stomp Protocol Plugin, active=false)

    Bridges桥接操作:

    $ ./bin/emqttd_ctl bridges

    bridges list                                    # List bridges

    bridges options                                 # Bridge options

    bridges start <Node> <Topic>                    # Start a bridge

    bridges start <Node> <Topic> <Options>          # Start a bridge with options

    bridges stop <Node> <Topic>                     # Stop a bridge

    vm虚机(erlang虚机)性能查看:

    $ ./bin/emqttd_ctl vm all

    cpu/load1               : 0.05

    cpu/load5               : 0.05

    cpu/load15              : 0.07

    memory/total            : 166404232

    memory/processes        : 32564328

    memory/processes_used   : 32563952

    memory/system           : 133839904

    memory/atom             : 959633

    memory/atom_used        : 954350

    memory/binary           : 46088

    memory/code             : 28250535

    memory/ets              : 5741416

    process/limit           : 2097152

    process/count           : 288

    io/max_fds              : 1000000

    io/active_fds           : 1

    ports/count           : 25

    ports/limit           : 1048576

    端口使用情况查看:

    $ ./bin/emqttd_ctl listeners

    mnesia数据库信息查看:

    $ bin/emqttd_ctl mnesia

    管理dashboard用户:

    $ ./bin/emqttd_ctl admins

    admins add <Username> <Password> <Tags>         # Add dashboard user

    admins passwd <Username> <Password>             # Reset dashboard user password

    admins del <Username>                           # Delete dashboard user

    追踪 ,EMQ消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。

    追踪客户端(Client):

    ./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" "trace_clientid.log"

    追踪主题(Topic):

    ./bin/emqttd_ctl trace topic "topic1" "trace_topic.log"

    查询追踪:

    ./bin/emqttd_ctl trace list

    停止追踪:

    ./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" off

    ./bin/emqttd_ctl trace topic "topic1" off

    钩子(Hook)扩展,EMQ消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):

    钩子                                说明

    client.connected             客户端上线

    client.subscribe               客户端订阅主题前

    client.unsubscribe           客户端取消订阅主题

    session.subscribed          客户端订阅主题后

    session.unsubscribed      客户端取消订阅主题后

    message.publish             MQTT消息发布

    message.delivered          MQTT消息送达

    message.acked               MQTT消息回执

    client.disconnected        客户端连接断开

    钩子使用例子:

    -module(emqttd_plugin_template).

    -export([load/1, unload/0]).

    -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

    load(Env) ->

        emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),

        emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),

        emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

    on_message_publish(Message, _Env) ->

        io:format("publish ~s~n", [emqttd_message:format(Message)]),

        {ok, Message}.

    on_message_delivered(ClientId, _Username, Message, _Env) ->

        io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),

        {ok, Message}.

    on_message_acked(ClientId, _Username, Message, _Env) ->

        io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),

        {ok, Message}.

    unload() ->

        emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),

        emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4),

        emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4).

    MQTT协议的客户端库:

    https://github.com/mqtt/mqtt.github.io/wiki/libraries

    http://www.eclipse.org/paho/downloads.php

    做IM即时通信协议时的考虑:

    MQTT作为发布、订阅系统,作为消息推送是很好的。

    如果作为IM即时通信,则可以考虑把每个用户id做成一个topic,每个用户订阅名为自己id的topic(比如"USER/1111"),发送方如果需要发布消息给某一个用户,则发布该用户id的topic消息,对方自然就会收到。另外每个用户还要订阅几个系统类的topic(比如"ADMIN/broadcast","ADMIN/1111"),以便后台系统发布各类消息。至于群组消息,每加入一个群,则增加订阅一个名为群id的topic(比如"GROUP/1111")。

    --------------------- 本文来自 王树民ITDATA 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/wangshuminjava/article/details/80589899?utm_source=copy 

     

  • 相关阅读:
    poj 3321 Apple Tree
    hdu 1520 Anniversary party
    Light OJ 1089 Points in Segments (II)
    Timus 1018 Binary Apple Tree
    zoj 3299 Fall the Brick
    HFUT 1287 法默尔的农场
    Codeforces 159C String Manipulation 1.0
    GraphQL + React Apollo + React Hook 大型项目实战(32 个视频)
    使用 TypeScript & mocha & chai 写测试代码实战(17 个视频)
    GraphQL + React Apollo + React Hook + Express + Mongodb 大型前后端分离项目实战之后端(19 个视频)
  • 原文地址:https://www.cnblogs.com/saryli/p/9739256.html
Copyright © 2011-2022 走看看