RabbitMQ介绍
什么是RabbitMQ
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一exchange即可,剩下的消息分发工作由RabbitMQ完成。
单向解耦
“Producer”--
|
|----->"RabbitMQ Clusters" ---> “Consumer”
"Producer"--
双向解耦(如:RPC)
“Producer1”-->
|
|<----->"RabbitMQ Clusters" <---> “Consumer2&Producer2”
"Consumer1"<--
概念和特性
交换机(exchange)
1.接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout
- direct:转发消息到routigKey指定的队列
- topic:按规则转发消息(最灵活)
- headers:基于首部进行路由
- fanout:转发消息到所有绑定队列
2.如果交换机上(Exchange)和(Queue)是多对多的关系。
3.topic类型交换器通过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。、
支持表达式:
*.1.geewu #只要包含1.geewu就可以匹配相关信息。这个是topic,性能最慢
hello #这个是direct,性能最好。
4.因为交换器是在RabbitMQ是一个实际存在的实体,不能被改变。只能删除之后,重新创建。
5.交换器的属性:
- 持久性:如果启用,交换器将会在server重启前都有效。(对应Duration属性,持久化)
- 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。(创建时候设置,如果不设置不会自动删除)。
- 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。(不会自动创建)
队列(queue)
1.队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。
2.临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。
队列的属性:
- 持久性:如果启用,队列将会在server重启前都有效。
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
- 排他性:如果启用,队列只能被声明它的消费者使用。
消息传递
- 消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。
- 为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。 channel.basic_qos(prefetch_count=1) 注意:效率非常低,不能使用客户端缓存。
消息有14个属性,最常用的几种:
- deliveryMode:持久化属性
- contentType:编码
- replyTo:指定一个回调队列
- correlationId:消息id
在client代码中,send方法时候,可以设置mandatory和immediate。设置mandatory:发送到交换器并且还未投递到队列(没有绑定器存在)得到通知。设置immediate:没有消费者能够立即处理的时候得到通知。这些投递保障机制,保证了消息可靠性。
在client代码中,send方法时候persistent属性为true。数据就会被保存到队列中,但是必须Exchange,Queue,Client三者都设置为存储状态。
高可用性(HA)
1.消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
channel.basicConsume(queuename, noAck=false, consumer);
2.消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?) channel.queueDeclare(queuename, durable=true, false, false, null); 发送消息时可以指定消息持久化属性:这样,即使RabbitMQ服务器重启,也不会丢失队列和消息。
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
3.publisher confirms 提供批量确认消息的方法。
4.master/slave机制,配合Mirrored Queue。Mirrored Queue通过policy和rabbitmqctl设置可以实现。具体可以参考Rabbitmq官方文档。在Mirrored Queue下,无论Producer和Consumer连接那个RabbitMq服务器,都跟连接同一个RabbitMQ上,消费和生产数据会被同步。注意:Mirrored Queue会严重的消耗性能,性能会下降到原来的1/5。当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。
(通过命令行或管理插件可以查看哪个slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
集群(cluster)
1.不支持跨网段,因为RabbitMQ底层是Erlang,会导致脑裂(Slave Node感觉Master Node死掉了,主Master Node觉得Slave2 Node死掉了,结果数据无法复制,系统逻辑出现问题)(如需支持,需要shovel或federation插件)
2.可以随意的动态增加或减少、启动或停止节点,允许节点故障。(但是数据同步会造成Queue服务暂停,所有的Producer和Consumer都被终止)
3.集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
4.集群的配置可以通过命令行,也可以通过配置文件,命令行优先。
安装与使用
安装服务
由于RabbitMQ主要依赖Erlang所以如果手动安装的话这个需要注意,在下面的的实验中是用CentOS 7系统的yum来进行安装。在安装前需要配置好epel源。
yum install -y rabbitmq-server
然而安装后需要注意一个问题就是需要做下主机名的解析,如果不做则会导致RabbitMQ无法启动,配置如下:
主机1:
echo 192.168.100.51 study-1 >> /etc/hosts
主机2:
echo 192.168.100.52 study-2 >> /etc/hosts
启动服务:
systemctl start rabbitmq-server.service
启动图形界面
# rabbitmq-plugins enable rabbitmq_management
# systemctl restart rabbitmq-server.service
通过上面的操作后在浏览器输入主机的IP:15672,例如:http://192.168.100.51:15672/
默认用户为guest密码是guest。
用户管理
[root@study-1 ~]# rabbitmqctl add_user admin password
Creating user "admin" ...
...done.
[root@study-1 ~]# rabbitmqctl list_users
Listing users ...
admin []
guest [administrator]
...done.
[root@study-1 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
...done.
权限管理
[root@study-1 ~]# rabbitmqctl list_user_permissions admin
Listing permissions for user "admin" ...
...done.
[root@study-1 ~]# rabbitmqctl list_user_permissions guest
Listing permissions for user "guest" ...
/ .* .* .*
...done.
[root@study-1 ~]# rabbitmqctl list_permissions
Listing permissions in vhost "/" ...
guest .* .* .*
...done.
[root@study-1 ~]# rabbitmqctl set_permissions -p /study1 admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/study1" ...
...done.
[root@study-1 ~]# rabbitmqctl list_permissions -p /study1
Listing permissions in vhost "/study1" ...
admin .* .* .*
...done.
虚拟主机管理
[root@study-1 ~]# rabbitmqctl add_vhost /study1
Creating vhost "/study1" ...
...done.
[root@study-1 ~]# rabbitmqctl add_vhost /study1/lesson1
Creating vhost "/study1/lesson1" ...
...done.
[root@study-1 ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
/study1
/study1/lesson1
...done.
[root@study-1 ~]# rabbitmqctl delete_vhost /study1/lesson1
Deleting vhost "/study1/lesson1" ...
...done.
[root@study-1 ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
/study1
...done.
其他
broker查看
]# rabbitmqctl status
环境变量查看
]# rabbitmqctl environment
设定内存的高水位标记:
]# rabbitmqctl set_vm_memory_high_watermark <fraction>
关闭指定的连接
]# rabbitmqctl close_connection <connectionpid> <explanation>
HA环境搭建
配置之前需要将study-1主机上的/var/lib/rabbitmq/.erlang.cookie
文件复制到study-2上。
[root@study-2 ~]# rabbitmqctl stop_app
Stopping node 'rabbit@study-2' ...
...done.
[root@study-2 ~]# rabbitmqctl join_cluster rabbit@study-1
Clustering node 'rabbit@study-2' with 'rabbit@study-1' ...
...done.
[root@study-2 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@study-2' ...
[{nodes,[{disc,['rabbit@study-1','rabbit@study-2']}]}]
...done.
[root@study-2 ~]# rabbitmqctl start_app
Starting node 'rabbit@study-2' ...
...done.
[root@study-2 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@study-2' ...
[{nodes,[{disc,['rabbit@study-1','rabbit@study-2']}]},
{running_nodes,['rabbit@study-1','rabbit@study-2']},
{cluster_name,<<"rabbit@study-1">>},
{partitions,[]}]
...done.
通过上面的步骤就已经将study-2加入到study-1中,然后我们还可以在前端使用HAProxy设置一个调度器,将请求调度至后端的RabbitMQ集群中。