zoukankan      html  css  js  c++  java
  • EMQ学习 ---集群

    emqttd集群设置管理

    一、先来看EMQ的文档定义:http://emqtt.com/docs/v1/cluster.html

    emqttd集群设置管理

    假设部署两台服务器s1.emqtt.io, s2.emqtt.io上部署集群:

    节点名 主机名(FQDN) IP地址

    emqttd@s1.emqtt.io 或emqttd@192.168.0.10	s1.emqtt.io	192.168.0.10
    emqttd@s2.emqtt.io 或emqttd@192.168.0.20	s2.emqtt.io	192.168.0.20

    Warning

    节点名格式: Name@Host, Host必须是IP地址或FQDN(主机名.域名)

    emqttd@s1.emqtt.io节点设置

    emqttd/etc/vm.args:

    -name emqttd@s1.emqtt.io

    -name emqttd@192.168.0.10
    

    节点加入集群

    启动两台节点后,emqttd@s2.emqtt.io上执行:

    $ ./bin/emqttd_ctl cluster join emqttd@s1.emqtt.io

    Join the cluster successfully.
    Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}]

    或,emqttd@s1.emqtt.io上执行:

    $ ./bin/emqttd_ctl cluster join emqttd@s2.emqtt.io
    
    Join the cluster successfully.
    Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}]
    

      

    任意节点上查询集群状态:

    $ ./bin/emqttd_ctl cluster status
    
    Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}]
    

    节点退出集群

    节点退出集群,两种方式:

    • leave: 本节点退出集群
    • remove: 从集群删除其他节点

    emqttd@s2.emqtt.io主动退出集群:

    $ ./bin/emqttd_ctl cluster leave

    或emqttd@s1.emqtt.io节点上,从集群删除emqttd@s2.emqtt.io节点:

    $ ./bin/emqttd_ctl cluster remove emqttd@s2.emqtt.io
    

    emqttd_ctl是怎么使用的?

    -module(emqttd_cli).有定义要加载的命令

    -export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
             routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
             listeners/1, vm/1, mnesia/1, trace/1]).
    load() ->
        Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
        [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
    

    加入集群后,子节点mnesia数据库怎么办?

    mnesia数据库天然支持分布式集群。子节点加入之后就类似MySQL数据库主从备份一样,主节点和子节点mnesia会保持同步。来看源码:

    -module(emqttd_mnesia).

    %% @doc Join the mnesia cluster
    -spec(join_cluster(node()) -> ok).
    join_cluster(Node) when Node =/= node() ->
        %% Stop mnesia and delete schema first
        ensure_ok(ensure_stopped()),
        ensure_ok(delete_schema()),
        %% Start mnesia and cluster to node
        ensure_ok(ensure_started()),
        ensure_ok(connect(Node)),
        ensure_ok(copy_schema(node())),
        %% Copy tables
        copy_tables(),
        ensure_ok(wait_for(tables)).
    

    子节点加入之后,会先删除自己的mnesia数据库和各个表,然后copy一份主节点的库,再copy各个表数据。

    %% @doc Cluster with node.
    -spec(connect(node()) -> ok | {error, any()}).
    connect(Node) ->
        case mnesia:change_config(extra_db_nodes, [Node]) of
            {ok, [Node]} -> ok;
            {ok, []}     -> {error, {failed_to_connect_node, Node}};
            Error        -> Error
        end.
    
    %% @doc Copy schema.
    copy_schema(Node) ->
        case mnesia:change_table_copy_type(schema, Node, disc_copies) of
            {atomic, ok} ->
                ok;
            {aborted, {already_exists, schema, Node, disc_copies}} ->
                ok;
            {aborted, Error} ->
                {error, Error}
        end.
    
    %% @doc Copy mnesia tables.
    copy_tables() ->
        emqttd_boot:apply_module_attributes(copy_mnesia).
    

    函数copy_tables(),会检索和执行emq工程目录下所有erl模块里面的mnesia(copy)函数,模块要求含有"-copy_mnesia({mnesia, [copy]})."关键字

    例如:

    -module(emqttd_backend).

    mnesia(copy) ->
        ok = emqttd_mnesia:copy_table(retained_message),
        ok = emqttd_mnesia:copy_table(backend_subscription).
    

    -module(emqttd_router).

    -copy_mnesia({mnesia, [copy]}).
    
    mnesia(copy) ->
        ok = emqttd_mnesia:copy_table(route, ram_copies).
    

    EMQ工程目录下,有关键字-boot_mnesia({mnesia, [boot]}).和-copy_mnesia({mnesia, [copy]}).的模块是:

    -module(emqttd_backend).
    -module(emqttd_pubsub).
    -module(emqttd_router).
    -module(emqttd_server).
    -module(emqttd_sm).
    -module(emqttd_trie).

    其中,emqttd_backend模块新建的数据库retained_message和backend_subscription是disc_copies类型的,其他模块是ram_copies类型的。

    emqttd的mnesia初始化

    1、-module(emqttd_app).

    start(_StartType, _StartArgs) ->
    print_banner(),
    emqttd_mnesia:start(),
    

      


    2、-module(emqttd_mnesia)

    start() ->
    ensure_ok(ensure_data_dir()),
    ensure_ok(init_schema()),
    ok = mnesia:start(),
    init_tables(),
    wait_for(tables).
    
    %% @doc Init mnesia schema or tables.
    init_schema() ->
    case mnesia:system_info(extra_db_nodes) of
    [] -> mnesia:create_schema([node()]);
    [_|_] -> ok
    end.
    
    %% @private
    %% @doc Init mnesia tables.
    init_tables() ->
    case mnesia:system_info(extra_db_nodes) of
    [] -> create_tables();
    [_|_] -> copy_tables()
    end.
    

    3、数据库启动和拷贝的例子-module(emqttd_backend).

    -boot_mnesia({mnesia, [boot]}).
    -copy_mnesia({mnesia, [copy]}).
    
    %% Mnesia callbacks
    %%--------------------------------------------------------------------
    
    mnesia(boot) ->
    ok = emqttd_mnesia:create_table(retained_message, [
    {type, ordered_set},
    {disc_copies, [node()]},
    {record_name, retained_message},
    {attributes, record_info(fields, retained_message)},
    {storage_properties, [{ets, [compressed]},
    {dets, [{auto_save, 1000}]}]}]),
    ok = emqttd_mnesia:create_table(backend_subscription, [
    {type, bag},
    {disc_copies, [node()]},
    {record_name, mqtt_subscription},
    {attributes, record_info(fields, mqtt_subscription)},
    {storage_properties, [{ets, [compressed]},
    {dets, [{auto_save, 5000}]}]}]);
    
    mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(retained_message),
    ok = emqttd_mnesia:copy_table(backend_subscription).
    4、数据库启动和拷贝的例子-module(emqttd_router).
    
    -boot_mnesia({mnesia, [boot]}).
    -copy_mnesia({mnesia, [copy]}).
    
    mnesia(boot) ->
    ok = emqttd_mnesia:create_table(route, [
    {type, bag},
    {ram_copies, [node()]},
    {record_name, mqtt_route},
    {attributes, record_info(fields, mqtt_route)}]);
    
    mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(route, ram_copies).
    

    4、数据库启动和拷贝的例子-module(emqttd_router).

    -boot_mnesia({mnesia, [boot]}).
    -copy_mnesia({mnesia, [copy]}).
    
    mnesia(boot) ->
        ok = emqttd_mnesia:create_table(route, [
                    {type, bag},
                    {ram_copies, [node()]},
                    {record_name, mqtt_route},
                    {attributes, record_info(fields, mqtt_route)}]);
    
    mnesia(copy) ->
        ok = emqttd_mnesia:copy_table(route, ram_copies).
    

    注意事项

    1、如果EMQ所在服务器的IP地址是192.168.0.10

    那么节点名称A:emqttd@192.168.0.10和节点名称B:emqttd@127.0.0.1是相同的意思,如果EMQ以节点A启动服务器,那么再以节点B启动是会失败的。

    此时只能把A或B其中一个更名一下。即节点名格式: Name@Host里面的Name要加以区分。

    2、集群的信息会记录在工程目录下,/rel/emqttd/data/mnesia/emqttd@172.16.6.161/schema.DAT

    即,当子节点A连接了主节点B,集群信息会分别记录在schema.DAT。如果子节点A没有主动断开集群,下次重启时,仍然会主动连接主节点B。

    ★有几个遗留问题待确认,不知道EMQ V2版本有无修正:

    问题(1)如果子节点A没有主动断开集群,下次重启时,如果B不存在,那么A就会启动失败!好可怕!

    问题(2)A连接上B之后。A目录下的文件/rel/emqttd/data/mnesia/emqttd@172.16.6.161/retained_message.DCD和backend_subscription.DCD就自我删除了。以后也见不着了,彻底消失了,奇怪!请注意,这两个Mnesia数据库表类型是持久化,disc_copies。

    ★2018/05/17实测emq2.3.7,A主B从,结论如下:

    (1)B join A之后,会主动删除B的Mnesia表,然后从A拷贝一份过来。B leave A之后,也会删除B的Mnesia表。

    (2)B join A之后,A和B的Mnesia表会始终保持一致性。

    添加或删除或更新A的表数据,B会同步。

    添加或删除或更新B的表数据,A会同步。

    (3)集群的信息会记录在工程目录下,/rel/emqttd/data/mnesia/emqttd@172.16.6.161/schema.DAT。A或B进程退出后,再次启动时,仍然保持集群状态。

    3、常用命令

    ./emqttd console
    ./emqttd start
    ./emqttd stop
    ./emqttd_ctl cluster join emqttd@172.16.6.161
    ./emqttd_ctl cluster status

    ./emqttd_ctl cluster leave

    werl -name firecat@127.0.0.1 -setcookie emqsecretcookie
    observer:start().
    ./_rel/emqttd/bin/emqttd console
    ./_rel/emqttd/bin/emqttd start
    ./_rel/emqttd/bin/emqttd_ctl status
    ./_rel/emqttd/bin/emqttd stop
    ./_rel/emqttd/bin/emqttd_ctl cluster join emq@192.168.0.116
    ./_rel/emqttd/bin/emqttd_ctl cluster status
    ./_rel/emqttd/bin/emqttd_ctl cluster leave



  • 相关阅读:
    Thinkphp框架下对某个字段查询数据的时候进行唯一过滤,返回唯一不同的值
    Thinkphp框架下(同服务器下)不同二级域名之间session互通共享设置
    CentOS 6.8下Apache绑定多个域名的方法
    CentOS 6.8下更改Apache默认网站安装目录
    Ubuntu 16.04系统下安装PHP5.6*
    Ubuntu 16.04系统下解决Vim乱码问题
    jQuery 核心
    Ubuntu 16.04系统下安装Discuz出现“HTTP ERROR 500”目前无法处理此请求
    BCB6 重装后的项目编译莫名问题
    LR6 碱性电池才能带动微软鼠标
  • 原文地址:https://www.cnblogs.com/saryli/p/9783594.html
Copyright © 2011-2022 走看看