zoukankan      html  css  js  c++  java
  • MyCat源码分析系列之——SQL下发

    更多MyCat源码分析,请戳MyCat源码分析系列


     SQL下发

    SQL下发指的是MyCat将解析并改造完成的SQL语句依次发送至相应的MySQL节点(datanode)的过程,该执行过程由NonBlockingSession.execute()触发:

    public void execute(RouteResultset rrs, int type) {
            // clear prev execute resources
            clearHandlesResources();
            if (LOGGER.isDebugEnabled()) {
                StringBuilder s = new StringBuilder();
                LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
            }
    
            // 检查路由结果是否为空
            RouteResultsetNode[] nodes = rrs.getNodes();
            if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                    || nodes[0].getName().equals("")) {
                source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
                        "No dataNode found ,please check tables defined in schema:"
                                + source.getSchema());
                return;
            }
            if (nodes.length == 1) {
                singleNodeHandler = new SingleNodeHandler(rrs, this);
                try {
                    singleNodeHandler.execute();
                } catch (Exception e) {
                    LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
                }
            } else {
                boolean autocommit = source.isAutocommit();
                SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                        .getSystem();
                int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
                multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
                        this);
    
                try {
                    multiNodeHandler.execute();
                } catch (Exception e) {
                    LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
                }
            }
    }

    从代码中可以看到,首先对于路由节点信息RouteResultsetNode[]进行了判断,如果不存在任何需要派发的节点则直接返回;如果是单节点操作,则创建SingleNodeHandler实例,并调用其execute();如果是多节点操作,则创建MultiNodeQueryHandler实例,并调用其execute()

    下面先来看单节点操作的SQL下发过程,以下是SingleNodeHandler的execute()方法:

    public void execute() throws Exception {
            startTime=System.currentTimeMillis();
            ServerConnection sc = session.getSource();
            this.isRunning = true;
            this.packetId = 0;
            final BackendConnection conn = session.getTarget(node);
            if (session.tryExistsCon(conn, node)) {
                _execute(conn);
            } else {
                // create new connection
    
                MycatConfig conf = MycatServer.getInstance().getConfig();
                PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
                dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                        node);
            }
    }

    如果session已经有该datanode关联的后端连接(session.tryExistsCon(conn, node)返回true),则调用_execute()方法下发SQL指令;反之,则调用dn.getConnection()方法从连接池中获取一个可用连接或新建一个连接,并且由于第4个参数将this作为ResponseHandler对象传入,获取到连接后会在PhysicalDatasource.takeCon()中调用handler.connectionAcquired(conn)完成回调,即SingleNodeHandler.connectionAcquired()

    public void connectionAcquired(final BackendConnection conn) {
        session.bindConnection(node, conn);
        _execute(conn);
    }

    该方法先将获取到的后端连接关联到本session中,随后同样调用_execute()方法下发SQL指令。_execute()方法的实现如下:

    private void _execute(BackendConnection conn) {
            if (session.closed()) {
                endRunning();
                session.clearResources(true);
                return;
            }
            conn.setResponseHandler(this);
            try {
                conn.execute(node, session.getSource(), session.getSource()
                        .isAutocommit());
            } catch (Exception e1) {
                executeException(conn, e1);
                return;
            }
    }

    首先,很重要的是通过conn.setResponseHandler(this)将SingleNodeHandler与当前后端连接(MySQLConnection)以及连接中包含的MySQLConnectionHandler实例关联起来,这样做的目的是当结果返回的时候可以回调SingleNodeHandler相应的方法处理。随后调用MySQLConnection.execute()

    public void execute(RouteResultsetNode rrn, ServerConnection sc,
                boolean autocommit) throws UnsupportedEncodingException {
            if (!modifiedSQLExecuted && rrn.isModifySQL()) {
                modifiedSQLExecuted = true;
            }
            String xaTXID = sc.getSession2().getXaTXID();
            synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),
                    autocommit);
        }
    
        private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
                int clientCharSetIndex, int clientTxIsoLation,
                boolean clientAutoCommit) {
            String xaCmd = null;
    
            boolean conAutoComit = this.autocommit;
            String conSchema = this.schema;
            // never executed modify sql,so auto commit
            boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB()
                    || clientAutoCommit;
            if (expectAutocommit == false && xaTxID != null && xaStatus == 0) {
                clientTxIsoLation = Isolations.SERIALIZABLE;
                xaCmd = "XA START " + xaTxID + ';';
    
            }
            int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1;
            int charsetSyn = (this.charsetIndex == clientCharSetIndex) ? 0 : 1;
            int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1;
            int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1;
            int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn;
            if (synCount == 0) {
                // not need syn connection
                sendQueryCmd(rrn.getStatement());
                return;
            }
            CommandPacket schemaCmd = null;
            StringBuilder sb = new StringBuilder();
            if (schemaSyn == 1) {
                schemaCmd = getChangeSchemaCommand(conSchema);
                // getChangeSchemaCommand(sb, conSchema);
            }
    
            if (charsetSyn == 1) {
                getCharsetCommand(sb, clientCharSetIndex);
            }
            if (txIsoLationSyn == 1) {
                getTxIsolationCommand(sb, clientTxIsoLation);
            }
            if (autoCommitSyn == 1) {
                getAutocommitCommand(sb, expectAutocommit);
            }
            if (xaCmd != null) {
                sb.append(xaCmd);
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("con need syn ,total syn cmd " + synCount
                        + " commands " + sb.toString() + "schema change:"
                        + (schemaCmd != null) + " con:" + this);
            }
            metaDataSyned = false;
            statusSync = new StatusSync(xaCmd != null, conSchema,
                    clientCharSetIndex, clientTxIsoLation, expectAutocommit,
                    synCount);
            // syn schema
            if (schemaCmd != null) {
                schemaCmd.write(this);
            }
            // and our query sql to multi command at last
            sb.append(rrn.getStatement());
            // syn and execute others
            this.sendQueryCmd(sb.toString());
            // waiting syn result...
    }

    其中又会调用synAndDoExecute()方法,顾名思义是同步并执行,同步的目的在于之前获取到的后端连接可能在自动提交模式、数据库名、事务隔离级别和字符集上与当前要求可能不同,因此在真正执行SQL语句之前需要检查并同步相应如上设置。

    如果synCount==0,则说明不需要同步,直接调用sendQuery()发送指令即可;反之,将相应的设置语句依次append到sb中(数据库切换是个例外,直接发送了COM_INIT_DB包进行设置),并创建一个StatusSync对象,最后添加待执行的SQL语句,随后调用sendQuery()发送指令。到这里,大家可能会有疑问,在此将需更改的相关设置(数据库名、字符集等)与SQL语句一起发送(并不等待其设置成功与否),万一之前的更改失败怎么办?MyCat对此就是依靠之前创建的StatusSync对象来处理的,在结果合并的流程介绍中会具体解释。

    到此为止,SingleNodeHandler的SQL语句下发过程就算是结束了,当然底层真正的下发是由负责处理一个连接读写事件的NIOSocketWR对象来执行的。

     接下来,看多节点操作SQL语句下发过程,与单节点极其类似,以下是MultiNodeQueryHandler的execute()方法:

    public void execute() throws Exception {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                this.reset(rrs.getNodes().length);
                this.fieldsReturned = false;
                this.affectedRows = 0L;
                this.insertId = 0L;
            } finally {
                lock.unlock();
            }
            MycatConfig conf = MycatServer.getInstance().getConfig();
            startTime = System.currentTimeMillis();
            for (final RouteResultsetNode node : rrs.getNodes()) {
                BackendConnection conn = session.getTarget(node);
                if (session.tryExistsCon(conn, node)) {
                    _execute(conn, node);
                } else {
                    // create new connection
                    PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
                    dn.getConnection(dn.getDatabase(), autocommit, node, this, node);
                }
            }
    }

     不难发现,与单节点的执行过程基本是一致的,无非是打了一层循环,对每个datanode分别进行了同样的操作而已。

     


    为尊重原创成果,如需转载烦请注明本文出处:

    http://www.cnblogs.com/fernandolee24/p/5236237.html,特此感谢

  • 相关阅读:
    ubuntu 安装 redis desktop manager
    ubuntu 升级内核
    Ubuntu 内核升级,导致无法正常启动
    spring mvc 上传文件,但是接收到文件后发现文件变大,且文件打不开(multipartfile)
    angular5 open modal
    POJ 1426 Find the Multiple(二维DP)
    POJ 3093 Margritas
    POJ 3260 The Fewest Coins
    POJ 1837 Balance(二维DP)
    POJ 1337 A Lazy Worker
  • 原文地址:https://www.cnblogs.com/fernandolee24/p/5236237.html
Copyright © 2011-2022 走看看