zoukankan      html  css  js  c++  java
  • 【源码解析】Sharding-Jdbc的执行过程(一)

    一、ShardingContext

    在Sharding-Jdbc中,我们其实需要抓住一个核心类,也就是ShardingContext,分片上下文,里面定义了下面几个内容:

    @RequiredArgsConstructor
    @Getter
    public final class ShardingContext {
        //分片规则
        private final ShardingRule shardingRule;
        //数据库类型
        private final DatabaseType databaseType;
        //执行引擎
        private final ExecutorEngine executorEngine;
        //是否要在log文件中展示sql语句
        private final boolean showSQL;
    }
    

    里面的几个参数很简单易懂,但是里面包含的内容也是整个中间件的核心内容。

    二、preparedStatement

    这块对应于代码中的jdbc/core/statement,里面包括两部分,一部分是statement,一部分是prepareStatement。考虑到性能问题(prepareStatement可以进行缓存)和代码的优雅性(变量的设置),以及考虑到通常JDBC使用的都是prepareStatement,所以我们着重看下prepareStatement的两部分,即MasterSlavePreparedStatement和ShardingPreparedStatement。

    2.1 MasterSlavePreparedStatement

    2.1.1 获取connection

    支持读写分离。这块可以先看一些他的构造方法,构造方法中一般都有这样的方法:

    connection.getConnections(sqlStatement.getType())
    

    这块其实是根据sql的类型,来获取不同的连接

    • 如果是DDL类型,会获取所有的连接,包括master和slave,也就是说对于表结构的修改,会修改包括master和slave
    • 如果是DML类型,也就是写操作,获取的是master的连接
    • 最后,如果是DQL类型,也就是读操作,是根据读写分离的策略获取某个slave连接

    后面两种是有本地缓存的,可以避免每次都进行connect构建,提高效率。

    当然,这个sqlType是怎么来的?这就需要用到sql解析的模块了。

    SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
    

    这块先提一下,后续再详细讲解。

    2.1.2 执行sql

    主要有三个方法,

    • executeQuery
    public ResultSet executeQuery() throws SQLException {
        Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL");
        return routedStatements.iterator().next().executeQuery();
    }
    

    首先说明下,这里的routedStatements是待执行sql的一个集合,这块首先确保只有一个查询语句。然后直接调用PreparedStatement.executeQuery()方法,比较直观。

    • executeUpdate
    public int executeUpdate() throws SQLException {
        int result = 0;
        for (PreparedStatement each : routedStatements) {
            result += each.executeUpdate();
        }
        return result;
    }
    

    这块逻辑与上面一致,不过不一样的是,可以传入多个update的语句,然后顺序执行,最后调用的是PreparedStatement.executeUpdate()方法。

    • execute
    public boolean execute() throws SQLException {
        boolean result = false;
        for (PreparedStatement each : routedStatements) {
            result = each.execute();
        }
        return result;
    }
    

    最后就是一些其他的方法的执行了。

    总的来说,读写分离这块的内容比较简单,首先sql语句不要怎么变化,再者数据库连接connection基本上都是确定的,所以不需要路由什么的,可以直接运行。

    2.2 ShardingPreparedStatement

    下面我们看下分库分表的情况下,来执行我们的sql,这块就稍微有些复杂了。

    2.2.1 构造方法

    首先看一下他的构造方法,构造方法中有一些很奇怪的常量,比如TYPE_FORWARD_ONLY、CONCUR_READ_ONLY等等,这块可以参考这篇博客。其实就是在读写过程中的指针的方向等内容。

    2.2.2 sql执行

    2.2.2.1 查询

    查询的逻辑大概如下:

    • sql路由
    • sql改写
    • sql执行
    • 结果合并

    下面看下代码:

    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
            List<ResultSet> resultSets = new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
            result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
    

    首先我们需要看几个类:

    • PreparedStatementUnit
      • SQLExecutionUnit:包括dataSource和sql
      • PreparedStatement

    这个类,会用于最终我们到具体的数据库上执行sql,调用route方法,其实就是路由到具体的服务器上面。

    private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
        routeResult = routingEngine.route(getParameters());
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            SQLType sqlType = routeResult.getSqlStatement().getType();
            Collection<PreparedStatement> preparedStatements;
            if (SQLType.DDL == sqlType) {
                preparedStatements = generatePreparedStatementForDDL(each);
            } else {
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
            routedStatements.addAll(preparedStatements);
            for (PreparedStatement preparedStatement : preparedStatements) {
                replaySetParameter(preparedStatement);
                result.add(new PreparedStatementUnit(each, preparedStatement));
            }
        }
        return result;
    }
    

    这里面有个routeResult,其实就是根据入参路由到的数据库列表。这边有一个路由引擎,看下这边是怎么路由的:

    public SQLRouteResult route(final List<Object> parameters) {
        if (null == sqlStatement) {
            sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
        }
        return sqlRouter.route(logicSQL, parameters, sqlStatement);
    }
    

    这边有几个参数,

    • logicSQL:表示逻辑上的sql,因为业务使用时,写的sql其实是逻辑上的sql,用到的数据库和表也是逻辑库和逻辑表
    • parameters:传入的参数
    • sqlStatement:最终的sql语句

    首先解析sql,然后路由。这边的解析还是老套路,但是如果解析出来的sql是insert,而且配置了自动生成key,那么会调用自动生成key的方法,生成key,放到对应的column下。解析这块后续再分析。下面我们看下路由:

    @Override
    public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        SQLRouteResult result = new SQLRouteResult(sqlStatement);
        if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
            processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
        }
        //路由
        RoutingResult routingResult = route(parameters, sqlStatement);
        //重写sql
        SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement);
        boolean isSingleRouting = routingResult.isSingleRouting();
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
            processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
        }
        SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
        if (routingResult instanceof CartesianRoutingResult) {
            for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
                for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
                    result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
                }
            }
        } else {
            for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
                result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
            }
        }
        if (showSQL) {
            SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
        }
        return result;
    }
    

    先看路由:

    private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
        Collection<String> tableNames = sqlStatement.getTables().getTableNames();
        RoutingEngine routingEngine;
        //不涉及到具体的表,全路由
        if (tableNames.isEmpty()) {
            routingEngine = new DatabaseAllRoutingEngine(shardingRule.getDataSourceMap());
        } else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
            routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
        } else {
            // TODO config for cartesian set
            routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
        }
        return routingEngine.route();
    }
    

    这里涉及到一个bindingTable的概念,概念如下:指在任何场景下分片规则均一致的主表和子表。例:订单表和订单项表,均按照订单ID分片,则此两张表互为BindingTable关系。BindingTable关系的多表关联查询不会出现笛卡尔积关联,关联查询效率将大大提升。

    这里最终返回的结果RoutingResult类似如下:
    image

    这里面有几种路由类,后续再详细分析。

    获取到数据库表的路由信息后,就到了sql改写的过程。这里也有一个sql改写的引擎:++SQLRewriteEngine++。中间插入了一个判断,routingResult.isSingleRouting(),判断是否路由之后的DB只有一个。这里有个小的优化:如果是select语句,用到了limit,而且最终落到了单片上,那么sql语句是不会被重写的,通过rewrite方法也可以看出来,是和isSingleRouting取反的。

    public SQLBuilder rewrite(final boolean isRewriteLimit) {
        SQLBuilder result = new SQLBuilder();
        if (sqlTokens.isEmpty()) {
            result.appendLiterals(originalSQL);
            return result;
        }
        int count = 0;
        sortByBeginPosition();
        for (SQLToken each : sqlTokens) {
            if (0 == count) {
                result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));
            }
            if (each instanceof TableToken) {
                appendTableToken(result, (TableToken) each, count, sqlTokens);
            } else if (each instanceof ItemsToken) {
                appendItemsToken(result, (ItemsToken) each, count, sqlTokens);
            } else if (each instanceof RowCountToken) {
                appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit);
            } else if (each instanceof OffsetToken) {
                appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit);
            } else if (each instanceof OrderByToken) {
                appendOrderByToken(result, count, sqlTokens);
            }
            count++;
        }
        return result;
    }
    

    执行完成这步之后,sql语句重写基本完成。如果路由结果涉及到笛卡尔积,还需要对sql进行进一步的重写,因为涉及到关联表。下面涉及到真正转化sql这部分,使用的方法是:rewriteEngine.generateSQL,涉及到sql中的逻辑库表替换为实际的库表,形成SQLExecutionUnit,添加到待执行的sql列表中,最终得到SQLRouteResult。至此,sql路由完成。回到ShardingPreparedStatement的route方法。遍历routeResult,形成预执行的statement集合。

    再回到ShardingPreparedStatement的executeQuery()方法,route()得到了Collection,下面就到了真正要执行sql的步骤。

  • 相关阅读:
    python3 pyinstaller
    python3 random
    python3 turtle
    产生一个序列的所有排列组合
    蒙特卡洛算法
    lightoj 1014
    UVA11426
    nginx 配置本地https(免费证书)
    ElementUI
    Airbnb 代码规范
  • 原文地址:https://www.cnblogs.com/f-zhao/p/7877470.html
Copyright © 2011-2022 走看看