zoukankan      html  css  js  c++  java
  • 消息队列-RabbitMQ

    一:RabbitMQ

    组件原理图:

    生产者发送消息到 broker server(RabbitMQ), 在 Broker 内部,用户创建 Exchange/Queue,通过 Binding 规则将两者联系在一起,Exchange 分发消息, 根据类型/binding 的不同分发策略有区别,消息最后来到 Queue 中,等待消费者取走。 

    组件介绍:

    Broker: 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
    Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念,当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost创建 exchange/queue 等。
    Connection: publisher/consumer 和 broker 之间的 TCP 连接。 
    Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker识别 channel,
    所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销。 Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point
    -to-point), topic (publish subscribe) and fanout (multicast)。 Queue: 消息最终被送到这里等待 consumer 取走。 Binding: exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

    https://www.rabbitmq.com/download.html #官网下载地址

    https://github.com/rabbitmq/rabbitmq-server/releases #github 下载地址

    二、单机部署

    安装包:rabbitmq-server_3.7.22-1_all.deb

    要求:

    添加主机解析:hostname,运行之后不能修改域名

     快速安装:https://www.rabbitmq.com/install-debian.html

    2.1:登陆 web 管理界面

    默认不允许:

     允许方式: 

    #vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
    39 {loopback_users, []}, #删除被禁止登陆的 guest 账户 # systemctl restart rabbitmq-server.service #重启 rabbitmq 服务

    三、RabbitMQ 集群部署

    普通模式:创建好 RabbitMQ 之后的默认模式。

    (HA)镜像模式:把需要的队列做成镜像队列。

    消息实体会主动在镜像节点间同步,而不是在 consumer 取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。

    3.1 修改hosts主机名

    192.168.134.192 rabbitmq-server-1
    192.168.134.193 rabbitmq-server-2
    192.168.134.194 rabbitmq-server-3

    3.2 安装rabbitmq

    sudo apt-get update -y
    
    ## Install prerequisites
    sudo apt-get install curl gnupg -y
    
    ## Install RabbitMQ signing key
    curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
    
    ## Install apt HTTPS transport
    sudo apt-get install apt-transport-https
    
    ## Add Bintray repositories that provision latest RabbitMQ and Erlang 21.x releases
    sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF
    ## Installs the latest Erlang 22.x release.
    ## Change component to "erlang-21.x" to install the latest 21.x version.
    ## "bionic" as distribution name should work for any later Ubuntu or Debian release.
    ## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
    deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang
    deb https://dl.bintray.com/rabbitmq/debian bionic main
    EOF
    
    ## Update package indices
    sudo apt-get update -y
    
    ## Install rabbitmq-server and its dependencies
    sudo apt-get install rabbitmq-server -y --fix-missing

    3.3:同步cookie文件

    https://www.rabbitmq.com/clustering.html

    scp /var/lib/rabbitmq/.erlang.cookie 192.168.134.193:/var/lib/rabbitmq/.erlang.cookie
    scp /var/lib/rabbitmq/.erlang.cookie 192.168.134.194:/var/lib/rabbitmq/.erlang.cookie

    3.4:开启插件管理

    # vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.22/ebin/rabbit.app 
    39   {loopback_users, []},  #删除被禁止登陆的 guest 账户 
    # systemctl restart rabbitmq-server.service #重启 rabbitmq 服务 

    集群中有两种节点类型:

    内存节点:只将数据保存到内存

    磁盘节点:保存数据到内存和磁盘。

    内存节点虽然不写入磁盘,但是它执行比磁盘节点要好,集群中,只需要一个 磁盘节点来保存数据就足够了如果集群中只有内存节点,那么不能全部停止它

    们,否则所有数据消息在服务器全部停机之后都会丢失。 

    3.5 添加节点

    在一个 rabbitmq 集群里,有 3 台。其中 1 台使用磁盘模式,其它节 点使用内存模式,内存节点无访问速度更快,由于磁盘 IO 相对较慢,因此可作 为数据备份使用。

    在 rabbitmq-server-1 作为内存节点连接起来,并作为内存节点,在 rabbitmq-server-1 执行以下命令:

    rabbitmqctl  stop_app    #停止 app 服务 
    rabbitmqctl   reset      #清空元数据 
    #将rabbitmq-server-1 添加到集群当中,并成为内存节点,不加--ram 默认是磁盘节点
    root@rabbitmq-server-1:~#  rabbitmqctl  join_cluster rabbit@rabbitmq-server-3 --ram 
    Clustering node rabbit@rabbitmq-server-1 with rabbit@rabbitmq-server-3
    rabbitmqctl  start_app   #启动 app 服务 

    在  rabbitmq-server-2 作为内存节点连接起来,并作为内存节点,在  rabbitmq-server-2 执行以下命令:

    root@rabbitmq-server-2:~#  rabbitmqctl  stop_app 
    Stopping rabbit application on node rabbit@rabbitmq-server-2 ...
    root@rabbitmq-server-2:~# rabbitmqctl  reset 
    Resetting node rabbit@rabbitmq-server-2 ...
    root@rabbitmq-server-2:~# rabbitmqctl  join_cluster rabbit@rabbitmq-server-3 --ram
    Clustering node rabbit@rabbitmq-server-2 with rabbit@rabbitmq-server-3
    root@rabbitmq-server-2:~# rabbitmqctl  start_app 
    Starting node rabbit@rabbitmq-server-2 ...
     completed with 3 plugins.

    验证

    3.6 将集群设置为镜像模式: 

    只要在其中一台节点执行以下命令即可:

    root@rabbitmq-server-1:~#  rabbitmqctl set_policy  ha-all "#"  '{"ha-mode":"all"}' 

     四、RabbitMQ 常用命令

    rabbitmqctl  add_vhost xxx   #创建 vhost
    rabbitmqctl  list_vhosts   #列出所有 vhost 
    rabbitmqctl  list_queues   #列出所有队列 
    rabbitmqctl  add_user jack 123456   #添加账户 jack 密码为 123456 
    rabbitmqctl  change_password jack 654321  Changing   #更改用户密码 

    五、使用python处理rabbitmq

    监控rabbitmq是否正常

    import subprocess
    
    running_list = []
    error_list = []
    false = "false"
    true = "true"
    
    
    def get_status():
        obj = subprocess.Popen(("curl -s -u guest:guest  http://localhost:15672/api/nodes &> /dev/null"), shell=True,
                               stdout=subprocess.PIPE)
        data = obj.stdout.read()
        data1 = eval(data)
        #print(data1)
        for i in data1:
            if i.get("running") == "true":
                running_list.append(i.get("name"))
            else:
                error_list.append(i.get("name"))
    
    
    def count_server():
        if len(running_list) < 3:  # 可以判断错误列表大于0或者运行列表小于3,3未总计的节点数量
            print(100)  # 100就是集群内有节点运行不正常了
        else:
            print(50)  # 50为所有节点全部运行正常
    
    
    def main():
        get_status()
        count_server()
    
    
    if __name__ == "__main__":
        main()

  • 相关阅读:
    二分查找
    Uva11464 Even Parity
    Uva10881 Piotr's Ants
    POJ3154 Graveyard
    [NOIP2015] 提高组 洛谷P2680 运输计划
    [NOIP2015] 提高组 洛谷P2679 子串
    [NOIP2015] 提高组 洛谷P2678 跳石头
    [NOIP2015] 提高组 洛谷P2668 斗地主
    [NOIP2015] 提高组 洛谷P2661 信息传递
    [NOIP2015] 提高组 洛谷P2615 神奇的幻方
  • 原文地址:https://www.cnblogs.com/lummg-DAY/p/12608333.html
Copyright © 2011-2022 走看看