zoukankan      html  css  js  c++  java
  • RabbitMQ

     

    官方网址:

    建议读懂六种场景:http://www.rabbitmq.com/getstarted.html

    一、基本概念

    1、名词解释

    消息队列服务(Message Queue Service,MQS)是一种分布式、高可用的在线消息队列服务。它能为分布式应用系统提供异步解耦,具有高吞吐量,低延迟等优点。MQS服务基于RabbitMQ实现,不仅提供所有RabbitMQ的原生功能,还提供实例自动化部署、实例的维护、实例状态的监控与告警、账号和vHost的相关管理等功能。同时,MQS提供多种实例规格,方便用户按需选择。

    RabbitMQ:AMQP的一种实现方式,服务器用Erlang语言编写,支持多种客户端,主要用于在分布式系统中存储转发消息。

    节点:一个云主机实例。

    节点组:具有相同配置的云主机实例的集合。

    组件:可在操作系统上独立安装的软件系统包,如RabbitMQ等。

    user:使用IOP平台和RabbitMQ的用户。

    vhost:针对queues、exchanges、channels等资源的逻辑分组,用户可以通过创建vHost的方式来实现上述资源的分组。

    镜像模式:RabbitMQ的集群模式包括普通模式和镜像模式两种,镜像模式可以将指定队列在其它节点上进行备份,当某一节点出现故障时,该队列在其它节点上的备份仍可以为用户正常提供服务。

    connection:通过身份验证完成的AMQP连接。

    channel:共享一个TCP连接的通道。

    exchange:从channel接收消息,并将消息路由到零或多个队列中。

    queue:用来存储未被消费的消息。

    binding:交换机路由消息到队列中的路由规则。

    producer:发送消息到RabbitMQ的生产者。

    consumer:从RabbitMQ接收并消费消息的消费者。

    2、运行原理

    利用RabbitMQ进行消息传递前,需要在生产者、RabbitMQ和消费者之间建立完整的连接通道。

    系统结构图:

    图中:

    两个生产者发送的消息经过RabbitMQ中间件后,传递给了三个消费者;

    RabbitMQ中存在交换机和消息队列,交换机用来接收消息,并按照一定的规则路由消息到指定的队列;消息队列用来存储未被消费者消费的消息。

    3、Work Queues

    创建工作队列(Work queues)的主要目的是:执行相对比较耗时的任务时,可以将该任务分给多个消费者共同完成。

    生产者发送的任务,被封装成消息存储到队列中,如果该任务属于密集型任务,可以利用两个消费者去处理同一任务的不同部分,当然消费者可以不止两个。

    3.1、生产者发送任务

    这个例子,

    首先利用for循环生成一个资源相对比较丰富的任务,然后将其封装成消息存储在声明的队列中并发送;

    同时为了防止在RabbitMQ出现宕机或重启的状况下,创建的消息队列和消息丢失,将队列和消息都设置为持久化。

    3.2、消费者接收任务

    这个例子相比hello world中1.2【消费者接收任务】,有以下不同:

    • 利用channel.basicQos(1)语句实现消息的公平分发:假设只有两个消费者,并且RabbitMQ发送的所有奇数消息比较耗时,偶数消息却较易处理,那么处理奇数消息的消费者会非常忙碌,同时处理偶数消息的消费者大部分时间内无事可做。尽管如此,RabbitMQ依然会均匀的分发任务。为了避免类似状况的发生,利用channel.basicQos(1)语句实现在前一个消息没有被处理并发回确认之前,消费者不会收到RabbitMQ发送的下一条消息的方法;或者将消息发送给其它消费者。
    • 用手动方式进行消息的确认:该确认方法用于相对耗时的任务中。首先利用channel.basicConsume(queue,false,consumer)语句关闭消息的自动确认机制,然后在工作完成后,利用channel.basicAck(envelope.getDeliveryTag(), false)命令手动发回确认消息。
    • 消费者处理消息的过程中使用了Thread.sleep()函数。Thread.sleep()函数可以模拟消费者始终处于一个忙碌的场景中

    3.3、公平分发验证

    公平分发验证即验证利用channel.basicQos(1)语句进行消息分发后的结果。下图RabbitMQ把消息同时分发给三个消费者(即打开三个控制台)的结果,效果表明实现了公平分发。

     3.4、消息确认验证

    采用手动方式确认消息的验证结果:

    有消息确认机制时,消费者对生产者发送的消息不能重复接收,无消息确认机制时,消费者对生产者发送的消息可以重复接收。

    有消费确认机制时:消费者每次接收到消息时会向RabbitMQ发送确认消息,之后RabbitMQ就不会再发送同样的消息给消费者;

    无消息确认机制时,消费者接收消息之后不给RabbitMQ反馈确认信息,RabbitMQ没有收到确认信息,就不会从队列中删除该消息,因此消息仍然能从队列中获取。

    3.5、消息持久化验证

     

    消息持久化被设置后,RabbitMQ将需要发送的消息写入到了磁盘中,而非内存中;

    下次RabbitMQ启动后,即使不运行发送者,消费者也可以从指定的队列中取出存储在磁盘中的消息。

    4、hello world

    假设一个生产者只向一个已命名的队列中发送消息,发送的消息为Hello World,一个消费者从该队列中接收消息并打印,交换机为默认类型。

    消息传递的流程图如5-2所示。在图5-2中P为生产者,C为消费者。

    4.1、生产者和消费者

    4.2、代码分析 - 创建链接

    生产者还是消费者,在处理消息之前都需要和RabbitMQ中间件建立完整的连接。RabbitMQ连接的建立方式。

        程序中利用channel.queueDeclare命令声明队列,同时在声明的队列中设置了相关属性值。

    • Queue: 队列名。
    • Durable: 队列是否持久化,即RabbitMQ宕机或重启后,队列和队列中存储的消息是否依然存在。
    • Exclusive:队列是否排他。如果一个队列被声明为排他队列,同一连接的不同通道可以同时访问该队列,并在连接断开时自动删除;这种队列适用于只限一个客户端发送读取消息的实例。
    • Auto-delete: 队列是否自动删除,如果该队列没有任何订阅的消费者,则会被自动删除。这种队列适用于临时队列。
    • Arguments: 队列的其它属性。

    注意:

            如果只有消费者声明队列,则队列被声明之前,生产者发送的消息存在被丢弃的可能性,所以需要生产者首先声明队列;

            为了防止程序先运行消费者,消费者也需再次声明队列,目的为在RabbitMQ中创建队列。

    4.3、代码分析 - 生产者发送消息

    生产者使用channel.basicPublish命令向RabbitMQ发送消息,设置了相关属性值。

    • Exchange: 交换机。此处用空字符串表示默认类型的交换机。
    • RoutingKey: 路由关键字。
    • Props: 消息的其它属性。此处设置为null。
    • Body: 生产者发送的消息体。

    4.4、代码分析 - 消费者接收消息

    消费者被创建之后,在接收消息的过程中,使用了方法名为handleDelivery的方法,并在该方法中传入了各种相关的参数。

    • ConsumerTag: 第i个消费者的标签。
    • Envelope: 利用AMQP基本方法封装的一些参数,包括:表示接收到第几条消息的DeliveryTag;若消息确认失败,决定是否重发消息的Redeliver;选用的Exchange类型;路由关键字RoutingKey。
    • Properties: AMQP协议中预先确定的14条消息属性。大部分属性并不常用,常用的属性会在之后的某些实例中进行详细说明,此处的消息属性全部为null。
    • Body: 消费者获得的消息体。在该程序中使用UTF-8编码将消息体转换为英文的形式进行打印并输出。

    消息被接收并打印之后,一般需要在RabbitMQ中进行确认。

    • 如果消费者发送确认信号,RabbitMQ便知道消息被成功接收,然后将其从队里中删除;
    • 如果消费者没有发送确认信号,RabbitMQ会认为这个消息没有处理成功,然后把它传递给其它消费者。

    消息确认的方法有两种。

    • 一种只采用channel.basicConsume(queue,true,consumer)语句,使消息被接收后立即确认;
    • 另一种针对相对耗时的工作,先利用channel.basicConsume(queue,false,consumer)语句关闭消息的自动确认,在工作完成后,通过channel.basicACK命令手动发回确认。

    4.5、代码分析 - 关闭连接

    消息被处理完后可以利用channel.close()和connection.close()语句将整个通道和连接关闭。如果连接没有关闭,会导致RabbitMQ的资源无法释放。

    二、发布订阅的三种消息路由机制

    1、Publish/Subscribe 《---》fanout 广播

    1.1、概念

    在工作队列的实例中,RabbitMQ将同一任务的不同部分发送给不同的消费者,即每个消费者接收到的消息都是唯一的。但在该部分,我们将同一条消息发送给多个消费者,也就是说发布的消息将会被广播给所有的接收者,这就是“发布/订阅模式(Publish/Subscribe)”。

    举例:为了描述这种模式,需要建立一个简单的日志系统,该日志系统包括发送日志信息和接收并打印信息两部分。一个生产者P发送消息到RabbitMQ的交换机中,然后交换机将消息处理后,发送消息到两个不同的队列,最后消费者C1和C2分别去接收相应队列的消息。显然,两个消费者接收到的消息是相同的。

    相比于前面的例子,这个例子加入了对交换机的表示。因为生产者发送的消息并不会直接发送到队列中,而是先发送给交换机;

    交换机对接收到的消息进行处理,比如决定将消息发送给一个队列还是多个队列,或者选择是否抛弃该消息;

    交换机对消息处理完后,如果继续将消息发送给队列,对应的消费者才可以接收此消息。前两种实例采用了默认类型的交换机,所以没有加入对交换机的表示。

    1.2、faout交换机与临时队列

    常见的交换机类型有direct、topic、headers和fanout。

    fanout类型的交换机可以把接收到的消息路由到所有与它绑定的队列中。声明交换机类型并与队列绑定的方法如图:

    因为我们既想要监听所有的消息,又只对当前的消息感兴趣,所以放弃了使用已命名队列,选则了临时队列。

    临时队列可以实现:任何时候连接RabbitMQ,队列都是新的、空的;一旦消费者和队列不再连接,那么队列可以自动删除。

    临时队列一般随机命名,虽然它可以自行创建,但是利用RabbitMQ为我们选择一个随机分配的队列却是更好的方式。

    1.3、消息的发送和接收结果

    2、Routing 《---》direct

    2.1、bindingkey

    此处的绑定关键字(BindingKey)就是路由关键字(RoutingKey),但因发送消息的basicPublish命令中也含有路由关键字参数,所以为了避免混淆,将路由关键字又命名为绑定关键字。

    在发布和订阅模式中,尽管fanout类型的交换机接收到的消息,总能被路由到所有与之绑定的队列中,但我们有时并不会对生产者发送的所有消息感兴趣,而且希望消费者只接收所有消息中的某个子集。比如生产者发送的任务中包含error、warning和info类型的消息,某个消费者可以只输出并打印error类型的消息,当然其它消费者也可以只输出并打印warning类型的消息或info类型的消息,甚至所有类型的消息。

    若果满足上述需求,首先需要将交换机和队列之间通过关键字进行绑定,同时利用该绑定关键字决定输出并打印哪种类型的消息。图5-16中交换机为direct型,交换机和队列之间的绑定关键字分别为error和warning。

    2.2、direct类型的交换机

     因为生产者发送的消息只路由到路由关键字和绑定关键字完全匹配的队列中,所以相对fanout类型的交换机,direct类型的交换机功能更灵活。

    direct类型交换机的声明和与该交换机有关的关键字绑定方式如图:

    2.3、消费者获取绑定关键字

     消费者在接收消息的过程中,首先需要获得绑定关键字;

    如果绑定关键字和生产者发送的路由关键字完全匹配,则输出并打印消息。消费者获取关键字的方法如图:

    在envelope中含有利用AMQP基本方法封装的一些参数,其中一项即为绑定关键字。如果消费者希望在控制台输出并打印该关键字,只需要利用getRoutingKey()命令。

    2.4、消息的发送和接收结果

    3、topics 《---》正则匹配路由

    3.1、概念

    尽管direct交换机相对fanout交换机来说已经足够灵活,但它依然无法依据多重准则将消息路由到队列中,而topic类型的交换机却实现了这一功能。

    Topic类型的交换机可以将接收到的消息路由到绑定关键字与路由关键字完全匹配的队列中,但需要满足以下条件:

    (1)路由关键字必须由一组通过点“.”分隔的单词组成 ;

    (2)绑定关键字也必须由一组通过点“.”分隔的单词组成;

    (3)绑定关键字可以利用“*”和“#”两种比较特殊的字符来进行模糊匹配,其中“*”可以匹配一个单词,“#”可以匹配零个或多个单词。

    对比fanout、direct和topic类型的交换机发现:

    如果绑定关键字为“#”,那么topic类型的交换机实质是fanout类型;

    如果绑定关键字为“*”,那么topic类型的交换机实质则是direct类型。接下来利用图5-20的具体实例流程对topic类型进行说明。

    图中:生产者P发送用来描述动物的信息;路由关键字由依次用来形容速度、颜色和物种的三个单词和两个点组成,比如路由关键字为"quick.orange.rabbit"和"lazy.brown.fox"等。

    交换机和队列1的绑定关键字为“*.orange.*”,和队列2的绑定关键字分别为“lazy.#”、“*.*.rabbit”。我们可以形象的将此理解为:队列1对橘色的动物更感兴趣;队列2更喜欢比较懒惰的动物,同时会监听关于兔子的一切特征。依据topic交换机的条件进行分析,如果路由关键字为"quick.orange.rabbit",消息会被路由到两个队列中;如果路由关键字为"quick.orange.rabbit",消息只能被路由到队列2中;如果生产者发送了“quick.orange.male.rabbit”的路由关键字,消息因无法匹配而被抛弃。

    3.2、消息的发送和接收结果举例

    三、rabbitmq 远程过程调用

    1、RPC的基础知识

    远程过程调用(Remote Procedure Call,RPC)是一种进程间的通信方式。

    rabbitmq 的RPC使用client/server模型:

    • 首先客户端发送请求消息到RabbitMQ,
    • 然后服务器从RabbitMQ中获取消息并进行处理,
    • 最后处理的消息重新通过RabbitMQ传递给客户端。

    消息传递的流程图:

    为了更形象地理解RPC,这里进一步说明:

    (1)客户端将请求消息发送到RabbitMQ的队列中进行存储,此时客户端和生产者类似;

    (2)服务器从RabbitMQ中取出消息并处理,此时服务器和消费者类似;

    (3)服务器将消息的处理结果发送到RabbitMQ的队列中进行存储,此时服务器类似于生产者;

    (4)客户端从RabbitMQ的队列中获取消息结果后,判断该消息的响应和请求是否匹配,此时客户端类似消费者。

    2、reply-to和correlation-id

    客户端在发送消息给RabbitMQ时,会同时发送消息的reply-to和correlation-id属性;获取消息的处理结果时,客户端也会接收到correlation-id属性。下面对两种属性的功能进行说明。

    reply-to即对消息的回调(callback)队列进行命名。服务器对客户端发送的请求消息接收并处理后,将结果反馈给客户端,而客户端为了接收该结果,需要在发送请求消息的同时,为服务器提供一个回调的队列地址。

    为每个RPC请求创建单独的回调队列,其效率非常低。比如客户端发送大量的请求消息给服务器,服务器为了获取消息的处理结果,必须为每条请求消息创建回调队列,这样不仅花费时间,而且占用了RabbitMQ的资源。因此为避免类似状况,选择只创建一个回调队列,但这又造成客户端接收到的结果无法确定是属于哪条请求消息的问题。比如客户端发送了请求消息A和请求消息B,就会分别接收到处理后的消息结果A和消息结果B,但是之前这些结果存储在唯一的回调队列中,我们当然不希望请求消息A和消息结果B匹配,同样不希望请求消息B和消息结果A匹配。

    为解决上述问题,在消息中加入correlation-id,用它与相关联的请求RPC响应。客户端发送请求消息时,同时发送唯一的correlation-id;如果接收消息的处理结果时,也接收到相同的correlation-id,则认为请求和响应是匹配的,否则抛弃该结果。图5-23为生成并发送reply-to和correlation-id的过程。

    3、RPC系统工作流程及结果

    结合之前的分析,RPC的工作流程可以总结为:

    (1)客户端启动时,会创建一个匿名的回调队列;
    (2)客户端在发送的请求消息中设置两个属性:reply-to和correlation-id;

    (3)服务器等候队列中的请求,当请求出现,服务器处理请求并将请求结果发送到客户端,使用的队列名即消息属性reply-to;

    (4)客户端等待回调队列中的结果,当消息结果出现,便检查correlation-id属性,若correlation-id和请求中的correlation-id一致,则接收该结果。

     客户端发送斐波纳契数列fib(30)的请求消息,服务器处理后,客户端接收到了正确的处理结果。

  • 相关阅读:
    dotnet 控制台读写 Sqlite 提示 no such table 找不到文件
    dotnet 控制台读写 Sqlite 提示 no such table 找不到文件
    dotnet 控制台 Hangfire 后台定时任务
    dotnet 控制台 Hangfire 后台定时任务
    dotnet 获取指定进程的输入命令行
    dotnet 获取指定进程的输入命令行
    PHP sqrt() 函数
    PHP sinh() 函数
    PHP sin() 函数
    PHP round() 函数
  • 原文地址:https://www.cnblogs.com/lexiaofei/p/8359680.html
Copyright © 2011-2022 走看看