zoukankan      html  css  js  c++  java
  • 架构设计:系统间通信(20)——MQ:消息协议(下)

    (接上文《架构设计:系统间通信(19)——MQ:消息协议(上)》)

    上篇文章中我们重点讨论了“协议”的重要性。并为各位读者介绍了Stomp协议和XMPP协议。

    这两种协议是消息队列中两种不同使用场景下的典型代表。

    本文主要接续上文的篇幅,继续讨论消息队列中还有一种典型协议:AMQP协议。

    3-3、AMQP协议

    AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。眼下AMQP协议的版本号为 Version 1.0。这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和 IEC 国际化标准。眼下支持AMQP的软件厂商包含:

    这里写图片描写叙述

    3-3-1、协议概览

    在网络上解说AQMP协议的文章已经有非常多了。您能够在百度、Google、必应上搜索关键字‘AMQP’,就会出现非常多相关文章。尽管文章数量比較多,可是却鲜有质量过硬的文章(当然除了AMQP官网 http://www.amqp.org/ 的协议说明文档)。本小节的内容我试图在能力所及的范围内。为各位读者将AMQP协议的核心要点讲清楚。

    为了达到这个目的。首先将AMQP协议的原理用下图进行一个全面呈现。然后在具体解说图中的每个要点:

    这里写图片描写叙述

    从上图我们能够看到AMQP协议的各个组成部分:

    • AMQP协议中的元素包含:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等

    • 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的client。他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义。

    • 一个Broker能够创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。

      注意,假设AMQP是由多个Broker构成的集群提供服务。那么一个Virtual Host也能够由多个Broker共同构成。

    • Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。可是有了Connection后client还不能和server通信,在Connection之上client会创建Channel。连接到Virtual Host或者Queue上。这样client才干向Exchange发送消息或者从Queue接受消息。一个Connection上同意存在多个Channel,仅仅有Channel中能够发送/接受消息

    • Exchange元素是AMQP协议中的交换机,Exchange能够绑定多个Queue也能够同一时候绑定其它Exchange。消息通过Exchange时,会依照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中

    那么AMQP消息在这个结构中是怎样通过Producer发出,又经过Broker最后到达Consumer的呢?请看下图:

    这里写图片描写叙述

    1. 在Producer(消息生产者)client建立了Channel后,就建立了到Broker上Virtual Host的连接。

      接下来Producer就能够向这个Virtual Host中的Exchange发送消息了。

    2. Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系。并设置好了到这些Queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有三种模式。我们随后会讲到。在Exchange收到消息后。会依据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)。

    3. Queue收到消息后,可能会进行例如以下的处理:假设当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽同样);假设已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel。

    4. Consumer收到消息后,就能够进行消息的处理了。

      可是整个消息传递的过程还没有完毕:视设置情况。Consumer在完毕某一条消息的处理后,将须要手动的发送一条ACK消息给相应的Queue(当然您能够设置为自己主动发送,或者无需发送)。Queue在收到这条ACK信息后,才会觉得这条消息处理成功,并将这条消息从Queue中移除。假设在相应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会又一次被发送给另外的Channel。当然,您还能够发送NACK信息,这样这条消息将会马上归队。并发送给另外的Channel

    3-3-2、Message(消息体)

    通过上一小节的描写叙述,我们能够看到AMQP协议中消息的处理规则和Stomp协议中消息的处理规则有相似之处。比方对ACK、NACK的使用。

    但明显不同的地方还是非常多。比如AMQP中Exchange元素提供的Routing路由规则,这显然比Stomp协议中直接发送给Queue要灵活得多。

    为了支持AMQP协议中的这些规则,AMQP协议中的消息也必须有特定的格式,实际上AMQP协议要比Stomp协议复杂得多。以下我们就依据ISO/IEC公布的AMQP Version 1.0标准文档。来讨论一下AMQP协议中的消息格式。

    首先要说明的是眼下国内多个技术网站。具体介绍AMQP消息格式的文章本来就不多(不包含那些聊聊几笔的转发)。并且基本上都没有具体解说格式本身,仅仅是粗略说明了AMQP消息採用二进制格式(不论什么应用层协议在网络上进行传输,都是使用二进制流进行的,所以这个说法当然没错)。

    有的文章还向读者传递了错误的信息。

    比如说AMQP消息格式包含两部分:消息头和消息正文。 这是全然错误的,尽管AMQP消息格式确实包含Header和Body部分。可是绝对不止这两个部分。(假设真是这样,ISO/IEC组织就不须要使用125页的文档篇幅来进行说明了)

    首先我们须要说明的是,作为一种网络通讯协议,AMQP工作在七层/五层网络模型的应用层。是一个典型的应用层协议;另外。因为AMQP协议存在多种元素定义,且这些元素定义工作在不同的领域。比如Channel的定义是为了基于网络连接记录会话状态;Queue等元素帮助AMQP完毕路由规则,这些元素在Message消息记录中都须要有所体现。

    所以AMQP协议首先要记录网络状态和会话状态,格式例如以下(AMQP帧的定义在《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》文档的第38页):

    这里写图片描写叙述

    当中非PAYLOAD部分。在网络协议的应用层说明Channel的工作状态(当然还有说明整个AMQP消息的长度区域:SIZE),我们真正须要的内容存在PAYLOAD区域。PAYLOAD区域(译文称为‘交付区’)的格式例如以下(能够在《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》文档的第3部分:messaging第82页找到具体说明):

    这里写图片描写叙述

    在PAYLAOD区域一共包含7个数据区域:header、delivery-annotations、message-annotations、properties、application-properties、application-data、footer。这些元素的作用例如以下:

    • header:header部分记录了AMQP消息的在‘支持AMQP的中间件’中的交互状态。

      比如该条消息在节点间被交互的总次数、优先级、TTL(Time To Live)值等信息。

    • delivery-annotations:在header部分仅仅能传递规范的、标准的、经过ISO/IEC组织定义的属性

      那么假设须要在header部分传递一些非标准信息怎么办呢?这就是delivery-annotations数据区域存在的意义:用来记录那些’非标’的header信息。

    • message-annotations:这个数据区域,用于存储一些自己定义的辅助属性。和delivery-annotations区域的非标准信息不同。这里的自己定义属性主要用于消息的转换。

      比如AMQP-JMS信息转换过程中将依据这个数据区域的“x-opt-jms-type”、“x-opt-to-type”、“x-opt-reply-type”和“name”属性进行JMS规范中相应的“JMSType”、“Type of the JMSDestination”、“Type of the JMSReplyTo”和“JMS_AMQP_MA_name”属相的转换。

    • properties:从整个AMQP消息的properties属性開始,到AMQP消息的application-data部分结束,才是AMQP消息的正文内容(译文称为‘裸消息’)。Properties属性记录了AMQP消息正文的那些‘不可变’属性。在properties部分仅仅能传递规范的、标准的、经过ISO/IEC组织定义的属性

      比如:消息id、分组id、发送者id、内容编码等。以下是AMQP协议文档中对Properties部分属性的描写叙述(仅仅能包含这些信息):

    <type name="properties" class="composite" source="list" provides="section">
        <descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
        <field name="message-id" type="*" requires="message-id"/>
        <field name="user-id" type="binary"/>
        <field name="to" type="*" requires="address"/>
        <field name="subject" type="string"/>
        <field name="reply-to" type="*" requires="address"/>
        <field name="correlation-id" type="*" requires="message-id"/>
        <field name="content-type" type="symbol"/>
        <field name="content-encoding" type="symbol"/>
        <field name="absolute-expiry-time" type="timestamp"/>
        <field name="creation-time" type="timestamp"/>
        <field name="group-id" type="string"/>
        <field name="group-sequence" type="sequence-no"/>
        <field name="reply-to-group-id" type="string"/>
    </type>
    • application-properties:‘应用数据’属性。在这部分数据中主要记录和应用有关的数据,AMQP的实现产品(比如RabbitMQ)须要用这部分数据决定其处理逻辑。比如:送入哪一个Exchange、消息的Routing值是什么、是否进行持久化等。

    • application-data:使用二进制格式描写叙述的AMQP消息的用户部分内容。既是我们发送出去的真实内容

    • footer:一般在这个数据区域存储辅助内容。比如消息的哈希值。HMAC。签名或者加密细节。

    以上才是一个AMQP消息的完整结构。当然因为篇幅限制。在某一个数据区域的‘标准’属性就没有再细讲了,比如Properties数据区域定义的creation-time属性、Header数据区域定义的durable属性。有兴趣的朋友能够參考《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》,这个文档我已经上传到我的下载列表中。供大家免费下载^_^(http://download.csdn.net/detail/yinwenjie/9460653)。

    3-3-3、Exchange(交换机)路由规则

    Exchange交换机在AMQP协议中主要负责依照一定的规则,将收到的消息转发到已经和它事先绑定好的Queue或者另外的Exchange中。

    Excahnge交换机的这个处理过程称之为Routing(路由)。眼下流行的AMQP路由实现一共同拥有三种:各自是Direct Exchange、Fanout Exchange和Topic Exchange。

    须要特别注意的是:Exhange须要具备怎样的‘路由’规则,并没有在AMQP标准协议进行强行规定,眼下流行的AMQP转发规则都是AMQP实现产品自行开发的(这也是为什么AMQP消息中和路由、过滤规则相关的属性是存放在application-properties区域的原因)。

    A、Direct路由

    direct模式从字面上的理解应该是‘引导’、‘直接’的含义。direct模式下Exchange将使用AMQP消息中所携带的Routing-Key和Queue中的Routing Key进行比較。

    假设两者全然匹配。就会将这条消息发送到这个Queue中。

    例如以下图所看到的:

    这里写图片描写叙述

    B、Fanout路由

    Fanout路由模式不须要Routing Key。当设置为Fanout模式的Exchange收到AMQP消息后。将会将这个AMQP消息复制多份。分别发送到和自己绑定的各个Queue中。

    例如以下图所看到的:

    这里写图片描写叙述

    C、Topic路由

    Topic模式是Routing Key的匹配模式。Exchange将支持使用‘#’和‘ * ’通配符进行Routing Key的匹配查找(‘#’表示0个或若干个关键词,‘ * ’表示一个关键词,注意是关键词不是字母)。例如以下图所看到的:

    这里写图片描写叙述

    为了方便各位读者的理解,这里我们再举几个通配符匹配的演示样例:

    • “param.#”。能够匹配“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;可是不能匹配诸如“param1.test”、“param2.test”、“param3.test”。

      以为param这个关键词和param1这个关键词不同样。

    • “param.*.* ”,能够匹配“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;可是不能匹配诸如“param”、“param.test”、“parm.child”等等Routing Key。

    • “param.*.value”。能够匹配“param.value.value”、“param.test.value”等Routing Key;可是不能匹配诸如“param.value”、“param.value.child”等Routing Key。

    注意,以上介绍的Direct 路由模式和Topic 路由模式中,假设Exchange交换机没有找到不论什么匹配Routing Key的Queue,那么这条AMQP消息会被丢弃。

    (仅仅有Queue有保存消息的功能,可是Exchange并不负责保存消息)

    4、不得不提的JMS规范

    JMS不是消息队列,更不是某种消息队列协议。**JMS是Java消息服务接口,是一套规范的JAVA API 接口。这套规范接口由SUN提出,并在2002年公布JMS规范的Version 1.1版本号。**JMS和消息中间件厂商无关。既然是一套接口规范,就代表这它须要各个厂商进行实现。好消息是,大部分消息中间件产品都支持JMS 接口规范。也就是说。您能够使用JMS API来连接Stomp协议的产品(比如ActiveMQ)。就像您能够使用JDBC API来连接ORACLE或者MYSQL一样。

    部分网络上的资料都介绍JMS是一个消息队列,这个说法是错误的,会误导读者。难道你能说JDBC是数据库?

    当然。这些具体实现JMS规范的JAVA API都是由具体的中间件厂商提供的。

    以下一段代码演示了怎样使用JMS建立与ActiveMQ的连接:

    package com.yinwenjie.test.testActivemq.jms;
    
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
    
    /**
     * 測试使用JMS API连接ActiveMQ
     * @author yinwenjie
     */
    public class JMSProducer {
        /**
         * 因为是測试代码,这里忽略了异常处理。

    * 正是代码可不能这样做 * @param args * @throws RuntimeException */ public static void main (String[] args) throws Exception { // 定义JMS-ActiveMQ连接信息 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616", "username", "password"); Session session = null; Destination sendQueue; Connection connection = null; //进行连接 connection = connectionFactory.createConnection(); connection.start(); //建立会话 session = connection.createSession(true, Session.SESSION_TRANSACTED); //建立queue(当然假设有了就不会反复建立) sendQueue = session.createQueue(""); //建立消息发送者对象 MessageProducer sender = session.createProducer(sendQueue); TextMessage outMessage = session.createTextMessage(); outMessage.setText("这是发送的消息内容"); //发送(JMS是支持事务的) sender.send(outMessage); session.commit(); //关闭 sender.close(); connection.close(); connectionFactory.close(); } }

    这里,再给各位读者一个官方文档。

    这个官方文档用于描写叙述ActiveMQ消息中间件中实现的AMQP协议信息转换为JMS服务接口能够识别的数据信息(请细致理解这句话黑体字部分的描写叙述)。http://activemq.apache.org/amqp.html

    5、后文介绍

    通过两篇文章的篇幅。我们介绍了典型的消息队列协议。当然还有非常多具体的消息队列协议没有讲到,可是通过介绍XMPP、AMQP、Stomp协议能够起到一个管中窥豹可见一斑的效果。另外我们还说明了JMS规范的具体含义。以便帮助读者纠正一些不对的观点。

    下一篇文章開始,我们将解说两个典型的消息队列中间件:ActiveMQ和RabbitMQ。最后我们还会列举一个实际场景。然后通过消息队列技术一起搭建一个高性能的业务处理方案。

  • 相关阅读:
    vim复制
    嵌入式Linux学习(二)
    (Java实现) 洛谷 P1042 乒乓球
    (Java实现) 洛谷 P1042 乒乓球
    (Java实现) 洛谷 P1071 潜伏者
    (Java实现) 洛谷 P1071 潜伏者
    (Java实现) 洛谷 P1025 数的划分
    (Java实现)洛谷 P1093 奖学金
    (Java实现)洛谷 P1093 奖学金
    Java实现 洛谷 P1064 金明的预算方案
  • 原文地址:https://www.cnblogs.com/cxchanpin/p/7301584.html
Copyright © 2011-2022 走看看