zoukankan      html  css  js  c++  java
  • Sharding-Jdbc 3.1.0遇到的问题与处理

    读前请注意

    本文涉及的源码修改并没有经过严格的验证或大量测试,谨慎参考.

    提前configMapContext的注入便于shardingAlgorithm使用

    • 总的来说
      ShardingDataSource初始化时会读取"sharding.jdbc.config.config-map"配置放入configMapContext类中.
      但是用户自定义的shardingAlgorithm初始化先于configMapContext. 所以我给他提前了.
    • 具体来说
      下面这是读取配置的部分
    @ConfigurationProperties(prefix = "sharding.jdbc.config")
    public class SpringBootConfigMapConfigurationProperties {
        private Map<String, Object> configMap = new LinkedHashMap<>();
    }
    

    下面是修改部分.

    @EnableConfigurationProperties({
            SpringBootShardingRuleConfigurationProperties.class, SpringBootMasterSlaveRuleConfigurationProperties.class,
            SpringBootConfigMapConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class
    })
    public class SpringBootConfiguration implements EnvironmentAware {
    	//此处为spring读取的sharding.jdbc.config配置.
    	private final SpringBootConfigMapConfigurationProperties configMapProperties;
    	@Bean
        public DataSource dataSource() throws SQLException {
        	// ------------------以下为手动添加的部分------------------
            if (!configMapProperties.getConfigMap().isEmpty()) {
                ConfigMapContext.getInstance().getConfigMap().putAll(configMapProperties.getConfigMap());
            }
            // ------------------以上为手动添加的部分------------------
    		//shardingProperties.getShardingRuleConfiguration()会对ShardingAlgorithm进行实例化,此时configMapContetx尚未初始化.
            return null == masterSlaveProperties.getMasterDataSourceName()
                    ? ShardingDataSourceFactory
                    .createDataSource(dataSourceMap, shardingProperties.getShardingRuleConfiguration(), configMapProperties.getConfigMap(), propMapProperties.getProps())
                    : MasterSlaveDataSourceFactory.createDataSource(
                    dataSourceMap, masterSlaveProperties.getMasterSlaveRuleConfiguration(), configMapProperties.getConfigMap(), propMapProperties.getProps());
        }
    }
    /**
     *下面是ShardingDataSource的构造函数
     */
    public class ShardingDataSource extends AbstractDataSourceAdapter {
    	public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Map<String, Object> configMap, final Properties props) throws SQLException {
            super(dataSourceMap);
            checkDataSourceType(dataSourceMap);
            //原来是在此处进行configMapContext的初始化.
            if (!configMap.isEmpty()) {
                ConfigMapContext.getInstance().getConfigMap().putAll(configMap);
            }
            shardingContext = new ShardingContext(getDataSourceMap(), shardingRule, getDatabaseType(), props);
        }
    }
    

    解释:

    原本ConfigMapContext读取"sharding.jdbc.config"的配置是在ShradingDatasource初始化的时候注入的.但是ShardingDatasource初始化的时候先进行了ShardingRule的初始化(此时进行了ShardingAlgorithm的实例化),故在ShardingAlgorithm中初始化获取ConfigMapContext的时候还没有获取到配置文件中的配置.

    莫名的NullPointerException

    • 总的来说
      执行报错,抛空指针异常,发现真实异常被finally代码块中的异常覆盖了.然后改代码放出异常以便排查真正的报错原因.
    • 具体来说
      追踪下去发现了ShardingPreparedStatement中sqlRoute()方法抛了异常,routeResult就会为NULL,结果在finally的时候又抛了一次异常覆盖掉了前一次的异常,导致看到的都是nullPointerException
      病源:
    @Override
    public boolean execute() throws SQLException {
        try {
            clearPrevious();
            //抛出异常
            sqlRoute();
            initPreparedStatementExecutor();
            return preparedStatementExecutor.execute();
        } finally {
            //若sqlRoute()存在异常,则routeResult的值为Null.便会再抛一次NullpointerException
            refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
            clearBatch();
        }
    }
    
    private void sqlRoute() {
        routeResult = routingEngine.route(new ArrayList<>(getParameters()));
    }
    

    临时处理方案: 由于我们只有查询(query)语句,故直接在这里加上一个非空判断.

    if (routeResult != null) {
        refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
    }
    

    refreshTableMetaData()方法只有在如下几种场景时才会有作用,其中不包含我的查询场景.故直接跳过,具体作用没看

    protected final void refreshTableMetaData(final ShardingContext shardingContext, final SQLStatement sqlStatement) {
        if (sqlStatement instanceof CreateTableStatement) {
            refreshTableMetaData(shardingContext, (CreateTableStatement) sqlStatement);
        } else if (sqlStatement instanceof AlterTableStatement) {
            refreshTableMetaData(shardingContext, (AlterTableStatement) sqlStatement);
        } else if (sqlStatement instanceof DropTableStatement) {
            refreshTableMetaData(shardingContext, (DropTableStatement) sqlStatement);
        }
    }
    

    基于暗示(Hint)的路由,表级别的分片策略不生效,只走默认分片策略.

    • 总的来说
      仔细看官方文档.忘了就多看看.
      addDatabaseShardingValue不会跳过SQL解析引擎,setDatabaseShardingValue则会并且从此无视为单表配置好的分片策略,只会走默认策略
    • 具体来说
      一开始没有仔细看官方文档,导致我一直不明白为什么给表配置了shardingAlgorithm却走不进去.
      下面是官方文档内容:

    添加分片键值

    • 使用hintManager.addDatabaseShardingValue来添加数据源分片键值。
    • 使用hintManager.addTableShardingValue来添加表分片键值。

    分库不分表情况下,强制路由至某一个分库时,可使用hintManager.setDatabaseShardingValue方式添加分片。通过此方式添加分片键值后,将跳过SQL解析和改写阶段,从而提高整体执行效率。


    若是想分片逻辑精确控制到表单位得使用HintManager.getInstance().addDatabaseShardingValue()而不是setDatabaseShardingValue(),而我没仔细看,就踩了进去.

    若使用了setDatabaseShardingValue,
    将会使用DatabaseHintSQLRouter路由器并跳过sql解析直接指定目标库(兼容性最好,可以无视sharding-jdbc解析引擎不支持的sql语法报错,比如CTE这种解析引擎无法识别的语法再或者UNION这种解析引擎主动过滤的语法(后者可以强制开启,见三))

    public final class PreparedStatementRoutingEngine {
    
        public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
            this.logicSQL = logicSQL;
            //创建路由实例, 当使用了HintManager的setDatabaseShardingValue时,此处会生成DatabaseHintSQLRouter并跳过sql解析.
            //普通情况下,使用ParsingSQLRouter路由并解析sql只有解析了sql才知道表名,才能从配置中找到表对应的逻辑.
            shardingRouter = ShardingRouterFactory.newInstance(shardingRule, shardingMetaData, databaseType, showSQL);
            masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
        }
        //路由入口,由shardingRouter.route进行路由. shardingRouter的生成逻辑见上
        public SQLRouteResult route(final List<Object> parameters) {
            if (null == sqlStatement) {
                sqlStatement = shardingRouter.parse(logicSQL, true);
            }
            return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
        }
    }
    

    之所以使用setDatabaseShardingValue会使用DatabaseHintSQLRouter是因为set中databaseShardingOnly = true了

    public void setDatabaseShardingValue(final Comparable<?> value) {
        databaseShardingValues.clear();
        addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME, value);
        databaseShardingOnly = true;
    }
    

    而ShardingRouterFactory创建实例的时候又会根据这个databaseShardingOnly字段是否为true来决定创建哪一个ShardingRouter.见下:

    public static ShardingRouter newInstance(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
        return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingRule, showSQL) : new ParsingSQLRouter(shardingRule, shardingMetaData, databaseType, showSQL);
    }
    

    sharding-jdbc不支持union语句,强制放行.

    • 总的来说
      sharding-jdbc不让union语句执行,如果是项目初期可以考虑规避这个语法(强改也可以),如果是后来加入的又不想改SQL那么就很有必要改代码强制放行了,但会引入新的疑惑:1.会不会对SQL解析造成影响,2.出现多表查询时的疑惑(见具体最后的推测).
    • 具体来说
      刚加入sharidng-jdbc时就报了不支持UNION的错误,从逻辑上推理可能性大概就两种:一种是其使用的SQL解析引擎不支持UNION语法.另一种就是出于一些业务上的思考使其拒绝接受UNION语法.而前者我认为高概率不太可能,所以在领导的建议下我决定去看后者.
      最终一路追踪下去,可以看到UNION是硬编码写死的不支持,于是我强制去掉了DefaultKeyword.UNION
    public abstract class SelectRestClauseParser implements SQLClauseParser {
        private final LexerEngine lexerEngine;
        /**
         * Parse select rest.
         */
        public final void parse() {
            Collection<Keyword> unsupportedRestKeywords = new LinkedList<>();
            //可以强制去掉UNION
            unsupportedRestKeywords.addAll(Arrays.asList(DefaultKeyword.UNION, DefaultKeyword.INTERSECT, DefaultKeyword.EXCEPT, DefaultKeyword.MINUS));
            unsupportedRestKeywords.addAll(Arrays.asList(getUnsupportedKeywordsRest()));
            lexerEngine.unsupportedIfEqual(unsupportedRestKeywords.toArray(new Keyword[unsupportedRestKeywords.size()]));
        }
        protected abstract Keyword[] getUnsupportedKeywordsRest();
    }
    

    我推测是sharding-jdbc不想(也不该)做多个库的结果集合并时的去重逻辑,故直接ban掉了这个语法.比如下面这个sql路由到了多个库,然后在多个库执行完返回并聚合的时候,将会有两个2333 :
    select 2333 union select xxxx from xxxx

    addDatabaseShardingValue需要填写表名,我不想填.

    • 总的来说
      我想统一调用addDatabaseShardingValue()接口,不用再每个持久层调用处加上,但是这个API又需要写入表名.所以我就改了下代码不用填表名了.
    • 具体来说
      我用HintManager的addDatabaseShardingValue((String logicTable,Comparable<?> value) 来进行分库又不想填表名,因为我不是在每个持久层调用前手写加的addDatabaseShardingValue的,而是统一拦截了SQL的查询参数并add进去的(具体见下注),故此时没有获取到表名(可以获取SQL自己解析),而若想进入表的hint分片策略,则需要为此表add过shardingValue才行,故需要改造来适配我的需求.

    具体点说: 用的mybatis interceptor拦截Executor接口,因为sharding-jdbc的路由在prepareStatement创建时便决定好了,没办法通过拦截Executor更下层的接口add了.

    我就直接使用一个常量作为这个addDatabaseShardingValue的表名参数(logicTable),这里我使用的是HintManagerHolder.DB_TABLE_NAME.
    如果不作任何修改,那么结果将是不会进入我配置的表分片逻辑class里,而是将SQL在表所配的所有数据节点里跑一次.

    sharding.jdbc.config.sharding.tables.表名.database-strategy.hint.algorithm-class-name=分片逻辑class
    

    为了让其适配这种使用方式,
    追踪源码,一路找到了RoutingEngineFactory路由引擎工厂.
    ParsingSQLRouter通过这个引擎工厂创造RouteEngine然后执行其Route()方法决定路由目标库.
    我们为表配置了路由策略,工厂会为我们生产StandardRoutingEngine来为我们分析得出目标数据节点.

    //若有为表配置路由策略且查的是单表,那么将会使用StandardRoutingEngine. 若是没有配置过的表,使用DefaultDatabaseRoutingEngine
    RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, shardingConditions).route();
    

    接下来看StandardRoutingEngine.route()方法:

    public final class StandardRoutingEngine implements RoutingEngine {
    
        private final ShardingRule shardingRule;
        private final String logicTableName;
    
        @Override
        public RoutingResult route() {
            //重点是getDataNodes()方法
            return generateRoutingResult(getDataNodes(shardingRule.getTableRuleByLogicTableName(logicTableName)));
        }
    
        private Collection<DataNode> getDataNodes(final TableRule tableRule) {
            //若database-strategy和table-strategy都配置成了HintAlgorithm,则会走这个.
            if (isRoutingByHint(tableRule)) {
                return routeByHint(tableRule);
            }
            if (isRoutingByShardingConditions(tableRule)) {
                return routeByShardingConditions(tableRule);
            }
            //当我只配了其中一种的时候,走的是这个.上面的Condition目前不知道干啥的,没有进入过.
            return routeByMixedConditions(tableRule);
        }
    
        
        //无论routeByHint还是routeByMixedConditions最终都会进入这个routeDataSources方法决定路由库结果.
        private Collection<String> routeDataSources(final TableRule tableRule, final List<ShardingValue> databaseShardingValues) {
            Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
            //此处将会判断databaseShardingValue是否为空(即是否通过HintManager添加了分库值)
            if (databaseShardingValues.isEmpty()) {
                return availableTargetDatabases;
            }
            //此处获取我们的分库逻辑ShardingAlgorithm然后执行我们的doSharding方法.
            Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));
            Preconditions.checkState(!result.isEmpty(), "no database route info");
            return result;
        }
    
        //上述传入的databaseShardingValues就来自于此方法
        private List<ShardingValue> getDatabaseShardingValuesFromHint() {
            //HintManagerHolder中存的是一个Map,其存储了<表名-->对应的add进去的分库值>
            //可以将此处的logicTableName改为我们的HintManagerHolder.DB_TABLE_NAME适配上述的操作
            Optional<ShardingValue> shardingValueOptional = HintManagerHolder.getDatabaseShardingValue(logicTableName);
            return shardingValueOptional.isPresent() ? Collections.singletonList(shardingValueOptional.get()) : Collections.<ShardingValue>emptyList();
        }
    }
    

    最终都会进入routeDataSources再调用我们写好的分库逻辑dosharding进行分库.而此时提供给我们的List<ShardingValue> databaseShardingValues来自于
    getDatabaseShardingValuesFromHint方法(若为空,将会返回所有可用库,即拿着我们的sql去所有库都跑一次).
    因为我们往HintManager中add的值的key是HintManagerHolder.DB_TABLE_NAME,
    所以此处可以将HintManagerHolder.getDatabaseShardingValue(logicTableName)中的logicTableName参数改成我们期望的HintManagerHolder.DB_TABLE_NAME来完成此次需求.

    强制路由(set)时不知道表名是啥.

    • 总的来说
      最后选择了set方式强制路由,跳过SQL解析了.但我想在分片逻辑时知道表名(用于垂直分库)且不用改动原来的那份逻辑.那么答案就只有一个了,动手改吧. 跳过SQL解析就给它加上SQL解析,接口没有表名传递就给加上.

    其实也可以在持久层拦截处通过解析SQL方式获取到表名再作为分片键值传递给分片逻辑接口中.

    • 具体来说
      这个问题得先从我用了HintManager的setDatabaseShardingValue()之后开始讲起.
      起初我是用addDatabaseShardingValue添加分片键值,但是这种方式会走sql解析,不是说sql解析不好,而是我们的项目中存在使用CTE语法的情况,而这个语法不被sharding-jdbc 3.1.0的SQL解析引擎所支持.于是我便改用了setDatabaseShardingValue方式强制路由(PS:所幸我们在sharding-jdbc侧分库不分表(PS:我们分表使用PG自带的分区表,且我们只有查询的场景.)).
      通过此方式添加分片键值后,将跳过SQL解析和改写阶段(即不解析SQL内容,直接根据添加的分片键值判断目标库. 起初只有水平分库,又因为我是拦截mybatis参数的形式将分库字段扔进分库逻辑HintShardingAlgorithm接口中去(默认不带表名参数,所以这里改了原实现),故实现方便,直接在分库逻辑(即shardingAlgorithm接口)中判断分片键值然后给出水平库即可.
      后来又考虑加上垂直分库,其中几张表需要放入其他垂直库中去,最快的方式便是在ShardingAlgorithm里面再加一级判断表名的垂直分库,故我改了原来提供的HintShardingAlgorithm的接口,往里面追加了一各tableName的参数.
    public interface HintShardingAlgorithm extends ShardingAlgorithm {
        //起初的接口
        Collection<String> doSharding(Collection<String> availableTargetNames, ShardingValue shardingValue);
        //修改后的.
        Collection<String> doSharding(Collection<String> availableTargetNames, Collection<String> logicalTableNames, ShardingValue shardingValue);
    }
    

    接口改了,顺带也改了使用到接口的几个相关类.然后重头是这个logicalTableNames的参数从哪儿来. 既然非set方式会进行sql解析,我便也给set方式加上了sql解析,然后从其解析完的sqlStatement中获取表名.

    //路由引擎,负责调用路由器进行SQL解析和路由并返回路由结果
    public final class PreparedStatementRoutingEngine {
        //其他略...    
    
            //当未使用set时是ParsingSQLRouter,反之则是DatabaseHintSQLRouter,后者默认不走强制路由
            private final ShardingRouter shardingRouter;
    
        public SQLRouteResult route(final List<Object> parameters) {
            if (null == sqlStatement) {
                           // parse解析完后返回的sqlStatement中包含表名(如果进行了正确的SQL解析).
                sqlStatement = shardingRouter.parse(logicSQL, true);
            }
            return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
        }
    }
    //会进行SQL解析的路由器
    public final class ParsingSQLRouter implements ShardingRouter {
        //其他略....
    
        @Override
        public SQLStatement parse(final String logicSQL, final boolean useCache) {
            parsingHook.start(logicSQL);
            try {
                // 此处创建了SQL解析引擎并进行解析返回解析完成的SQLStatement
                SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable()).parse(useCache);
                parsingHook.finishSuccess();
                return result;
            } catch (final Exception ex) {
                parsingHook.finishFailure(ex);
                throw ex;
            }
        }
    }
    // 强制路由时使用的路由器
    public final class DatabaseHintSQLRouter implements ShardingRouter {
            //其他略......    
    
            //默认的解析逻辑,强制路由的路由器不会进行SQL解析,返回的sqlStatement中不包含表名.
        @Override
        public SQLStatement parse(final String logicSQL, final boolean useCache) {
            return new SQLJudgeEngine(logicSQL).judge();
        }
        //修改后的路由器解析方法,参照了ParsingSQLRouter解析SQL的方式.
        @Override
        public SQLStatement parse(final String logicSQL, final boolean useCache) {
            try {
                //与ParsingSQLRouter一样的逻辑
                return new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable()).parse(useCache);
            } catch (final Exception ex) {
                //留下了解析不了SQL就继续用原来的策略,为了应对解析不了的语法比如CTE.
                return new SQLJudgeEngine(logicSQL).judge();
            }
        }
    }
    

    都改完了,看起来都完善了之后又遇到了一个问题. 那便是实际跑起来,并没有拿到表名.
    无论我怎么跑,sqlStatement中的table永远都是空的.
    一路debug追踪SQL解析引擎发现了猫腻

    String tableName = SQLUtil.getExactlyValue(literals);
    if (Strings.isNullOrEmpty(tableName)) {
        return;
    }
    //1.是否单表(但是这个值从入口看发现是写死的false,原因不明)
    //2.是否有为此表配置分库分表逻辑(我走统一的强制路由,不进行单表逻辑配置)
    //3.是否广播表
    //4.绑定表是否有分库逻辑.
    //5.可用数据源(统一的分库逻辑,故此处是全部数据源)中是否包含默认数据源.
    if (isSingleTableOnly || shardingRule.findTableRuleByLogicTable(tableName).isPresent()
            || shardingRule.isBroadcastTable(tableName) || shardingRule.findBindingTableRule(tableName).isPresent()
            || shardingRule.getShardingDataSourceNames().getDataSourceNames().contains(shardingRule.getShardingDataSourceNames().getDefaultDataSourceName())) {
        sqlStatement.addSQLToken(new TableToken(beginPosition, skippedSchemaNameLength, literals));
        sqlStatement.getTables().add(new Table(tableName, aliasExpressionParser.parseTableAlias(sqlStatement, true, tableName)));
    } else {
        aliasExpressionParser.parseTableAlias();
    }
    

    追踪到这段代码的时候,表名实际上是解析获取成功了,但是下面的判断最终导致了tableName没有被添加进sqlStatement中去.
    而这当中,我最快能实现的便是添加上如下配置

    sharding.jdbc.config.sharding.default-data-source-name=默认数据源名
    

    最后终于可以愉快的在HintShardingAlgorithm中使用表名了.

  • 相关阅读:
    Flink集群模式部署及案例执行
    Solr查询解析及内核剖析
    Spark Streaming流计算核心概念
    Kaldi语音识别CVTE模型实战
    Kaldi基础代码库及建模
    Kaldi样例实战
    Solr文本分析剖析【文本分析、分词器详解、自定义文本分析字段及分词器】
    Flink场景分析与比较【事件驱动、数据分析、数据管道】
    什么是Apache Flink实时流计算框架?
    基于Tesseract实现图片文字识别
  • 原文地址:https://www.cnblogs.com/jiangxiewei/p/12970952.html
Copyright © 2011-2022 走看看