zoukankan      html  css  js  c++  java
  • Activemq的消息事务

    消息事务
    消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。
    一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。
    生产者、消费者与消息服务器直接都支持事务性;
    ActionMQ的事务主要偏向在生产者的应用。

    ActionMQ 消息事务流程图:

    image-20201016100112087

    一、生产者事务:

    没有加入事务的时候,会有部分信息过去,结果如图:

    image-20201016101148802

    方式一:

     /**
         * 事务性发送--方案一
         */
        @Test
        public void sendMessageTx(){
            //获取连接工厂
            ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
    
            Session session = null;
            try {
                //创建连接
                Connection connection = connectionFactory.createConnection();
    
                /**
                 * 参数一:是否开启消息事务
                 */
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
                //创建生产者
                MessageProducer producer = session.createProducer(session.createQueue(name));
    
                for(int i=1;i<=10;i++){
                    //模拟异常
                    if(i==4){
                        int a = 10/0;
                    }
    
                    TextMessage textMessage = session.createTextMessage("消息--" + i);
                    producer.send(textMessage);
                }
    
                //注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达MQ服务器
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
                //消息事务回滚
                try {
                    session.rollback();
                } catch (JMSException e1) {
                    e1.printStackTrace();
                }
            }
    
    
        }
    

    结果,没有发送出去

    方式二:

    /**
     * ActiveMQ配置类
     */
    @Configuration
    public class ActiveMQConfig {
    
        /**
         * 添加Jms事务管理器
         */
        @Bean
        public PlatformTransactionManager createTransactionManager(ConnectionFactory connectionFactory){
            return new JmsTransactionManager(connectionFactory);
        }
    
    }
    
    /**
     * 消息发送的业务类
     */
    @Service
    public class MessageService {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
        @Value("${activemq.name}")
        private String name;
    
        @Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
        public void sendMessage(){
            for(int i=1;i<=10;i++) {
                //模拟异常
                if(i==4){
                    int a = 10/0;
                }
    
                jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
            }
        }
    
    }
    
    

    二、消费者事务

    /**
     * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
     */
    @Component // 放入IOC容器
    public class MsgListener {
    
        /**
         * 接收TextMessage的方法
         */
        @JmsListener(destination = "${activemq.name}")
        public void receiveMessage(Message message,Session session){
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
    
                try {
                    System.out.println("接收消息:"+textMessage.getText());
    
    
                    int i=10/0;
    
                    //提交事务
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                    //回滚事务
                    try {
                        session.rollback();//一旦事务回滚,MQ会重发消息,一共重发6次
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }
                }
    
            }
        }
    
    }
    

    注意如果在消费者异常了,会收到消息,然后重发6次,要是期间还是异常,就会到私信队列中

    image-20201016101936691

  • 相关阅读:
    项目Beta冲刺(团队4/7)
    项目Beta冲刺(团队3/7)
    修!咻咻!团队Beta作业博客汇总
    修!咻咻!团队作业博客汇总
    用户使用调查报告
    Beta冲刺总结
    修咻咻对追光的人、云打印团队的Beta产品测试报告
    Beta冲刺(9/7)——2019.5.31
    Beta冲刺(8/7)——2019.5.30
    Beta冲刺(7/7)——2019.5.29
  • 原文地址:https://www.cnblogs.com/dalianpai/p/13825122.html
Copyright © 2011-2022 走看看