zoukankan      html  css  js  c++  java
  • 数据库路由中间件MyCat

    此文已由作者张镐薪授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    调用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey):


    public static boolean processInsert(ServerConnection sc,SchemaConfig schema,            int sqlType,String origSQL,String tableName,String primaryKey) throws SQLNonTransientException {    int firstLeftBracketIndex = origSQL.indexOf("(");    int firstRightBracketIndex = origSQL.indexOf(")");
        String upperSql = origSQL.toUpperCase();    int valuesIndex = upperSql.indexOf("VALUES");    int selectIndex = upperSql.indexOf("SELECT");    int fromIndex = upperSql.indexOf("FROM");    //屏蔽insert into table1 select * from table2语句
        if(firstLeftBracketIndex < 0) {
            String msg = "invalid sql:" + origSQL;
            LOGGER.warn(msg);        throw new SQLNonTransientException(msg);
        }    //屏蔽批量插入
        if(selectIndex > 0 &&fromIndex>0&&selectIndex>firstRightBracketIndex&&valuesIndex<0) {
            String msg = "multi insert not provided" ;
            LOGGER.warn(msg);        throw new SQLNonTransientException(msg);
        }    //插入语句必须提供列结构,因为MyCat默认对于表结构无感知
        if(valuesIndex + "VALUES".length() <= firstLeftBracketIndex) {        throw new SQLSyntaxErrorException("insert must provide ColumnList");
        }    //如果主键不在插入语句的fields中,则需要进一步处理
        boolean processedInsert=!isPKInFields(origSQL,primaryKey,firstLeftBracketIndex,firstRightBracketIndex);    if(processedInsert){
            processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey,firstLeftBracketIndex+1,origSQL.indexOf('(',firstRightBracketIndex)+1);
        }    return processedInsert;
    }复制代码


    对于主键不在插入语句的fields中的SQL,需要改写。比如hotnews主键为id,插入语句为:

    insert into hotnews(title) values('aaa');复制代码

    需要改写成:

    insert into hotnews(id, title) values(next value for MYCATSEQ_hotnews,'aaa');复制代码

    这个在下面这个函数实现:

    private static void processInsert(ServerConnection sc, SchemaConfig schema, int sqlType, String origSQL,
                String tableName, String primaryKey, int afterFirstLeftBracketIndex, int afterLastLeftBracketIndex) {    int primaryKeyLength = primaryKey.length();    int insertSegOffset = afterFirstLeftBracketIndex;
        String mycatSeqPrefix = "next value for MYCATSEQ_";    int mycatSeqPrefixLength = mycatSeqPrefix.length();    int tableNameLength = tableName.length();    char[] newSQLBuf = new char[origSQL.length() + primaryKeyLength + mycatSeqPrefixLength + tableNameLength + 2];
        origSQL.getChars(0, afterFirstLeftBracketIndex, newSQLBuf, 0);
        primaryKey.getChars(0, primaryKeyLength, newSQLBuf, insertSegOffset);
        insertSegOffset += primaryKeyLength;
        newSQLBuf[insertSegOffset] = ',';
        insertSegOffset++;
        origSQL.getChars(afterFirstLeftBracketIndex, afterLastLeftBracketIndex, newSQLBuf, insertSegOffset);
        insertSegOffset += afterLastLeftBracketIndex - afterFirstLeftBracketIndex;
        mycatSeqPrefix.getChars(0, mycatSeqPrefixLength, newSQLBuf, insertSegOffset);
        insertSegOffset += mycatSeqPrefixLength;
        tableName.getChars(0, tableNameLength, newSQLBuf, insertSegOffset);
        insertSegOffset += tableNameLength;
        newSQLBuf[insertSegOffset] = ',';
        insertSegOffset++;
        origSQL.getChars(afterLastLeftBracketIndex, origSQL.length(), newSQLBuf, insertSegOffset);
        processSQL(sc, schema, new String(newSQLBuf), sqlType);
    }复制代码

    最后的processSQL(sc, schema, new String(newSQLBuf), sqlType);是将语句放入执行队列: 这里MyCat考虑NIO线程吞吐量以及全局ID生成线程安全的问题,使用如下流程执行需要全局ID的SQL insert语句。 processSQL(sc, schema, new String(newSQLBuf), sqlType):

    SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
    MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);复制代码


    5.4 DDL语句路由

    可以分为两步,整体源代码:

    public static RouteResultset routeToDDLNode(RouteResultset rrs, int sqlType, String stmt,SchemaConfig schema) throws SQLSyntaxErrorException {
        stmt = getFixedSql(stmt);
        String tablename = "";        
        final String upStmt = stmt.toUpperCase();    if(upStmt.startsWith("CREATE")){        if (upStmt.contains("CREATE INDEX ")){
                tablename = RouterUtil.getTableName(stmt, RouterUtil.getCreateIndexPos(upStmt, 0));
            }else tablename = RouterUtil.getTableName(stmt, RouterUtil.getCreateTablePos(upStmt, 0));
        }else if(upStmt.startsWith("DROP")){        if (upStmt.contains("DROP INDEX ")){
                tablename = RouterUtil.getTableName(stmt, RouterUtil.getDropIndexPos(upStmt, 0));
            }else tablename = RouterUtil.getTableName(stmt, RouterUtil.getDropTablePos(upStmt, 0));
        }else if(upStmt.startsWith("ALTER")){
            tablename = RouterUtil.getTableName(stmt, RouterUtil.getAlterTablePos(upStmt, 0));
        }else if (upStmt.startsWith("TRUNCATE")){
            tablename = RouterUtil.getTableName(stmt, RouterUtil.getTruncateTablePos(upStmt, 0));
        }
        tablename = tablename.toUpperCase();    if (schema.getTables().containsKey(tablename)){        if(ServerParse.DDL==sqlType){
                List<String> dataNodes = new ArrayList<>();
                Map<String, TableConfig> tables = schema.getTables();
                TableConfig tc;            if (tables != null && (tc = tables.get(tablename)) != null) {
                    dataNodes = tc.getDataNodes();
                }
                Iterator<String> iterator1 = dataNodes.iterator();            int nodeSize = dataNodes.size();
                RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSize];            for(int i=0;i<nodeSize;i++){
                    String name = iterator1.next();
                    nodes[i] = new RouteResultsetNode(name, sqlType, stmt);
                }
                rrs.setNodes(nodes);
            }        return rrs;
        }else if(schema.getDataNode()!=null){        //默认节点ddl
            RouteResultsetNode[] nodes = new RouteResultsetNode[1];
            nodes[0] = new RouteResultsetNode(schema.getDataNode(), sqlType, stmt);
            rrs.setNodes(nodes);        return rrs;
        }    //both tablename and defaultnode null
        LOGGER.error("table not in schema----"+tablename);    throw new SQLSyntaxErrorException("op table not in schema----"+tablename);
    }复制代码

    首先,获取表名,步骤如下:

    拿一个获取表名的函数举例:

    /**
     * 获取语句中前关键字位置和占位个数表名位置
     *
     * @param upStmt
     *            执行语句
     * @param start
     *            开始位置
     * @return int[]关键字位置和占位个数
     * @author aStoneGod
     */public static int[] getCreateIndexPos(String upStmt, int start) {
        String token1 = "CREATE ";
        String token2 = " INDEX ";
        String token3 = " ON ";    int createInd = upStmt.indexOf(token1, start);    int idxInd = upStmt.indexOf(token2, start);    int onInd = upStmt.indexOf(token3, start);    // 既包含CREATE又包含INDEX,且CREATE关键字在INDEX关键字之前, 且包含ON...
        if (createInd >= 0 && idxInd > 0 && idxInd > createInd && onInd > 0 && onInd > idxInd) {        return new int[] {onInd , token3.length() };
        } else {        return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1,第二个任意
        }
    }复制代码

    然后,根据表名获取配置进行路由:

    默认语句路由

    对于有默认节点的schema,且不是show, describe, select @@之类的语句,则路由到默认的节点上。 对于show, describe, select @@之类的语句,利用查询信息路由方法算出路由。接下来,取一个举例,对于Show语句:analyseShowSQL(schema, rrs, stmt)方法

    5.5 AST语义解析路由

    首先我们看一下MySQL的SQL解析步骤(硬解析和软解析):MyCat的机制,仿照MySQL的,可以总结为:这里我们可以总结一个优化思路,就是通过仿照MySQL物理优化原理(定时更新表配置,报表信息),来做进一步MyCat查询的优化。语义解析基本过程:

    1.词法分析(一般抽象都叫Lexer):不同的关键词有不同的含义

    select concat(id,'_',name),value from student where value>60 order by value复制代码

    词法分析的输出,就是一句带上词义的语句:

    (select: Keyword)  (concat: Keyword)((: LB)…… (from: keyword) (student: identifier)复制代码

    2.语法分析:

    • 分析关键词之间的联系,生成表达式(expression)

    • 基本语法正确性判断(比如from这个keyword之后必须紧跟一个表名(就是一个identifier))

    3.生成AST语意树(完整解析的statement)根据MyCat权威指南,DruidParser比其他Parser快很多很多。 快在哪里呢?主要是抽象静态化的粒度,拿jsqlparser和druidparser对比。 这两个parser都遵从了上面的步骤,对于词(lexer),表达式(expression)和语句AST(statement)都有抽象。 但是对于语句AST(statement)的抽象, DruidParser做的粒度更细。如下图对于Alter语句的对比:所以,不难推测为啥DruidParser快了


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击




    相关文章:
    【推荐】 Android 标题栏(1)
    【推荐】 virtualenv简介以及一个比较折腾的scrapy安装方法
    【推荐】 企业建立私有云的N个理由


  • 相关阅读:
    Benchmarking Apache Kafka, Apache Pulsar, and RabbitMQ: Which is the Fastest?
    Kafka实战:集群SSL加密认证和配置(最新版kafka-2.7.0)
    Postgresql 编译安装教程
    CentOS在线和离线安装PostgreSQL
    ubuntu apt-get update连不上dl.google.com解决方法
    ubuntu E: Sub-process /usr/bin/dpkg returned an error code (1)解决办法
    ubuntu apt-get更新出现W: GPG error: http://repo.mysql.com trusty InRelease
    hadoop3.2.2 ERROR: but there is no HDFS_NAMENODE_USER defined. Aborting operation. Starting datanodes
    Hudi on flink v0.7.0 使用遇到的问题及解决办法
    RocksDB in Flink官方答疑:Using RocksDB State Backend in Apache Flink: When and How
  • 原文地址:https://www.cnblogs.com/twodog/p/12135707.html
Copyright © 2011-2022 走看看