zoukankan      html  css  js  c++  java
  • Seata AT模式全局锁源码分析

    源码版本为1.3.0

    分支事务执行前,需要把分支注册到seata server

    代码 AbstractCore # branchRegister

    @Override
        public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                                   String applicationData, String lockKeys) throws TransactionException {
            GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
            return SessionHolder.lockAndExecute(globalSession, () -> {
                globalSessionStatusCheck(globalSession);
                globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                        applicationData, lockKeys, clientId);
                branchSessionLock(globalSession, branchSession);
                try {
                    globalSession.addBranch(branchSession);

      最重要的一步就是获取该分支事务的全局锁

      如果使用mysql来做分布式锁就是这样的 代码在 DataBaseLocker # acquireLock

      LockStoreDataBaseDAO # acquireLock(List<LockDO> lockDOs)

    @Override
        public boolean acquireLock(List<LockDO> lockDOs) {
            Connection conn = null;
            PreparedStatement ps = null;
            ResultSet rs = null;
            Set<String> dbExistedRowKeys = new HashSet<>();
            boolean originalAutoCommit = true;
            if (lockDOs.size() > 1) {
                lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
            }
            try {
                conn = lockStoreDataSource.getConnection();
                if (originalAutoCommit = conn.getAutoCommit()) {
                    conn.setAutoCommit(false);
                }
                //check lock
                StringJoiner sj = new StringJoiner(",");
                for (int i = 0; i < lockDOs.size(); i++) {
                    sj.add("?");
                }
                boolean canLock = true;
                //query
                String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString());
                ps = conn.prepareStatement(checkLockSQL);
                for (int i = 0; i < lockDOs.size(); i++) {
                    ps.setString(i + 1, lockDOs.get(i).getRowKey());
                }
                rs = ps.executeQuery();
                String currentXID = lockDOs.get(0).getXid();
                while (rs.next()) {
                    String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                    if (!StringUtils.equals(dbXID, currentXID)) {//如果存在行记录而且还不是一个全局事务ID,获取锁失败
                        if (LOGGER.isInfoEnabled()) {
                            String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                            String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                            Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                            LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
                                dbBranchId);
                        }
                        canLock &= false;
                        break;
                    }
                    dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
                }
    
                if (!canLock) {
                    conn.rollback();
                    return false;
                }
                List<LockDO> unrepeatedLockDOs = null;
                if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                    unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                        .collect(Collectors.toList());
                } else {
                    unrepeatedLockDOs = lockDOs;
                }
                if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                    conn.rollback();
                    return true;
                }
                //lock
                if (unrepeatedLockDOs.size() == 1) {
                    LockDO lockDO = unrepeatedLockDOs.get(0);
                    if (!doAcquireLock(conn, lockDO)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                        }
                        conn.rollback();
                        return false;
                    }
                } else {
                    if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                                unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                        }
                        conn.rollback();
                        return false;
                    }
                }
                conn.commit();
                return true;

      来看看语句 

    /**
         * The constant CHECK_LOCK_SQL.
         */
        private static final String CHECK_LOCK_SQL = "select " + ALL_COLUMNS + " from " + LOCK_TABLE_PLACE_HOLD
            + " where " + ServerTableColumnsName.LOCK_TABLE_ROW_KEY + " in (" + IN_PARAMS_PLACE_HOLD + ")";

      LOCK_TABLE的表模型如下

    CREATE TABLE IF NOT EXISTS `lock_table`
    (
        `row_key`        VARCHAR(128) NOT NULL,
        `xid`            VARCHAR(96),
        `transaction_id` BIGINT,
        `branch_id`      BIGINT       NOT NULL,
        `resource_id`    VARCHAR(256),
        `table_name`     VARCHAR(32),
        `pk`             VARCHAR(36),
        `gmt_create`     DATETIME,
        `gmt_modified`   DATETIME,
        PRIMARY KEY (`row_key`),
        KEY `idx_branch_id` (`branch_id`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8;

      row_key就是要锁定的行的主键值

      查询的逻辑就是当前表里是不是有这行主键值等于入参的行数据,如果有全局事务id还不一样那说明另外的事务锁定该行,获取全局锁失败。

  • 相关阅读:
    spring源码分析之cache注解
    Full Gc经历分析
    spring源码分析之context
    spring源码分析之freemarker整合
    publishing failed with multiple errors resource is out of sync with the file system--转
    真正解决问题:maven eclipse tomcat java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderListener--转
    pyspark kafka createDirectStream和createStream 区别
    kafka 0.11 spark 2.11 streaming例子
    蜜罐技术——通过布置一些作为诱饵的主机、网络服务或者信息,诱使攻击方对它们实施攻击,从而可以对攻击行为进行捕获和分析
    安装和使用访问暗网
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15167158.html
Copyright © 2011-2022 走看看