zoukankan      html  css  js  c++  java
  • emqtt 分布集群及节点桥接搭建

    目录

    分布集群

    emq@s1.emqtt.io 节点设置

    emq@s2.emqtt.io 节点设置

    节点加入集群

    节点退出集群

    节点发现与自动集群

    manual 手动创建集群

    基于 static 节点列表自动集群

    基于 mcast 组播自动集群

    基于 DNS A 记录自动集群

    基于 etcd 自动集群

    基于 Kubernetes 自动集群

    集群脑裂与自动愈合

    集群节点自动清除

    跨节点会话(Session)

    防火墙设置

    一致性 Hash 与 DHT

     

    负载均衡

    HAProxy -> EMQ 集群

    NGINX Plus -> EMQ 集群

     

    节点桥接 (Bridge)

    EMQ 节点桥接配置


    分布集群

    假设部署两台服务器 s1.emqtt.io, s2.emqtt.io 上部署集群:

    节点名 主机名(FQDN) IP 地址
    emq@s1.emqtt.io 或emq@192.168.0.10 s1.emqtt.io 192.168.0.10
    emq@s2.emqtt.io 或emq@192.168.0.20 s2.emqtt.io 192.168.0.20

    警告

    节点名格式: Name@Host, Host必须是IP地址或FQDN(主机名.域名)

    emq@s1.emqtt.io 节点设置

    emqttd/etc/emq.conf:

    node.name = emq@s1.emqtt.io
    或
    node.name = emq@192.168.0.10

    也可通过环境变量:

    export EMQ_NODE_NAME=emq@s1.emqtt.io && ./bin/emqttd start

    警告

    节点启动加入集群后,节点名称不能变更。

    emq@s2.emqtt.io 节点设置

    emqttd/etc/emq.conf:

    node.name = emq@s2.emqtt.io
    或
    node.name = emq@192.168.0.20
    

    节点加入集群

    启动两台节点后,

    emq@s2.emqtt.io 上执行:

    $ ./bin/emqttd_ctl cluster join emq@s1.emqtt.io
    
    Join the cluster successfully.
    Cluster status: [{running_nodes,['emq@s1.emqtt.io','emq@s2.emqtt.io']}]
    

    或,emq@s1.emqtt.io 上执行:

    $ ./bin/emqttd_ctl cluster join emq@s2.emqtt.io
    
    Join the cluster successfully.
    Cluster status: [{running_nodes,['emq@s1.emqtt.io','emq@s2.emqtt.io']}]
    

    任意节点上查询集群状态:

    $ ./bin/emqttd_ctl cluster status
    
    Cluster status: [{running_nodes,['emq@s1.emqtt.io','emq@s2.emqtt.io']}]
    

    节点退出集群

    节点退出集群,两种方式:

    1. leave: 本节点退出集群
    2. remove: 从集群删除其他节点

    emq@s2.emqtt.io 主动退出集群:

    $ ./bin/emqttd_ctl cluster leave
    

    或 emq@s1.emqtt.io 节点上,从集群删除 emq@s2.emqtt.io 节点:

    $ ./bin/emqttd_ctl cluster remove emq@s2.emqtt.io
    

    节点发现与自动集群

    EMQ R2.3 版本支持基于 Ekka 库的集群自动发现(Autocluster)。Ekka 是为 Erlang/OTP 应用开发的集群管理库,支持 Erlang 节点自动发现(Discovery)、自动集群(Autocluster)、脑裂自动愈合(Network Partition Autoheal)、自动删除宕机节点(Autoclean)。

    EMQ R2.3 支持多种策略自动发现节点创建集群:

    策略 说明
    manual 手工命令创建集群
    static 静态节点列表自动集群
    mcast UDP 组播方式自动集群
    dns DNS A 记录自动集群
    etcd 通过 etcd 自动集群
    k8s Kubernetes 服务自动集群

    manual 手动创建集群

    默认配置为手动创建集群,节点通过 ./bin/emqttd_ctl join <Node> 命令加入:

    cluster.discovery = manual
    

    基于 static 节点列表自动集群

    配置固定的节点列表,自动发现并创建集群:

    cluster.discovery = static
    
    ##--------------------------------------------------------------------
    ## Cluster with static node list
    
    cluster.static.seeds = emq1@127.0.0.1,ekka2@127.0.0.1
    

    基于 mcast 组播自动集群

    基于 UDP 组播自动发现并创建集群:

    cluster.discovery = mcast
    
    ##--------------------------------------------------------------------
    ## Cluster with multicast
    
    cluster.mcast.addr = 239.192.0.1
    
    cluster.mcast.ports = 4369,4370
    
    cluster.mcast.iface = 0.0.0.0
    
    cluster.mcast.ttl = 255
    
    cluster.mcast.loop = on
    

    基于 DNS A 记录自动集群

    基于 DNS A 记录自动发现并创建集群:

    cluster.discovery = dns
    
    ##--------------------------------------------------------------------
    ## Cluster with DNS
    
    cluster.dns.name = localhost
    
    cluster.dns.app  = ekka
    

    基于 etcd 自动集群

    基于 etcd 自动发现并创建集群:

    cluster.discovery = etcd
    
    ##--------------------------------------------------------------------
    ## Cluster with Etcd
    
    cluster.etcd.server = http://127.0.0.1:2379
    
    cluster.etcd.prefix = emqcl
    
    cluster.etcd.node_ttl = 1m
    

    基于 Kubernetes 自动集群

    Kubernetes 下自动发现并创建集群:

    cluster.discovery = k8s
    
    ##--------------------------------------------------------------------
    ## Cluster with k8s
    
    cluster.k8s.apiserver = http://10.110.111.204:8080
    
    cluster.k8s.service_name = ekka
    
    ## Address Type: ip | dns
    cluster.k8s.address_type = ip
    
    ## The Erlang application name
    cluster.k8s.app_name = ekka
    

    集群脑裂与自动愈合

    EMQ R2.3 版本正式支持集群脑裂自动恢复(Network Partition Autoheal):

    cluster.autoheal = on
    

    集群脑裂自动恢复流程:

    1. 节点收到 Mnesia库 的 inconsistent_database 事件3秒后进行集群脑裂确认;
    2. 节点确认集群脑裂发生后,向 Leader 节点(集群中最早启动节点)上报脑裂消息;
    3. Leader 节点延迟一段时间后,在全部节点在线状态下创建脑裂视图(SplitView);
    4. Leader 节点在多数派(majority)分区选择集群自愈的 Coordinator 节点;
    5. Coordinator 节点重启少数派(minority)分区节点恢复集群。

    集群节点自动清除

    EMQ R2.3 版本支持从集群自动删除宕机节点(Autoclean):

    cluster.autoclean = 5m
    

    跨节点会话(Session)

    EMQ 消息服务器集群模式下,MQTT 连接的持久会话(Session)跨节点。

    例如负载均衡的两台集群节点: node1 与 node2,同一 MQTT 客户端先连接 node1,node1 节点会创建持久会话;客户端断线重连到 node2 时,MQTT 的连接在 node2 节点,持久会话仍在 node1 节点:

                                     node1
                                  -----------
                              |-->| session |
                              |   -----------
                node2         |
             --------------   |
    client-->| connection |<--|
             --------------
    

    防火墙设置

    如果集群节点间存在防火墙,防火墙需要开启 4369 端口和一个 TCP 端口段。4369 由 epmd 端口映射服务使用,TCP 端口段用于节点间建立连接与通信。

    防火墙设置后,EMQ 需要配置相同的端口段,emqttd/etc/emq.conf 文件:

    ## Distributed node port range
    node.dist_listen_min = 6369
    node.dist_listen_max = 7369
    

    一致性 Hash 与 DHT

    NoSQL 数据库领域分布式设计,大多会采用一致性 Hash 或 DHT。EMQ 消息服务器集群架构可支持千万级的路由,更大级别的集群可采用一致性 Hash、DHT 或 Shard 方式切分路由表。

     

    负载均衡

    HAProxy -> EMQ 集群

    HAProxy 作为 LB 部署 EMQ 集群,并终结 SSL 连接:

    1. 创建 EMQ 集群节点,例如:
    节点 IP 地址
    emq1 192.168.0.2
    emq2 192.168.0.3
    1. 配置 /etc/haproxy/haproxy.cfg,示例:

      listen mqtt-ssl
          bind *:8883 ssl crt /etc/ssl/emqttd/emq.pem no-sslv3
          mode tcp
          maxconn 50000
          timeout client 600s
          default_backend emq_cluster
      
      backend emq_cluster
          mode tcp
          balance source
          timeout server 50s
          timeout check 5000
          server emq1 192.168.0.2:1883 check inter 10000 fall 2 rise 5 weight 1
          server emq2 192.168.0.3:1883 check inter 10000 fall 2 rise 5 weight 1
          source 0.0.0.0 usesrc clientip
      

    官方文档:http://cbonte.github.io/haproxy-dconv/1.8/intro.html

    NGINX Plus -> EMQ 集群

    NGINX Plus 产品作为 EMQ 集群 LB,并终结 SSL 连接:

    1. 注册 NGINX Plus 试用版,Ubuntu 下安装: https://cs.nginx.com/repo_setup
    2. 创建 EMQ 节点集群,例如:
    节点 IP 地址
    emq1 192.168.0.2
    emq2 192.168.0.3
    1. 配置 /etc/nginx/nginx.conf,示例:

      stream {
          # Example configuration for TCP load balancing
      
          upstream stream_backend {
              zone tcp_servers 64k;
              hash $remote_addr;
              server 192.168.0.2:1883 max_fails=2 fail_timeout=30s;
              server 192.168.0.3:1883 max_fails=2 fail_timeout=30s;
          }
      
          server {
              listen 8883 ssl;
              status_zone tcp_server;
              proxy_pass stream_backend;
              proxy_buffer_size 4k;
              ssl_handshake_timeout 15s;
              ssl_certificate     /etc/emqttd/certs/cert.pem;
              ssl_certificate_key /etc/emqttd/certs/key.pem;
          }
      }

    官方文档:https://cs.nginx.com/repo_setup

    节点桥接 (Bridge)

    EMQ 消息服务器支持多节点桥接模式互联:

                  ---------                     ---------                     ---------
    Publisher --> | Node1 | --Bridge Forward--> | Node2 | --Bridge Forward--> | Node3 | --> Subscriber
                  ---------                     ---------                     ---------
    

    节点间桥接与集群不同,不复制主题树与路由表,只按桥接规则转发 MQTT 消息。

    EMQ 节点桥接配置

    假设在本机创建两个 EMQ 节点,并创建一条桥接转发全部传感器(sensor)主题消息:

    目录 节点 MQTT 端口
    emqttd1 emqttd1@127.0.0.1 1883
    emqttd2 emqttd2@127.0.0.1 2883

    启动 emqttd1, emqttd2 节点:

    cd emqttd1/ && ./bin/emqttd start
    cd emqttd2/ && ./bin/emqttd start
    

    emqttd1 节点上创建到 emqttd2 桥接:

    $ ./bin/emqttd_ctl bridges start emqttd2@127.0.0.1 sensor/#
    
    bridge is started.
    
    $ ./bin/emqttd_ctl bridges list
    
    bridge: emqttd1@127.0.0.1--sensor/#-->emqttd2@127.0.0.1
    

    测试 emqttd1–sensor/#–>emqttd2 的桥接: 

    #A 连接emqttd2节点,订阅 sensor/# 主题
    
    #B 连接emqttd1节点,发布 sensor/test 主题
    
    #A 接受到sensor/test的消息
    

    删除桥接:

    ./bin/emqttd_ctl bridges stop emqttd2@127.0.0.1 sensor/#
  • 相关阅读:
    C# 利用DataTable批处理数据导入数据库
    人员基础信息一体化采集系统建设方案
    定时调用WebService方法同步数据
    进程间通信
    信号
    Linux进程基础
    来自硬件男的《信号与系统》整理笔记
    shell脚本编程(ubantu)
    Linux系统c语言开发环境
    Linux系统用户管理及VIM配置
  • 原文地址:https://www.cnblogs.com/gmhappy/p/11864113.html
Copyright © 2011-2022 走看看