zoukankan      html  css  js  c++  java
  • sharding-jdbc-core 源码分析

    Sharding-Jdbc 源码分析

    Apache Sharding-Sphere 系列目录(https://www.cnblogs.com/binarylei/p/12217637.html)

    在看 Sharding-Jdbc 源码之前,强烈建议先阅读一直官网的文章:

    1. Apache Sharding-Jdbc 数据分片

    JDBC 调用过程如下:APP -> ORM -> JDBC -> PROXY -> MySQL。如果要完成数据的分库分表,可以在这五层任意地方进行,Sharding-Jdbc 是在 JDBC 层进行分库分表,Sharding-Proxy 是在 PROXY 进行分库分表。

    Sharding-Jdbc 是一个轻量级的分库分表框架,使用时最关键的是配制分库分表策略,其余的和使用普通的 MySQL 驱动一样,几乎不用改代码。具体使用方法参考:Apache Sharding-Jdbc 使用示例

    try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(
        createDataSourceMap(), shardingRuleConfig, new Properties()) {
        connection Connection = dataSource.getConnection();
        ...
    }
    

    1. Sharding-Jdbc 包结构

    sharding-jdbc  
        ├── sharding-jdbc-core      重写DataSource/Connection/Statement/ResultSet四大对象
        └── sharding-jdbc-orchestration        配置中心
    sharding-core
        ├── sharding-core-api       接口和配置类	
        ├── sharding-core-common    通用分片策略实现...
        ├── sharding-core-entry     SQL解析、路由、改写,核心类BaseShardingEngine
        ├── sharding-core-route     SQL路由,核心类StatementRoutingEngine
        ├── sharding-core-rewrite   SQL改写,核心类ShardingSQLRewriteEngine
        ├── sharding-core-execute   SQL执行,核心类ShardingExecuteEngine
        └── sharding-core-merge     结果合并,核心类MergeEngine
    shardingsphere-sql-parser 
        ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化SQLParser
        ├── shardingsphere-sql-parser-engine    SQL解析,核心类SQLParseEngine
        ├── shardingsphere-sql-parser-relation
        └── shardingsphere-sql-parser-mysql     MySQL解析器,核心类MySQLParserEntry和MySQLParser
    shardingsphere-underlying           基础接口和api
        ├── shardingsphere-rewrite      SQLRewriteEngine接口
        ├── shardingsphere-execute      QueryResult查询结果
        └── shardingsphere-merge        MergeEngine接口
    shardingsphere-spi                  SPI加载工具类
    sharding-transaction
        ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加载		
        ├── sharding-transaction-2pc    实现类XAShardingTransactionManager
        └── sharding-transaction-base   实现类SeataATShardingTransactionManager
    

    2. JDBC 四大对象

    所有的一切都从 ShardingDataSourceFactory 开始的,创建了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。

    public static DataSource createDataSource(
            final Map<String, DataSource> dataSourceMap,
            final ShardingRuleConfiguration shardingRuleConfig,
            final Properties props) throws SQLException {
        return new ShardingDataSource(dataSourceMap,
                   new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
    }
    

    说明: 本文主要以 ShardingDataSource 为切入点分析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。

    2.1 DataSource

    DataSource、Connection

    DataSource 和 Connection 都比较简单,没有处理过多的逻辑,只是 dataSourceMap, shardingRule 进行简单的封装。

    ShardingDataSource 持有对数据源和分片规则,可以通过 getConnection 方法获取 ShardingConnection 连接。

    private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
                    dataSourceMap, shardingRule, props, getDatabaseType());
    @Override
    public final ShardingConnection getConnection() {
        return new ShardingConnection(getDataSourceMap(), runtimeContext,
                TransactionTypeHolder.get());
    }
    

    ShardingDataSource 的功能非常简单,就不多说了。

    2.2 Connection

    ShardingConnection 可以创建 Statement 和 PrepareStatement 两种运行方式:

    @Override
    public Statement createStatement(final int resultSetType,
            final int resultSetConcurrency, final int resultSetHoldability) {
        return new ShardingStatement(this, resultSetType,
                resultSetConcurrency, resultSetHoldability);
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType,
            final int resultSetConcurrency, final int resultSetHoldability)
            throws SQLException {
        return new ShardingPreparedStatement(this, sql, resultSetType,
                resultSetConcurrency, resultSetHoldability);
    }
    

    说明: ShardingConnection 主要是将创建 ShardingStatement 和 ShardingPreparedStatement 两个对象,主要的执行逻辑都在 Statement 对象中。当然 ShardingConnection 还有两个重要的功能,一个是获取真正的数据库连接,一个是事务提交功能,事务不在本文讨论范围中。

    protected Connection createConnection(final String dataSourceName,
            final DataSource dataSource) throws SQLException {
        return isInShardingTransaction()
                ? shardingTransactionManager.getConnection(dataSourceName)
                : dataSource.getConnection();
    }
    

    说明: 如果有事务,则需要通过事务管理器获取连接,关于事务不在本文讨论范围中。

    2.3 Statement

    Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。

    Statement

    说明: Statement 分为 ShardingStatement 和 ShardingPrepareStatement 两种情况。本文以 ShardingStatement 为例分析 Sharding-Jdbc 执行过程。下一节会重点为分析 ShardingStatement 的执行流程。

    2.4 ResultSet

    ResultSet

    说明: ShardingResultSet 只是对 MergedResult 的简单封装。

    private final MergedResult mergeResultSet;
    @Override
    public boolean next() throws SQLException {
        return mergeResultSet.next();
    }
    

    3. Sharding-Jdbc 执行流程分析

    ShardingStatement 执行时序图

    总结: ShardingStatement 执行过程如下:

    1. SimpleQueryShardingEngine(或 PreparedQueryShardingEngine):完成 SQL 解析、路由、改写,位于 sharding-jdbc-core 工程中。SimpleQueryShardingEngine 直接将路由的功能委托给 StatementRoutingEngine(或 PreparedQueryShardingEngine),本质是对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine 的封装。
    2. StatementExecutor(或 PreparedStatementExecutor): 提供 SQL 执行的操作,位于 sharding-jdbc-core 工程中。本质是对 ShardingExecuteEngine 的封装。
    3. StatementRoutingEngine:SQL 路由引擎,位于 sharding-core-route 工程中。路由引擎包装了 SQL 解析、路由、改写三点。SQL 路由分两步,先进行数据分片路由(ShardingRouter),再进行主从路由(ShardingMasterSlaveRouter)。
    4. SQLParseEngine:SQL 解析引擎,位于 shardingsphere-sql-parser 工程中。目前有 MySQL和 PostgreSQL 两种。
    5. ShardingSQLRewriteEngine:SQL 改写引擎,位于 sharding-core-rewrite 工程中。
    6. ShardingExecuteEngine:执行引擎,位于 sharding-core-execute 工程中。StatementExecutor 对
    7. MergeEngine:结果合并引擎,位于 sharding-core-merge 工程中。

    接下来一下会对 ShardingStatement 深入分析,之后会对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine、ShardingExecuteEngine、MergeEngine 一个引擎进行分析。

    4. sharding-jdbc-core 任务执行分析

    ShardingStatement 内部有三个核心的类,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最后调用 MergeEngine 对结果进行合并处理。

    4.1 ShardingStatement

    4.1.2 初始化

    private final ShardingConnection connection;
    private final StatementExecutor statementExecutor;
    
    public ShardingStatement(final ShardingConnection connection) {
        this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
                ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection connection, final int resultSetType,
            final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.connection = connection;
        statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
                resultSetHoldability, connection);
    }
    

    说明: ShardingStatement 内部执行 SQL 委托给了 statementExecutor。关于 ResultSet.CONCUR_READ_ONLY 等参考这里

    4.1.2 执行

    (1)executeQuery 执行过程

    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
            shard(sql);
            // 2. 生成执行计划 SQLRouteResult -> StatementExecuteUnit
            initStatementExecutor();
            // 3. statementExecutor.executeQuery() 执行任务
            MergeEngine mergeEngine = MergeEngineFactory.newInstance(
                    connection.getRuntimeContext().getDatabaseType(),
                    connection.getRuntimeContext().getRule(), sqlRouteResult,
                    connection.getRuntimeContext().getMetaData().getRelationMetas(),
                    statementExecutor.executeQuery());
            // 4. 结果合并
            result = getResultSet(mergeEngine);
        } finally {
            currentResultSet = null;
        }
        currentResultSet = result;
        return result;
    }
    

    (2)SQL 路由(包括 SQL 解析、路由、改写)

    private SQLRouteResult sqlRouteResult;
    private void shard(final String sql) {
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
                runtimeContext.getRule(), runtimeContext.getProps(),
                runtimeContext.getMetaData(), runtimeContext.getParseEngine());
        sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
    }
    

    说明: SimpleQueryShardingEngine 进行 SQL 路由(包括 SQL 解析、路由、改写),生成 SQLRouteResult。之后会有一节专门分析 SQL 路由过程。

    当 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的执行任务就全部交给 StatementExecutor 完成。

    4.2 StatementExecutor

    StatementExecutor 内部封装了 SQL 任务的执行过程,包括:SqlExecutePrepareTemplate 类生成执行计划 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。

    4.2.1 类结构

    StatementExecutor 类图

    4.2.2 重要属性

    AbstractStatementExecutor 类中重要的属性:

    // SQLExecutePrepareTemplate用于生成执行计划StatementExecuteUnit
    private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
    // 保存生成的执行计划StatementExecuteUnit
    private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
                new LinkedList<>();
    
    // SQLExecuteTemplate用于执行StatementExecuteUnit
    private final SQLExecuteTemplate sqlExecuteTemplate;
    // 保存查询结果
    private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
    

    4.2.3 生成执行计划

    // 执行前清理状态
    private void clearPrevious() throws SQLException {
        statementExecutor.clear();
    }
    // 执行时初始化
    private void initStatementExecutor() throws SQLException {
        statementExecutor.init(sqlRouteResult);
        replayMethodForStatements();
    }
    

    说明: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 重新初始化。下面我们看一下 init 主要做了些什么事。

    statementExecutor.init() 初始化主要是生成执行计划 StatementExecuteUnit。

    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatementContext(routeResult.getSqlStatementContext());
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        cacheStatements();
    }
    
    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
            final Collection<RouteUnit> routeUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
                routeUnits, new SQLExecutePrepareCallback() {
                    // 获取连接
                    @Override
                    public List<Connection> getConnections(
                            final ConnectionMode connectionMode,
                            final String dataSourceName, final int connectionSize)
                            throws SQLException {
                        return StatementExecutor.super.getConnection().getConnections(
                                connectionMode, dataSourceName, connectionSize);
                    }
    
                    // 生成执行计划RouteUnit -> StatementExecuteUnit
                    @Override
                    public StatementExecuteUnit createStatementExecuteUnit(
                            final Connection connection, final RouteUnit routeUnit,
                            final ConnectionMode connectionMode) throws SQLException {
                        return new StatementExecuteUnit(
                                routeUnit, connection.createStatement(
                                getResultSetType(), getResultSetConcurrency(),
                                getResultSetHoldability()), connectionMode);
                    }
                });
    }
    

    说明: SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行计划,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在任务执行时我们会看到这个类。

    4.2.4 任务执行

    public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = 
            new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            @Override
            protected QueryResult executeSQL(final String sql, final Statement statement,
                    final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(sql, statement, connectionMode);
            }
        };
        // 执行StatementExecuteUnit
        return executeCallback(executeCallback);
    }
    
    // sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)
    protected final <T> List<T> executeCallback(
            final SQLExecuteCallback<T> executeCallback) throws SQLException {
        // 执行所有的任务 StatementExecuteUnit
        List<T> result = sqlExecuteTemplate.executeGroup(
                (Collection) executeGroups, executeCallback);
        refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
        return result;
    }
    

    说明: SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 getQueryResult 方法。

    private QueryResult getQueryResult(final String sql, final Statement statement,
            final ConnectionMode connectionMode) throws SQLException {
        ResultSet resultSet = statement.executeQuery(sql);
        getResultSets().add(resultSet);
        return ConnectionMode.MEMORY_STRICTLY == connectionMode
                ? new StreamQueryResult(resultSet)
                : new MemoryQueryResult(resultSet);
    }
    

    说明: ConnectionMode 有两种模式:内存限制(MEMORY_STRICTLY)和连接限制(CONNECTION_STRICTLY),本质是一种资源隔离,保护服务器资源不被消耗殆尽。

    如果一个连接执行多个 StatementExecuteUnit 则为内存限制(MEMORY_STRICTLY),采用流式处理,即 StreamQueryResult ,反之则为连接限制(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特别是在 Sharding-Proxy 中特别有用,避免将代理服务器撑爆,见 Apache Sharding-Proxy 核心原理


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    WRF WPS预处理
    CVS安装
    Linux窗口最小化消失,任务栏上无法找到的解决方法
    NCARG安装配置出现error while loading shared libraries: libg2c.so.0问题额解决办法
    Netcdf安装
    Embedding R-generated Interactive HTML pages in MS PowerPoint(转)
    The leaflet package for online mapping in R(转)
    Some 3D Graphics (rgl) for Classification with Splines and Logistic Regression (from The Elements of Statistical Learning)(转)
    What does a Bayes factor feel like?(转)
    Weka算法介绍
  • 原文地址:https://www.cnblogs.com/binarylei/p/12234545.html
Copyright © 2011-2022 走看看