zoukankan      html  css  js  c++  java
  • 数据库路由中间件MyCat

    此文已由作者张镐薪授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。

    5. 路由模块

    真正取得RouteResultset的步骤:AbstractRouteStrategy的route方法:这里写图片描述对应源代码:

    public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
                String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {    /**
         * 处理一些路由之前的逻辑
         * 全局序列号,父子表插入
         */
        if ( beforeRouteProcess(schema, sqlType, origSQL, sc) )        return null;    /**
         * SQL 语句拦截
         */
        String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);    if (origSQL != stmt && LOGGER.isDebugEnabled()) {
            LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
        }    //对应schema标签checkSQLschema属性,把表示schema的字符去掉
        if (schema.isCheckSQLSchema()) {
            stmt = RouterUtil.removeSchema(stmt, schema.getName());
        }
    
        RouteResultset rrs = new RouteResultset(stmt, sqlType);    /**
         * 优化debug loaddata输出cache的日志会极大降低性能
         */
        if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
            rrs.setCacheAble(false);
        }       /**
            * rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
            * select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
            */
        if (sc != null ) {
            rrs.setAutocommit(sc.isAutocommit());
        }    /**
         * DDL 语句的路由
         */
        if (ServerParse.DDL == sqlType) {        return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
        }    /**
         * 检查是否有分片
         */
        if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
            rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
        } else {
            RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);        if (returnedSet == null) {
                rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
            }
        }    return rrs;
    }


    5.3 路由之前的逻辑 - 判断子表插入以及全局序列号的生成:


    AbstractRouteStrategy.java

    /**
     * 路由之前必要的处理
     * 主要是全局序列号插入,还有子表插入
     */private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
            throws SQLNonTransientException {    return RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
                || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
                || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
    }


    这里利用了Java的一个特性,||表达式,前半部分如果为真,则后半部分不会被执行。首先执行RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc),这个方法是判断是否是显示使用全局序列号的sql语句,比如像:insert into table1(id,name) values(next value for MYCATSEQ_GLOBAL,‘test’);对于这样的语句处理是先将改写next value for MYCATSEQ_GLOBAL 为调用全局ID生成的ID,之后进入AST语句解析路由

    如果不是,则执行(sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc)),这个方法判断是否是子表插入:部分代码:

    String tableName = StringUtil.getTableName(origSQL).toUpperCase();final TableConfig tc = schema.getTables().get(tableName);//判断是否为子表,如果不是,只会返回falseif (null != tc && tc.isChildTable()) {final RouteResultset rrs = new RouteResultset(origSQL, ServerParse.INSERT);
    String joinKey = tc.getJoinKey();//因为是Insert语句,用MySqlInsertStatement进行parseMySqlInsertStatement insertStmt = (MySqlInsertStatement) (new MySqlStatementParser(origSQL)).parseInsert();
    ......


    这里注意,所有类型的SQL语句都有druid对应的SQLparser,比如说这里的插入语句就用MySqlInsertStatement解析。druidparser在这节先不讲,会在 AST语义解析路由中详细讲述。

    接上面代码:

    //判断条件完整性,取得解析后语句列中的joinkey列的index
        int joinKeyIndex = getJoinKeyIndex(insertStmt.getColumns(), joinKey);    if (joinKeyIndex == -1) {
            String inf = "joinKey not provided :" + tc.getJoinKey() + "," + insertStmt;
            LOGGER.warn(inf);        throw new SQLNonTransientException(inf);
        }    //子表不支持批量插入
        if (isMultiInsert(insertStmt)) {
            String msg = "ChildTable multi insert not provided";
            LOGGER.warn(msg);        throw new SQLNonTransientException(msg);
        }    //取得joinkey的值
        String joinKeyVal = insertStmt.getValues().getValues().get(joinKeyIndex).toString();
    
        String sql = insertStmt.toString();    // try to route by ER parent partion key
        //如果是二级子表(父表不再有父表),并且分片字段正好是joinkey字段,调用routeByERParentKey
        RouteResultset theRrs = RouterUtil.routeByERParentKey(sc, schema, ServerParse.INSERT, sql, rrs, tc, joinKeyVal);    if (theRrs != null) {        boolean processedInsert=false;        //判断是否需要全局序列号
                  if ( sc!=null && tc.isAutoIncrement()) {
                      String primaryKey = tc.getPrimaryKey();
                      processedInsert=processInsert(sc,schema,ServerParse.INSERT,sql,tc.getName(),primaryKey);
                  }              if(processedInsert==false){
                      rrs.setFinishedRoute(true);
                      sc.getSession2().execute(rrs, ServerParse.INSERT);
                  }        return true;
        }    // route by sql query root parent's datanode
        //如果不是二级子表或者分片字段不是joinKey字段结果为空,则启动异步线程去后台分片查询出datanode
        //只要查询出上一级表的parentkey字段的对应值在哪个分片即可
        final String findRootTBSql = tc.getLocateRTableKeySql().toLowerCase() + joinKeyVal;    if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("find root parent's node sql " + findRootTBSql);
        }
    
        ListenableFuture<String> listenableFuture = MycatServer.getInstance().
                getListeningExecutorService().submit(new Callable<String>() {        @Override
            public String call() throws Exception {
                FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();            return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
            }
        });
    
    
        Futures.addCallback(listenableFuture, new FutureCallback<String>() {        @Override
            public void onSuccess(String result) {            //结果为空,证明上一级表中不存在那条记录,失败
                if (Strings.isNullOrEmpty(result)) {
                    StringBuilder s = new StringBuilder();
                    LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +                        " err:" + "can't find (root) parent sharding node for sql:" + origSQL);
                    sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can't find (root) parent sharding node for sql:" + origSQL);                return;
                }            if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found partion node for child table to insert " + result + " sql :" + origSQL);
                }            //找到分片,进行插入(和其他的一样,需要判断是否需要全局自增ID)
                boolean processedInsert=false;                  if ( sc!=null && tc.isAutoIncrement()) {                      try {
                              String primaryKey = tc.getPrimaryKey();
                        processedInsert=processInsert(sc,schema,ServerParse.INSERT,origSQL,tc.getName(),primaryKey);
                    } catch (SQLNonTransientException e) {
                        LOGGER.warn("sequence processInsert error,",e);
                        sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR , "sequence processInsert error," + e.getMessage());
                    }
                      }                  if(processedInsert==false){
                          RouteResultset executeRrs = RouterUtil.routeToSingleNode(rrs, result, origSQL);
                          sc.getSession2().execute(executeRrs, ServerParse.INSERT);
                      }
    
            }        @Override
            public void onFailure(Throwable t) {
                StringBuilder s = new StringBuilder();
                LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +                    " err:" + t.getMessage());
                sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, t.getMessage() + " " + s.toString());
            }
        }, MycatServer.getInstance().
                getListeningExecutorService());    return true;
    }return false;

    如果返回false,则继续执行(sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc)) 这个是处理一般的SQL插入语句,将其中的自增主键字段的值改写成内置的全局ID生成器生成的id。RouterUtil.java:

    public static boolean processInsert(SchemaConfig schema, int sqlType,
                                            String origSQL, ServerConnection sc) throws SQLNonTransientException {
        String tableName = StringUtil.getTableName(origSQL).toUpperCase();
        TableConfig tableConfig = schema.getTables().get(tableName);    boolean processedInsert=false;    //判断是有自增字段
        if (null != tableConfig && tableConfig.isAutoIncrement()) {
            String primaryKey = tableConfig.getPrimaryKey();
            processedInsert=processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey);
        }    return processedInsert;
    }


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击




    相关文章:
    【推荐】 Spring Boot + Mybatis 多数据源配置实现读写分离
    【推荐】 Puppeteer入门初探
    【推荐】 敏捷初体验--没有最好只有最合适

  • 相关阅读:
    Django_jinja2
    css画太极
    python 自己实现map
    python 比赛 组合问题
    python 找素数
    如何快速掌握一门新技术/语言/框架
    jQuery常用事件-思维导图
    jQuery常用函数-思维导图
    jQuery选择器汇总-思维导图
    3.git版本控制-管理修改、撤销、删除
  • 原文地址:https://www.cnblogs.com/163yun/p/9895352.html
Copyright © 2011-2022 走看看