zoukankan      html  css  js  c++  java
  • 单系统下的分布式数据库事务方案(拓展spring的事务管理器)AgileBPM多数据的解决方案

    先推荐一下码云上的一个GVP(最有价值的开源项目) AgileBPM(下面简称ab),我下面讲解的方案也是它的Bo支持多数据源操作事务管理器,友情链接:http://doc.agilebpm.cn/

    目前是解决的是处理单系统内的多数据源问题,简单来说就是在单系统中的一个线程内,保护多个数据源事务,这也是ab项目所需要的场景。

    参考了码云上的开源的lcn分布式事务解决方案,觉得再拓展一下也是可以解决微服务间的分布式事务处理,利用redis放一个事务处理的共同空间,然后在共同空间内来统筹事务,不过它处理commit异常的问题也是用通用方式(commit失败很多项目都是采取tcc的方式处理)。

    ps:之前本人试过使用jta事务管理器,这个性能真看不下去。一会就卡。。所以就想着自己定义个管理器,自己来释放资源。

    1 用AbstractRoutingDataSource让系统支持多数据源

    动态数据源配置:

    真正的数据源(druid数据源):

    展示一下DynamicDataSource是继承了AbstractRoutingDataSource的实现,这里不是重点。

    2 实现支持这种路由数据源的事务管理器

    先继承AbstractPlatformTransactionManager(事务管理器的抽象类,我们很常用的DataSourceTransactionManager就是继承它的)

    里面需要实现几个关键点就行(笔者只考虑了事务传播性为PROPAGATION_REQUIRED的情况,这也是项目最常用的,其他我没支持,毕竟是定制化的事务管理器)

    package com.dstz.bus.service.impl;
    
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import javax.sql.DataSource;
    
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.jdbc.datasource.ConnectionHolder;
    import org.springframework.jdbc.datasource.DataSourceUtils;
    import org.springframework.transaction.CannotCreateTransactionException;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionException;
    import org.springframework.transaction.TransactionSystemException;
    import org.springframework.transaction.support.AbstractPlatformTransactionManager;
    import org.springframework.transaction.support.DefaultTransactionStatus;
    import org.springframework.transaction.support.ResourceTransactionManager;
    import org.springframework.transaction.support.TransactionSynchronizationManager;
    
    import com.dstz.base.core.util.AppUtil;
    import com.dstz.base.core.util.ThreadMapUtil;
    import com.dstz.base.db.datasource.DataSourceUtil;
    import com.dstz.base.db.datasource.DbContextHolder;
    import com.dstz.base.db.datasource.DynamicDataSource;
    
    /**
     * <pre>
     * 描述:ab 结合sys多数据源操作 专门为bo db实例化做的事务管理器
     * 它只保护系统数据源(包含dataSourceDefault),不会保护datasource
     * 其实可以做到,但是这个事务管理器目前只为bo多数据源的保护,所以我没支持
     * 作者:aschs
     * 邮箱:aschs@qq.com
     * 日期:2018年10月10日
     * 版权:summer
     * </pre>
     */
    public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
        private int i = 0;
        
        @Override
        public void afterPropertiesSet() throws Exception {
            logger.debug("ab的事务管理器已就绪");
        }
    
        @Override
        public Object getResourceFactory() {
            return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE);
        }
    
        /**
         * <pre>
         * 生成一个在整个事务处理都用到的资源
         * 这里我放了在过程中的所有连接 Map<数据源别名,连接>
         * </pre>
         */
        @Override
        protected Object doGetTransaction() {
            return new HashMap<String, Connection>();
        }
        
        /**
         * 判断是否已存在事务
         */
        @Override
        protected boolean isExistingTransaction(Object transaction) {
            return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false);
        }
        
        /**
         * <pre>
         * 必须实现的一个方法,设置线程内的事务为回滚状态。
         * 这里其实是为了预防传播性设置为 让线程内可以多次管理器操作的情况下,用来通知大家不要只做回滚,别commit了。
         * 在该事务管理器只支持PROPAGATION_REQUIRED 的情况下(线程只有一个管理器操作),没多大用,只是必须要实现这个
         * 不然抽象类那里会有报错代码。
         * </pre>
         */
        @Override
        protected void doSetRollbackOnly(DefaultTransactionStatus status) {
            ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//标记ab事务管理器在线程内已准备要回滚了
        }
        
        /**
         * <pre>
         * 准备事务,获取链接
         * </pre>
         */
        @Override
        protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
            logger.info("分布式事务开始:"+i);
            
            Map<String, Connection> conMap = (Map<String, Connection>) transaction;
            Map<String, DataSource> dsMap = DataSourceUtil.getDataSources();
            // 遍历系统中的所有数据源,打开连接
            for (Entry<String, DataSource> entry : dsMap.entrySet()) {
                Connection con = null;
                try {
                    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue());
                    if (conHolder == null) {
                        con = entry.getValue().getConnection();
                        con.setAutoCommit(false);
                        // 缓存链接
                        TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con));
                    } else {
                        con = conHolder.getConnection();
                    }
                    
                    //系统数据源放进资源里
                    if(DbContextHolder.getDataSource().equals(entry.getKey())) {
                        DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
                        TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con));
                    }
                    
                    conMap.put(entry.getKey(), con);
                    logger.debug("数据源别名[" + entry.getKey() + "]打开连接成功");
                } catch (Throwable ex) {
                    doCleanupAfterCompletion(conMap);
                    throw new CannotCreateTransactionException("数据源别名[" + entry.getKey() + "]打开连接错误", ex);
                }
            }
            
            ThreadMapUtil.put("abTransactionManagerExist", true);//标记ab事务管理器已经在线程内启动了
        }
    
        @Override
        protected void doCommit(DefaultTransactionStatus status) {
            Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
            for (Entry<String, Connection> entry : conMap.entrySet()) {
                try {
                    entry.getValue().commit();
                    logger.debug("数据源别名[" + entry.getKey() + "]提交事务成功");
                } catch (SQLException ex) {
                    doCleanupAfterCompletion(conMap);
                    throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]提交事务失败", ex);
                }
            }
            logger.info("分布式事务提交:"+i);
        }
        
        /**
         * 回滚
         */
        @Override
        protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
            Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
            for (Entry<String, Connection> entry : conMap.entrySet()) {
                try {
                    entry.getValue().rollback();
                    logger.debug("数据源别名[" + entry.getKey() + "]回滚事务成功");
                } catch (SQLException ex) {
                    doCleanupAfterCompletion(conMap);
                    throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]回滚事务失败", ex);
                }
            }
            logger.info("分布式事务回滚:"+i);
        }
        
        /**
         * 回收链接
         */
        @Override
        protected void doCleanupAfterCompletion(Object transaction) {
            Map<String, Connection> conMap = (Map<String, Connection>) transaction;
            for (Entry<String, Connection> entry : conMap.entrySet()) {
                DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey());
                TransactionSynchronizationManager.unbindResource(dataSource);
                DataSourceUtils.releaseConnection(entry.getValue(), dataSource);
                logger.debug("数据源别名[" + entry.getKey() + "]关闭链接成功");
            }
            
            //最后把本地资源也释放了
            DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
            TransactionSynchronizationManager.unbindResource(dynamicDataSource);
            
            ThreadMapUtil.remove("abTransactionManagerExist");
            ThreadMapUtil.remove("abTransactionManagerRollbackOnly");
            ThreadMapUtil.remove();
            
            logger.info("分布式事务释放:"+(i++));
        }
    }

     事务管理器的方法调用顺序和时机大概说一下:

    1 doGetTransaction方法:来初始化事务处理过程中的公共资源,后面调用的其他方法都是以它为媒介的。

    2 doBegin方法:开始事务操作,主要是打开数据源的链接,记得要放到事务资源管理服务中TransactionSynchronizationManager,非常重要,因为这个过程中用到的jdbc操作是从这里面拿的。

    3 doCommit(doRollback):如题,把获取的链接提交或者回滚操作。

    4 doCleanupAfterCompletion:回收链接资源

    至此,事务管理器的逻辑已经结束了~

    最后,实现务必实现isExistingTransaction,用来处理重复线程内多次触发了事务切面的逻辑

    这里笔者用简单的线程变量来标记是否线程内已存在了事务管理,因为我只支持PROPAGATION_REQUIRED传播性,所以没考虑内部嵌入的其他情况,其实也是内部commit一下,资源肯定是最后统一释放的。

    3 使用自定义事务管理器

    先提一下,这里笔者只保护会使用到多数据源的模块,其实大部分系统逻辑还是用DataSourceTransactionManager就够,不需要保护太多数据源(因为释放和打开链接是有性能损耗的)。

    可以看出,主要的逻辑系统还是使用传统管理器,然后在特定地方声明特殊管理器则可:

    5 到这里,整个分布式事务管理已完成了,主要是利用了路由数据源AbstractRoutingDataSource和自定义事务管理器实现的~

    6 AgileBPM的多数据源结合展示(可跳过)

    这里展示一下,这个开源的流程系统的强大可配置性(让人发指的灵活性-。-)的数据源管理功能。

    数据源模板,在这里你可以使用定义不同的数据源实现类,只要在项目import就行

    这里有一个内置的阿里的数据源,后面有需要你可以增加其他模板,例如BasicDataSOurce这个,最常用的数据源。

    有了模板,就可以新建数据源了:

    这里的是特殊默认数据源,系统本地数据源,用户可以随便添加。

    然后,我就基于AgileBPM的强大数据源管理下,进行了分布式数据源测试。测试逻辑很简单,就是在一个线程内,操作多个数据源,然后看一下会不会一起回滚。

     这里展示了一下,AgileBPM中使用数据源的便捷性,根据配置的别名,直接拿来代码开发则可,测试代码比较随意了,能保证一致性。

     7 这样的实现在压测中的表现

    本人用的是jmetter来压测事务处理,它的表现跟传统的DataSourceTransactionManager表现是一样的!!!!(虽然过程遇到了线程变量的坑,但已修复)。

    配置,400进程同时施压:

    这是压测结果

     这是日志输出,我故意输出了每一次获取链接,提交,和释放的事务处理过程

     8 挫败:原来行业内的问题主要卡在commit上。

    由于对分布式事务产生极大兴趣,所以专研了一下,这么简单的实现为啥别人都觉得是打问题呢?原来是因为commit会出错的情况,第一个链接commit成功后,第二个链接commit失败,那么第一个链接已不能回滚了!!!!所以行业内大部分方案都在处理这种情况,虽然到了commit阶段,数据库已经对相关资源产生了写锁,数据也写入磁盘,就等commit刷进去了,产生错误的概率是极少了。作为行业内的大难题,很多方案在处理这个问题。什么2pc原则。。等等,有空我整理一下。大部分主流项目解决方案还是tcc为主,毕竟这个最通用直接。

    8.1顿悟!!

    顿悟!!其实我以上这种实现方案就是2pc的实现方案,在jdbc都操作了sql没问题后,再一并提交的方案就是2pc。但是2pc有这个commit提交存在的设计缺陷(这种时机是存在很少可能性的),所以别人就提出tcc和消息队列的解决commit异常的更可靠的方案(但是,只要是串行逻辑就没有百分百可靠的方案,只是降低了可能性罢了)。所以,ab项目关于分布式是采用了2pc的解决方案,顺带提一下jta事务也是类似的逻辑,不过他们的性能主要卡在消息通知上。例如所有链接操作sql都成功了,我需要通知AB链接去提交,我通知了A,A提交成功,然后我通知B,B没收到消息,那么AB资源都会卡住不释放,然后B会超时导致回滚了。所以,jta在消息通知上比较损耗性能……关于2pc的友情链接:

    落寞的流月城(632266504) 14:13:42
    https://cloud.tencent.com/developer/article/1355859

    9关于AB项目的多数据bo场景方案

    在经历挫败之后,理性分析了一下,其实当前这种方案已经满足了ab的分布式事务处理的需求了。首先,其实commit失败的场景是少之又少,笔者调整了逻辑,后面把重要的系统数据源放在最后提交,保证了系统数据源的强一致性,也就是说保证了流程数据的一致性。

    原因分析,这里细想一个场景,我先把业务数据从1改成2,然后驱动流程流转,假如我的bo是其他数据源A,A先提交成功,但是系统的本地数据源B提交失败了,那么导致B操作的流程数据会回滚,但是A的数据已提交无法回滚。结果是,流程没有流转,但是业务数据已更新了为2了。这种场景在ab中,相当于,我操作了一下业务数据的保存,因为流程没有变,只是保存了一下数据,对于流程系统本身来说,有时候还是好事,因为虽然流程流转失败了,但是业务数据不想再填写一次。所以我说这种方案已经满足ab项目的多数据源下的分布式场景的需求了。

    当然,如果用户还是执着于所有数据源的强一致性,在ab项目中可以在bo保存前,先备份一下bo数据,然后在doCommit时恢复备份数据则可以,ab里有很多时机插件,定义了一些时机插件列表,然后你多实现了插件则会运行,ab的插件代码展示,如下:

    ab作为面向技术人员的流程系统,里面内嵌提供了丰富的便捷开发的写法和实现。

  • 相关阅读:
    HDU 2433 Travel (最短路,BFS,变形)
    HDU 2544 最短路 (最短路,spfa)
    HDU 2063 过山车 (最大匹配,匈牙利算法)
    HDU 1150 Machine Schedule (最小覆盖,匈牙利算法)
    290 Word Pattern 单词模式
    289 Game of Life 生命的游戏
    287 Find the Duplicate Number 寻找重复数
    283 Move Zeroes 移动零
    282 Expression Add Operators 给表达式添加运算符
    279 Perfect Squares 完美平方数
  • 原文地址:https://www.cnblogs.com/aschs/p/9771302.html
Copyright © 2011-2022 走看看