zoukankan      html  css  js  c++  java
  • 分布式数据库中间件–(3) Cobar对简单select命令的处理过程

    友情提示:非原文链接可能会影响您的阅读体验,欢迎查看原文。(http://blog.geekcome.com)


    在上一篇中介绍了Cobar和client初次建立连接的过程,Cobar监听端口,client发起连接请求,Cobar发送握手数据包,client发送认证数据包最后依据认证的结果Cobar向client发送认证结果。

    在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。

    所以在client再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。以下详细分析一下简单select语句的运行过程。

    1、事件的产生

    NIOReactor的R线程一直在监听selector上的每一个连接的感兴趣事件是否发生,当client发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,而且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。

    01public void run() {
    02            final Selector selector = this.selector;
    03            for (;;) {
    04                ++reactCount;
    05                try {
    06                    int res = selector.select();
    07                    LOGGER.debug(reactCount + ">>NIOReactor接受连接数:" + res);
    08                    register(selector);
    09                    Set<SelectionKey> keys = selector.selectedKeys();
    10                    try {
    11                        for (SelectionKey key : keys) {
    12                            Object att = key.attachment();
    13                            if (att != null && key.isValid()) {
    14                                int readyOps = key.readyOps();
    15                                if ((readyOps & SelectionKey.OP_READ) != 0) {
    16                                    LOGGER.debug("select读事件");
    17                                    read((NIOConnection) att);
    18                               ..............................
    19                            }
    20                             ...........................
    21                        }
    22                    } ..................
    23                } ............
    24            }
    25  }

     2、调用该连接的read函数进行处理

    该函数在上一篇中提到过,该函数的实如今AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。

    该函数交给processor进行异步处理。从processor中的线程池获取一个线程来运行该任务。这里调用详细的handler来进行处理。

    刚開始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。

    在这里须要先了解MySQL数据包的格式:

    MySQLclient命令请求报文

    MySQLclient命令请求报文

    该处理函数例如以下:

    01public void handle(byte[] data) {
    02    LOGGER.info("data[4]:"+data[4]);
    03    switch (data[4]) {
    04    case MySQLPacket.COM_INIT_DB:
    05        commands.doInitDB();
    06        source.initDB(data);
    07        break;
    08    case MySQLPacket.COM_QUERY:
    09        commands.doQuery();
    10        source.query(data);
    11        break;
    12    case MySQLPacket.COM_PING:
    13        commands.doPing();
    14        source.ping();
    15        break;
    16    case MySQLPacket.COM_QUIT:
    17        commands.doQuit();
    18        source.close();
    19        break;
    20    case MySQLPacket.COM_PROCESS_KILL:
    21        commands.doKill();
    22        source.kill(data);
    23        break;
    24    case MySQLPacket.COM_STMT_PREPARE:
    25        commands.doStmtPrepare();
    26        source.stmtPrepare(data);
    27        break;
    28    case MySQLPacket.COM_STMT_EXECUTE:
    29        commands.doStmtExecute();
    30        source.stmtExecute(data);
    31        break;
    32    case MySQLPacket.COM_STMT_CLOSE:
    33        commands.doStmtClose();
    34        source.stmtClose(data);
    35        break;
    36    case MySQLPacket.COM_HEARTBEAT:
    37        commands.doHeartbeat();
    38        source.heartbeat(data);
    39        break;
    40    default:
    41        commands.doOther();
    42        source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
    43    }
    44}

     

    由于每一个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,例如以下所看到的

    mysql_protocol_struct

    所以data[4]是第五个字节。也就是消息体的第一个字节。client向Cobar端发送的是命令报文,第一个字节是详细的命令。

    假设是select语句,那么data[4]就是COM_QUERY,然后会调用详细连接的query成员函数,其定义在FrontendConnection类中。

    01public void query(byte[] data) {
    02    if (queryHandler != null) {
    03        // 取得语句
    04        MySQLMessage mm = new MySQLMessage(data);
    05        mm.position(5);
    06        String sql = null;
    07        try {
    08            sql = mm.readString(charset);
    09        catch (UnsupportedEncodingException e) {
    10            writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
    11            return;
    12        }
    13        if (sql == null || sql.length() == 0) {
    14            writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
    15            return;
    16        }
    17        LOGGER.debug("解析的SQL语句:"+sql);
    18        // 运行查询
    19        queryHandler.query(sql);
    20    else {
    21        writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
    22    }
    23}

    首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的全部的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。

    查询的时候Cobar控制台输出例如以下内容:

    11:35:33,392 INFO data[4]:3
    11:35:33,392 DEBUG 解析的SQL语句:select * from tb2

    解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,事实上现的query函数例如以下:

    01public void query(String sql) {
    02    //这里就得到了完整的SQL语句,接收自client
    03    ServerConnection c = this.source;
    04    if (LOGGER.isDebugEnabled()) {
    05        LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
    06    }
    07    //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,运行对应的操作
    08    int rs = ServerParse.parse(sql);
    09    switch (rs & 0xff) {
    10    .......................
    11    case ServerParse.SELECT:
    12        //select操作运行
    13        SelectHandler.handle(sql, c, rs >>> 8);
    14        break;
    15    .......................
    16    }
    17}

    首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

    假设语句没有语法错误,则直接交给SelectHandler进行处理。假设是一般的select语句,则直接调用ServerConnection的execute运行sql

    c.execute(stmt, ServerParse.SELECT);

    在ServerConnection中的execute函数中须要进行路由检查,由于select的数据不一定在一个数据库中,须要按拆分的规则进行路由的检查。

    1// 路由计算
    2RouteResultset rrs = null;
    3try {
    4    rrs = ServerRouter.route(schema, sql, this.charset, this);
    5    LOGGER.debug("路由计算结果:"+rrs.toString());
    6}

    详细的路由算法也是比較复杂,以后会专门分析。

    Cobar的DEBUG控制台输出路由的计算结果例如以下:

    11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
    1 -> dnTest2.default{select * from tb2}
    2 -> dnTest3.default{select * from tb2}
    }

    该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

    经过比較复杂的资源处理最后在每一个后端数据库上运行函数execute0。

    01private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {
    02    ServerConnection sc = ss.getSource();
    03    .........................
    04    try {
    05        // 运行并等待返回
    06        BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
    07        // 接收和处理数据,运行到这里就说明上面的运行已经得到运行结果的返回
    08        final ReentrantLock lock = MultiNodeExecutor.this.lock;
    09        lock.lock();
    10        try {
    11            switch (bin.data[0]) {
    12            case ErrorPacket.FIELD_COUNT:
    13                c.setRunning(false);
    14                handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
    15                break;
    16            case OkPacket.FIELD_COUNT:
    17                OkPacket ok = new OkPacket();
    18                ok.read(bin);
    19                affectedRows += ok.affectedRows;
    20                // set lastInsertId
    21                if (ok.insertId > 0) {
    22                    insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);
    23                }
    24                c.setRunning(false);
    25                handleSuccessOK(ss, rrn, autocommit, ok);
    26                break;
    27            default// HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF
    28                final MySQLChannel mc = (MySQLChannel) c;
    29                if (fieldEOF) {
    30                    for (;;) {
    31                        bin = mc.receive();
    32                        switch (bin.data[0]) {
    33                        case ErrorPacket.FIELD_COUNT:
    34                            c.setRunning(false);
    35                            handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
    36                            return;
    37                        case EOFPacket.FIELD_COUNT:
    38                            handleRowData(rrn, c, ss);
    39                            return;
    40                        default:
    41                            continue;
    42                        }
    43                    }
    44                else {
    45                    bin.packetId = ++packetId;// HEADER
    46                    List<MySQLPacket> headerList = new LinkedList<MySQLPacket>();
    47                    headerList.add(bin);
    48                    for (;;) {
    49                        bin = mc.receive();
    50                        switch (bin.data[0]) {
    51                        case ErrorPacket.FIELD_COUNT:
    52                            c.setRunning(false);
    53                            handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
    54                            return;
    55                        case EOFPacket.FIELD_COUNT:
    56                            bin.packetId = ++packetId;// FIELD_EOF
    57                            for (MySQLPacket packet : headerList) {
    58                                buffer = packet.write(buffer, sc);
    59                            }
    60                            headerList = null;
    61                            buffer = bin.write(buffer, sc);
    62                            fieldEOF = true;
    63                            handleRowData(rrn, c, ss);
    64                            return;
    65                        default:
    66                            bin.packetId = ++packetId;// FIELDS
    67                            switch (flag) {
    68                            case RouteResultset.REWRITE_FIELD:
    69                                StringBuilder fieldName = new StringBuilder();
    70                                fieldName.append("Tables_in_").append(ss.getSource().getSchema());
    71                                FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
    72                                headerList.add(field);
    73                                break;
    74                            default:
    75                                headerList.add(bin);
    76                            }
    77                        }
    78                    }
    79                }
    80            }
    81        finally {
    82            lock.unlock();
    83        }
    84    }//异常处理....................
    85}

    这里真正的运行SQL语句,然后等待后端运行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。

    当client发起认证请求或命令请求后,server会返回对应的运行结果给client。client在收到响应报文后,须要首先检查第1个字节的值,来区分响应报文的类型。

    响应报文类型第1个字节取值范围
    OK 响应报文0×00
    Error 响应报文0xFF
    Result Set 报文0×01 – 0xFA
    Field 报文0×01 – 0xFA
    Row Data 报文0×01 – 0xFA
    EOF 报文0xFE

    注:响应报文的第1个字节在不同类型中含义不同,比方在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。

    Result Set 消息分为五部分,结构例如以下:

    结构说明
    [Result Set Header]列数量
    [Field]列信息(多个)
    [EOF]列结束
    [Row Data]行数据(多个)
    [EOF]数据结束

    函数运行完毕后,返回的结果都放入LinkedList中,当读取结果完毕后放入多节点运行器的缓冲区。假设buffer满了,就通过前端连接写出给client。

    作者:Yong Man
    提示:本文版权归作者,欢迎转载,但未经作者允许必须保留此段声明,且在文章页面明显位置给出原文连接。
    假设对文章有不论什么问题,都能够在评论中留言,我会尽可能的答复您,谢谢你的阅读

  • 相关阅读:
    博客园如何运行代码
    视觉差
    h5 播放器 -3
    播放器 视频 音频 -1
    游戏 保卫萝卜
    跟踪算法
    走口字

    联动日历
    jq 抽奖
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/3788115.html
Copyright © 2011-2022 走看看