zoukankan      html  css  js  c++  java
  • RabbitMQ实战:理解消息通信

    RabbitMQ是一个开源的消息代理和队列服务器,可以通过基本协议在完全不同的应用之间共享数据,可以将作业排队以便让分布式服务进行处理。

    本篇介绍下消息通信,首先介绍基础概念,将这些概念映射到AMQP协议,然后介绍消息持久化、发送方确认模式等消息可靠性保证。

    通过本篇介绍,你会了解到:

    • 消息通信概念:消费者、生产者和代理
    • AMQP元素:队列、交换器、绑定
    • 虚拟主机
    • 消息持久化
    • 发送方确认模式

    消息通信概念

    此部分的介绍,会牵涉到AMQP的元素,如果之前没接触过的,可以结合下面的「AMQP元素」进行理解。

    消息

    消息是传输的主体,消息包括两部分:有效载荷(payload)和标签(label);有效载荷是要传输的数据,可以是任何内容,比如JSON串、二进制、自定义的数据协议等;标签描述了有效载荷,并且Rabbit用它来决定谁将获得消息的投递。

    可以与HTTP协议类比,HTTP消息头部描述了消息体的类型、大小等,HTTP消息体是要传输的数据,HTTP服务端通过消息头部决定如何处理请求和数据。

    生产者和消费者

    生产者创建消息,然后发送到代理服务器(RabbitMQ Server),AMQP只会用标签表述这条消息(一个交换器名称和可选的主题标记),Rabbit服务器会根据标签把消息发送给订阅的消费者。

    消费者消费消息,它会订阅到队列(queue)上,每当有消息到达RabbitMQ服务器时,会发送给消费者,消费者收到消息时,会进行处理。

    注意:消费者收到的消息只包括有效载荷,所有不会知道是从哪里发来的。

    连接和信道

    要想发布或消费消息,必须先与RabbitMQ Server建立一条TCP连接,建立TCP连接之后,要创建一条信道,信道是建立在真实TCP连接的虚拟连接。

    AMQP命令都是通过信道发送出去的,每条信道会被指派一个唯一的ID,为什么不直接通过TCP连接发送AMQP命令呢? 因为操作系统建立和销毁TCP会话是很昂贵的,而且创建的连接数也有限。 通过引入通道,可以在连接上建立通道,而且通道是私密的,相互不受影响。

    通道的概念还是有点抽象,后面专门写一篇文章进行分析介绍,这里简单理解下吧。

    AMQP元素

    AMQP消息路由有三部分组成:队列、交换器和绑定,队列是存放消息的地方,交换器是决定不同的分发策略,绑定是队列和交换器的桥梁,定义匹配规则。

    生产者发送消息到交换器,交换器根据自身类型和绑定规则,将消息存放在对应队列中,然后将消息发送到监听队列的消费者。

    AMQP基本模型

    如上图:P为生产者,X为交换器,交换器类型为direct,根据不同的绑定规则(orange、black、green),分发给不同的队列,C为消费者,从不同的队列介绍消息。

    队列

    消费者通过两种方式从特定的队列接收消息:

    • basic.consume,这样会将信道置为接收模式,直到取消对队列的订阅;
    • basic.get,主动让消费者接收队列中的下一条消息;

    basic.get会影响性能,推荐使用basic.consume来实现高吞吐量,因为其处理过程是先订阅消息,获取单条消息,再取取消订阅。

    如果队列拥有多个消费者时,队列收到的消息将以循环的方式发给消费者,即多个消费者平均消费这些消息。

    另外,消费者接收到的每一条消息都要进行确认,必须通过basic.ack命令向rabbitmq服务端发送一个确认。 也可以设置auto_ack为true,只要消费者接收到消息,就自动视为确认,不过不建议这样,因为接收到不代表业务逻辑处理成功。 服务端接收到确认后,会从队列中删除对应消息。

    还有一种场景,在接收到消息后,如果不想处理,可以通过下面方式处理:

    • 把消费者从RabbitMQ服务器断开连接,,这样RabbitMQ会自动将消息入队并发送给另外一个消费者;
    • 如果不想发送给其他消费者处理,就是想忽略这个消息,可以发送basic.reject命令;

    最后来介绍下如何创建队列,首先明确下是生成者还是消费者创建,关键点是:生产者能否承担起丢失消息,因为发出去的消息如果路由到了不存在的队列,Rabbit会忽略它们。所以,建议生成者和消费者都尝试去创建队列,可以通过设置queue.declare的passive选项设置为ture来判断队列是否存在,如果不存在会返回一个错误。

    通过queue.declare命令来创建队列,有一些选项说明下:

    • exclusive:如果设置true的化,队列将变成私有的,只有创建队列的应用程序才能够消费队列消息;
    • auto-delete:当最后一个消费者取消订阅的时候,队列会自动移除;
    • durable:是否要持久化;
    queueDeclare(String queue, 
                boolean durable, 
                boolean exclusive, 
                Map<String, Object> arguments);
    

      

    交换器和绑定

    交换器有四种类型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由键,不太实用,就不详细介绍了。

    第一种:direct交换器

    direct交换器比较简单,如果和路由键 完全匹配 的话,就会投递到对应的队列:

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    

      

    服务器默认包含一个空白字符串名称的默认路由器,当声明一个队列时,会自定绑定到默认交换器,并以队列名称作为路由键。

    第二种:fanout交换器

    fanout交换器,不处理路由键,只需要简单的将队列绑定到交换机上,为会每个消费者自动生成一个随机队列,所有的消费者都会收到所有消息。

    fanout交换器

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    

      

    第三种:topic交换器

    topic交换器,将路由键和某模式进行匹配,此时队列需要绑定要一个模式上。

    tipic交换器

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
    

      

    关于模式,符号#匹配一个或多个词,符号匹配一个词,因此kfs.#能够匹配到kfs.session.message,但是audit.只会匹配到audit.session。

    虚拟主机

    每个RabbitMQ服务器都能创建虚拟消息服务器,称为虚拟主机(vhost),每个RabbitMQ本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定,还有自己的权限机制。

    连接时,必须制定vhost,rabbitmq包含了默认的vhost:”/”。当创建一个用户时,会被指派给至少一个vhsot,并且相互隔离。

    vhost不能通过AMQP协议创建,需要使用rabbitmqctl工具创建。

    消息持久化和发送方确认模式

    如果没有持久化,重启rabbitmq后,队列、交换器都会消失,RabbitMQ提供了持久化的功能,需要满足以下三个条件:

    • 交换器设置为持久化,通过durable属性;
    • 队列设置为持久化,通过durable属性;
    • 消息投递模式delivery设置为2;

    当发布一条持久化消息到持久化交换器上时,rabbit会在消息提交到日志文件后才会发送响应,所有会损失性能,所以,只对重要数据持久化即可。

    考虑这种情况:由于发布消息后,不返回任何信息给生产者,如何只对服务器已经持久化到硬盘了呢,可能在传输过程中丢失,或者持久化前服务器宕机,导致消息丢失。

    RabbitMQ通过「发送方确认模式」来解决上面的问题。首先,需要将信道设置成confirm模式,这样所有在信道上发布的消息都会被指派一个唯一的ID号,一旦消息被投递到所有匹配的队列或持久化到磁盘,会发送一个确认消息给生产者。

    通过本篇的介绍,对Rabbit的消息模型有了整体了解

  • 相关阅读:
    idea 连接 hive
    css img自适应
    测试视频文件
    ubuntu不显示ipv4地址的解决办法
    nginx path捕获
    union all两个结果集报ORA-12704: character set mismatch错误
    润乾报表试用指南
    报表工具对比之润乾报表与锐浪报表对比
    项目微管理36
    docker远程调用
  • 原文地址:https://www.cnblogs.com/saryli/p/9732379.html
Copyright © 2011-2022 走看看