zoukankan      html  css  js  c++  java
  • 52.RocketMQ 事务

    今天的博客有点多,因为前几天一直用笔记录,今天都补上了。后面的博客先停一段时间,后面还有dubbo、storm、kafka、solor、nginx、keepalived、fastdfs等内容,只是因为最近准备跳槽,停更一段时间,等到新公司后再继续更新。

    场景1:支付宝转1w到余额宝,支付宝扣了1w,服务挂了怎么办?余额还没有加上

    场景2:订单系统和库存系统如何保持一致

    如果是本地的话很好解决

    • begin transaction
        update 支付宝 - 1w;
        update 余额宝 + 1W;
      end transaction
    • 用Spring的话,方法上加 @Transaction注释

    那如果是跨系统的呢?该如何解决?

    有一种思路是这样的:

    1.  client发送转账请求给事务协调器
    2. 事务协调器先发送扣款请求给支付宝,返回执行结果(这时并没有提交)
    3. 事务协调器在发送加款请求给余额宝,返回执行结果(这时也没有提交)
    4. 事务协调器看两个执行结果都返回OK 就执行第四步,提交2和3没有提交的更新请求。

    但是这个有个问题,那就是性能很受影响,主要卡在事务协调器这里。

    RocketMQ的实现方式如下(图片来自网络):

    支付宝先生成 扣款信息 --> 消息队列 --> 余额宝消费消息

    发送消息:

     1 import com.alibaba.rocketmq.client.exception.MQClientException;
     2 import com.alibaba.rocketmq.client.producer.SendResult;
     3 import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
     4 import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
     5 import com.alibaba.rocketmq.common.message.Message;
     6 
     7 
     8 /**
     9  * 发送事务消息例子
    10  * 
    11  */
    12 public class TransactionProducer {
    13     public static void main(String[] args) throws MQClientException, InterruptedException {
    14 
    15         TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
    16         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    17         // 事务回查最小并发数
    18         producer.setCheckThreadPoolMinSize(2);
    19         // 事务回查最大并发数
    20         producer.setCheckThreadPoolMaxSize(2);
    21         // 队列数
    22         producer.setCheckRequestHoldMax(2000);
    23         producer.setTransactionCheckListener(transactionCheckListener);
    24         producer.start();
    25 
    26         String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
    27         TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
    28         for (int i = 0; i < 100; i++) {
    29             try {
    30                 Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
    31                     ("Hello RocketMQ " + i).getBytes());
    32                 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
    33                 System.out.println(sendResult);
    34 
    35                 Thread.sleep(10);
    36             }
    37             catch (MQClientException e) {
    38                 e.printStackTrace();
    39             }
    40         }
    41 
    42         for (int i = 0; i < 100000; i++) {
    43             Thread.sleep(1000);
    44         }
    45 
    46         producer.shutdown();
    47 
    48     }
    49 }

    执行本地事务

     1 import java.util.concurrent.atomic.AtomicInteger;
     2 
     3 import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
     4 import com.alibaba.rocketmq.client.producer.LocalTransactionState;
     5 import com.alibaba.rocketmq.common.message.Message;
     6 
     7 
     8 /**
     9  * 执行本地事务
    10  */
    11 public class TransactionExecuterImpl implements LocalTransactionExecuter {
    12     private AtomicInteger transactionIndex = new AtomicInteger(1);
    13 
    14 
    15     @Override
    16     public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
    17         int value = transactionIndex.getAndIncrement();
    18 
    19         if (value == 0) {
    20             throw new RuntimeException("Could not find db");
    21         }
    22         else if ((value % 5) == 0) {
    23             return LocalTransactionState.ROLLBACK_MESSAGE;
    24         }
    25         else if ((value % 4) == 0) {
    26             return LocalTransactionState.COMMIT_MESSAGE;
    27         }
    28 
    29         return LocalTransactionState.UNKNOW;
    30     }
    31 }

    服务器回查客户端(这个功能在开源版本中已经被咔掉了,但是我们还是要写,不然报错)

     1 import java.util.concurrent.atomic.AtomicInteger;
     2 
     3 import com.alibaba.rocketmq.client.producer.LocalTransactionState;
     4 import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
     5 import com.alibaba.rocketmq.common.message.MessageExt;
     6 
     7 
     8 /**
     9  * 未决事务,服务器回查客户端
    10  */
    11 public class TransactionCheckListenerImpl implements TransactionCheckListener {
    12     private AtomicInteger transactionIndex = new AtomicInteger(0);
    13 
    14 
    15     @Override
    16     public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
    17         System.out.println("server checking TrMsg " + msg.toString());
    18 
    19         int value = transactionIndex.getAndIncrement();
    20         if ((value % 6) == 0) {
    21             throw new RuntimeException("Could not find db");
    22         }
    23         else if ((value % 5) == 0) {
    24             return LocalTransactionState.ROLLBACK_MESSAGE;
    25         }
    26         else if ((value % 4) == 0) {
    27             return LocalTransactionState.COMMIT_MESSAGE;
    28         }
    29 
    30         return LocalTransactionState.UNKNOW;
    31     }
    32 }

    到这就完了,为什么只介绍发送不介绍接收呢?因为一旦消息提交到MQ就不用管了, 要相信MQ会把消息送达consumer,如果消息未能被成功消费的话,那么Producer也会回滚

    如何保证分布式系统的全局性事务?

    因为阿里在3.2.6版本后,砍掉了消息回查的功能,也就是consumer端是否成功消费,Producer端并不知道,所以如果要保证全局性事务,我们要有自己的实现机制:

  • 相关阅读:
    在VMware中为Red Hat配置静态ip并可访问网络-Windows下的VMware
    03-nginx虚拟主机配置
    解决nginx: [emerg] bind() to [::]:80 failed (98: Address already in use)
    02-nginx信号量
    RedHat Linux设置yum软件源为本地ISO
    01-nginx介绍及编译安装
    Linux.负载均衡
    01-MySQL优化大的思路
    10 华电内部文档搜索系统 search02
    10 华电内部文档搜索系统 search03
  • 原文地址:https://www.cnblogs.com/sigm/p/6720499.html
Copyright © 2011-2022 走看看