zoukankan      html  css  js  c++  java
  • Rabbitmq.md

    RabbitMQ介绍

    什么是RabbitMQ

    RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
    RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
    例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一exchange即可,剩下的消息分发工作由RabbitMQ完成。

    单向解耦

    “Producer”--
               |
               |----->"RabbitMQ Clusters" ---> “Consumer”
    "Producer"--
    

    双向解耦(如:RPC)

    “Producer1”-->
               |
               |<----->"RabbitMQ Clusters" <---> “Consumer2&Producer2”
    "Consumer1"<--
    

    概念和特性

    交换机(exchange)

    1.接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout

    • direct:转发消息到routigKey指定的队列
    • topic:按规则转发消息(最灵活)
    • headers:基于首部进行路由
    • fanout:转发消息到所有绑定队列

    2.如果交换机上(Exchange)和(Queue)是多对多的关系。

    3.topic类型交换器通过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。、

    支持表达式:
        *.1.geewu  #只要包含1.geewu就可以匹配相关信息。这个是topic,性能最慢
        hello   #这个是direct,性能最好。
    

    4.因为交换器是在RabbitMQ是一个实际存在的实体,不能被改变。只能删除之后,重新创建。

    5.交换器的属性:

    • 持久性:如果启用,交换器将会在server重启前都有效。(对应Duration属性,持久化)
    • 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。(创建时候设置,如果不设置不会自动删除)。
    • 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。(不会自动创建)

    队列(queue)

    1.队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。

    2.临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。

    队列的属性:

    • 持久性:如果启用,队列将会在server重启前都有效。
    • 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
    • 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
    • 排他性:如果启用,队列只能被声明它的消费者使用。

    消息传递

    • 消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。
    • 为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。 channel.basic_qos(prefetch_count=1) 注意:效率非常低,不能使用客户端缓存。

    消息有14个属性,最常用的几种:

    • deliveryMode:持久化属性
    • contentType:编码
    • replyTo:指定一个回调队列
    • correlationId:消息id

    在client代码中,send方法时候,可以设置mandatory和immediate。设置mandatory:发送到交换器并且还未投递到队列(没有绑定器存在)得到通知。设置immediate:没有消费者能够立即处理的时候得到通知。这些投递保障机制,保证了消息可靠性。

    在client代码中,send方法时候persistent属性为true。数据就会被保存到队列中,但是必须Exchange,Queue,Client三者都设置为存储状态。

    高可用性(HA)

    1.消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

    channel.basicConsume(queuename, noAck=false, consumer);
    

    2.消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?) channel.queueDeclare(queuename, durable=true, false, false, null); 发送消息时可以指定消息持久化属性:这样,即使RabbitMQ服务器重启,也不会丢失队列和消息。

    channel.basicPublish(exchangeName, routingKey,
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());
    

    3.publisher confirms 提供批量确认消息的方法。

    4.master/slave机制,配合Mirrored Queue。Mirrored Queue通过policy和rabbitmqctl设置可以实现。具体可以参考Rabbitmq官方文档。在Mirrored Queue下,无论Producer和Consumer连接那个RabbitMq服务器,都跟连接同一个RabbitMQ上,消费和生产数据会被同步。注意:Mirrored Queue会严重的消耗性能,性能会下降到原来的1/5。当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。
    (通过命令行或管理插件可以查看哪个slave是同步的:

    rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
    

    集群(cluster)

    1.不支持跨网段,因为RabbitMQ底层是Erlang,会导致脑裂(Slave Node感觉Master Node死掉了,主Master Node觉得Slave2 Node死掉了,结果数据无法复制,系统逻辑出现问题)(如需支持,需要shovel或federation插件)
    2.可以随意的动态增加或减少、启动或停止节点,允许节点故障。(但是数据同步会造成Queue服务暂停,所有的Producer和Consumer都被终止)
    3.集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
    4.集群的配置可以通过命令行,也可以通过配置文件,命令行优先。

    安装与使用

    安装服务

    由于RabbitMQ主要依赖Erlang所以如果手动安装的话这个需要注意,在下面的的实验中是用CentOS 7系统的yum来进行安装。在安装前需要配置好epel源。

    yum install -y rabbitmq-server
    

    然而安装后需要注意一个问题就是需要做下主机名的解析,如果不做则会导致RabbitMQ无法启动,配置如下:

    主机1:

    echo 192.168.100.51 study-1 >> /etc/hosts
    

    主机2:

    echo 192.168.100.52 study-2 >> /etc/hosts
    

    启动服务:

    systemctl start rabbitmq-server.service
    

    启动图形界面

    # rabbitmq-plugins enable rabbitmq_management
    # systemctl restart rabbitmq-server.service
    

    通过上面的操作后在浏览器输入主机的IP:15672,例如:http://192.168.100.51:15672/默认用户为guest密码是guest。

    用户管理

    [root@study-1 ~]# rabbitmqctl add_user admin password
    Creating user "admin" ...
    ...done.
    [root@study-1 ~]# rabbitmqctl list_users
    Listing users ...
    admin	[]
    guest	[administrator]
    ...done.
    [root@study-1 ~]# rabbitmqctl set_user_tags admin administrator
    Setting tags for user "admin" to [administrator] ...
    ...done.
    

    权限管理

    [root@study-1 ~]# rabbitmqctl list_user_permissions admin
    Listing permissions for user "admin" ...
    ...done.
    [root@study-1 ~]# rabbitmqctl list_user_permissions guest
    Listing permissions for user "guest" ...
    /	.*	.*	.*
    ...done.
    [root@study-1 ~]# rabbitmqctl list_permissions 
    Listing permissions in vhost "/" ...
    guest	.*	.*	.*
    ...done.
    [root@study-1 ~]# rabbitmqctl set_permissions -p /study1 admin ".*" ".*" ".*"
    Setting permissions for user "admin" in vhost "/study1" ...
    ...done.
    [root@study-1 ~]# rabbitmqctl list_permissions -p /study1
    Listing permissions in vhost "/study1" ...
    admin	.*	.*	.*
    ...done.
    

    虚拟主机管理

    [root@study-1 ~]# rabbitmqctl add_vhost /study1
    Creating vhost "/study1" ...
    ...done.
    [root@study-1 ~]# rabbitmqctl add_vhost /study1/lesson1
    Creating vhost "/study1/lesson1" ...
    ...done.
    [root@study-1 ~]# rabbitmqctl list_vhosts
    Listing vhosts ...
    /
    /study1
    /study1/lesson1
    ...done.
    [root@study-1 ~]# rabbitmqctl delete_vhost /study1/lesson1
    Deleting vhost "/study1/lesson1" ...
    ...done.
    [root@study-1 ~]# rabbitmqctl list_vhosts
    Listing vhosts ...
    /
    /study1
    ...done.
    

    其他

    broker查看

    ]# rabbitmqctl status
    

    环境变量查看

    ]# rabbitmqctl environment
    

    设定内存的高水位标记:

    ]# rabbitmqctl set_vm_memory_high_watermark <fraction>
    

    关闭指定的连接

    ]# rabbitmqctl close_connection <connectionpid> <explanation>
    

    HA环境搭建

    配置之前需要将study-1主机上的/var/lib/rabbitmq/.erlang.cookie文件复制到study-2上。

    [root@study-2 ~]# rabbitmqctl stop_app
    Stopping node 'rabbit@study-2' ...
    ...done.
    [root@study-2 ~]# rabbitmqctl join_cluster rabbit@study-1
    Clustering node 'rabbit@study-2' with 'rabbit@study-1' ...
    ...done.
    [root@study-2 ~]# rabbitmqctl cluster_status
    Cluster status of node 'rabbit@study-2' ...
    [{nodes,[{disc,['rabbit@study-1','rabbit@study-2']}]}]
    ...done.
    [root@study-2 ~]# rabbitmqctl start_app
    Starting node 'rabbit@study-2' ...
    ...done.
    [root@study-2 ~]# rabbitmqctl cluster_status
    Cluster status of node 'rabbit@study-2' ...
    [{nodes,[{disc,['rabbit@study-1','rabbit@study-2']}]},
     {running_nodes,['rabbit@study-1','rabbit@study-2']},
     {cluster_name,<<"rabbit@study-1">>},
     {partitions,[]}]
    ...done.
    

    通过上面的步骤就已经将study-2加入到study-1中,然后我们还可以在前端使用HAProxy设置一个调度器,将请求调度至后端的RabbitMQ集群中。

  • 相关阅读:
    Dart Learn Notes 04
    Dart Learn Notes 03
    Dart Learn Notes 02
    一介书生,仅此而已
    计算机技术的演进及编程语言的多样
    C#方法(用法,参数)
    C#数组--(Array类的属性和方法)
    C#数组--(一维数组,二维数组的声明,使用及遍历)
    程序设计的编程方法
    C#流程控制语句--跳转语句(break,continue,goto,return,)
  • 原文地址:https://www.cnblogs.com/cuchadanfan/p/6791751.html
Copyright © 2011-2022 走看看