一:RabbitMQ
组件原理图:
生产者发送消息到 broker server(RabbitMQ), 在 Broker 内部,用户创建 Exchange/Queue,通过 Binding 规则将两者联系在一起,Exchange 分发消息, 根据类型/binding 的不同分发策略有区别,消息最后来到 Queue 中,等待消费者取走。
组件介绍:
Broker: 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念,当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost创建 exchange/queue 等。 Connection: publisher/consumer 和 broker 之间的 TCP 连接。 Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker识别 channel,
所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销。 Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish subscribe) and fanout (multicast)。 Queue: 消息最终被送到这里等待 consumer 取走。 Binding: exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
https://www.rabbitmq.com/download.html #官网下载地址
https://github.com/rabbitmq/rabbitmq-server/releases #github 下载地址
二、单机部署
安装包:rabbitmq-server_3.7.22-1_all.deb
要求:
添加主机解析:hostname,运行之后不能修改域名
快速安装:https://www.rabbitmq.com/install-debian.html
2.1:登陆 web 管理界面
默认不允许:
允许方式:
#vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
39 {loopback_users, []}, #删除被禁止登陆的 guest 账户 # systemctl restart rabbitmq-server.service #重启 rabbitmq 服务
三、RabbitMQ 集群部署
普通模式:创建好 RabbitMQ 之后的默认模式。
(HA)镜像模式:把需要的队列做成镜像队列。
消息实体会主动在镜像节点间同步,而不是在 consumer 取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。
3.1 修改hosts主机名
192.168.134.192 rabbitmq-server-1 192.168.134.193 rabbitmq-server-2 192.168.134.194 rabbitmq-server-3
3.2 安装rabbitmq
sudo apt-get update -y ## Install prerequisites sudo apt-get install curl gnupg -y ## Install RabbitMQ signing key curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add - ## Install apt HTTPS transport sudo apt-get install apt-transport-https ## Add Bintray repositories that provision latest RabbitMQ and Erlang 21.x releases sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF ## Installs the latest Erlang 22.x release. ## Change component to "erlang-21.x" to install the latest 21.x version. ## "bionic" as distribution name should work for any later Ubuntu or Debian release. ## See the release to distribution mapping table in RabbitMQ doc guides to learn more. deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang deb https://dl.bintray.com/rabbitmq/debian bionic main EOF ## Update package indices sudo apt-get update -y ## Install rabbitmq-server and its dependencies sudo apt-get install rabbitmq-server -y --fix-missing
3.3:同步cookie文件
https://www.rabbitmq.com/clustering.html
scp /var/lib/rabbitmq/.erlang.cookie 192.168.134.193:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie 192.168.134.194:/var/lib/rabbitmq/.erlang.cookie
3.4:开启插件管理
# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.22/ebin/rabbit.app 39 {loopback_users, []}, #删除被禁止登陆的 guest 账户 # systemctl restart rabbitmq-server.service #重启 rabbitmq 服务
集群中有两种节点类型:
内存节点:只将数据保存到内存
磁盘节点:保存数据到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好,集群中,只需要一个 磁盘节点来保存数据就足够了如果集群中只有内存节点,那么不能全部停止它
们,否则所有数据消息在服务器全部停机之后都会丢失。
3.5 添加节点
在一个 rabbitmq 集群里,有 3 台。其中 1 台使用磁盘模式,其它节 点使用内存模式,内存节点无访问速度更快,由于磁盘 IO 相对较慢,因此可作 为数据备份使用。
在 rabbitmq-server-1 作为内存节点连接起来,并作为内存节点,在 rabbitmq-server-1 执行以下命令:
rabbitmqctl stop_app #停止 app 服务 rabbitmqctl reset #清空元数据 #将rabbitmq-server-1 添加到集群当中,并成为内存节点,不加--ram 默认是磁盘节点 root@rabbitmq-server-1:~# rabbitmqctl join_cluster rabbit@rabbitmq-server-3 --ram Clustering node rabbit@rabbitmq-server-1 with rabbit@rabbitmq-server-3 rabbitmqctl start_app #启动 app 服务
在 rabbitmq-server-2 作为内存节点连接起来,并作为内存节点,在 rabbitmq-server-2 执行以下命令:
root@rabbitmq-server-2:~# rabbitmqctl stop_app Stopping rabbit application on node rabbit@rabbitmq-server-2 ... root@rabbitmq-server-2:~# rabbitmqctl reset Resetting node rabbit@rabbitmq-server-2 ... root@rabbitmq-server-2:~# rabbitmqctl join_cluster rabbit@rabbitmq-server-3 --ram Clustering node rabbit@rabbitmq-server-2 with rabbit@rabbitmq-server-3 root@rabbitmq-server-2:~# rabbitmqctl start_app Starting node rabbit@rabbitmq-server-2 ... completed with 3 plugins.
验证
3.6 将集群设置为镜像模式:
只要在其中一台节点执行以下命令即可:
root@rabbitmq-server-1:~# rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}'
四、RabbitMQ 常用命令
rabbitmqctl add_vhost xxx #创建 vhost rabbitmqctl list_vhosts #列出所有 vhost rabbitmqctl list_queues #列出所有队列 rabbitmqctl add_user jack 123456 #添加账户 jack 密码为 123456 rabbitmqctl change_password jack 654321 Changing #更改用户密码
五、使用python处理rabbitmq
监控rabbitmq是否正常
import subprocess running_list = [] error_list = [] false = "false" true = "true" def get_status(): obj = subprocess.Popen(("curl -s -u guest:guest http://localhost:15672/api/nodes &> /dev/null"), shell=True, stdout=subprocess.PIPE) data = obj.stdout.read() data1 = eval(data) #print(data1) for i in data1: if i.get("running") == "true": running_list.append(i.get("name")) else: error_list.append(i.get("name")) def count_server(): if len(running_list) < 3: # 可以判断错误列表大于0或者运行列表小于3,3未总计的节点数量 print(100) # 100就是集群内有节点运行不正常了 else: print(50) # 50为所有节点全部运行正常 def main(): get_status() count_server() if __name__ == "__main__": main()