zoukankan      html  css  js  c++  java
  • ActiveMQ消息的事务和签收机制

    一、消息的事务性

    1、生产者  

      一、生产者不开启事务

    public class JmsQueueProducer {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String QUEUE_NAME = "queue01";
        public static final String TEXT_MESSAGE_NAME = "textMessage";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            Connection connection = connectionFactory.createConnection();
            connection.start();
    		// 两个参数 transacted=事务,acknowledgeMode=签收模式
    		// 如果生产者不开启事务,也就是transacted=false,那么第二个参数只要是符合传参的规范,就能将消息发送至MQ服务器中
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    	
            Queue queue = session.createQueue(QUEUE_NAME);
            MessageProducer producer = session.createProducer(queue);     
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 1; i < 4; i++) {
                TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
                producer.send(queue, textMessage);
            }
            // 释放资源
            producer.close();
            session.close();
            connection.close();
            System.out.println("生产者发送消息至MQ QUEUE......");
        }
    }

      可以看出第二个参数是int类型,它的规范的值有下面这一些:

        Session.SESSION_TRANSACTED

        Session.ATUO_ACKNOWLEDGE

        Session.CLIENT_ACKNOWLEDGE

        Session.DUPS_KO_ACKNOWKEDGE

      如果我们给acknowledge设置一个int类型的值5,看看会有什么现象

    // 给第二个参数设置int类型的数字5
    Session session = connection.createSession(false, 5);  

      运行程序发现报错了,报错的意思就是int类型的数字 5 不是一个有效的参数,能使用的参数下面列举出来了.

    Exception in thread "main" javax.jms.JMSException: invalid acknowledgeMode: 5.
    Valid values are 
    Session.AUTO_ACKNOWLEDGE (1), 
    Session.CLIENT_ACKNOWLEDGE (2), 
    Session.DUPS_OK_ACKNOWLEDGE (3), 
    ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4)
    or for transacted sessions Session.SESSION_TRANSACTED (0)
    	at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:334)
    	at queue.JmsQueueProducer.main(JmsQueueProducer.java:23)
    

      二、生产者开启事务

    public class JmsQueueProducer {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String QUEUE_NAME = "queue01";
        public static final String TEXT_MESSAGE_NAME = "textMessage";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            MessageProducer producer = session.createProducer(queue);
    
            try {
                for (int i = 1; i < 11; i++) {
                    TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
                    producer.send(queue, textMessage);
                   if(i<=5){
                       session.commit();
                   }else{
                       session.rollback();
                   }
                }
                System.out.println("生产者成功发送消息至MQ QUEUE......");
            } catch (Exception e) {
                System.out.println("如果出现异常,消息回滚");
                session.rollback();
            } finally {
                // 释放资源
                producer.close();
                session.close();
                connection.close();
            }
        }
    }
    

      小结:

      1、生产者不开启事务的情况下,只要执行send()方法,消息就可以发送至队列,它会默认帮我们commit(),第二个参数只要是符合语法规范的有效值就可以了,理论上是任意的,但是我们比较喜欢使用ATUO_ACKNOWLEDGE

      2、生产者开启事务的情况下,执行send()方法并不会把消息发送至MQ服务器,还需要手动commit()进行提交,如果出现异常的情况下可以使用session.rollback进行回滚,由于对于生产者来说,事务的级别高于消息的应答,所以第二个参数可以忽略,一般ATUO_ACKNOWLEDGE就可以了.

      3、对于生产者来说,生产者偏向于事务,生产者发送消息到MQ队列中,不关心签收机制是什么,所以不管开启不开启事务,第二个参数一般只要给一个AUTO_ACKNOWLEDGE就可以了

    2、消费者

      、消费者不开启事务,自动签收的情况下,只能消费一次消息,不会重复消费消息

    // 消费者不开启事务,自动应答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      、消费者开启事务、自动签收、并且手动commit(),那么消息就只能被消费一次,这样就可以保证一定的可靠性,否则消息会被重复消费

    // 消费者开启事务,自动签收
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    // 在session关闭之前,使用session.commit()手动提交
    session.commit();
    

      

    二、消息的签收机制 

      般签收机制只对于消费者来说,对于生产者来说,它只负责发送消息,消息的签收机制对它来说没有太大意义

      1、消费者不开启事务,自动签收的情况下,消费者可以消费消息,并且只能消费一次消息,不会存在重复消费的情况

    // 消费者不开启事务,自动签收
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    

      2、消费者开启事务,自动签收的情况下,必须使用commit手动提交,否则重复消费消息

    // 消费者开启事务,自动签收
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    // 如果没有手动提交,消费者可以重复消费消息,并且队列中的消息不会出队列
    session.commit();
    

      3、消费者不开启事务,手动签收,必须调用acknowledge()方法来确认签收,否则会重复消费消息

    // 消费者不开启事务,手动签收
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    // 在自动签收的情况下使用message.acknowledge()方法进行签收,如果不手动调用ackonwledge()方法,那么消息会被重复消费
    textMessage.acknowledge();
    

      4、消费者开启事务,手动签收,只调用acknowledge()方法,消息会被重复消费

    // 消费者开启事务,手动签收
    Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
    // 如果只加上message.acknowledge()方法,消息会被重复消费
    textMessage.acknowledge();
    

      5、消费者开启事务,手动签收,只调用commit()方法,消息不会被重复消费(事务的决定性>签收机制???)

    // 消费者开启事务,手动签收
    Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
    // 如果只加上session.commit()方法,消息不会重复消费
    session.commit();
    

      6、消费者开启事务,手动签收,调用commit()方法和acknowledge()方法,消息不会重复消费

    // 消费者开启事务,手动签收
    Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
    // 加上message.acknowledge()方法
    textMessage.acknowledge();
    // 加上session.commit()方法
    session.commit();
    

      

    三、总结

      1、生产者

        一、对于生产者来说,它关心的仅仅只是事务,生产者只负责发送消息,消息是否签收,以及消息的签收机制对他来说没有很大意义

        二、如果生产者不开启事务,我们为了符合规范,消息签收机制那里我们给一个Session.AUTO_ACKONWLEDGE就可以了

        三、如果生产者开启事务,我们为了符合规范,消息签收机制那里我们给一个AUTO_ACKONWLEDGE,并且需要手动调用session.commit(),否则消息不会发送至MQ服务器

      2、消费者

        一、消费者一般关心的是签收方式,我们一般不开启事务,如果是自动签收,我们什么都不需要做,如果是手动签收,那么我们需要手动调用message.acknowledge()方法

        二、消费者特殊情况下如果使用事务,就需要调用session.commit()方法.

        三、特殊情况下,如果既开启了事务,又是手动签收,单单开启事务就可以避免消费重复消息

        四、为了避免出错,消费者消费消息的时候,如果开启了事务,就调用session.commit(),自动签收就调用acknowledge,使它们成对的出现,这样可以避免不必要的错误.

  • 相关阅读:
    元数据管理
    sqoop 安装
    postgres 索引
    postgres 表和库等信息大小统计
    Perl基础语法
    Perl 认识简介
    Oracle层次查询start with connect by
    jquery.cookie.js 的使用指南
    JavaScript中cookie使用
    CSS实现垂直居中的4种思路
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/13715488.html
Copyright © 2011-2022 走看看