zoukankan      html  css  js  c++  java
  • RocketMQ 分布式事务

    在RocketMQ中生产者有三种角色NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),根据名字大概可以看出各个代表着什么作用,我们这里用TransactionProducer(事务)来解决分布式事务问题。

    说到分布式事务,就会谈到那个经典的”账户转账”问题:2个账户,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

    一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

    但这里面有个问题:A是先update DB,后发送消息呢?还是先发送消息,后update DB?
    假设先update DB成功,发送消息网络失败,重发又失败,怎么办?
    假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

    所以,这里下个结论:只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

    那这个问题怎么解决呢??
    为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

    具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

    具体来说,上面的2个步骤,被分解成3个步骤:
    (1) 发送Prepared消息
    (2) update DB
    (3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

    可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

    (1) 执行业务逻辑的部分

    package com.lynch.simple.demo;
    
    import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.common.message.Message;
    
    /**
     * 执行业务逻辑的部分
     * 
     * @author jqlin
     *
     */
    public class TransactionExecuterImpl implements LocalTransactionExecuter {
    
        @Override
        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
            System.out.println("执行本地事务msg = " + new String(msg.getBody()));  
            System.out.println("执行本地事务arg = " + arg);  
      
            //DB操作 应该带上事务 service -> dao
            //如果数据操作失败  需要回滚    同时返回RocketMQ一条失败消息  意味着消费者无法消费到这条失败的消息
            //如果成功 就要返回一条rocketMQ成功的消息,意味着消费者将读取到这条消息
            //o就是attachment
            String tags = msg.getTags();  
            if (tags.equals("transaction2")) {  
                System.out.println("===> 本地事务执行失败,进行MQ ROLLBACK");
                return LocalTransactionState.ROLLBACK_MESSAGE;  
            }  
            
            System.out.println("===> 本地事务执行成功,发送确认消息");
            // return LocalTransactionState.UNKNOW;  
            return LocalTransactionState.COMMIT_MESSAGE;  
        }
        
    }

    (2) 处理事务回查的代码部分

    package com.lynch.simple.demo;
    
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * 处理事务回查的代码部分
     * 
     * @author jqlin
     *
     */
    public class TransactionCheckListenerImpl implements TransactionCheckListener {
    
        @Override
        public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));
            
            //由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
            //可以查询数据库看这条数据是否已经处理
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

    (3) 启动生产者

    package com.lynch.simple.demo;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
    import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
            TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
            // 构造事务消息的生产者
            TransactionMQProducer producer = new TransactionMQProducer("transactionProducer");
            producer.setNamesrvAddr("127.0.0.1:9876");
            // 事务回查最小并发数
            producer.setCheckThreadPoolMinSize(2);
            // 事务回查最大并发数
            producer.setCheckThreadPoolMaxSize(2);
            // 队列数
            producer.setCheckRequestHoldMax(2000);
            // 设置事务决断处理类
            producer.setTransactionCheckListener(transactionCheckListener);
            producer.start();
    
            // 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
            TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
            for (int i = 1; i <= 2; i++) {
                try {
                    String tags = "transaction" + i;
                    String keys = "KEY" + i;
                    byte[] body = ("Hello RocketMQ " + i).getBytes();
                    Message msg = new Message("topicTransaction", tags, keys, body);
                    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                    System.out.println(sendResult);
                } catch (MQClientException e) {
                    e.printStackTrace();
                }
            }
     
            producer.shutdown();
        }
    }

    (4) 启动消费消息

    package com.lynch.simple.demo;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class TransactionConsumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transactionConsumer");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.setConsumeMessageBatchMaxSize(10);
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("topicTransaction", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        try {
                            System.out.println(msg + ",内容:" + new String(msg.getBody()));
                        } catch (Exception e) {
                            e.printStackTrace();
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                        }
                    }
                    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
                }
            });
    
            consumer.start();
            System.out.println("transaction_Consumer Started.");
        }
    }

    重点来了:3.2.6之前的版本这样写就可以了,但是之后的版本被关于事务回查这个接口被阉割了,不会在进行事务回查操作。
    那么向MQ发送消息如果失败的话,会造成A银行扣款成功而B银行收款未成功的数据不一致的情况

    解决办法

  • 相关阅读:
    ASP.NET Core 问题排查:Request.EnableRewind 后第一次读取不到 Request.Body
    解决 AutoMapper ProjectTo 不起作用的问题
    解决 ASP.NET Core 自定义错误页面对 Middleware 异常无效的问题
    ASP.NET Core 从 gitlab-ci 环境变量读取配置
    终于解决 xUnit.net 测试中无法输出到控制台的问题
    ASP.NET Core 新建线程中使用依赖注入的问题
    前端回顾:2016年 JavaScript 之星
    前端工程师和设计师必读文章推荐【系列三十五】
    AsciiMorph
    Notyf
  • 原文地址:https://www.cnblogs.com/linjiqin/p/9561641.html
Copyright © 2011-2022 走看看