zoukankan      html  css  js  c++  java
  • RabbitMQ介绍2

    这一节介绍RabbitMQ的一些概念,当然也是AMQP协议的概念。官方网站也有详细解释,包括协议的命令: http://www.rabbitmq.com/tutorials/amqp-concepts.html

    消息通信拓扑结构概念:生产者producer,消费者consumer,队列queue,交换器exchange,路由键routing key,绑定键binding key。

    消息传播过程:producer发布消息给交换器,交换器根据路由规则,将消息放到绑定的队列,消费者从队列中得到消息。

    clip_image001_thumb

    Queue。队列是唯一存储消息的地方,一条消息路由后要么进入队列,要么被丢弃。

    RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

    clip_image002[3]

    多个消费者可以订阅同一个Queue,这时Queue中的消息会被按照round-robin分摊给多个消费者,而不是每个消费者都收到所有的消息。

    clip_image003[3]

    Prefetch count。消费者从队列中订阅消息的时候,RabbitMQ会把队列中的全部消息推送到消费者,然后等待确认。通过设置prefetch count可以限定消息确认前最多发送给消费者的消息数量。在有多个消费者订阅一个队列的时候,设置这个参数可以帮助实现负载平衡。注意这里提到的订阅是持续订阅,通过basic.get进行的单次订阅不在此列。

    创建队列。生产者和消费者都可以使用queue.declare创建队列,如果消费者在同一条信道上订阅了另一个队列的话,就无法再创建队列了。创建队列时如果不指定队列的名称,RabbitMQ会分配一个随机名称,并在queue.declare的结果中返回。创建队列如果已经存在,只要声明参数和现存队列完全一样,RabbitMQ什么都不做,创建过程成功返回,如果参数不匹配,会抛出异常。如果想检查队列是否存在,创建时设置参数passive为true,如果队列存在,创建成功返回,否则返回错误。

    队列属性:exclusive,如果为true的话,只有你的程序才能消费队列。Auto-delete,如果为true的话,在最后一个消费者取消订阅的时候,队列会自动删除。

    Exchange。生成者将消息发送到exchange,exchange根据消息的routing key,路由消息到相应的队列。不同的交换器类型exchange type提供不同的路由方案,exchange type有4种:direct,fanout,topic和headers。

    1. direct。消息的routing key需要精确匹配binding key。

    clip_image004_thumb

    当生产者(P)发送的消息Rotuing key=booking时,发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果以Rotuing key=create或Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。

    注意RabbitMQ有一个默认Exchange,类型是direct,没有办法绑定一个queue到这个exchange,但是通过指定routing key,可以发送消息到同名的queue。

    clip_image005_thumb

    2. fanout。这个类型的exchange会发送消息到所有绑定在上面的queue,routing key和binding key不起作用。

    clip_image006_thumb

    生产者(P)生产消息1将消息1推送到Exchange,Exchange将消息推送到所有与它绑定Queue,最后两个消费者都会收到消息。

    3. topic。Binding key可以使用*和#来对routing key进行模糊匹配。

    • 点号“. ”分隔的每一段字符串称为一个单词,如“quick.orange.rabbit”包含3个单词quick,orange,rabbit。点分割的部分可以为空,比如bindingkey=*.*的可以匹配routingkey是"."的消息,但是bindingkey=“*”无法匹配routingkey是空的消息。可以简单理解,先将routingkey按照stringsplit方法用"."分割,保留空白字符,*表示匹配分割后的一个位置,#表示匹配0…n个位置。其余字符需要精确匹配。
    • *表示一个单词,#表示0或多个单词。
    • binding key使用“*”与“#”来模糊匹配routing key。routing key中的“*”或“#”当做普通字符处理。

      clip_image007_thumb

      当生产者发送消息Routing Key=F.C.E时,只会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同时路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中

      Will "*" binding catch a message sent with an empty routing key? 不会

      Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key? 都会

      How different is "a.*.#" from "a.#"? 前者routingkey要有"a.",后者只要有"a"就能匹配。

    4. headers。headers交换器允许你匹配AMQP消息的header而非路由键,除此之外,和direct交换器完全一样,但是性能会差很多,因此并不太实用,而且几乎再也用不到了。使用方法:在绑定的时候提供一组键值对,如果消息的header(也是一组键值对)和其完全匹配,则路由消息到队列。

    网络连接概念:ConnectionFactory, connection, channel信道。connectionFactory用来建立连接,connection表示一个TCP连接,channel是一个建立在connection上的虚拟连接。每条信道通过一个唯一的ID标记,发布消息、绑定、消费消息这些操作都是在信道上完成。connection和channel的关系类似于进程和线程。

    问题:在同一个channel能订阅多个queue吗?生产者和消费者能使用同一个channel吗?同一个channel能给多个exchange发消息吗?

    消息包括有效荷载payload和标签label。payload是需要传输的数据,可以是任何东西(C#编程的时候就是一个byte数组),label是传输数据的一些描述,比如routing key,持久化delivery mode。

    消息持久化。exchange,queue,message都可以设置是否持久化durable,如果是持久化的,服务重启后会重建exchange、queue,消息会放到原来的queue里,重新发送给消费者。要注意的是,只有这3个都是持久化的,消息才会在RabbitMQ重启后还保留,任何一个环节不是持久化的,消息都不会恢复,也就是丢失了。持久化会可能会使吞吐量降低10倍(采用SSD存储可以大大缓解),而且在集群环境持久化队列不会在其它节点上重建。如果集群中一个节点崩溃,其上的持久化队列便从集群中消失了,在这个节点重启前,所有发送到这些队列的消息都会丢失。作为对比,非持久化队列在节点崩溃后,会在其它节点重建,消息便能发送到重建的队列上。所有,如果不是必须保留消息,不要用持久化。

    消息确认。消费者接收到的每一条消息都必须进行确认。消费者必须通过AMQP的basic.ack命令显式地向RabbitMQ发送一个确认,或者在订阅到队列的时候设置auto_ack为true。auto_ack为true时,一旦消费者接收消息,RabbitMQ会自动认为其进行了确认。只有确认过的消息,才会从队列中删除。如果消费者接收了消息,但没有进行确认,消息会进入等待确认状态,这时候如果消费者的连接断开,RabbitMQ会清除这些消息的状态,并发送给队列的其它消费者。如果消费者一直不进行确认,消息会堆积在队列中,不会超时。消费者也可以明确拒绝一个消息(basic.reject命令),拒绝的时候如果requeue参数为true,消息会重新发送给下一个订阅的消费者,requeue为false时会从队列中删除消息,这时候的效果和发送确认类似,不同的是2.0以后版本的RabbitMQ会维护被拒绝的消息队列,以便后续调查。

    AMQP事务transaction。由于发布消息不返回任何信息给生产者,你怎么知道服务器是否已经保存了持久化消息到硬盘呢?事务用来确认发送者的消息已经进入了队列(或者被丢弃)。在把信道设置为事务模式后,这条信道上的消息将按照串行方式发送,只有前面的消息确认成功了,才会执行后面的,直到提交事务,结束信道的事务模式。事务会极大的降低RabbitMQ的吞吐量(2~10倍)。

    发送方确认模式。这是RabbitMQ的概念,不属于AMQP。类似AMQP事务的功能,但是不需要串行,极大的提高了吞吐量。使用方法:将信道设置为confirm模式,只有重新创建信道才能关闭该设置。所有信道上发送的消息都会指派一个唯一的ID(从1开始),一旦消息投递成功,信道会发送一个确认给生产者。生产者通过回调接收信道的确认消息(如果消息丢失,会收到nack),与此同时,生产者也可以继续发送消息。由于没有事务的消息回滚,发送方确认模式更加轻量,对RabbitMQ服务器的影响也可以忽略不计。

    我们没有办法得到信道指派的这个ID(basic_publish不会返回ID信息),那怎么确认信道确认消息中的ID具体指的是哪个消息呢?方法是发送者自己维护消息和ID的关系。一个信道一般只有一个线程使用,ID从1开始,每次递增1,发送者完全可以自己算出消息的ID。

  • 相关阅读:
    随机森林算法参数调优
    BAYES和朴素BAYES
    阿里云 金融接口 token PHP
    PHP mysql 按时间分组 表格table 跨度 rowspan
    MySql按周,按月,按日分组统计数据
    PHP 获取今日、昨日、本周、上周、本月的等等常用的起始时间戳和结束时间戳的时间处理类
    thinkphp5 tp5 会话控制 session 登录 退出 检查检验登录 判断是否应该跳转到上次url
    微信 模板消息
    php 腾讯 地图 api 计算 坐标 两点 距离 微信 网页 WebService API
    php添加http头禁止浏览器缓存
  • 原文地址:https://www.cnblogs.com/lingshf/p/5192825.html
Copyright © 2011-2022 走看看