zoukankan      html  css  js  c++  java
  • MyCat源码分析系列之——SQL下发

    更多MyCat源码分析,请戳MyCat源码分析系列


     SQL下发

    SQL下发指的是MyCat将解析并改造完成的SQL语句依次发送至相应的MySQL节点(datanode)的过程,该执行过程由NonBlockingSession.execute()触发:

    public void execute(RouteResultset rrs, int type) {
            // clear prev execute resources
            clearHandlesResources();
            if (LOGGER.isDebugEnabled()) {
                StringBuilder s = new StringBuilder();
                LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
            }
    
            // 检查路由结果是否为空
            RouteResultsetNode[] nodes = rrs.getNodes();
            if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                    || nodes[0].getName().equals("")) {
                source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
                        "No dataNode found ,please check tables defined in schema:"
                                + source.getSchema());
                return;
            }
            if (nodes.length == 1) {
                singleNodeHandler = new SingleNodeHandler(rrs, this);
                try {
                    singleNodeHandler.execute();
                } catch (Exception e) {
                    LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
                }
            } else {
                boolean autocommit = source.isAutocommit();
                SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                        .getSystem();
                int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
                multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
                        this);
    
                try {
                    multiNodeHandler.execute();
                } catch (Exception e) {
                    LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
                }
            }
    }

    从代码中可以看到,首先对于路由节点信息RouteResultsetNode[]进行了判断,如果不存在任何需要派发的节点则直接返回;如果是单节点操作,则创建SingleNodeHandler实例,并调用其execute();如果是多节点操作,则创建MultiNodeQueryHandler实例,并调用其execute()

    下面先来看单节点操作的SQL下发过程,以下是SingleNodeHandler的execute()方法:

    public void execute() throws Exception {
            startTime=System.currentTimeMillis();
            ServerConnection sc = session.getSource();
            this.isRunning = true;
            this.packetId = 0;
            final BackendConnection conn = session.getTarget(node);
            if (session.tryExistsCon(conn, node)) {
                _execute(conn);
            } else {
                // create new connection
    
                MycatConfig conf = MycatServer.getInstance().getConfig();
                PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
                dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                        node);
            }
    }

    如果session已经有该datanode关联的后端连接(session.tryExistsCon(conn, node)返回true),则调用_execute()方法下发SQL指令;反之,则调用dn.getConnection()方法从连接池中获取一个可用连接或新建一个连接,并且由于第4个参数将this作为ResponseHandler对象传入,获取到连接后会在PhysicalDatasource.takeCon()中调用handler.connectionAcquired(conn)完成回调,即SingleNodeHandler.connectionAcquired()

    public void connectionAcquired(final BackendConnection conn) {
        session.bindConnection(node, conn);
        _execute(conn);
    }

    该方法先将获取到的后端连接关联到本session中,随后同样调用_execute()方法下发SQL指令。_execute()方法的实现如下:

    private void _execute(BackendConnection conn) {
            if (session.closed()) {
                endRunning();
                session.clearResources(true);
                return;
            }
            conn.setResponseHandler(this);
            try {
                conn.execute(node, session.getSource(), session.getSource()
                        .isAutocommit());
            } catch (Exception e1) {
                executeException(conn, e1);
                return;
            }
    }

    首先,很重要的是通过conn.setResponseHandler(this)将SingleNodeHandler与当前后端连接(MySQLConnection)以及连接中包含的MySQLConnectionHandler实例关联起来,这样做的目的是当结果返回的时候可以回调SingleNodeHandler相应的方法处理。随后调用MySQLConnection.execute()

    public void execute(RouteResultsetNode rrn, ServerConnection sc,
                boolean autocommit) throws UnsupportedEncodingException {
            if (!modifiedSQLExecuted && rrn.isModifySQL()) {
                modifiedSQLExecuted = true;
            }
            String xaTXID = sc.getSession2().getXaTXID();
            synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),
                    autocommit);
        }
    
        private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
                int clientCharSetIndex, int clientTxIsoLation,
                boolean clientAutoCommit) {
            String xaCmd = null;
    
            boolean conAutoComit = this.autocommit;
            String conSchema = this.schema;
            // never executed modify sql,so auto commit
            boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB()
                    || clientAutoCommit;
            if (expectAutocommit == false && xaTxID != null && xaStatus == 0) {
                clientTxIsoLation = Isolations.SERIALIZABLE;
                xaCmd = "XA START " + xaTxID + ';';
    
            }
            int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1;
            int charsetSyn = (this.charsetIndex == clientCharSetIndex) ? 0 : 1;
            int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1;
            int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1;
            int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn;
            if (synCount == 0) {
                // not need syn connection
                sendQueryCmd(rrn.getStatement());
                return;
            }
            CommandPacket schemaCmd = null;
            StringBuilder sb = new StringBuilder();
            if (schemaSyn == 1) {
                schemaCmd = getChangeSchemaCommand(conSchema);
                // getChangeSchemaCommand(sb, conSchema);
            }
    
            if (charsetSyn == 1) {
                getCharsetCommand(sb, clientCharSetIndex);
            }
            if (txIsoLationSyn == 1) {
                getTxIsolationCommand(sb, clientTxIsoLation);
            }
            if (autoCommitSyn == 1) {
                getAutocommitCommand(sb, expectAutocommit);
            }
            if (xaCmd != null) {
                sb.append(xaCmd);
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("con need syn ,total syn cmd " + synCount
                        + " commands " + sb.toString() + "schema change:"
                        + (schemaCmd != null) + " con:" + this);
            }
            metaDataSyned = false;
            statusSync = new StatusSync(xaCmd != null, conSchema,
                    clientCharSetIndex, clientTxIsoLation, expectAutocommit,
                    synCount);
            // syn schema
            if (schemaCmd != null) {
                schemaCmd.write(this);
            }
            // and our query sql to multi command at last
            sb.append(rrn.getStatement());
            // syn and execute others
            this.sendQueryCmd(sb.toString());
            // waiting syn result...
    }

    其中又会调用synAndDoExecute()方法,顾名思义是同步并执行,同步的目的在于之前获取到的后端连接可能在自动提交模式、数据库名、事务隔离级别和字符集上与当前要求可能不同,因此在真正执行SQL语句之前需要检查并同步相应如上设置。

    如果synCount==0,则说明不需要同步,直接调用sendQuery()发送指令即可;反之,将相应的设置语句依次append到sb中(数据库切换是个例外,直接发送了COM_INIT_DB包进行设置),并创建一个StatusSync对象,最后添加待执行的SQL语句,随后调用sendQuery()发送指令。到这里,大家可能会有疑问,在此将需更改的相关设置(数据库名、字符集等)与SQL语句一起发送(并不等待其设置成功与否),万一之前的更改失败怎么办?MyCat对此就是依靠之前创建的StatusSync对象来处理的,在结果合并的流程介绍中会具体解释。

    到此为止,SingleNodeHandler的SQL语句下发过程就算是结束了,当然底层真正的下发是由负责处理一个连接读写事件的NIOSocketWR对象来执行的。

     接下来,看多节点操作SQL语句下发过程,与单节点极其类似,以下是MultiNodeQueryHandler的execute()方法:

    public void execute() throws Exception {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                this.reset(rrs.getNodes().length);
                this.fieldsReturned = false;
                this.affectedRows = 0L;
                this.insertId = 0L;
            } finally {
                lock.unlock();
            }
            MycatConfig conf = MycatServer.getInstance().getConfig();
            startTime = System.currentTimeMillis();
            for (final RouteResultsetNode node : rrs.getNodes()) {
                BackendConnection conn = session.getTarget(node);
                if (session.tryExistsCon(conn, node)) {
                    _execute(conn, node);
                } else {
                    // create new connection
                    PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
                    dn.getConnection(dn.getDatabase(), autocommit, node, this, node);
                }
            }
    }

     不难发现,与单节点的执行过程基本是一致的,无非是打了一层循环,对每个datanode分别进行了同样的操作而已。

     


    为尊重原创成果,如需转载烦请注明本文出处:

    http://www.cnblogs.com/fernandolee24/p/5236237.html,特此感谢

  • 相关阅读:
    Codeforces Round #693 (Div. 3) G. Moving to the Capital (图,dp)
    Educational Codeforces Round 102 (Rated for Div. 2) B. String LCM (构造,思维)
    Hadoop离线计算——环境搭建(一)
    大数据项目开发进度(实时更新)
    【迭代式开发】v1架构设计文档——大数据开发实战项目(三)
    【迭代式开发】V1软件需求规格说明书——大数据开发实战项目(二)
    Flume安装——环境搭建(二)
    【中英双语】Spark官方文档解读(一)——Spark概述
    TortoiseSVN使用教程【多图超详细】——大数据开发实习day1
    【深度学习TPU+Keras+Tensorflow+EfficientNetB7】kaggle竞赛 使用TPU对104种花朵进行分类 第十八次尝试 99.9%准确率
  • 原文地址:https://www.cnblogs.com/fernandolee24/p/5236237.html
Copyright © 2011-2022 走看看