zoukankan      html  css  js  c++  java
  • Seata尝试及源码解读

    官网

    https://seata.io/zh-cn/docs/overview/what-is-seata.html

    seata建表语句
    https://gitee.com/dhb414/seata/blob/master/script/server/db/mysql.sql

    undo_log https://gitee.com/dhb414/seata/blob/master/script/client/at/db/mysql.sql

    配置:

    #阿里分布式事务配置
    seata:
      service:
        vgroup-mapping:
          #这里的组名my_test_tx_group就是上面已经配置过的
          # seata-server 对应的就是register.conf里的application选项的内容
          my_test_tx_group1: seata-server
        grouplist:
          #这里对应的就是上面的seata-server,后面的蚕食seata服务的IP地址和端口号
          seata-server: l27.0.0.1:9091
        enable-degrade: false
        disable-global-transaction: false

    Demo模型

    排坑

    mysql-connector-java不能用5.1.47,这个版本有bug,以前不会触发,使用seata后会触发

    undo_log滞留的问题

    AT模式自测用例

     

    异步调用的4种情况:

    1、主事务调用分支事务之前;2、主事务和分支事务均未结束;3、分支事务结束而主事务没结束;4、主事务结束而分支事务没结束;

    异步调用出问题的两种情况:

    1、主事务成功,分支事务失败;

    2、主事务失败,分支事务成功;

    一TM一RM,Feign方式(同步)

    同步调用的3个阶段:

    1、主事务调用分支事务之前;

    2、分支事务结束并返回之前;

    3、分支事务提交后,且主事务提交前

    同步调用出问题的情况:

    1、主事务成功,分支事务失败

    2、主事务失败,分支事务成功

    正常执行

    观察每个阶段的undo_log和TC的记录

    阶段一:只有TC全局事务注册

    阶段二:只有TC的全局事务表有记录

    阶段二(主事务没有使用@Transactional(rollbackFor = Exception.class)):主事务提交,主事务undo存在;分支事务啥都没有;TC全局事务有,锁了主事务,分支事务表注册了主事务

    阶段二附加:接阶段二,分支一直断点,主事务会超时而失败,此时TC的全局事务也没有了。等到分支断点放开,分支提交也会因为没有全局事务而失败

    阶段二附加(主事务没有使用@Transactional(rollbackFor = Exception.class)):主事务超时失败了,并且发生回滚,全局事务也会回滚,但此时分支事务处于断点,放开断点后分支事务提交时,发现没有全局事务,分支事务也会回滚

    阶段三:主事务已提交有undolog;分支事务已提交有undolog;TC全局事务一个,全局锁两个,分支事务注册两个

    观察最终结果是否正确

    全部符合预期

     

    主事务成功,分支事务失败(主事务捕获掉异常)

    观察每个阶段的undo_log和TC的记录

    观察最终效果是否正确

    seata认为,主事务捕获异常代表不关心分支事务,所以主事务正常提交

    分支事务由于发生异常,没有正常提交

    主事务失败,分支事务成功

    观察每个阶段的undo_log和TC的记录

    处于阶段三时:跟正常执行的阶段三相同

    观察最终效果是否正确

    所有分支均回滚

     

    源码解读

    Seata 中有三大模块,分别是 TM、RM 和 TC。其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

    角色划分:
    TM: 事务管理,开启、提交、回滚分布式事务
    RM: 资源管理,注册、汇报、执资源,负责接收TC发过来的提交、回滚消息,并作出提交,回滚操作
    TC: 事务管理器服务功能,存储事务日志、补偿异常事务等、集中管理事务全局锁(全局行锁)


    事务执行整体流程:
    • TM 开启分布式事务(TM 向 TC 注册全局事务记录);
    • 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
    • TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
    • TC 汇总事务信息,决定分布式事务是提交还是回滚;
    • TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;

    1、springboot-starter 启动 

    SeataAutoConfiguration

    1 @Bean
    2 @DependsOn({"springUtils"})
    3 @ConditionalOnMissingBean(GlobalTransactionScanner.class) //在容器加载它作用的bean时,检查容器中是否存在目标类型的bean,如果存在这跳过原始bean的BeanDefinition加载动作。
    4 public GlobalTransactionScanner globalTransactionScanner() {
    5 if (LOGGER.isInfoEnabled()) {
    6 LOGGER.info("Automatically configure Seata");
    7 }
    8 return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());
    9 }

    2、初始化

    1、、通过Spring的InitializingBean初始化

     1 public class GlobalTransactionScanner extends AbstractAutoProxyCreator
     2     implements InitializingBean, ApplicationContextAware,
     3     DisposableBean{
     4 
     5 //InitializingBean实现方法,spring自动调用
     6 @Override
     7 public void afterPropertiesSet() {
     8 if (disableGlobalTransaction) {
     9 return;
    10 }
    11 //初始化
    12 initClient();
    13 }
    14 
    15 private void initClient() {
    16 //init TM register TM success
    17 TMClient.init(applicationId, txServiceGroup);
    18 //init RM register RM success
    19 RMClient.init(applicationId, txServiceGroup);
    20 //注册钩子事件,封装销毁操作
    21 registerSpringShutdownHook();
    22 }
    23 }

    2.1 Rm netty Channel 启动

    1)启动ScheduledExecutorService定时执行器,每5秒尝试进行一次重连TC

    2)重连时,先从file.conf中根据分组名称(service_group)找到集群名称(cluster_name)

    3)再根据集群名称找到fescar-server集群ip端口列表

    4)从ip列表中选择一个用netty进行连接

     

    public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
        implements RegisterMsgListener, ClientMessageSender {
        @Override
        public void init() {
            clientBootstrap.start();
            //启动ScheduledExecutorService定时执行器,每5秒尝试进行一次重连TC
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    clientChannelManager.reconnect(getTransactionServiceGroup());
                }
            }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
            if (NettyClientConfig.isEnableClientBatchSendRequest()) {
            //用于多数据合并,减少通信次数
                mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                    MAX_MERGE_SEND_THREAD,
                    KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
                mergeSendExecutorService.submit(new MergedSendRunnable());
            }
            super.init();
        }
        }

    2.2 Tm netty Channel 启动

     1 public class RMClient {
     2 
     3     public static void init(String applicationId, String transactionServiceGroup) {
     4         RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
     5         //资源管理器ResourceManager
     6         rmRpcClient.setResourceManager(DefaultResourceManager.get());
     7         //消息回调监听器,rmHandler用于接收TC在二阶段发出的提交或者回滚请求
     8         rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
     9         rmRpcClient.init();
    10     }
    11 }

    注:此处用到了Java Spi拓展机制,可插拔
    1
    public class DefaultResourceManager implements ResourceManager { 2 protected void initResourceManagers() { 3 //init all resource managers 4 List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class); 5 if (CollectionUtils.isNotEmpty(allResourceManagers)) { 6 for (ResourceManager rm : allResourceManagers) { 7 resourceManagers.put(rm.getBranchType(), rm); 8 } 9 } 10 } 11 }

    2.3 

    根据注解开启 aop切面

    根据@GlobalTransactional注释的方法,通过GlobalTransactionalInterceptor过滤器加入cglib切面,并new TransactionalTemplate开启事务

     1 //BeanPostProcessor后置处理器
     2     @Override
     3     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
     4         if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
     5             DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
     6             Class<?>[] interfaces = SpringProxyUtils.getAllInterfaces(bean);
     7             return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvocationHandler() {
     8                 @Override
     9                 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    10                     Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
    11                     if (null != m) {
    12                         return m.invoke(dataSourceProxy, args);
    13                     } else {
    14                         boolean oldAccessible = method.isAccessible();
    15                         try {
    16                             method.setAccessible(true);
    17                             return method.invoke(bean, args);
    18                         } finally {
    19                             //recover the original accessible for security reason
    20                             method.setAccessible(oldAccessible);
    21                         }
    22                     }
    23                 }
    24             });
    25         }
    26         return super.postProcessAfterInitialization(bean, beanName);
    27     }

    3、事务一阶段

    3.1拦截器

     1 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener,MethodInterceptor {
     2        
     3     @Override
     4     public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
     5         Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
     6             : null;
     7         Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
     8         final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
     9 
    10         final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    11         final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    12         if (!disable && globalTransactionalAnnotation != null) {
    13             //全局事务开始
    14             return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    15         } else if (!disable && globalLockAnnotation != null) {
    16             //全局锁
    17             return handleGlobalLock(methodInvocation);
    18         } else {
    19             return methodInvocation.proceed();
    20         }
    21     }

    3.2开始事务

    TransactionalTemplate

     1 public Object execute(TransactionalExecutor business) throws Throwable {
     2 
     3 // 1. 根据xid判断是否新建事务
     4 GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
     5 
     6 // 1.1 get transactionInfo
     7 TransactionInfo txInfo = business.getTransactionInfo();
     8 if (txInfo == null) {
     9 throw new ShouldNeverHappenException("transactionInfo does not exist");
    10 }
    11 try {
    12 
    13 // 2. begin transaction
    14 try {
    15 //暂时无用
    16 triggerBeforeBegin();
    17 //开启事务
    18 tx.begin(txInfo.getTimeOut(), txInfo.getName());
    19 //暂时无用
    20 triggerAfterBegin();
    21 } catch (TransactionException txe) {
    22 throw new TransactionalExecutor.ExecutionException(tx, txe,
    23 TransactionalExecutor.Code.BeginFailure);
    24 
    25 }
    26 
    27 Object rs = null;
    28 try {
    29 
    30 // Do Your Business
    31 rs = business.execute();
    32 
    33 } catch (Throwable ex) {
    34 
    35 // 3.the needed business exception to rollback.
    36 completeTransactionAfterThrowing(txInfo,tx,ex);
    37 throw ex;
    38 }
    39 
    40 // 4. everything is fine, commit.
    41 commitTransaction(tx);
    42 
    43 return rs;
    44 } finally {
    45 //5. clear
    46 triggerAfterCompletion();
    47 cleanUp();
    48 }
    49 }

    真正执行事务开始的地方

     1 public class DefaultGlobalTransaction implements GlobalTransaction {
     2     
     3     @Override
     4     public void begin(int timeout, String name) throws TransactionException {
     5         //此处的角色判断有关键的作用
     6         //表明当前是全局事务的发起者(Launcher)还是参与者(Participant)
     7         //如果在分布式事务的下游系统方法中也加上GlobalTransactional注解
     8         //那么它的角色就是Participant,即会忽略后面的begin就退出了
     9         //而判断是发起者(Launcher)还是参与者(Participant)是根据当前上下文是否已存在XID来判断
    10         //没有XID的就是Launcher,已经存在XID的就是Participant
    11         if (role != GlobalTransactionRole.Launcher) {
    12             check();
    13             if (LOGGER.isDebugEnabled()) {
    14                 LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
    15             }
    16             return;
    17         }
    18         if (xid != null) {
    19             throw new IllegalStateException();
    20         }
    21         if (RootContext.getXID() != null) {
    22             throw new IllegalStateException();
    23         }
    24         //具体开启事务的方法,获取TC返回的XID,具体由DefaultTransactionManager操作
    25         xid = transactionManager.begin(null, null, name, timeout);
    26         status = GlobalStatus.Begin;
    27         RootContext.bind(xid);
    28         if (LOGGER.isDebugEnabled()) {
    29             LOGGER.debug("Begin a NEW global transaction [" + xid + "]");
    30         }
    31     }
    32 }

    3.3 数据源代理

      1 public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
      2 
      3         @Override
      4     public boolean execute(String sql) throws SQLException {
      5         this.targetSQL = sql;
      6         return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
      7             @Override
      8             public Boolean execute(T statement, Object... args) throws SQLException {
      9                 return statement.execute((String) args[0]);
     10             }
     11         }, sql);
     12     }
     13 }
     14 
     15 public class ExecuteTemplate{
     16     
     17    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
     18                                                      StatementProxy<S> statementProxy,
     19                                                      StatementCallback<T, S> statementCallback,
     20                                                      Object... args) throws SQLException {
     21         if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
     22         // 未开启全局事务时,正常执行
     23             return statementCallback.execute(statementProxy.getTargetStatement(), args);
     24         }
     25         //解析SQL
     26         if (sqlRecognizer == null) {
     27             sqlRecognizer = SQLVisitorFactory.get(
     28                     statementProxy.getTargetSQL(),
     29                     statementProxy.getConnectionProxy().getDbType());
     30         }
     31         Executor<T> executor = null;
     32         if (sqlRecognizer == null) {
     33             executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
     34         } else {
     35             //对不同的SQL类型特殊处理
     36             switch (sqlRecognizer.getSQLType()) {
     37                 case INSERT:
     38                     executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
     39                     break;
     40                 case UPDATE:
     41                     executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
     42                     break;
     43                 case DELETE:
     44                     executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
     45                     break;
     46                 case SELECT_FOR_UPDATE:
     47                     executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
     48                     break;
     49                 default:
     50                     executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
     51                     break;
     52             }
     53         }
     54         T rs = null;
     55         try {
     56             //真正执行业务逻辑
     57             rs = executor.execute(args);
     58         } catch (Throwable ex) {
     59             if (!(ex instanceof SQLException)) {
     60                 // Turn other exception into SQLException
     61                 ex = new SQLException(ex);
     62             }
     63             throw (SQLException)ex;
     64         }
     65         return rs;
     66     }
     67 }
     68 
     69 
     70 public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
     71 
     72     //接下来执行到这里
     73       @Override
     74     public T doExecute(Object... args) throws Throwable {
     75         AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
     76         if (connectionProxy.getAutoCommit()) {
     77             return executeAutoCommitTrue(args);
     78         } else {
     79             return executeAutoCommitFalse(args);
     80         }
     81     }
     82 
     83     protected T executeAutoCommitFalse(Object[] args) throws Exception {
     84         //业务SQL执行前快照
     85         TableRecords beforeImage = beforeImage();
     86         //真正执行业务SQL
     87         T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
     88         //业务SQL执行后快照
     89         TableRecords afterImage = afterImage(beforeImage);
     90         //准备快照
     91         prepareUndoLog(beforeImage, afterImage);
     92         return result;
     93     }
     94     
     95     
     96     protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
     97         if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
     98             return;
     99         }
    100         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    101         TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    102         String lockKeys = buildLockKey(lockKeyRecords);
    103         connectionProxy.appendLockKey(lockKeys);
    104         SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    105         connectionProxy.appendUndoLog(sqlUndoLog);
    106     }
    107 }

     

    3.4 分支事务注册与事务提交

     1 public class ConnectionProxy extends AbstractConnectionProxy {
     2     
     3     private void doCommit() throws SQLException {
     4         //全局事务提交
     5         if (context.inGlobalTransaction()) {
     6             processGlobalTransactionCommit();
     7         //全局锁提交
     8         } else if (context.isGlobalLockRequire()) {
     9             processLocalCommitWithGlobalLocks();
    10         //正常提交
    11         } else {
    12             targetConnection.commit();
    13         }
    14     }
    15     
    16         //全局事务提交
    17     private void processGlobalTransactionCommit() throws SQLException {
    18         try {
    19             //注册branchId,并保存到上下文
    20             register();
    21         } catch (TransactionException e) {
    22             recognizeLockKeyConflictException(e, context.buildLockKeys());
    23         }
    24         try {
    25             if (context.hasUndoLog()) {
    26                 //如果包含undolog,则将之前绑定到上下文中的undolog进行入库
    27                 UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
    28             }
    29             //本地事务提交
    30             targetConnection.commit();
    31         } catch (Throwable ex) {
    32             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    33             //通过RMClient汇报TC结果
    34             report(false);
    35             throw new SQLException(ex);
    36         }
    37         if (IS_REPORT_SUCCESS_ENABLE) {
    38             //通过RmRpcClient汇报TC结果
    39             report(true);
    40         }
    41         context.reset();
    42     }
    43     //注册branchId,并保存到上下文
    44     private void register() throws TransactionException {
    45         Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
    46         null, context.getXid(), null, context.buildLockKeys());
    47         context.setBranchId(branchId);
    48     }
    49 }

     

    至此一阶段事务完成

    4、事务二阶段

    在RMClient初始化时,启动了RMHandlerAT接收TC在二阶段发出的提交或者回滚请求

     1 public abstract class AbstractRMHandler extends AbstractExceptionHandler
     2     implements RMInboundHandler, TransactionMessageHandler {
     3 
     4     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);
     5 
     6     @Override
     7     public BranchCommitResponse handle(BranchCommitRequest request) {
     8         BranchCommitResponse response = new BranchCommitResponse();
     9         exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
    10             @Override
    11             public void execute(BranchCommitRequest request, BranchCommitResponse response)
    12                 throws TransactionException {
    13                 doBranchCommit(request, response);
    14             }
    15         }, request, response);
    16         return response;
    17     }
    18 
    19     @Override
    20     public BranchRollbackResponse handle(BranchRollbackRequest request) {
    21         BranchRollbackResponse response = new BranchRollbackResponse();
    22         exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
    23             @Override
    24             public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
    25                 throws TransactionException {
    26                 doBranchRollback(request, response);
    27             }
    28         }, request, response);
    29         return response;
    30     }
    31 }

    全局提交

     交时,RM只需删除Undo_log表

      1 //AT模式下,最终是由AsyncWorker执行提交
      2 public class AsyncWorker implements ResourceManagerInbound { 
      3 @Override
      4     public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
      5                                      String applicationData) throws TransactionException {
      6         //加入BlockingQueue
      7         if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
      8             LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
      9                 + "] will be handled by housekeeping later.");
     10         }
     11         return BranchStatus.PhaseTwo_Committed;
     12     }
     13 
     14     public synchronized void init() {
     15         LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
     16         timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
     17         //每秒执行
     18         timerExecutor.scheduleAtFixedRate(new Runnable() {
     19             @Override
     20             public void run() {
     21                 try {
     22                     //提交
     23                     doBranchCommits();
     24                 } catch (Throwable e) {
     25                     LOGGER.info("Failed at async committing ... " + e.getMessage());
     26 
     27                 }
     28             }
     29         }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
     30     }
     31 
     32     private void doBranchCommits() {
     33         if (ASYNC_COMMIT_BUFFER.size() == 0) {
     34             return;
     35         }
     36         Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
     37         while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
     38             Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
     39             List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
     40            //根据resourceId分组
     41             if (contextsGroupedByResourceId == null) {
     42                 contextsGroupedByResourceId = new ArrayList<>();
     43                 mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
     44             }
     45             contextsGroupedByResourceId.add(commitContext);
     46         }
     47         for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
     48             Connection conn = null;
     49             DataSourceProxy dataSourceProxy;
     50             try {
     51                 try {
     52                     DataSourceManager resourceManager = (DataSourceManager)DefaultResourceManager.get()
     53                         .getResourceManager(BranchType.AT);
     54                     //根据resourceId查找对应dataSourceProxy
     55                     dataSourceProxy = resourceManager.get(entry.getKey());
     56                     if (dataSourceProxy == null) {
     57                         throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
     58                     }
     59                     conn = dataSourceProxy.getPlainConnection();
     60                 } catch (SQLException sqle) {
     61                     LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
     62                     continue;
     63                 }
     64                 List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
     65                 Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
     66                 Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
     67                 for (Phase2Context commitContext : contextsGroupedByResourceId) {
     68                     xids.add(commitContext.xid);
     69                     branchIds.add(commitContext.branchId);
     70                     int maxSize = xids.size() > branchIds.size() ? xids.size() : branchIds.size();
     71                     //1000个一起执行
     72                     if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
     73                         try {
     74                             //删除undo_log
     75                             UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
     76                                 xids, branchIds, conn);
     77                         } catch (Exception ex) {
     78                             LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
     79                         }
     80                         xids.clear();
     81                         branchIds.clear();
     82                     }
     83                 }
     84                 if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
     85                     return;
     86                 }
     87                 //剩余未满1000的,在执行一次
     88                 try {
     89                     //删除undo_log
     90                     UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
     91                         branchIds, conn);
     92                 } catch (Exception ex) {
     93                     LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
     94                 }
     95             } finally {
     96                 if (conn != null) {
     97                     try {
     98                         conn.close();
     99                     } catch (SQLException closeEx) {
    100                         LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
    101                     }
    102                 }
    103             }
    104         }
    105     }
    106 }

    4.2 全局回滚

    全局回滚时,根据undo_log回滚

     

    public abstract class AbstractUndoLogManager implements UndoLogManager {
        
            protected static final String SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE "
            + ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " = ? AND " + ClientTableColumnsName.UNDO_LOG_XID
            + " = ? FOR UPDATE";
    
         @Override
        public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
            Connection conn = null;
            ResultSet rs = null;
            PreparedStatement selectPST = null;
            boolean originalAutoCommit = true;
    
            for (; ; ) {
                try {
                    conn = dataSourceProxy.getPlainConnection();
    
                    // The entire undo process should run in a local transaction.
                    if (originalAutoCommit = conn.getAutoCommit()) {
                        conn.setAutoCommit(false);
                    }
    
                    //根据Xid查询出数据
                    selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                    selectPST.setLong(1, branchId);
                    selectPST.setString(2, xid);
                    rs = selectPST.executeQuery();
    
                    boolean exists = false;
                    while (rs.next()) {
                        exists = true;
    
                        //防重复提交
                        int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                        if (!canUndo(state)) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                            }
                            return;
                        }
    
                        String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                        Map<String, String> context = parseContext(contextString);
                        Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
                        byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
    
                        String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                        UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                            : UndoLogParserFactory.getInstance(serializer);
                        BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
    
                        try {
                            // put serializer name to local
                            setCurrentSerializer(parser.getName());
                            List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                            if (sqlUndoLogs.size() > 1) {
                                Collections.reverse(sqlUndoLogs);
                            }
                            //反解析出回滚SQL并执行
                            for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy).getTableMeta(
                                    conn, sqlUndoLog.getTableName(),dataSourceProxy.getResourceId());
                                sqlUndoLog.setTableMeta(tableMeta);
                                AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                    dataSourceProxy.getDbType(), sqlUndoLog);
                                undoExecutor.executeOn(conn);
                            }
                        } finally {
                            // remove serializer name
                            removeCurrentSerializer();
                        }
                    }
         ....
        
    }

    知识点:

    BeanPostProcessor的作用 https://www.jianshu.com/p/6a48675ef7a3

    Bean初始化执行顺序 https://www.cnblogs.com/twelve-eleven/p/8080038.html

    JAVA虚拟机关闭钩子 https://www.jianshu.com/p/7de5ee9418f8

    JAVA中的SPI机制 https://www.jianshu.com/p/46b42f7f593c

  • 相关阅读:
    10 个你需要了解的 Linux 网络和监控命令
    U盘安装 bt5
    SpringCloud RabbitMQ 使用
    两个大数相乘笔试题目
    activemq 话题模式(三)
    activemq 队列模式(二)
    activemq 安装 (一)
    安装mysql5.7时缺少my.ini文件
    linux 远程rsa 登录配置 文件 /etc/ssh/sshd_config
    java -jar 解决占用终端问题
  • 原文地址:https://www.cnblogs.com/anhaogoon/p/13033986.html
Copyright © 2011-2022 走看看