zoukankan      html  css  js  c++  java
  • ShardingSphere~8

    SQL简要流程

    ShardingSphereDataSource ->  ShardingSphereConnection ->  ShardingSphereStatement/ShardingSpherePreparedStatement查询为例

        @Override
        public ResultSet executeQuery(final String sql) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            ResultSet result;
            try {
                executionContext = createExecutionContext(sql);
                List<QueryResult> queryResults = executeQuery0();
                MergedResult mergedResult = mergeQuery(queryResults);
                result = new ShardingSphereResultSet(getResultSetsForShardingSphereResultSet(), mergedResult, this, executionContext);
            } finally {
                currentResultSet = null;
            }
            currentResultSet = result;
            return result;
        }

    创建执行单元

        private ExecutionContext createExecutionContext(final String sql) throws SQLException {
            clearStatements();
            LogicSQL logicSQL = createLogicSQL(sql);
            SQLCheckEngine.check(logicSQL.getSqlStatementContext().getSqlStatement(), logicSQL.getParameters(), 
                    metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(), DefaultSchema.LOGIC_NAME, metaDataContexts.getMetaDataMap(), null);
            return kernelProcessor.generateExecutionContext(logicSQL, metaDataContexts.getDefaultMetaData(), metaDataContexts.getProps());
        }

    路由,改写,执行

        public ExecutionContext generateExecutionContext(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final ConfigurationProperties props) {
            RouteContext routeContext = route(logicSQL, metaData, props);
            SQLRewriteResult rewriteResult = rewrite(logicSQL, metaData, props, routeContext);
            ExecutionContext result = createExecutionContext(logicSQL, metaData, routeContext, rewriteResult);
            logSQL(logicSQL, props, result);
            return result;
        }

    结果合并

        private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
            ShardingSphereMetaData metaData = metaDataContexts.getDefaultMetaData();
            MergeEngine mergeEngine = new MergeEngine(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), 
                    metaData.getSchema(), metaDataContexts.getProps(), metaData.getRuleMetaData().getRules());
            return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
        }
        private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                                   final Map<String, Integer> columnLabelIndexMap, final ShardingSphereSchema schema) throws SQLException {
            if (isNeedProcessGroupBy(selectStatementContext)) {
                return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schema);
            }
            if (isNeedProcessDistinctRow(selectStatementContext)) {
                setGroupByForDistinctRow(selectStatementContext);
                return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schema);
            }
            if (isNeedProcessOrderBy(selectStatementContext)) {
                return new OrderByStreamMergedResult(queryResults, selectStatementContext, schema);
            }
            return new IteratorStreamMergedResult(queryResults);
        }

    在结果合并中,连接数限制情况下执行内存合并,内存限制下可以使用流式合并,使用各个数据源连接的迭代器进行游标遍历,结合优先级队列做排序、分组等

    GroupByStreamMergedResult、GroupByMemoryMergedResult

    public final class GroupByMemoryMergedResult extends MemoryMergedResult<ShardingRule> {
        
        public GroupByMemoryMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
            super(null, schema, selectStatementContext, queryResults);
        }
        
        @Override
        protected List<MemoryQueryResultRow> init(final ShardingRule shardingRule, final ShardingSphereSchema schema, 
                                                  final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults, final MergedResult mergedResult) throws SQLException {
            SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
            Map<GroupByValue, MemoryQueryResultRow> dataMap = new HashMap<>(1024);
            Map<GroupByValue, Map<AggregationProjection, AggregationUnit>> aggregationMap = new HashMap<>(1024);
            for (QueryResult each : queryResults) {
                while (each.next()) {
                    GroupByValue groupByValue = new GroupByValue(each, selectStatementContext.getGroupByContext().getItems());
                    initForFirstGroupByValue(selectStatementContext, each, groupByValue, dataMap, aggregationMap);
                    aggregate(selectStatementContext, each, groupByValue, aggregationMap);
                }
            }
            setAggregationValueToMemoryRow(selectStatementContext, dataMap, aggregationMap);
            List<Boolean> valueCaseSensitive = queryResults.isEmpty() ? Collections.emptyList() : getValueCaseSensitive(queryResults.iterator().next(), selectStatementContext, schema);
            return getMemoryResultSetRows(selectStatementContext, dataMap, valueCaseSensitive);
        }
        
        private void initForFirstGroupByValue(final SelectStatementContext selectStatementContext, final QueryResult queryResult,
                                              final GroupByValue groupByValue, final Map<GroupByValue, MemoryQueryResultRow> dataMap,
                                              final Map<GroupByValue, Map<AggregationProjection, AggregationUnit>> aggregationMap) throws SQLException {
            if (!dataMap.containsKey(groupByValue)) {
                dataMap.put(groupByValue, new MemoryQueryResultRow(queryResult));
            }
            if (!aggregationMap.containsKey(groupByValue)) {
                Map<AggregationProjection, AggregationUnit> map = Maps.toMap(selectStatementContext.getProjectionsContext().getAggregationProjections(), 
                    input -> AggregationUnitFactory.create(input.getType(), input instanceof AggregationDistinctProjection));
                aggregationMap.put(groupByValue, map);
            }
        }
        
        private void aggregate(final SelectStatementContext selectStatementContext, final QueryResult queryResult,
                               final GroupByValue groupByValue, final Map<GroupByValue, Map<AggregationProjection, AggregationUnit>> aggregationMap) throws SQLException {
            for (AggregationProjection each : selectStatementContext.getProjectionsContext().getAggregationProjections()) {
                List<Comparable<?>> values = new ArrayList<>(2);
                if (each.getDerivedAggregationProjections().isEmpty()) {
                    values.add(getAggregationValue(queryResult, each));
                } else {
                    for (AggregationProjection derived : each.getDerivedAggregationProjections()) {
                        values.add(getAggregationValue(queryResult, derived));
                    }
                }
                aggregationMap.get(groupByValue).get(each).merge(values);
            }
        }
        
        private Comparable<?> getAggregationValue(final QueryResult queryResult, final AggregationProjection aggregationProjection) throws SQLException {
            Object result = queryResult.getValue(aggregationProjection.getIndex(), Object.class);
            Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");
            return (Comparable<?>) result;
        }
        
        private void setAggregationValueToMemoryRow(final SelectStatementContext selectStatementContext, 
                                                    final Map<GroupByValue, MemoryQueryResultRow> dataMap, final Map<GroupByValue, Map<AggregationProjection, AggregationUnit>> aggregationMap) {
            for (Entry<GroupByValue, MemoryQueryResultRow> entry : dataMap.entrySet()) {
                for (AggregationProjection each : selectStatementContext.getProjectionsContext().getAggregationProjections()) {
                    entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult());
                }
            }
        }
        
        private List<Boolean> getValueCaseSensitive(final QueryResult queryResult, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
            List<Boolean> result = Lists.newArrayList(false);
            for (int columnIndex = 1; columnIndex <= queryResult.getMetaData().getColumnCount(); columnIndex++) {
                result.add(getValueCaseSensitiveFromTables(queryResult, selectStatementContext, schema, columnIndex));
            }
            return result;
        }
        
        private boolean getValueCaseSensitiveFromTables(final QueryResult queryResult, 
                                                        final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema, final int columnIndex) throws SQLException {
            for (SimpleTableSegment each : selectStatementContext.getAllSimpleTableSegments()) {
                String tableName = each.getTableName().getIdentifier().getValue();
                TableMetaData tableMetaData = schema.get(tableName);
                Map<String, ColumnMetaData> columns = tableMetaData.getColumns();
                String columnName = queryResult.getMetaData().getColumnName(columnIndex);
                if (columns.containsKey(columnName)) {
                    return columns.get(columnName).isCaseSensitive();
                }
            }
            return false;
        }
        
        private List<MemoryQueryResultRow> getMemoryResultSetRows(final SelectStatementContext selectStatementContext,
                                                                  final Map<GroupByValue, MemoryQueryResultRow> dataMap, final List<Boolean> valueCaseSensitive) {
            if (dataMap.isEmpty()) {
                Object[] data = generateReturnData(selectStatementContext);
                return Collections.singletonList(new MemoryQueryResultRow(data));
            }
            List<MemoryQueryResultRow> result = new ArrayList<>(dataMap.values());
            result.sort(new GroupByRowComparator(selectStatementContext, valueCaseSensitive));
            return result;
        }
        
        private Object[] generateReturnData(final SelectStatementContext selectStatementContext) {
            List<Projection> projections = new LinkedList<>(selectStatementContext.getProjectionsContext().getProjections());
            Object[] result = new Object[projections.size()];
            for (int i = 0; i < projections.size(); i++) {
                if (projections.get(i) instanceof AggregationProjection && AggregationType.COUNT == ((AggregationProjection) projections.get(i)).getType()) {
                    result[i] = 0;
                }
            }
            return result;
        }
    }
    View Code
    public final class GroupByStreamMergedResult extends OrderByStreamMergedResult {
        
        private final SelectStatementContext selectStatementContext;
        
        private final List<Object> currentRow;
        
        private List<?> currentGroupByValues;
        
        public GroupByStreamMergedResult(final Map<String, Integer> labelAndIndexMap, final List<QueryResult> queryResults,
                                         final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
            super(queryResults, selectStatementContext, schema);
            this.selectStatementContext = selectStatementContext;
            currentRow = new ArrayList<>(labelAndIndexMap.size());
            currentGroupByValues = getOrderByValuesQueue().isEmpty()
                    ? Collections.emptyList() : new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues();
        }
        
        @Override
        public boolean next() throws SQLException {
            currentRow.clear();
            if (getOrderByValuesQueue().isEmpty()) {
                return false;
            }
            if (isFirstNext()) {
                super.next();
            }
            if (aggregateCurrentGroupByRowAndNext()) {
                currentGroupByValues = new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues();
            }
            return true;
        }
        
        private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
            boolean result = false;
            boolean cachedRow = false;
            Map<AggregationProjection, AggregationUnit> aggregationUnitMap = Maps.toMap(
                    selectStatementContext.getProjectionsContext().getAggregationProjections(), input -> AggregationUnitFactory.create(input.getType(), input instanceof AggregationDistinctProjection));
            while (currentGroupByValues.equals(new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues())) {
                aggregate(aggregationUnitMap);
                if (!cachedRow) {
                    cacheCurrentRow();
                    cachedRow = true;
                }
                result = super.next();
                if (!result) {
                    break;
                }
            }
            setAggregationValueToCurrentRow(aggregationUnitMap);
            return result;
        }
        
        private void aggregate(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) throws SQLException {
            for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
                List<Comparable<?>> values = new ArrayList<>(2);
                if (entry.getKey().getDerivedAggregationProjections().isEmpty()) {
                    values.add(getAggregationValue(entry.getKey()));
                } else {
                    for (AggregationProjection each : entry.getKey().getDerivedAggregationProjections()) {
                        values.add(getAggregationValue(each));
                    }
                }
                entry.getValue().merge(values);
            }
        }
        
        private void cacheCurrentRow() throws SQLException {
            for (int i = 0; i < getCurrentQueryResult().getMetaData().getColumnCount(); i++) {
                currentRow.add(getCurrentQueryResult().getValue(i + 1, Object.class));
            }
        }
        
        private Comparable<?> getAggregationValue(final AggregationProjection aggregationProjection) throws SQLException {
            Object result = getCurrentQueryResult().getValue(aggregationProjection.getIndex(), Object.class);
            Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");
            return (Comparable<?>) result;
        }
        
        private void setAggregationValueToCurrentRow(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) {
            for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
                currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult());
            }
        }
        
        @Override
        public Object getValue(final int columnIndex, final Class<?> type) {
            Object result = currentRow.get(columnIndex - 1);
            setWasNull(null == result);
            return result;
        }
        
        @Override
        public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) {
            Object result = currentRow.get(columnIndex - 1);
            setWasNull(null == result);
            return result;
        }
    }
    View Code
  • 相关阅读:
    手动为php安装memcached扩展模块
    Linux下程序包管理工具RPM
    【Hibernate】一级、二级缓冲
    MySQL的用户和权限介绍
    一步一步教你安装openstack
    LVS负载平衡集群(没成型)
    SQL Server等待事件—PAGEIOLATCH_EX
    ORACLE ANALYZE使用小结
    为什么你SQL Server中SQL日期转换出错了呢?
    SQL Server误设置max server memory处理小结
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15000293.html
Copyright © 2011-2022 走看看