zoukankan      html  css  js  c++  java
  • JMS的组成

    1.JMS是什么

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    2.JMS的主要组成部分

    • jms provider    消息中间件/消息服务器也就是MQ
    • jms producer   消息生产者
    • jms consumer  消息消费者
    • jms message   消息

    3.消息结构(jms message)

    消息头:

    红色的为常用的属性

    1.JMSDestination:消息发送的目的地:主要指Queue和Topic,自动分配。
    2.JMSDeliveryMode:传送模式。有两种:持久模式和非持久模式。一条持久性的消息应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传送一次,这意味着服务器出现故障,该消息将永久丢失。自动分配。
    3.JMSExpiration:消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。自动分配。
    4.JMSPriority:消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是4级。自动分配。
    5.JMSMessageID:唯一识别每个消息的标识,由JMS Provider产生。自动分配。
    6.JMSTimestamp:一个JMS Provider在调用send()方法时自动设置的。它是消息被发送和消费者实际接收的时间差。自动分配。
    7.JMSCorrelationID:用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,JMSCorrelationID
    8.JMSReplyTo:提供本消息回复消息的目的地址。由开发者设置。
    9.JMSType
    10.JMSRedelivered

    消息体:

    JMS API定义了5种消息体格式,也叫消息类型,可以使用不同形式发送接收数据,并可以兼容现有的消息格式

    1. Text message : javax.jms.TextMessage,表示一个文本对象。
    2. Map message : javax.jms.MapMessage,表示键值对。
    3. Object message : javax.jms.ObjectMessage,表示一个JAVA对象。
    4. Bytes message : javax.jms.BytesMessage,表示字节数据。
    5. Stream message :javax.jms.StreamMessage,表示java原始值数据流。

    生产者生产不同类型的信息

              //通过session创建消息,
                    TextMessage textMessage = session.createTextMessage("this is msg	"+i);
                    MapMessage mapMessage = session.createMapMessage();
                    mapMessage.setString("key","mapMessage");
                    //用producer将消息发送至MQ的队列里
                    producer.send(textMessage);
                    producer.send(mapMessage);

    消费者消费不同类型的信息

          consumer.setMessageListener(new MessageListener(){
               @Override
               public void onMessage(Message message) {
                    //获取字符串类型的消息
                   if (message!=null&&message instanceof TextMessage) {
                       TextMessage message1 =(TextMessage)message;
                       try {
                           String text = message1.getText();
                           System.out.println(text);
                       } catch (JMSException e) {
                           e.printStackTrace();
                       }
                   }
                   //获取map类型的消息
                   if (message!=null&&message instanceof MapMessage) {
                       MapMessage mapMessage =(MapMessage)message;
                       try {
                           String text = mapMessage.getString("key");
                           System.out.println(text);
                       } catch (JMSException e) {
                           e.printStackTrace();
                       }
                   }
    
               }
           });

    消息属性

    我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。

    一系列的setXXXProperty的方法,参数类型为键值对的方式,设消息附加一些属性

    生产者生产消息时给消息附带消息属性,可以用于区别消息。

    TextMessage textMessage = session.createTextMessage("this is msg	"+i);
    textMessage.setStringProperty("key1","消息属性");
    producer.send(textMessage);

    消费者获取消息时也可以通过key获取到消息的属性

      consumer.setMessageListener(new MessageListener(){
               @Override
               public void onMessage(Message message) {
                    //获取字符串类型的消息
                   if (message!=null&&message instanceof TextMessage) {
                       TextMessage message1 =(TextMessage)message;
                       try {
                           String text = message1.getText();
                           String key1 = message1.getStringProperty("key1");
                           System.out.println(key1);
                           System.out.println(text);
                       } catch (JMSException e) {
                           e.printStackTrace();
                       }
                   }
    }

    4.消息的可靠性

    • 持久化
    • 签收
    • 事务

    持久化:当服务器突然宕机的时候,队列中未被消费的的信息,是否会消失。

    可靠性的一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

    在队列模式中默认是持久化的

    //.创建消息的生产者
    MessageProducer producer = session.createProducer(queue);
    //持久化,当服务器宕机的时候,消息依然存在
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    //非持久化,当服务器宕机的时候消息不存在
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

     持久化Topic

    1. 一定要先运行次消费者,等于应MQ注册, 类似我订阅了这个主题,
    2. 然后再运行生产者发送信息,
    3. 此时,无论消费者是否在线,都会接收到,不在我的话,下次还接的时候, 公把没有收过的消息都接收下来。

    生产者(start的时机要在设置好DeliveryMode)之后,这样的话就可以实现持久化了

    package com.yjc.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class JMSProducer_Topic {
        //ActiveMQ的url
        private static final String ACTIVEMQ_URL="tcp://192.168.118.3:61616";
        //Topic的名称
        private  static final  String TOPIC_NAME="topic001";
    
        public static void main(String[] args) throws JMSException {
            //1.创建ActiveMQ的连接工程,给定ActiveMQ的url,使用默认账号和密码
            ActiveMQConnectionFactory activeFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.创建active的连接对象,抛出或捕获异常,并启动connection
            Connection connection = activeFactory.createConnection();
            //3.创建会话对象,两个参数前者为事务,后者为签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地,目的地的类型为队列(Queue),参数为队列的名称
            //Queue和Topic都继承自Destination接口
            Topic topic = session.createTopic(TOPIC_NAME);
            //5.创建消息的生产者
            MessageProducer producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            connection.start();//设置完持久化再start之后就可以实现持久化了
            for (int i=1;i<=3;i++){
                //6.通过session创建消息,
                TextMessage textMessage = session.createTextMessage("this is msg	"+i);
                textMessage.setStringProperty("key1","消息属性");
                //7.用producer将消息发送至MQ的队列里
                producer.send(textMessage);
            }
            //8.释放资源,先开后关
            producer.close();
            session.close();
            connection.close();
            System.out.println("将消息发送至MQ队列中成功!!!");
    
        }
    }

    消费者

    start()方法的调用时机为durableSubscriber 对象创建完毕之后

    package com.yjc.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class JMSConsumer_Topic {
        //ActiveMQ的url
        private static final String ACTIVEMQ_URL="tcp://192.168.118.3:61616";
        //Topic的名称
        private  static final  String TOPIC_NAME="topic001";
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建ActiveMQ的连接工程,给定ActiveMQ的url,使用默认账号和密码
            ActiveMQConnectionFactory activeFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.创建active的连接对象,抛出或捕获异常,并启动connection
            Connection connection = activeFactory.createConnection();
            //设置客户端的id
            connection.setClientID("yjc");
            //3.创建会话对象,两个参数前者为事务,后者为签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地,目的地的类型为队列(Queue),参数为队列的名称
            //Queue和Topic都继承自Destination接口
               Topic topic=session.createTopic(TOPIC_NAME);
            //----------------------上面的步骤和Producer的一样------------------------------------------------
            //5.创建订阅者
            TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "remake.....");
            connection.start();//此处启动
            Message message = durableSubscriber.receive();
            while (message!=null){
                TextMessage textMessage=(TextMessage)message;
                System.out.println("消息内容为"+textMessage.getText());
                message=durableSubscriber.receive(1000L);
            }
    
            //7.释放资源先开后关
            durableSubscriber.close();
            session.close();
            connection.close();
            System.out.println("消费完毕!!!");
        }
    }

    5.消息的事务

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//代表生产者不开启事务

     生产者

      false:代表生产者执行完send()方法之后,该信息就会被提交到队列之中

      true:代表生产者做的操作不会被立即提交,需要使用session对象调用commit()方法,之前的操作才会被真正提交,当然也可以回滚

     消费者

      false:代表消费者消费完一条信息之后,消息会立马从消息队列中出列

      true:代表消费者消费完信息之后,如果不commit的话,所消费的消息就不会出列,这样的话就会有重复消费的可能性

    6.签收

    事务偏生产者,签收偏消费者

    非事务下的签收

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//代表不开事务,默认为自动签收

    即使消费了消息,不签收成功的话,消息也是不会出列的

    [ Session.AUTO_ACKNOWLEDGE ] 自动签收(默认)

    当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。

    [ Session.CLIENT_ACKNOWLEDGE ]手动签收

    客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。

    [ Session.DUPS_OK_ACKNOWLEDGE ]允许重复消息

    Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。不常用

    事务下的签收

    事务性会话中,当一个事务被成功提交则消息被自动签收。也就是说,在事务的开启的情况下,无论是自动签收或者是手动签收,只要事务成功提交了,消息就会被自动签收,在手动签收的情况下

    不调用消息(Message)的acknowledge方法消息也会被签收,消息也会出列。而如果你调用消息(Message)的acknowledge方法进行签收,但是没有commit的话也是不会签收的。

    如果事务回滚,则消息会被再次传送。

    非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgementmode)

  • 相关阅读:
    linux下模拟CISCO设备输出NetFlow数据 规格严格
    国际:十个习惯让你精通新的开发技术
    国人眼中的Scrum
    NetBeans 时事通讯(刊号 # 2 Apr 08, 2008)
    Ruby语言的发展趋势和启示
    离开几天
    Ruby语言的发展趋势和启示
    だ い が く せ い か つ
    NetBeans 时事通讯(刊号 # 2 Apr 08, 2008)
    离开几天
  • 原文地址:https://www.cnblogs.com/yjc1605961523/p/11979189.html
Copyright © 2011-2022 走看看