zoukankan      html  css  js  c++  java
  • JMS编程模型

    模型结构

    JMS编程模型由以下几个组成:

    • ConnectionFactory:连接工厂(创建连接)
    • Connection:连接(创建会话)
    • Session:会话(创建目的地、生产者、消费者、消息)
    • Destination:目的地(消息发送目标)
    • MessageProducer:消息生产者(发送消息)
    • MessageConsumer:消息消费者(消费消息)
    • Message:消息(内容主体)

    下面用一张图片展示几个组成部分是如何联系在一起的

    下面将逐个了解每个部分,并且以activeMQ的实现作为代码片段部分示例。

    ConnectionFactory

    顾名思义,一个ConnectionFactory是客户端用来创建Connection的接口。基于工厂模式,它简化了Connection的创建。除了ConnectionFactory接口,常见的还有QueueConnectionFactory和TopicConnectionFactory,它们都继承自ConnectionFactory。

    创建一个ConnectionFactory的代码片段如下:

    1 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

    Connection

    有了ConnectionFactory我们就可以创建Connection了,Connection表示的是一个虚拟的连接,也就是代表着打开了一个由客户端到消息代理端的socket连接。Connection可以用来创建Session。

    下面我们看看ConnectionFactory来创建Connection的示例:

    1 Connection connection = connectionFactory.createConnection();

    在使用Connection之前,你必须先调用start方法开启连接

    1 connection.start();

    在使用完了之后,你必须调用close方法关闭资源。注意,close方法会关闭Connection创建的Session、MessageProducer、MessageConsumer。另外,如果close方法调用失败,那么将会导致资源未被释放的问题。

    但是,如果你只是想暂时停止一下消息的传送,那么可以调用stop方法,而不是将Connection进行close。如果要重新打开,那么再调用start方法。

    Session

    session是一个Message的生产和消费的上下文,我们称作会话,由Connection创建。session可以创建MessageProducer、MessageConsumer、Message、Destination。

    我们创建一个session

    1 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    createSession方法有两个入参

    • transacted:是否开启事务
      1. true:当前session开启事务,事务中的send和receive要么全部完成,要么全部回滚;需要显示调用commit方法或者rollback方法,开启事务的时候将忽略acknowledgeMode参数;
        如果在生产消息的时候发生了故障或者rollback,那么该消息会被丢弃,直到事务提交,该消息才会传给消费者

        如果在消费消息的时候发生了故障或者rollback,那么该消息会被重新发送,并且打上重新传送标记(JMSRedelivered=true)
      2. false:不开启事务
    • acknowledgeMode:该参数描述的是consumer和broker的消息确认方式,而不是producer和broker
      1. auto_acknowledge:自动确认模式,当同步方法receive方法返回message的时候立即确认,当异步方法onMessage正常返回的时候确认,异常的时候会要求broker重新发送
      2. client_acknowledge:客户端手动确认模式,客户端需要显示调用 Message的acknowledge()方法逐个确认消息。也可以处理多条消息后一次确认多条消息。
      3. dups_ok_acknowledge:类似auto_acknowledge,也是一种自动确认模式,为了自动批量确认而生。根据内部的算法,在收到一定数量的message以后就会自动确认。但是这种模式会出现消息重复发送,比如Consumer故障重启以后,原先已经处理过的,但是还未确认的消息会重新发送过来。
      4. session_transacted:通过createSession(int sessionMode)方法创建session的时候指定该值将开启事务,需要显示调用commit方法提交事务确认全部消息。

    如果JMS被整合到了项目的JTA当中,session事务将和其它操作(如数据库访问)在同一个事务当中,当前session的事务设置将都被忽略,session会加入JTA事务,随着jta事务的commit和rollback而提交或者回滚,而不是通过session的commit或rollback方法,而在这种情况下其实设置transacted参数和acknowledgeMode参数已经没意义了,你可以直接使用createSession()方法即可。

    Destination

    一个destination表示的是生产者的消息发送目的地,以及消费者消费消息的源头。在点对点模式中,destination又被称作queue(队列)。在发布订阅模式中,destination被称作topic(话题)。

    Destination由session创建,创建一个queue

    1 Destination destination = session.createQueue("queue1");

    创建一个topic

    1 Destination destination = session.createTopic("topic1");

    MessageProducer

    MessageProducer是由session创建的,用于发送Message到destination。我们使用session创建一个MessageProducer,如下

    1 MessageProducer producer = session.createProducer(destination);

    如果你创建了一个Message对象,你可以使用MessageProducer发送消息

    1 producer.send(message);

    MessageConsumer

    MessageConsumer是由session创建的,将会作为一个消费者消费destination中的Message。创建一个MessageConsumer

    1 MessageConsumer consumer = session.createConsumer(destination);

    创建了消费者,就可以消费消息了

    1 Message message = consumer.receive();

    receive方法是同步消费消息的方法,有时候我们不想等待那么久,所以采用异步监听的方式,如

    1 Listener listener = new Listener();
    2 consumer.setMessageListener(listener);

    这里假设Listener是实现了MessageListener接口的监听器,当消息到达的时候onMessage方法将被触发。

    Message

    Message又包含以下三个部分,其中header是必须存在的,后两者可选:

    1. message header:消息头(必须存在)
    2. message properties:附加属性(可选)
    3. message body:消息体(可选)

    message header

    和http协议类似,header包含了一系列的预设的键值对内容,这些内容是由客户端或者消息代理设置的,在消息传递过程中将被一同传递。

    下表是相关的header字段,以及谁才有权限设置该字段

    header各个字段 设置方 中文描述 可选值
    JMSDestination  send或publish方法   消息将要被发送的目的地 destination一般为queue或者topic
     JMSDeliveryMode send或publish方法 消息的传送模式  PERSISTENT(持久化)、NON_PERSISTENT(非持久化)
     JMSExpiration send或publish方法  消息的过期时间  时间戳
     JMSPriority send或publish方法  消息的优先级  0-9优先级由低到高
     JMSMessageID send或publish方法  消息的全局唯一标识  字符串
     JMSTimestamp send或publish方法  消息的时间 时间戳
     JMSCorrelationID client  消息关联的另外一个JMSMessageID 字符串
    JMSReplyTo  client  回复当前消息的JMSDestination  destination一般为queue或者topic
     JMSType client 消息的类型  字符串
     JMSRedelivered   provider 消息重发标记  布尔值

    message properties

    如果你希望消息体有一些自定义的键值对属性,但是这些属性又不在header当中,这时候你就可以设置properties附加属性来达到目的。

    message body

    Message表示具体的消息,JMS定义了五种消息格式,如:

    1. TextMessage:文本
    2. MapMessage:键值对
    3. BytesMessage:字节码
    4. StreamMessage:流
    5. ObjectMessage:对象

    以TextMessage为例,创建一个消息

    1 TextMessage message = session.createTextMessage();
    2 message.setText("text content");
    3 producer.send(message);

    如果是MessageConsumer调用receive消费消息

    1 Message message = consumer.receive();
    2 if (message instanceof TextMessage) {
    3     TextMessage textMessage = (TextMessage)message;
    4     System.out.println("receive message" + message.getText());
    5 }

    message selector

    consumer将消费queue上的消费,有时候不同的consumer消费同一个队列,但是我们又希望每个consumer只关心自己相关的内容,而过滤掉其它消息,这就可以用到selector。

    selector将会根据Message中的header和properties中设置的键值来过滤Message,例如,我们定义一个selector:

    1 "JMSMessageId = '1234567890' OR consumerName = 'lay'"

    那么在消费Message的时候将会根据header过滤出JMSMessageId='1234567890'的Message,或者consumerName='lay'的Message。

    那么上面的表达式如何应用在代码中呢,如

    1 MessageConsumer consumer = session.createConsumer(destination, "JMSMessageId = '1234567890' OR consumerName = 'lay'");

    完整代码

    生产消息

     1 // Create a ConnectionFactory
     2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
     3 
     4 // Create a Connection
     5 Connection connection = connectionFactory.createConnection();
     6 connection.start();
     7 
     8 // Create a Session
     9 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    10 
    11 // Create the destination (Topic or Queue)
    12 Destination destination = session.createQueue("TEST.FOO");
    13 
    14 // Create a MessageProducer from the Session to the Topic or Queue
    15 MessageProducer producer = session.createProducer(destination);
    16 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    17 
    18 // Create a messages
    19 String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
    20 TextMessage message = session.createTextMessage(text);
    21 
    22 // Tell the producer to send the message
    23 System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
    24 producer.send(message);
    25 
    26 // Clean up
    27 session.close();
    28 connection.close();

    消费消息

     1 // Create a ConnectionFactory
     2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
     3 
     4 // Create a Connection
     5 Connection connection = connectionFactory.createConnection();
     6 connection.start();
     7 
     8 connection.setExceptionListener(this);
     9 
    10 // Create a Session
    11 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    12 
    13 // Create the destination (Topic or Queue)
    14 Destination destination = session.createQueue("TEST.FOO");
    15 
    16 // Create a MessageConsumer from the Session to the Topic or Queue
    17 MessageConsumer consumer = session.createConsumer(destination);
    18 
    19 // Wait for a message
    20 Message message = consumer.receive(1000);
    21 
    22 if (message instanceof TextMessage) {
    23     TextMessage textMessage = (TextMessage) message;
    24     String text = textMessage.getText();
    25     System.out.println("Received: " + text);
    26 } else {
    27     System.out.println("Received: " + message);
    28 }
    29 
    30 consumer.close();
    31 session.close();
    32 connection.close();

    原文

    https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS4.html#wp78884

    基本概念

    https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS3.html#wp78636

    JavaDoc

    https://docs.oracle.com/javaee/7/api/javax/jms/package-frame.html

  • 相关阅读:
    leetcode bugfree note
    leetcode 419
    leetcode 165
    leetcode 155
    leetcode 204
    leetcode 28
    将二叉搜索树转为有序双向链表
    leetcode 397
    ABAP 动态内表创建/赋值
    ABAP 屏幕下拉框值根据选择框填值赋值
  • 原文地址:https://www.cnblogs.com/lay2017/p/11080107.html
Copyright © 2011-2022 走看看