zoukankan      html  css  js  c++  java
  • sharding-jdbc-core 源码分析

    Sharding-Jdbc 源码分析

    Apache Sharding-Sphere 系列目录(https://www.cnblogs.com/binarylei/p/12217637.html)

    在看 Sharding-Jdbc 源码之前,强烈建议先阅读一直官网的文章:

    1. Apache Sharding-Jdbc 数据分片

    JDBC 调用过程如下:APP -> ORM -> JDBC -> PROXY -> MySQL。如果要完成数据的分库分表,可以在这五层任意地方进行,Sharding-Jdbc 是在 JDBC 层进行分库分表,Sharding-Proxy 是在 PROXY 进行分库分表。

    Sharding-Jdbc 是一个轻量级的分库分表框架,使用时最关键的是配制分库分表策略,其余的和使用普通的 MySQL 驱动一样,几乎不用改代码。具体使用方法参考:Apache Sharding-Jdbc 使用示例

    try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(
        createDataSourceMap(), shardingRuleConfig, new Properties()) {
        connection Connection = dataSource.getConnection();
        ...
    }
    

    1. Sharding-Jdbc 包结构

    sharding-jdbc  
        ├── sharding-jdbc-core      重写DataSource/Connection/Statement/ResultSet四大对象
        └── sharding-jdbc-orchestration        配置中心
    sharding-core
        ├── sharding-core-api       接口和配置类	
        ├── sharding-core-common    通用分片策略实现...
        ├── sharding-core-entry     SQL解析、路由、改写,核心类BaseShardingEngine
        ├── sharding-core-route     SQL路由,核心类StatementRoutingEngine
        ├── sharding-core-rewrite   SQL改写,核心类ShardingSQLRewriteEngine
        ├── sharding-core-execute   SQL执行,核心类ShardingExecuteEngine
        └── sharding-core-merge     结果合并,核心类MergeEngine
    shardingsphere-sql-parser 
        ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化SQLParser
        ├── shardingsphere-sql-parser-engine    SQL解析,核心类SQLParseEngine
        ├── shardingsphere-sql-parser-relation
        └── shardingsphere-sql-parser-mysql     MySQL解析器,核心类MySQLParserEntry和MySQLParser
    shardingsphere-underlying           基础接口和api
        ├── shardingsphere-rewrite      SQLRewriteEngine接口
        ├── shardingsphere-execute      QueryResult查询结果
        └── shardingsphere-merge        MergeEngine接口
    shardingsphere-spi                  SPI加载工具类
    sharding-transaction
        ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加载		
        ├── sharding-transaction-2pc    实现类XAShardingTransactionManager
        └── sharding-transaction-base   实现类SeataATShardingTransactionManager
    

    2. JDBC 四大对象

    所有的一切都从 ShardingDataSourceFactory 开始的,创建了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。

    public static DataSource createDataSource(
            final Map<String, DataSource> dataSourceMap,
            final ShardingRuleConfiguration shardingRuleConfig,
            final Properties props) throws SQLException {
        return new ShardingDataSource(dataSourceMap,
                   new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
    }
    

    说明: 本文主要以 ShardingDataSource 为切入点分析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。

    2.1 DataSource

    DataSource、Connection

    DataSource 和 Connection 都比较简单,没有处理过多的逻辑,只是 dataSourceMap, shardingRule 进行简单的封装。

    ShardingDataSource 持有对数据源和分片规则,可以通过 getConnection 方法获取 ShardingConnection 连接。

    private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
                    dataSourceMap, shardingRule, props, getDatabaseType());
    @Override
    public final ShardingConnection getConnection() {
        return new ShardingConnection(getDataSourceMap(), runtimeContext,
                TransactionTypeHolder.get());
    }
    

    ShardingDataSource 的功能非常简单,就不多说了。

    2.2 Connection

    ShardingConnection 可以创建 Statement 和 PrepareStatement 两种运行方式:

    @Override
    public Statement createStatement(final int resultSetType,
            final int resultSetConcurrency, final int resultSetHoldability) {
        return new ShardingStatement(this, resultSetType,
                resultSetConcurrency, resultSetHoldability);
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType,
            final int resultSetConcurrency, final int resultSetHoldability)
            throws SQLException {
        return new ShardingPreparedStatement(this, sql, resultSetType,
                resultSetConcurrency, resultSetHoldability);
    }
    

    说明: ShardingConnection 主要是将创建 ShardingStatement 和 ShardingPreparedStatement 两个对象,主要的执行逻辑都在 Statement 对象中。当然 ShardingConnection 还有两个重要的功能,一个是获取真正的数据库连接,一个是事务提交功能,事务不在本文讨论范围中。

    protected Connection createConnection(final String dataSourceName,
            final DataSource dataSource) throws SQLException {
        return isInShardingTransaction()
                ? shardingTransactionManager.getConnection(dataSourceName)
                : dataSource.getConnection();
    }
    

    说明: 如果有事务,则需要通过事务管理器获取连接,关于事务不在本文讨论范围中。

    2.3 Statement

    Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。

    Statement

    说明: Statement 分为 ShardingStatement 和 ShardingPrepareStatement 两种情况。本文以 ShardingStatement 为例分析 Sharding-Jdbc 执行过程。下一节会重点为分析 ShardingStatement 的执行流程。

    2.4 ResultSet

    ResultSet

    说明: ShardingResultSet 只是对 MergedResult 的简单封装。

    private final MergedResult mergeResultSet;
    @Override
    public boolean next() throws SQLException {
        return mergeResultSet.next();
    }
    

    3. Sharding-Jdbc 执行流程分析

    ShardingStatement 执行时序图

    总结: ShardingStatement 执行过程如下:

    1. SimpleQueryShardingEngine(或 PreparedQueryShardingEngine):完成 SQL 解析、路由、改写,位于 sharding-jdbc-core 工程中。SimpleQueryShardingEngine 直接将路由的功能委托给 StatementRoutingEngine(或 PreparedQueryShardingEngine),本质是对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine 的封装。
    2. StatementExecutor(或 PreparedStatementExecutor): 提供 SQL 执行的操作,位于 sharding-jdbc-core 工程中。本质是对 ShardingExecuteEngine 的封装。
    3. StatementRoutingEngine:SQL 路由引擎,位于 sharding-core-route 工程中。路由引擎包装了 SQL 解析、路由、改写三点。SQL 路由分两步,先进行数据分片路由(ShardingRouter),再进行主从路由(ShardingMasterSlaveRouter)。
    4. SQLParseEngine:SQL 解析引擎,位于 shardingsphere-sql-parser 工程中。目前有 MySQL和 PostgreSQL 两种。
    5. ShardingSQLRewriteEngine:SQL 改写引擎,位于 sharding-core-rewrite 工程中。
    6. ShardingExecuteEngine:执行引擎,位于 sharding-core-execute 工程中。StatementExecutor 对
    7. MergeEngine:结果合并引擎,位于 sharding-core-merge 工程中。

    接下来一下会对 ShardingStatement 深入分析,之后会对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine、ShardingExecuteEngine、MergeEngine 一个引擎进行分析。

    4. sharding-jdbc-core 任务执行分析

    ShardingStatement 内部有三个核心的类,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最后调用 MergeEngine 对结果进行合并处理。

    4.1 ShardingStatement

    4.1.2 初始化

    private final ShardingConnection connection;
    private final StatementExecutor statementExecutor;
    
    public ShardingStatement(final ShardingConnection connection) {
        this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
                ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection connection, final int resultSetType,
            final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.connection = connection;
        statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
                resultSetHoldability, connection);
    }
    

    说明: ShardingStatement 内部执行 SQL 委托给了 statementExecutor。关于 ResultSet.CONCUR_READ_ONLY 等参考这里

    4.1.2 执行

    (1)executeQuery 执行过程

    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
            shard(sql);
            // 2. 生成执行计划 SQLRouteResult -> StatementExecuteUnit
            initStatementExecutor();
            // 3. statementExecutor.executeQuery() 执行任务
            MergeEngine mergeEngine = MergeEngineFactory.newInstance(
                    connection.getRuntimeContext().getDatabaseType(),
                    connection.getRuntimeContext().getRule(), sqlRouteResult,
                    connection.getRuntimeContext().getMetaData().getRelationMetas(),
                    statementExecutor.executeQuery());
            // 4. 结果合并
            result = getResultSet(mergeEngine);
        } finally {
            currentResultSet = null;
        }
        currentResultSet = result;
        return result;
    }
    

    (2)SQL 路由(包括 SQL 解析、路由、改写)

    private SQLRouteResult sqlRouteResult;
    private void shard(final String sql) {
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
                runtimeContext.getRule(), runtimeContext.getProps(),
                runtimeContext.getMetaData(), runtimeContext.getParseEngine());
        sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
    }
    

    说明: SimpleQueryShardingEngine 进行 SQL 路由(包括 SQL 解析、路由、改写),生成 SQLRouteResult。之后会有一节专门分析 SQL 路由过程。

    当 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的执行任务就全部交给 StatementExecutor 完成。

    4.2 StatementExecutor

    StatementExecutor 内部封装了 SQL 任务的执行过程,包括:SqlExecutePrepareTemplate 类生成执行计划 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。

    4.2.1 类结构

    StatementExecutor 类图

    4.2.2 重要属性

    AbstractStatementExecutor 类中重要的属性:

    // SQLExecutePrepareTemplate用于生成执行计划StatementExecuteUnit
    private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
    // 保存生成的执行计划StatementExecuteUnit
    private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
                new LinkedList<>();
    
    // SQLExecuteTemplate用于执行StatementExecuteUnit
    private final SQLExecuteTemplate sqlExecuteTemplate;
    // 保存查询结果
    private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
    

    4.2.3 生成执行计划

    // 执行前清理状态
    private void clearPrevious() throws SQLException {
        statementExecutor.clear();
    }
    // 执行时初始化
    private void initStatementExecutor() throws SQLException {
        statementExecutor.init(sqlRouteResult);
        replayMethodForStatements();
    }
    

    说明: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 重新初始化。下面我们看一下 init 主要做了些什么事。

    statementExecutor.init() 初始化主要是生成执行计划 StatementExecuteUnit。

    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatementContext(routeResult.getSqlStatementContext());
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        cacheStatements();
    }
    
    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
            final Collection<RouteUnit> routeUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
                routeUnits, new SQLExecutePrepareCallback() {
                    // 获取连接
                    @Override
                    public List<Connection> getConnections(
                            final ConnectionMode connectionMode,
                            final String dataSourceName, final int connectionSize)
                            throws SQLException {
                        return StatementExecutor.super.getConnection().getConnections(
                                connectionMode, dataSourceName, connectionSize);
                    }
    
                    // 生成执行计划RouteUnit -> StatementExecuteUnit
                    @Override
                    public StatementExecuteUnit createStatementExecuteUnit(
                            final Connection connection, final RouteUnit routeUnit,
                            final ConnectionMode connectionMode) throws SQLException {
                        return new StatementExecuteUnit(
                                routeUnit, connection.createStatement(
                                getResultSetType(), getResultSetConcurrency(),
                                getResultSetHoldability()), connectionMode);
                    }
                });
    }
    

    说明: SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行计划,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在任务执行时我们会看到这个类。

    4.2.4 任务执行

    public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = 
            new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            @Override
            protected QueryResult executeSQL(final String sql, final Statement statement,
                    final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(sql, statement, connectionMode);
            }
        };
        // 执行StatementExecuteUnit
        return executeCallback(executeCallback);
    }
    
    // sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)
    protected final <T> List<T> executeCallback(
            final SQLExecuteCallback<T> executeCallback) throws SQLException {
        // 执行所有的任务 StatementExecuteUnit
        List<T> result = sqlExecuteTemplate.executeGroup(
                (Collection) executeGroups, executeCallback);
        refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
        return result;
    }
    

    说明: SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 getQueryResult 方法。

    private QueryResult getQueryResult(final String sql, final Statement statement,
            final ConnectionMode connectionMode) throws SQLException {
        ResultSet resultSet = statement.executeQuery(sql);
        getResultSets().add(resultSet);
        return ConnectionMode.MEMORY_STRICTLY == connectionMode
                ? new StreamQueryResult(resultSet)
                : new MemoryQueryResult(resultSet);
    }
    

    说明: ConnectionMode 有两种模式:内存限制(MEMORY_STRICTLY)和连接限制(CONNECTION_STRICTLY),本质是一种资源隔离,保护服务器资源不被消耗殆尽。

    如果一个连接执行多个 StatementExecuteUnit 则为内存限制(MEMORY_STRICTLY),采用流式处理,即 StreamQueryResult ,反之则为连接限制(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特别是在 Sharding-Proxy 中特别有用,避免将代理服务器撑爆,见 Apache Sharding-Proxy 核心原理


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    想要在控件里面使用触发器,只需要将下面代码填入控件中间即可
    WPF有关控件和模板样式设计的微软官方文档
    XDG0062 XAML 与XDG0008 XAML 错误的解决办法
    在WPF中一种较好的绑定Enums数据方法
    FrameworkElementFactory中的SetBinding与SetValue
    线性表结构:栈
    关于链表的一些问题
    使用Mybatis-Plus的一个坑
    双向链表和双向循环链表
    上下文切换的确切含义了解下
  • 原文地址:https://www.cnblogs.com/binarylei/p/12234545.html
Copyright © 2011-2022 走看看