zoukankan      html  css  js  c++  java
  • TX-LCN分布式事务之LCN模式

    什么是LCN模式

    LCN模式是TX-LCN分布式事务模式的一种,L-lock-锁定事务单元、C-confirm-确认事务模块状态、 notify-通知事务单元

    原理

    LCN模式是通过Spring AOP的方式代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。 当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。

    模式特点

    • 该模式对代码的嵌入性为低。
    • 该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。
    • 该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。
    • 该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。

    源码解读

    首先我们来看几个关键的类DataSourceAspect-数据源切面类、TransactionAspect事务切面类、LcnConnectionProxylcn 连接代理类、DTXLogicWeaver分布式事务调度器、DTXServiceExecutor分布式事务执行器

    DataSourceAspect的作用

    • 源码
    @Aspect
    @Component
    public class DataSourceAspect implements Ordered {
        private static final Logger log = LoggerFactory.getLogger(DataSourceAspect.class);
        private final TxClientConfig txClientConfig;
        private final DTXResourceWeaver dtxResourceWeaver;

        public DataSourceAspect(TxClientConfig txClientConfig, DTXResourceWeaver dtxResourceWeaver) {
            this.txClientConfig = txClientConfig;
            this.dtxResourceWeaver = dtxResourceWeaver;
        }

        @Around("execution(* javax.sql.DataSource.getConnection(..))")
        public Object around(ProceedingJoinPoint point) throws Throwable {
            return this.dtxResourceWeaver.getConnection(() -> {
                return (Connection)point.proceed();
            });
        }

        public int getOrder() {
            return this.txClientConfig.getResourceOrder();
        }
    }

    由该类的源码,我们能够知道,lcn模式主要对数据库的连接进行了拦截代理。获取到数据库的连接交由lcn 来进行代理。

    TransactionAspect 作用

    • 源码
    @Aspect
    @Component
    public class TransactionAspect implements Ordered {
        private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class);
        private final TxClientConfig txClientConfig;
        private final DTXLogicWeaver dtxLogicWeaver;

        public TransactionAspect(TxClientConfig txClientConfig, DTXLogicWeaver dtxLogicWeaver) {
            this.txClientConfig = txClientConfig;
            this.dtxLogicWeaver = dtxLogicWeaver;
        }
        
        @Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")
        public void lcnTransactionPointcut() {
        }

        @Around("lcnTransactionPointcut() && !txcTransactionPointcut()&& !tccTransactionPointcut() && !txTransactionPointcut()")
        public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
            DTXInfo dtxInfo = DTXInfo.getFromCache(point);
            LcnTransaction lcnTransaction = (LcnTransaction)dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
            dtxInfo.setTransactionType("lcn");
            dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
            DTXLogicWeaver var10000 = this.dtxLogicWeaver;
            point.getClass();
            return var10000.runTransaction(dtxInfo, point::proceed);
        }

        public int getOrder() {
            return this.txClientConfig.getDtxAspectOrder();
        }
    }

    由该类的源码,我们能够明白,通过解析@LcnTransaction注解进行相应的操作。代码会调用到DTXLogicWeaver

    DTXLogicWeaver 作用

        public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {
            if (Objects.isNull(DTXLocalContext.cur())) {
                DTXLocalContext.getOrNew();
                log.debug("<---- TxLcn start ---->");
                DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
                TxContext txContext;
                if (this.globalContext.hasTxContext()) {
                    txContext = this.globalContext.txContext();
                    dtxLocalContext.setInGroup(true);
                    log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
                } else {
                    txContext = this.globalContext.startTx();
                }

                if (Objects.nonNull(dtxLocalContext.getGroupId())) {
                    dtxLocalContext.setDestroy(false);
                }

                dtxLocalContext.setUnitId(dtxInfo.getUnitId());
                dtxLocalContext.setGroupId(txContext.getGroupId());
                dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
                TxTransactionInfo info = new TxTransactionInfo();
                info.setBusinessCallback(business);
                info.setGroupId(txContext.getGroupId());
                info.setUnitId(dtxInfo.getUnitId());
                info.setPointMethod(dtxInfo.getBusinessMethod());
                info.setPropagation(dtxInfo.getTransactionPropagation());
                info.setTransactionInfo(dtxInfo.getTransactionInfo());
                info.setTransactionType(dtxInfo.getTransactionType());
                info.setTransactionStart(txContext.isDtxStart());
                boolean var15 = false;

                Object var6;
                try {
                    var15 = true;
                    var6 = this.transactionServiceExecutor.transactionRunning(info);
                    var15 = false;
                } finally {
                    if (var15) {
                        if (dtxLocalContext.isDestroy()) {
                            synchronized(txContext.getLock()) {
                                txContext.getLock().notifyAll();
                            }

                            if (!dtxLocalContext.isInGroup()) {
                                this.globalContext.destroyTx();
                            }

                            DTXLocalContext.makeNeverAppeared();
                            TracingContext.tracing().destroy();
                        }

                        log.debug("<---- TxLcn end ---->");
                    }
                }

                if (dtxLocalContext.isDestroy()) {
                    synchronized(txContext.getLock()) {
                        txContext.getLock().notifyAll();
                    }

                    if (!dtxLocalContext.isInGroup()) {
                        this.globalContext.destroyTx();
                    }

                    DTXLocalContext.makeNeverAppeared();
                    TracingContext.tracing().destroy();
                }

                log.debug("<---- TxLcn end ---->");
                return var6;
            } else {
                return business.call();
            }
        }

    以上代码是该类的核心逻辑,可以看出来TX-LCN事务的处理全部都是走的这个类的该方法,最终会调用到DTXServiceExecutor分布式事务执行器

    DTXServiceExecutor 作用


        /**
         * 事务业务执行
         *
         * @param info info
         * @return Object
         * @throws Throwable Throwable
         */

        public Object transactionRunning(TxTransactionInfo info) throws Throwable {

            // 1. 获取事务类型
            String transactionType = info.getTransactionType();

            // 2. 获取事务传播状态
            DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);

            // 2.1 如果不参与分布式事务立即终止
            if (propagationState.isIgnored()) {
                return info.getBusinessCallback().call();
            }

            // 3. 获取本地分布式事务控制器
            DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);

            // 4. 织入事务操作
            try {
                // 4.1 记录事务类型到事务上下文
                Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
                transactionTypeSet.add(transactionType);

                dtxLocalControl.preBusinessCode(info);

                // 4.2 业务执行前
                txLogger.txTrace(
                        info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);

                // 4.3 执行业务
                Object result = dtxLocalControl.doBusinessCode(info);

                // 4.4 业务执行成功
                txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
                dtxLocalControl.onBusinessCodeSuccess(info, result);
                return result;
            } catch (TransactionException e) {
                txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
                throw e;
            } catch (Throwable e) {
                // 4.5 业务执行失败
                txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
                        "business code error");
                dtxLocalControl.onBusinessCodeError(info, e);
                throw e;
            } finally {
                // 4.6 业务执行完毕
                dtxLocalControl.postBusinessCode(info);
            }
        }

    通过以上代码可以看出,该类是整个事务执行关键类。

    以上就是LCN模式比较核心的代码,其他的分支代码就不一一赘述了

    实战

    由上一篇分布式事务之TX-LCN 我们规划了俩个TC分别是lcn-order 服务和lcn-pay服务,我们的思路是订单服务调用支付服务,分别在订单服务表t_order和支付服务表t_pay中插入插入数据。

    订单服务核心代码和数据表脚本

    • 代码
    /**
     * @author:triumphxx
     * @Date:2021/10/24
     * @Time:2:13 下午
     * @微信公众号:北漂码农有话说
     * @网站:http://blog.triumphxx.com.cn
     * @GitHub https://github.com/triumphxx
     * @Desc:
     **/

    @RestController
    public class LcnOrderController {

        @Autowired
        TOrderDao tOrderDao;

        @Autowired
        private RestTemplate restTemplate;

        @PostMapping("/add-order")
        @Transactional(rollbackFor = Exception.class)
        @LcnTransaction
        public String add()
    {
            TOrder bean = new TOrder();
            bean.setTId(1);
            bean.setTName("order");
            restTemplate.postForEntity("http://lcn-pay/add-pay","",String.class);
    //        int i = 1/0;
            tOrderDao.insert(bean);
            return "新增订单成功";
        }
    }
    • 脚本
    CREATE TABLE `t_order` (
       `t_id` int(11NOT NULL,
       `t_name` varchar(45DEFAULT NULL
    ENGINE=InnoDB DEFAULT CHARSET=latin1

    支付服务核心代码和数据表脚本

    • 代码
    /**
     * @author:triumphxx
     * @Date:2021/10/24
     * @Time:2:26 下午
     * @微信公众号:北漂码农有话说
     * @网站:http://blog.triumphxx.com.cn
     * @GitHub https://github.com/triumphxx
     * @Desc:
     **/

    @RestController
    public class LcnPayController {
        @Autowired
        TPayDao tPayDao;

        @PostMapping("/add-pay")
        @Transactional(rollbackFor = Exception.class)
        @LcnTransaction
        public String addPay()
    {
            TPay tPay = new TPay();
            tPay.setTId(1);
            tPay.setTName("t_pay");
            int i = tPayDao.insertSelective(tPay);
            return "新增支付成功";

        }
    }
    • 脚本
    CREATE TABLE `t_pay` (
         `t_id` int(11NOT NULL,
         `t_name` varchar(45DEFAULT NULL
    ENGINE=InnoDB DEFAULT CHARSET=latin1

    测试流程

    • 启动Redis
    • 启动TM
    • 启动注册中心eureka-server
    • 启动服务lcn-order
    • 启动服务lcn-pay
    • 请求接口http://localhost:8001/add-order
    • 代码创造异常看数据是否进行回滚

    小结

    本篇我们分析了TX-LCN分布式事务的lcn模式的原理及相关源码,以及搭建服务的进行测试。希望能对大家有所帮助。 源码地址源码传送门

  • 相关阅读:
    Homebrew替换源
    Tensorboard on Server
    locale.Error: unsupported locale setting(ViZDoom执行错误)
    Python之从numpy.array保存图片
    医院汇总
    Ubuntu上CUDA环境搭建
    图片
    高斯分布与Gamma分布关系
    Python之2维list转置、旋转及其简单应用
    如何让 zend studio 10 识别 Phalcon语法并且进行语法提示
  • 原文地址:https://www.cnblogs.com/triumph-wyp-com/p/15467654.html
Copyright © 2011-2022 走看看