zoukankan      html  css  js  c++  java
  • [转]数据库中间件 MyCAT源码分析——跨库两表Join


    • 1. 概述

    • 2. 主流程

    • 3. ShareJoin

      • 3.1 JoinParser

      • 3.2 ShareJoin.processSQL(...)

      • 3.3 BatchSQLJob

      • 3.4 ShareDBJoinHandler

      • 3.5 ShareRowOutPutDataHandler

    • 4. 彩蛋


    1. 概述

    MyCAT 支持跨库表 Join,目前版本仅支持跨库表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。

    本文主要分享:

    1. 整体流程、调用顺序图

    2. 核心代码的分析

    前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》。

    2. 主流程

    当执行跨库两表 Join SQL 时,经历的大体流程如下:

    SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */${SQL}RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。

    HintCatletHandler 获取注解对应的 Catlet 实现类, io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看, ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。

    核心代码如下:

    1. // HintCatletHandler.java

    2. public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,

    3.                           int sqlType, String realSQL, String charset, ServerConnection sc,

    4.                           LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)

    5.       throws SQLNonTransientException {

    6.   String cateletClass = hintSQLValue;

    7.   if (LOGGER.isDebugEnabled()) {

    8.       LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);

    9.   }

    10.   try {

    11.       Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);

    12.       catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);

    13.       catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));

    14.   } catch (Exception e) {

    15.       LOGGER.warn("catlet error " + e);

    16.       throw new SQLNonTransientException(e);

    17.   }

    18.   return null;

    19. }

    3. ShareJoin

    目前支持跨库表 Join。 ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。

    伪代码如下:

    1. // SELECT u.id, o.id FROM t_order o

    2. // INNER JOIN t_user u ON o.uid = u.id

    3. // 【顺序】查询左表

    4. String leftSQL = "SELECT o.id, u.id FROM t_order o";

    5. List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);

    6. // 【并行】查询右表

    7. String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";

    8. for (dn : dns) { // 此处是并行执行,使用回调逻辑

    9.    for (rightRecord : dn.select(rightSQL)) { // 查询右表

    10.        // 合并结果

    11.        for (leftRecord : leftList) {

    12.            if (leftRecord.uid == rightRecord.id) {

    13.                write(leftRecord + leftRecord.uid 拼接结果);

    14.            }

    15.        }

    16.    }

    17. }

    实际情况会更加复杂,我们接下来一点点往下看。

    3.1 JoinParser

    JoinParser 负责对 SQL 进行解析。整体流程如下:

    举个例子, /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 解析后, TableFilter 结果如下:

    • tName :表名

    • tAlia :表自定义命名

    • where :过滤条件

    • order :排序条件

    • parenTable :左连接的 Join 的表名。 t_user表 在 join属性 的 parenTable 为 "o",即 t_order

    • joinParentkey :左连接的 Join 字段

    • joinKey :join 字段。 t_user表 在 join属性 为 id

    • join :子 tableFilter。即,该表连接的右边的表。

    • parent :和 join属性 相对。

    看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilterJoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:

    1. // TableFilter.java

    2. public String getSQL() {

    3.   String sql = "";

    4.   // fields

    5.   for (Entry<String, String> entry : fieldAliasMap.entrySet()) {

    6.       String key = entry.getKey();

    7.       String val = entry.getValue();

    8.       if (val == null) {

    9.           sql = unionsql(sql, getFieldfrom(key), ",");

    10.       } else {

    11.           sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");

    12.       }

    13.   }

    14.   // where

    15.   if (parent == null) {    // on/where 等于号左边的表

    16.       String parentJoinKey = getJoinKey(true);

    17.       // fix sharejoin bug:

    18.       // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:

    19.       // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误

    20.       if (sql != null && parentJoinKey != null &&

    21.               !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {

    22.           sql += ", " + parentJoinKey;

    23.       }

    24.       sql = "select " + sql + " from " + tName;

    25.       if (!(where.trim().equals(""))) {

    26.           sql += " where " + where.trim();

    27.       }

    28.   } else {    // on/where 等于号右边边的表

    29.       if (allField) {

    30.           sql = "select " + sql + " from " + tName;

    31.       } else {

    32.           sql = unionField("select " + joinKey, sql, ",");

    33.           sql = sql + " from " + tName;

    34.           //sql="select "+joinKey+","+sql+" from "+tName;

    35.       }

    36.       if (!(where.trim().equals(""))) {

    37.           sql += " where " + where.trim() + " and (" + joinKey + " in %s )";

    38.       } else {

    39.           sql += " where " + joinKey + " in %s ";

    40.       }

    41.   }

    42.   // order

    43.   if (!(order.trim().equals(""))) {

    44.       sql += " order by " + order.trim();

    45.   }

    46.   // limit

    47.   if (parent == null) {

    48.       if ((rowCount > 0) && (offset > 0)) {

    49.           sql += " limit" + offset + "," + rowCount;

    50.       } else {

    51.           if (rowCount > 0) {

    52.               sql += " limit " + rowCount;

    53.           }

    54.       }

    55.   }

    56.   return sql;

    57. }

    • 当 parent 为空时,即on/where 等于号左边的表。例如: selectid,uidfromt_order

    • 当 parent 不为空时,即on/where 等于号右边的表。例如: selectid,usernamefromt_userwhereidin(1,2,3)

    3.2 ShareJoin.processSQL(...)

    当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:

    当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getSql() 的返回结果为 selectid,uidfromt_order

    生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob

    3.3 BatchSQLJob

    EngineCtxBatchSQLJob 封装,提供上层两个方法:

    1. executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务

    2. executeNativeSQLParallJob :并发在每个数据节点执行SQL任务

    核心代码如下:

    1. // EngineCtx.java

    2. public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,

    3.        SQLJobHandler jobHandler) {

    4.    for (String dataNode : dataNodes) {

    5.        SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,

    6.                jobHandler, this);

    7.        bachJob.addJob(job, false);

    8.    }

    9. }

    10. public void executeNativeSQLParallJob(String[] dataNodes, String sql,

    11.        SQLJobHandler jobHandler) {

    12.    for (String dataNode : dataNodes) {

    13.        SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,

    14.                jobHandler, this);

    15.        bachJob.addJob(job, true);

    16.    }

    17. }    


    BatchSQLJob 通过执行中任务列表待执行任务列表来实现顺序/并发执行任务。核心代码如下:

    1. // BatchSQLJob.java

    2. /**

    3. * 执行中任务列表

    4. */

    5. private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();

    6. /**

    7. * 待执行任务列表

    8. */

    9. private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();

    10. public void addJob(SQLJob newJob, boolean parallExecute) {

    11.   if (parallExecute) {

    12.       runJob(newJob);

    13.   } else {

    14.       waitingJobs.offer(newJob);

    15.       if (runningJobs.isEmpty()) { // 若无正在执行中的任务,则从等待队列里获取任务进行执行。

    16.           SQLJob job = waitingJobs.poll();

    17.           if (job != null) {

    18.               runJob(job);

    19.           }

    20.       }

    21.   }

    22. }

    23. public boolean jobFinished(SQLJob sqlJob) {

    24.    runningJobs.remove(sqlJob.getId());

    25.    SQLJob job = waitingJobs.poll();

    26.    if (job != null) {

    27.        runJob(job);

    28.        return false;

    29.    } else {

    30.        if (noMoreJobInput) {

    31.            return runningJobs.isEmpty() && waitingJobs.isEmpty();

    32.        } else {

    33.            return false;

    34.        }

    35.    }

    36. }

    • 顺序执行时,当 runningJobs 存在执行中的任务时, #addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。

    • 并发执行时, #addJob(...) 时,立即执行。


    SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。

    ShareJoin 里, SQLJobHandler 有两个实现: ShareDBJoinHandlerShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

    3.4 ShareDBJoinHandler

    ShareDBJoinHandler左边的表执行的 SQL 回调。流程如下:

    • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。

    • #rowResponse(...) :接收数据节点返回的 row,放入内存。

    • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中#createQryJob(...)

    当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getChildSQL() 的返回结果为selectid,usernamefromt_userwhereidin(1,2,3)

    核心代码如下:

    1. // ShareJoin.java

    2. private void createQryJob(int batchSize) {

    3.   int count = 0;

    4.   Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();

    5.   String theId = null;

    6.   StringBuilder sb = new StringBuilder().append('(');

    7.   String svalue = "";

    8.   for (Map.Entry<String, String> e : ids.entrySet()) {

    9.       theId = e.getKey();

    10.       byte[] rowbyte = rows.remove(theId);

    11.       if (rowbyte != null) {

    12.           batchRows.put(theId, rowbyte);

    13.       }

    14.       if (!svalue.equals(e.getValue())) {

    15.           if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING

    16.                   || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 为varchar

    17.               sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')

    18.           } else { // 默认joinkey为int/long

    19.               sb.append(e.getValue()).append(','); // (1,2,3)

    20.           }

    21.       }

    22.       svalue = e.getValue();

    23.       if (count++ > batchSize) {

    24.           break;

    25.       }

    26.   }

    27.   if (count == 0) {

    28.       return;

    29.   }

    30.   jointTableIsData = true;

    31.   sb.deleteCharAt(sb.length() - 1).append(')');

    32.   String sql = String.format(joinParser.getChildSQL(), sb);

    33.   getRoute(sql);

    34.   ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));

    35. }

    3.5 ShareRowOutPutDataHandler

    ShareRowOutPutDataHandler右边的表执行的 SQL 回调。流程如下:

    • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。

    • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。

    • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。

    核心代码如下:

    1. // ShareRowOutPutDataHandler.java

    2. public boolean onRowData(String dataNode, byte[] rowData) {

    3.   RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);

    4.   //拷贝一份batchRows

    5.   Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();

    6.   batchRowsCopy.putAll(arows);

    7.   // 获取Id字段,

    8.   String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));

    9.   // 查找ID对应的A表的记录

    10.   byte[] arow = getRow(batchRowsCopy, id, joinL);

    11.   while (arow != null) {

    12.       RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());

    13.       for (int i = 1; i < rowDataPkgold.fieldCount; i++) {

    14.           // 设置b.name 字段

    15.           byte[] bname = rowDataPkgold.fieldValues.get(i);

    16.           rowDataPkg.add(bname);

    17.           rowDataPkg.addFieldCount(1);

    18.       }

    19.       // huangyiming add

    20.       MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();

    21.       if (null == middlerResultHandler) {

    22.           ctx.writeRow(rowDataPkg);

    23.       } else {

    24.           if (middlerResultHandler instanceof MiddlerQueryResultHandler) {

    25.               byte[] columnData = rowDataPkg.fieldValues.get(0);

    26.               if (columnData != null && columnData.length > 0) {

    27.                   String rowValue = new String(columnData);

    28.                   middlerResultHandler.add(rowValue);

    29.               }

    30.               //}

    31.           }

    32.       }

    33.       arow = getRow(batchRowsCopy, id, joinL);

    34.   }

    35.   return false;

    36. }

    4. 彩蛋

    如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。

    ShareJoin 另外不支持的功能:

    1. 只支持 inner join,不支持 left join、right join 等等连接。

    2. 不支持 order by。

    3. 不支持 group by 以及 相关聚合函数。

    4. 即使 join 左表的字段未声明为返回 fields 也会返回。

    恩,MyCAT 弱XA 源码继续走起!

  • 相关阅读:
    现代软件工程 第一章 概论 第3题——韩婧
    现代软件工程 第一章 概论 第2题——韩婧
    小组成员邓琨、白文俊、张星星、韩婧
    UVa 10892 LCM的个数 (GCD和LCM 质因数分解)
    UVa 10780 幂和阶乘 求n!中某个因子的个数
    UVa 11859 除法游戏(Nim游戏,质因子)
    Codeforces 703C Chris and Road 二分、思考
    Codeforces 703D Mishka and Interesting sum 树状数组
    hdu 5795 A Simple Nim SG函数(多校)
    hdu 5793 A Boring Question 推公式(多校)
  • 原文地址:https://www.cnblogs.com/janehoo/p/8565464.html
Copyright © 2011-2022 走看看