zoukankan      html  css  js  c++  java
  • EMQ X:集群

    简介

    Erlang/OTP 最初是爱立信为开发电信设备系统设计的编程语言平台,电信设备 (路由器、接入网关...) 典型设计是通过背板连接主控板卡与多块业务板卡的分布式系统。

    Erlang/OTP 语言平台的分布式程序,由分布互联的 Erlang 运行时系统组成,每个 Erlang 运行时系统被称为节点(Node),节点间通过 TCP 两两互联,组成一个网状结构。

    image-20210728151402948

    Erlang 节点由唯一的节点名称标识,节点名称由 @ 分隔的两部分组成:

    <name>@<ip-address>
    

    Erlang 节点间通过 cookie 进行互连认证。cookie 是一个字符串,只有 cookie 相同的两个节点才能建立连接。

    cookie的配置在emqx.conf配置文件中,默认配置如下:

    ## Node name.
    ##
    ## See: http://erlang.org/doc/reference_manual/distributed.html
    ##
    ## Value: <name>@<host>
    ##
    ## Default: emqx@127.0.0.1
    node.name = emqx@127.0.0.1
    
    ## Cookie for distributed node communication.
    ##
    ## Value: String
    node.cookie = emqxsecretcookie
    

    EMQ X 集群协议设置

    Erlang 集群中各节点可通过 TCPv4、TCPv6 或 TLS 方式连接,可在 etc/emqx.conf 中配置连接方式:

    配置名 类型 默认值 描述
    cluster.proto_dist enum inet_tcp 分布式协议,可选值:
    - inet_tcp: 使用 TCP IPv4
    - inet6_tcp: 使用 TCP IPv6
    - inet_tls: 使用 TLS
    node.ssl_dist_optfile 文件路径 etc/ssl_dist.conf cluster.proto_dist 选定为 inet_tls 时,需要配置 etc/ssl_dist.conf 文件,指定 TLS 证书等

    EMQ X 分布式集群设计

    EMQ X 分布式的基本功能是将消息转发和投递给各节点上的订阅者,如下图所示:

    image-20210728152832286

    为实现此过程,EMQ X 维护了几个与之相关的数据结构:订阅表,路由表,主题树。

    订阅表

    主题 - 订阅者

    MQTT 客户端订阅主题时,EMQ X 会维护主题(Topic) -> 订阅者(Subscriber) 映射的订阅表。订阅表只存在于订阅者所在的 EMQ X 节点上,例如:

    node1:
    
        topic1 -> client1, client2
        topic2 -> client3
    
    node2:
    
        topic1 -> client4
    

    路由表

    主题 - 节点

    而同一集群的所有节点,都会复制一份主题(Topic) -> 节点(Node) 映射的路由表,例如:

    topic1 -> node1, node2
    topic2 -> node3
    topic3 -> node2, node4
    

    主题树

    带统配符的主题匹配

    除路由表之外,EMQ X 集群中的每个节点也会维护一份主题树(Topic Trie) 的备份。

    例如下述主题订阅关系:

    客户端 节点 订阅主题
    client1 node1 t/+/x, t/+/y
    client2 node2 t/#
    client3 node3 t/+/x, t/a

    在所有订阅完成时,EMQ X 中会维护如下主题树 (Topic Trie) 和路由表 (Route Table):

    image-20210728153542314

    消息派发过程

    当 MQTT 客户端发布消息时,所在节点会根据消息主题,检索路由表并转发消息到相关节点,再由相关节点检索本地的订阅表并将消息发送给相关订阅者。

    例如 client1 向主题 t/a 发布消息,消息在节点间的路由与派发流程:

    1. client1 发布主题为 t/a 的消息到节点 node1
    2. node1 通过查询主题树,得知 t/a 可匹配到现有的 t/at/# 这两个主题。
    3. node1 通过查询路由表,得知主题 t/a 只在 node3 上有订阅者,而主题 t/# 只在 node2 上有订阅者。故 node1 将消息转发给 node2 和 node3。
    4. node2 收到转发来的 t/a 消息后,查询本地订阅表,获取本节点上订阅了 t/# 的订阅者,并把消息投递给他们。
    5. node3 收到转发来的 t/a 消息后,查询本地订阅表,获取本节点上订阅了 t/a 的订阅者,并把消息投递给他们。
    6. 消息转发和投递结束。

    集群策略

    EMQ X 支持基于 Ekka 库的集群自动发现 (Autocluster)。Ekka 是为 Erlang/OTP 应用开发的集群管理库,支持 Erlang 节点自动发现 (Service Discovery)、自动集群 (Autocluster)、脑裂自动愈合 (Network Partition Autoheal)、自动删除宕机节点 (Autoclean)。

    EMQ X 支持多种节点发现策略:

    策略 说明
    manual 手动命令创建集群
    static 静态节点列表自动集群
    mcast UDP 组播方式自动集群
    dns DNS A 记录自动集群
    etcd 通过 etcd 自动集群
    k8s Kubernetes 服务自动集群

    节点发现策略配置:在/etc/emqx/emqx.confcluster.discovery配置项中

    默认是manual

    [root@localhost emqx]# cat /etc/emqx/emqx.conf | grep cluster.discovery
    cluster.discovery = manual
    

    manual 手动创建集群

    默认配置为手动创建集群,节点须通过 emqx_ctl join 命令加入:

    cluster.discovery = manual
    

    基于 static 节点列表自动集群

    配置固定的节点列表,自动发现并创建集群:

    cluster.discovery = static
    cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1  
    

    基于 mcast 组播自动集群

    基于 UDP 组播自动发现并创建集群:

    cluster.discovery = mcast
    cluster.mcast.addr = 239.192.0.1
    cluster.mcast.ports = 4369,4370
    cluster.mcast.iface = 0.0.0.0
    cluster.mcast.ttl = 255
    cluster.mcast.loop = on
    

    基于 DNS A 记录自动集群

    基于 DNS A 记录自动发现并创建集群:

    cluster.discovery = dns
    cluster.dns.name = localhost
    cluster.dns.app  = ekka
    

    基于 etcd 自动集群

    基于 etcd (opens new window)自动发现并创建集群:

    cluster.discovery = etcd
    cluster.etcd.server = http://127.0.0.1:2379
    cluster.etcd.prefix = emqcl
    cluster.etcd.node_ttl = 1m
    

    基于 kubernetes 自动集群

    Kubernetes (opens new window)下自动发现并创建集群:

    cluster.discovery = k8s
    cluster.k8s.apiserver = http://10.110.111.204:8080
    cluster.k8s.service_name = ekka
    cluster.k8s.address_type = ip
    cluster.k8s.app_name = ekka
        
    

    Kubernetes 不建议使用 Fannel 网络插件,推荐使用 Calico 网络插件。

    manual 集群搭建

    环境:

    节点名 ip地址
    emqx128@192.168.40.128 192.168.40.128
    emqx129@192.168.40.129 192.168.40.129

    配置节点1

    修改配置文件emqx.conf

    使用yum安装方式,默认的集群策略就是manual,不需要修改

    node1节点修改emqx.conf:

    node.name = emqx128@192.168.40.128
    

    防火墙

    如果集群节点间存在防火墙,防火墙需要开启 4369 端口和一个 TCP 端口段。4369 由 epmd 端口映射服务使用,TCP 端口段用于节点间建立连接与通信。

    防火墙设置后,需要在 /etc/emqx/emqx.conf 中配置相同的端口段:

    ## Distributed node port range
    node.dist_listen_min = 6369
    node.dist_listen_max = 7369
    

    启动节点1:

    emqx start
    

    配置节点2

    修改配置文件emqx.conf

    node2节点修改emqx.conf:

    node.name = emqx129@192.168.40.129
    

    启动node2:

    emqx start
    

    组成集群

    在node1上操作:

    [root@localhost emqx]# emqx_ctl cluster join emqx129@192.168.40.129
    =CRITICAL REPORT==== 28-Jul-2021::16:15:39.528620 ===
    [EMQ X] emqx shutdown for join
    Join the cluster successfully.
    Cluster status: #{running_nodes =>
                          ['emqx128@192.168.40.128','emqx129@192.168.40.129'],
                      stopped_nodes => []}
    

    集群搭建成功后,dashboard也可以发现节点的相关信息:

    image-20210728161644837

    集群状态

    [root@localhost emqx]# emqx_ctl cluster status
    Cluster status: #{running_nodes =>
                          ['emqx128@192.168.40.128','emqx129@192.168.40.129'],
                      stopped_nodes => []}
    

    测试集群收发

    这个连接的是node1,并发送消息到testtopic/123主题中

    image-20210728165840815

    这个连接的是node2,并监听testtopic/#,接收到node1发送过来的消息:

    image-20210728165951650

    退出集群

    节点退出集群,两种方式:

    • leave: 让本节点退出集群
    • force-leave: 从集群删除其他节点

    让节点2主动退出集群:

    #在node2上执行
    emqx_ctl cluster leave
    

    或节点1上,从集群删除节点2:

    emqx_ctl cluster force-leave emqx129@192.168.40.129
    

    image-20210728162051574

    集群节点自动清除

    EMQ X 支持从集群自动删除宕机节点 (Autoclean),可在 emqx.conf 中配置:

    cluster.autoclean = 5m
    

    集群脑裂与自动愈合

    EMQ X 支持集群脑裂自动恢复(Network Partition Autoheal),可在 emqx.conf 中配置:

    cluster.autoheal = on
    

    集群脑裂自动恢复流程:

    1. 节点收到 Mnesia 的 inconsistent_database 事件 3 秒后进行集群脑裂确认;
    2. 节点确认集群脑裂发生后,向 Leader 节点 (集群中最早启动节点) 上报脑裂消息;
    3. Leader 节点延迟一段时间后,在全部节点在线状态下创建脑裂视图 (SplitView);
    4. Leader 节点在多数派 (majority) 分区选择集群自愈的 Coordinator 节点;
    5. Coordinator 节点重启少数派 (minority) 分区节点恢复集群。
  • 相关阅读:
    C# 获取存储过程 返回的参数Output
    Blog数据库设计之Tags表设计
    FLASH 加载进度 JS代码
    Asp.net清除数据缓存及页面缓存
    access数据库用sql语句添加字段,修改字段,删除字段,类型转换
    获取资源文件
    C# 如何获取当前项目运行路径的父目录?
    深入理解 __doPostBack
    LookUpEdit How update binding source immediately after selection?
    C#动态加载DLL
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/15071416.html
Copyright © 2011-2022 走看看