zoukankan      html  css  js  c++  java
  • 单系统下的分布式数据库事务方案(拓展spring的事务管理器)AgileBPM多数据的解决方案

    先推荐一下码云上的一个GVP(最有价值的开源项目) AgileBPM(下面简称ab),我下面讲解的方案也是它的Bo支持多数据源操作事务管理器,友情链接:http://doc.agilebpm.cn/

    目前是解决的是处理单系统内的多数据源问题,简单来说就是在单系统中的一个线程内,保护多个数据源事务,这也是ab项目所需要的场景。

    参考了码云上的开源的lcn分布式事务解决方案,觉得再拓展一下也是可以解决微服务间的分布式事务处理,利用redis放一个事务处理的共同空间,然后在共同空间内来统筹事务,不过它处理commit异常的问题也是用通用方式(commit失败很多项目都是采取tcc的方式处理)。

    ps:之前本人试过使用jta事务管理器,这个性能真看不下去。一会就卡。。所以就想着自己定义个管理器,自己来释放资源。

    1 用AbstractRoutingDataSource让系统支持多数据源

    动态数据源配置:

    真正的数据源(druid数据源):

    展示一下DynamicDataSource是继承了AbstractRoutingDataSource的实现,这里不是重点。

    2 实现支持这种路由数据源的事务管理器

    先继承AbstractPlatformTransactionManager(事务管理器的抽象类,我们很常用的DataSourceTransactionManager就是继承它的)

    里面需要实现几个关键点就行(笔者只考虑了事务传播性为PROPAGATION_REQUIRED的情况,这也是项目最常用的,其他我没支持,毕竟是定制化的事务管理器)

    package com.dstz.bus.service.impl;
    
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import javax.sql.DataSource;
    
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.jdbc.datasource.ConnectionHolder;
    import org.springframework.jdbc.datasource.DataSourceUtils;
    import org.springframework.transaction.CannotCreateTransactionException;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionException;
    import org.springframework.transaction.TransactionSystemException;
    import org.springframework.transaction.support.AbstractPlatformTransactionManager;
    import org.springframework.transaction.support.DefaultTransactionStatus;
    import org.springframework.transaction.support.ResourceTransactionManager;
    import org.springframework.transaction.support.TransactionSynchronizationManager;
    
    import com.dstz.base.core.util.AppUtil;
    import com.dstz.base.core.util.ThreadMapUtil;
    import com.dstz.base.db.datasource.DataSourceUtil;
    import com.dstz.base.db.datasource.DbContextHolder;
    import com.dstz.base.db.datasource.DynamicDataSource;
    
    /**
     * <pre>
     * 描述:ab 结合sys多数据源操作 专门为bo db实例化做的事务管理器
     * 它只保护系统数据源(包含dataSourceDefault),不会保护datasource
     * 其实可以做到,但是这个事务管理器目前只为bo多数据源的保护,所以我没支持
     * 作者:aschs
     * 邮箱:aschs@qq.com
     * 日期:2018年10月10日
     * 版权:summer
     * </pre>
     */
    public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
        private int i = 0;
        
        @Override
        public void afterPropertiesSet() throws Exception {
            logger.debug("ab的事务管理器已就绪");
        }
    
        @Override
        public Object getResourceFactory() {
            return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE);
        }
    
        /**
         * <pre>
         * 生成一个在整个事务处理都用到的资源
         * 这里我放了在过程中的所有连接 Map<数据源别名,连接>
         * </pre>
         */
        @Override
        protected Object doGetTransaction() {
            return new HashMap<String, Connection>();
        }
        
        /**
         * 判断是否已存在事务
         */
        @Override
        protected boolean isExistingTransaction(Object transaction) {
            return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false);
        }
        
        /**
         * <pre>
         * 必须实现的一个方法,设置线程内的事务为回滚状态。
         * 这里其实是为了预防传播性设置为 让线程内可以多次管理器操作的情况下,用来通知大家不要只做回滚,别commit了。
         * 在该事务管理器只支持PROPAGATION_REQUIRED 的情况下(线程只有一个管理器操作),没多大用,只是必须要实现这个
         * 不然抽象类那里会有报错代码。
         * </pre>
         */
        @Override
        protected void doSetRollbackOnly(DefaultTransactionStatus status) {
            ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//标记ab事务管理器在线程内已准备要回滚了
        }
        
        /**
         * <pre>
         * 准备事务,获取链接
         * </pre>
         */
        @Override
        protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
            logger.info("分布式事务开始:"+i);
            
            Map<String, Connection> conMap = (Map<String, Connection>) transaction;
            Map<String, DataSource> dsMap = DataSourceUtil.getDataSources();
            // 遍历系统中的所有数据源,打开连接
            for (Entry<String, DataSource> entry : dsMap.entrySet()) {
                Connection con = null;
                try {
                    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue());
                    if (conHolder == null) {
                        con = entry.getValue().getConnection();
                        con.setAutoCommit(false);
                        // 缓存链接
                        TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con));
                    } else {
                        con = conHolder.getConnection();
                    }
                    
                    //系统数据源放进资源里
                    if(DbContextHolder.getDataSource().equals(entry.getKey())) {
                        DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
                        TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con));
                    }
                    
                    conMap.put(entry.getKey(), con);
                    logger.debug("数据源别名[" + entry.getKey() + "]打开连接成功");
                } catch (Throwable ex) {
                    doCleanupAfterCompletion(conMap);
                    throw new CannotCreateTransactionException("数据源别名[" + entry.getKey() + "]打开连接错误", ex);
                }
            }
            
            ThreadMapUtil.put("abTransactionManagerExist", true);//标记ab事务管理器已经在线程内启动了
        }
    
        @Override
        protected void doCommit(DefaultTransactionStatus status) {
            Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
            for (Entry<String, Connection> entry : conMap.entrySet()) {
                try {
                    entry.getValue().commit();
                    logger.debug("数据源别名[" + entry.getKey() + "]提交事务成功");
                } catch (SQLException ex) {
                    doCleanupAfterCompletion(conMap);
                    throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]提交事务失败", ex);
                }
            }
            logger.info("分布式事务提交:"+i);
        }
        
        /**
         * 回滚
         */
        @Override
        protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
            Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
            for (Entry<String, Connection> entry : conMap.entrySet()) {
                try {
                    entry.getValue().rollback();
                    logger.debug("数据源别名[" + entry.getKey() + "]回滚事务成功");
                } catch (SQLException ex) {
                    doCleanupAfterCompletion(conMap);
                    throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]回滚事务失败", ex);
                }
            }
            logger.info("分布式事务回滚:"+i);
        }
        
        /**
         * 回收链接
         */
        @Override
        protected void doCleanupAfterCompletion(Object transaction) {
            Map<String, Connection> conMap = (Map<String, Connection>) transaction;
            for (Entry<String, Connection> entry : conMap.entrySet()) {
                DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey());
                TransactionSynchronizationManager.unbindResource(dataSource);
                DataSourceUtils.releaseConnection(entry.getValue(), dataSource);
                logger.debug("数据源别名[" + entry.getKey() + "]关闭链接成功");
            }
            
            //最后把本地资源也释放了
            DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
            TransactionSynchronizationManager.unbindResource(dynamicDataSource);
            
            ThreadMapUtil.remove("abTransactionManagerExist");
            ThreadMapUtil.remove("abTransactionManagerRollbackOnly");
            ThreadMapUtil.remove();
            
            logger.info("分布式事务释放:"+(i++));
        }
    }

     事务管理器的方法调用顺序和时机大概说一下:

    1 doGetTransaction方法:来初始化事务处理过程中的公共资源,后面调用的其他方法都是以它为媒介的。

    2 doBegin方法:开始事务操作,主要是打开数据源的链接,记得要放到事务资源管理服务中TransactionSynchronizationManager,非常重要,因为这个过程中用到的jdbc操作是从这里面拿的。

    3 doCommit(doRollback):如题,把获取的链接提交或者回滚操作。

    4 doCleanupAfterCompletion:回收链接资源

    至此,事务管理器的逻辑已经结束了~

    最后,实现务必实现isExistingTransaction,用来处理重复线程内多次触发了事务切面的逻辑

    这里笔者用简单的线程变量来标记是否线程内已存在了事务管理,因为我只支持PROPAGATION_REQUIRED传播性,所以没考虑内部嵌入的其他情况,其实也是内部commit一下,资源肯定是最后统一释放的。

    3 使用自定义事务管理器

    先提一下,这里笔者只保护会使用到多数据源的模块,其实大部分系统逻辑还是用DataSourceTransactionManager就够,不需要保护太多数据源(因为释放和打开链接是有性能损耗的)。

    可以看出,主要的逻辑系统还是使用传统管理器,然后在特定地方声明特殊管理器则可:

    5 到这里,整个分布式事务管理已完成了,主要是利用了路由数据源AbstractRoutingDataSource和自定义事务管理器实现的~

    6 AgileBPM的多数据源结合展示(可跳过)

    这里展示一下,这个开源的流程系统的强大可配置性(让人发指的灵活性-。-)的数据源管理功能。

    数据源模板,在这里你可以使用定义不同的数据源实现类,只要在项目import就行

    这里有一个内置的阿里的数据源,后面有需要你可以增加其他模板,例如BasicDataSOurce这个,最常用的数据源。

    有了模板,就可以新建数据源了:

    这里的是特殊默认数据源,系统本地数据源,用户可以随便添加。

    然后,我就基于AgileBPM的强大数据源管理下,进行了分布式数据源测试。测试逻辑很简单,就是在一个线程内,操作多个数据源,然后看一下会不会一起回滚。

     这里展示了一下,AgileBPM中使用数据源的便捷性,根据配置的别名,直接拿来代码开发则可,测试代码比较随意了,能保证一致性。

     7 这样的实现在压测中的表现

    本人用的是jmetter来压测事务处理,它的表现跟传统的DataSourceTransactionManager表现是一样的!!!!(虽然过程遇到了线程变量的坑,但已修复)。

    配置,400进程同时施压:

    这是压测结果

     这是日志输出,我故意输出了每一次获取链接,提交,和释放的事务处理过程

     8 挫败:原来行业内的问题主要卡在commit上。

    由于对分布式事务产生极大兴趣,所以专研了一下,这么简单的实现为啥别人都觉得是打问题呢?原来是因为commit会出错的情况,第一个链接commit成功后,第二个链接commit失败,那么第一个链接已不能回滚了!!!!所以行业内大部分方案都在处理这种情况,虽然到了commit阶段,数据库已经对相关资源产生了写锁,数据也写入磁盘,就等commit刷进去了,产生错误的概率是极少了。作为行业内的大难题,很多方案在处理这个问题。什么2pc原则。。等等,有空我整理一下。大部分主流项目解决方案还是tcc为主,毕竟这个最通用直接。

    8.1顿悟!!

    顿悟!!其实我以上这种实现方案就是2pc的实现方案,在jdbc都操作了sql没问题后,再一并提交的方案就是2pc。但是2pc有这个commit提交存在的设计缺陷(这种时机是存在很少可能性的),所以别人就提出tcc和消息队列的解决commit异常的更可靠的方案(但是,只要是串行逻辑就没有百分百可靠的方案,只是降低了可能性罢了)。所以,ab项目关于分布式是采用了2pc的解决方案,顺带提一下jta事务也是类似的逻辑,不过他们的性能主要卡在消息通知上。例如所有链接操作sql都成功了,我需要通知AB链接去提交,我通知了A,A提交成功,然后我通知B,B没收到消息,那么AB资源都会卡住不释放,然后B会超时导致回滚了。所以,jta在消息通知上比较损耗性能……关于2pc的友情链接:

    落寞的流月城(632266504) 14:13:42
    https://cloud.tencent.com/developer/article/1355859

    9关于AB项目的多数据bo场景方案

    在经历挫败之后,理性分析了一下,其实当前这种方案已经满足了ab的分布式事务处理的需求了。首先,其实commit失败的场景是少之又少,笔者调整了逻辑,后面把重要的系统数据源放在最后提交,保证了系统数据源的强一致性,也就是说保证了流程数据的一致性。

    原因分析,这里细想一个场景,我先把业务数据从1改成2,然后驱动流程流转,假如我的bo是其他数据源A,A先提交成功,但是系统的本地数据源B提交失败了,那么导致B操作的流程数据会回滚,但是A的数据已提交无法回滚。结果是,流程没有流转,但是业务数据已更新了为2了。这种场景在ab中,相当于,我操作了一下业务数据的保存,因为流程没有变,只是保存了一下数据,对于流程系统本身来说,有时候还是好事,因为虽然流程流转失败了,但是业务数据不想再填写一次。所以我说这种方案已经满足ab项目的多数据源下的分布式场景的需求了。

    当然,如果用户还是执着于所有数据源的强一致性,在ab项目中可以在bo保存前,先备份一下bo数据,然后在doCommit时恢复备份数据则可以,ab里有很多时机插件,定义了一些时机插件列表,然后你多实现了插件则会运行,ab的插件代码展示,如下:

    ab作为面向技术人员的流程系统,里面内嵌提供了丰富的便捷开发的写法和实现。

  • 相关阅读:
    find the first t.py file through traversal C: and D:
    gevent simple server
    【python实例】判断是否是回文数
    【python实例】要求输出字符串中最少一个最多八个的所有字符串组合(连续)
    【python实例】统计字符串里大写字母,小写字母和非字母的个数
    【python基础】字符串方法汇总
    【python实例】判断质数:for-break-else
    【python实例】while&for 九九乘法表
    【python实例】水仙花数:每个位上的数值的三次方求和,刚好是这个三位数本身
    【python实例】判断输入年份是否是闰年
  • 原文地址:https://www.cnblogs.com/aschs/p/9771302.html
Copyright © 2011-2022 走看看