zoukankan      html  css  js  c++  java
  • 03.JMS深入


    1.JMS API

        JMS API可以分为3个主要部分:公共API、点对点API和发布/订阅API。在JMS1.1中,公共API可被用于向一个队列或主题发送消息,或从其中接收消息。点对点API专门用于使用队列的消息传送,而发布/订阅API则专门用于使用主题的消息传送。
    1.JMS公共API
    在JMS公共API内部,和发送与接收JMS消息有关的JMS API接口主要有7个:
    1. ConnectionFactory - 一个创建连接的连接工厂
    2. Destination - 消息目的地,可以是queue(队列)或者是topic(主题)
    3. Connection - 一个连接,用来创建Session
    4. Session - 客户端Session,可以创建Message、MessageProducer和MessageConsumer
    5. Message - 消息接口
    6. MessageProducer - 消息生产者,用来创建消息
    7. MessageConsumer - 消息消费者,用来读取消息
        注意:在这些公共接口中,ConnectionFactory和Destination必须使用JNDI(遵照JMS规范要求)从提供者处获得。其它接口则可以通过工厂方法在不同的API接口中创建。举例来说,一旦有了一个ConnectionFactory,就可以创建一个Connection。一旦有了一个Connection,就可以创建一个Session。而一旦有了一个Session,就可以创建一个Message、MessageProducer和MessageConsumer。
    2.点对点模型API
    点对点消息传送模型API特指JMS API内基于队列的接口。下面是用于向一个队列发送和从一个队列接收消息的接口:
    1. QueueConnectionFactory - 继承ConnectionFactory
    2. Queue - 继承BaseDestination
    3. QueueConnection - 继承Connection
    4. QueueSession - 继承Session
    5. QueueSender - 继承MessageProducer
    6. QueueReceiver - 继承MessageConsumer
        注意:大多数接口名称仅仅是在公共API接口名称之前添加Queue一词而已。不同之处在于,这里称为Queue的Destination接口,而MessageProducer和MessageConsumer接口则分别称为QueueSender和QueueReceiver。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不是使用公共API
    3.发布/订阅模型API
    由于基于主题的JMS API类似于基于队列的API,因此在大多数情况下,Queue这个词会由Topic取代。发布/订阅消息传送模型内部使用的接口如下:
    1. TopicConnectionFactory - 继承ConnectionFactory
    2. Topic - 继承BaseDestination
    3. TopicConnection - 继承Connection
    4. TopicSession - 继承Session
    5. TopicPublisher - 继承MessageProducer
    6. TopicSubscriber - 继承MessageConsumer
        注意:除了TopicPublisher和TopicSubscriber不同以外,发布/订阅模型中的接口和p2p模型中的那些接口名称基本类似。
    4.JMS API实例的创建
    (1)创建连接工厂和连接
    1. QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(user, password, url);
    2. QueueConnection queueConnection = queueConnectionFactory.createQueueConnection("admin", "admin");
        可以为客户端提供了一个用户名/密码认证凭证,用于连接认证。我们在代码中也可以使用无参数方法,在创建连接时,该方法将使用默认用户身份。
        Connection有start()、stop()和close()方法用来管理连接。start()方法将到达的消息流变为“on”状态,允许这些消息被该客户端接收。stop()方法阻塞了到达的消息流,直到再次调用start()方法时为止。close()方法用于关闭连接消息服务器的Connection。关闭Connection将关闭和该连接有关的所有对象,包括:Session, MessageProducerMessageConsumer等。
    (2)创建连接的Session
    1. QueueSession queueSession = queueConnection.createQueueSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    一个客户端可以创建多个Session对象,创建Session有两个参数:(boolean transacted, int acknowledgeMode)
    • boolean transacted - 用来表明Session对象是不是事务性的
    • int acknowledgeMode - 设置Session收到消息后的消息签收(确认)模式
    JMS定义了四种消息签收模式:
    1. Session.AUTO_ACKNOWLEDGE - 消息自动签收
    2. Session.CLIENT_ACKNOWLEDGE - 客户端调用Message.acknowledge()方法手动签收
    3. Session.DUPS_OK_ACKNOWLEDGE - 自动批量确认。不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制
    4. Session.SESSION_TRANSACTED  - 事务提交并确认
    ActiveMQ还定义了以下消息签收模式(MessageAck类):
    1. MessageAck.DELIVERED_ACK_TYPE - 消息"已接收",但尚未处理结束
    2. MessageAck.STANDARD_ACK_TYPE - "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
    3. MessageAck.POSION_ACK_TYPE - 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
    4. MessageAck.REDELIVERED_ACK_TYPE - 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
    5. MessageAck.INDIVIDUAL_ACK_TYPE - 表示只确认"单条消息",无论在任何ACK_MODE下
    6. MessageAck.UNMATCHED_ACK_TYPE - BROKER间转发消息时,接收端"拒绝"消息
    (3)创建Destination、Message、MessageProducer、MessageConsumer
    1. Queue queue = queueSession.createQueue("TOOL.DEFAULT");//创建 Destination
    2. QueueSender queueSender = queueSession.createSender(queue);//创建 MessageProducer
    3. TextMessage textMessage = queueSession.createTextMessage("测试消息");//创建 Message
    4. QueueReceiver queueReceiver = queueSession.createReceiver(queue);//创建 MessageConsumer
    (4)创建Browser浏览消息
    使用QueueBrowser可以遍历一个队列中的所有消息,与QueueReceiver接受消息不同的是QueueBrowser不会自动签收消息,所以使用QueueBrowser遍历的消息并没有被消费。
    1. QueueBrowser queueBrowser = queueSession.createBrowser(queue);
    2. Enumeration<TextMessage> enumeration = queueBrowser.getEnumeration();
    3. while (enumeration.hasMoreElements())
    4. {
    5. TextMessage message = enumeration.nextElement();
    6. System.out.println("QueueBrowser - " + message.getText());
    7. }
    8. System.out.println("QueueBrowser 完成!");

    2.JMS消息基本结构

        一条JMS消息对象分为三部分:消息头(Headers),属性(Properties)和消息体(Payload)。对于StreamMessage和MapMessage,消息本身就有特定的结构,而对于TextMessage,ObjectMessage和BytesMessage是无结构的。一个消息可以包含一些重要的数据或者仅仅是一个事件的通知
    1.JMS消息头
        消息的Headers部分通常包含一些消息的描述信息,它们都是标准的描述信息。包含下面一些值:
    1. JMSDestination - 消息的目的地,Topic或者是Queue
    2. JMSDeliveryMode - 消息的发送模式:persistent或nonpersistent,设置是否存储消息到硬盘
    3. JMSTimestamp - 当调用send()方法的时候,JMSTimestamp会被自动设置为当前事件。
    4. JMSExpiration - 表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。
    5. JMSPriority - 消息的优先级。0-4为正常的优先级,5-9为高优先级。
    6. JMSMessageID - 一个字符串用来唯一标示一个消息。
    7. JMSReplyTo - 有时消息生产者希望消费者回复一个消息,JMSReplyTo为一个Destination,表示需要回复的目的地。当然消费者可以不理会它。
    8. JMSCorrelationID - 通常用来关联多个Message。例如需要回复一个消息,可以把JMSCorrelationID设置为所收到的消息的JMSMessageID。
    9. JMSType - 表示消息体的结构,和JMS提供者有关。
    10. JMSRedelivered - 如果这个值为true,表示消息是被重新发送了。因为有时消费者没有确认他已经收到消息或者JMS提供者不确定消费者是否已经收到。
        JMS消息头可以分为两大类:自动分配的消息头和开发者分配的消息头
    • 自动分配的消息头:大多数JMS消息头是自动分配的,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用setJMS<HEADER>()方法分配的值就被忽略了。换句话说,对于大多数自动分配的消息头来说,使用赋值函数方法显然是徒劳的。不过,这并非意味着开发者无法控制这些消息头的值。一些自动分配的消息头可以在创建Session和MessageProducer(也就是TopicPublisher)时,由开发者通过编程方式来设置。
    • 发者分配的消息头:开发人员通过赋值方式设置的消息头信息。
    2.自动分配的消息头详解
    (1)JMSDeliveryMode
        在JMS中,传送模式有两种类型:持久性模式和非持久性模式。一条持久性消息应该被传送“一次而且仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失; 它会在服务器恢复正常之后再次传送。一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失。在持久性和非持久性这两种传送模式中,消息服务器都不会将一条消息向同一消息消费者发送一次以上。
        传送模式可以使用生产者(也就是 TopicPublisher 或 QueueSender)上的setJMSDeliveryMode()方法来设定。一旦为MessageProducer设置了传送模式,它就会应用到使用该生产者传送的所有消息上。默认设置为PERSISTENT(持久性),如下:
    1. //在消息生产者上设置JMS传送模式
    2. TopicPublisher topicPublisher = topicSession.createPublisher(topic);
    3. topicPubiisher.setDeliveryMode(DeliverMode.NON_PERSISTENT);
    (2)JMSMessageID
        JMSMessageID是一个String类型值,它唯一地标识了一条消息。
    (3)JMSTimestamp
        JMSTimestamp由MessageProducer在调用send()操作时自动设置。它包含的是JMS提供者接收消息的时间,而不是该消息实际传送的时间。这条消息头用于确定发送消息和它被消费者实际接收的时间间隔。时间戳是一个以毫秒来计算的long类型时间值(自1970年1月1日算起)。
    (4)JMSExpiration
        一个Message对象的有效期用来防止把过期的消息传送给消费者。这对于那些数据仅在某一个时间段内有效的消息来说,是非常有用的,消息的有效期以毫秒为单位,使用setTimeToLive()方法在生产者(也就是 TopicPublisher)上设置:
    1. TopicPublisher topicPublisher = topicSession.createPublisher(topic);
    2. //将生存时间设置为1小时(1000毫秒 *60 *60)
    3. topicPublisher.setTimeToLive(3600000);
    注意:在消息发送出去之后,任何直接通过编程方式来调用setJMSExpiration()方法都会被忽略。
    (5)JMSRedelivered
        JMSRedelivered消息头表示该消息将被重新传送给消费者。如果该消息被重新传送,JMSRedelivered消息头就为true,否则为false。如果一个消费者未能确认先前传送的消息,或者JMS提供者并不确定消费者是否已经接收到该消息时,就可以将这条消息标记为重新传送。
    (6)JMSPriority
        在传送一条消息时,消息生产者能够为该消息分配一个优先级。消息优先级共有两类:0~4级是普通优先级,而5~9级则是加急优先级。消息服务器能够利用一条消息的优先级,按优先次序将该消息传送给消息者:加急优先级的消息要比普通优先级的消息优先传送。消息的优先级可以通过JMS客户端在生产者上使用setPriority()方法进行声明:
    1. TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
    2. topicPublisher.setPriority(9);
    注意:在消息发送出去之后,任何直接通过编程方式调用setJMSPriority()方法都将被忽略。
    3.开发者分配的消息头详解
    (1)JMSReplyTo
        有些情况下,一个JMS消息生产者可能会要求消费者对一条消息做出应答。JMSReplyTo消息头包含了一个javax.jms.Destination,标明了JMS消费者应该应答的地址。在使用请求/应答场景时,通过这条消息头属性可以进一步实现消息生产者和消息消费者之间的去耦。
    (2)JMSCorrelationID
        JMSCorrelationID提供了一个消息头,用于将当前的消息和先前的某些消息或应用程序特定的ID关联起来。在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标识的上一条消息的应答,不过,JMSCorrelationID可以是任何值,而不仅仅是JMSMessageID。
    (3)JMSType
        JMSType是由JMS客户端设置的一个可选消息头。它的主要作用是标识消息结构和有效负载的类型。请注意,这个消息头并未指明被发送的消息类型(MapMessge,TextMessage等),而是JMS提供者使用的内部消息仓库中的一个条目。一些MOM系统(比如IBM的WebSphere MO)将消息体视为连续的字节流。这些系统通常为应用程序提供了一种消息类型,作为标记消息体的一种简单方式。有些非JMS的客户端需要使用某种类型的信息来处理有效负载。因此,在和这类非JMS客户端进行消息交换时,消息类型会非常有用。
    4.JMS消息属性
        消息的属性就像可以分配给一条消息的附加消息头一样。它允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。Message接口为读取和写入属性提供了若干个取值函数和赋值函数方法。消息的属性值可以是String, boolean , byte,short, double, int ,long或float型。消息属性有3种基本类型:应用程序特定的属性,JMS定义的属性和提供者特定的属性。应用程序属性由应用程序开发者定义并应用到Message对象上;JMS扩展和提供者特定的属性大多是由JMS提供者自动添加的附加消息头。
    (1)应用程序特定的属性
        由应用程序开发者定义的所有属性都可以作为一个应用程序特定的属性。应用程序属性在消息传送之前进行设置。并不存在预先定义的应用程序属性,开发者可以自由定义能够满足它们需要的任何属性。例如,在聊天示例中,可以添加一个特定的属性,该属性标识了正在发送消息的用户:
    1. TextMessage message = pubSession.createTextMessage();
    2. message.setText(text);
    3. message.setStringProperty("username",username); //自定义属性
    4. publisher.publish(message);
        注意:作为一个应用程序的特定属性,username一旦离开Chat应用程序就变得毫无意义;它专门用于应用程序根据发布者身份对消息进行过滤。
        一旦一条消息发布或发送以后,它就变成了只读属性;消费者或生产者都无法修改它的属性。如果消费者试图设置某个属性,该方法就会抛出一个javax.jms.MessageNotWriteableException。不过,通过调用clearProperties()方法,就可以修改消息的属性,该方法将删除一条消息的所有属性,以便能够添加进新的属性。
    (2)JMS定义的属性
        JMS定义的属性具有和应用程序属性相同的特性,除了前者大多数在消息发送时由JMS提供者来设置之外。JMS定义的属性可以作为可选的JMS消息头;下面是JMS定义的9个属性清单:
    1. JMSXUserID
    2. JMSXAppID
    3. JMSXProducerTXID
    4. JMSXConsumerTXID
    5. JMSXRcvTimestamp
    6. JMSXDeliveryCount
    7. JMSXState
    8. JMSXGroupID
    9. JMSXGroupSeq
        在这份清单中,只有JMSXGroupID和JMSXGroupSeq需要所有JMS提供者的支持。这些可选属性用于聚合消息。请注意:在Message接口中,您将无法找到对应的setJMSX<PROPERTY>()和getJMSX<PROPERTY>()方法定义;在使用这些方法时,必须使用和应用程序特定属性相同的方法来设置它们:
    1. message.setStringProperty("JMSXGroupID","ERF-001");
    2. message.setIntProperty("JMSXGroupSeq",3);
    (3)提供者特定的属性
        每个JMS提供者都可以定义一组私有属性,这些属性可以由客户端或提供者自动设置。提供者特定的属性必须以前缀JMS开头,后面紧接着是属性名称(JMS<vendor-property-name>)。提供者特定的属性,其作用就是支持厂商的私有特性。

    3.JMS消息类型

    1.JMS的五种消息类型
    1. Message - 这种类型不含有效负载。它可以用于简单的事件通知。
    2. TextMessage - 这种类型携带了一个java.lang.String作为有效负载。它可以用于简单的文本消息交换,还可以用于更复杂的字符数据交换,比如XML文档等。
    3. ObjectMessage - 这种类型携带了一个可序列化Java对象作为有效负载。它可以用于Java对象交换。
    4. BytesMessage - 这种类型携带了一组原始类型字节流作为有效负载。
    5. StreamMessage - 这种类型携带了一个Java原始数据类型流(int ,double,char等)作为有效负载。它提供了一套将格式化字节流映射为java原始数据类型的简便方法。
    6. MapMessage - 这种类型携带了一组名/值对作为有效负载。有效负载类似于一个java.util.Properties对象,除了有效负载值必须是java原始类型或它们的包装器之外。
    2.使用示例
    1. TextMessage textMessage = queueSession.createTextMessage("测试消息-TextMessage");
    2. queueSender.send(textMessage);
    3. User object = new User("LZW", 20, true);
    4. ObjectMessage objectMessage = queueSession.createObjectMessage(object);
    5. queueSender.send(objectMessage);
    6. BytesMessage bytesMessage = queueSession.createBytesMessage();
    7. bytesMessage.writeBytes("测试消息-BytesMessage".getBytes());
    8. queueSender.send(bytesMessage);
    9. QueueReceiver queueReceiver = queueSession.createReceiver(queue);
    10. for (int i = 0; i < 3; i++)
    11. {
    12. Message message = queueReceiver.receive(1000 * 1);
    13. if (message instanceof TextMessage)
    14. {
    15. System.out.println(((TextMessage)message).getText());
    16. }
    17. if (message instanceof ObjectMessage)
    18. {
    19. System.out.println(((ObjectMessage)message).getObject());
    20. }
    21. if (message instanceof BytesMessage)
    22. {
    23. bytesMessage = (BytesMessage)message;
    24. byte[] data = new byte[(int)bytesMessage.getBodyLength()];
    25. bytesMessage.readBytes(data);
    26. System.out.println(new String(data));
    27. }
    28. }
    -------------------------------------------------------------------------------------------------------------------------------



  • 相关阅读:
    SQL Server2016 AlwaysOn无域高可用
    Windows Server 2016 无域故障转移群集
    SQL Server高可用实现方案
    oracle11g RMAN catalog的基本使用
    Oracle_Windows server ORA-01031: insufficient privileges
    MySQL MGR 单主模式下master角色切换规则
    SQL Server AlwaysOn原理简介
    DB2创建视图并授权给其他用户
    Oracle数据库用户的密码过期问题处理
    访问GitLab的PostgreSQL数据库
  • 原文地址:https://www.cnblogs.com/LiZhiW/p/4966577.html
Copyright © 2011-2022 走看看