zoukankan      html  css  js  c++  java
  • [转]数据库中间件 MyCAT源码分析——跨库两表Join


    • 1. 概述

    • 2. 主流程

    • 3. ShareJoin

      • 3.1 JoinParser

      • 3.2 ShareJoin.processSQL(...)

      • 3.3 BatchSQLJob

      • 3.4 ShareDBJoinHandler

      • 3.5 ShareRowOutPutDataHandler

    • 4. 彩蛋


    1. 概述

    MyCAT 支持跨库表 Join,目前版本仅支持跨库表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。

    本文主要分享:

    1. 整体流程、调用顺序图

    2. 核心代码的分析

    前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》。

    2. 主流程

    当执行跨库两表 Join SQL 时,经历的大体流程如下:

    SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */${SQL}RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。

    HintCatletHandler 获取注解对应的 Catlet 实现类, io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看, ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。

    核心代码如下:

    1. // HintCatletHandler.java

    2. public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,

    3.                           int sqlType, String realSQL, String charset, ServerConnection sc,

    4.                           LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)

    5.       throws SQLNonTransientException {

    6.   String cateletClass = hintSQLValue;

    7.   if (LOGGER.isDebugEnabled()) {

    8.       LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);

    9.   }

    10.   try {

    11.       Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);

    12.       catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);

    13.       catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));

    14.   } catch (Exception e) {

    15.       LOGGER.warn("catlet error " + e);

    16.       throw new SQLNonTransientException(e);

    17.   }

    18.   return null;

    19. }

    3. ShareJoin

    目前支持跨库表 Join。 ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。

    伪代码如下:

    1. // SELECT u.id, o.id FROM t_order o

    2. // INNER JOIN t_user u ON o.uid = u.id

    3. // 【顺序】查询左表

    4. String leftSQL = "SELECT o.id, u.id FROM t_order o";

    5. List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);

    6. // 【并行】查询右表

    7. String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";

    8. for (dn : dns) { // 此处是并行执行,使用回调逻辑

    9.    for (rightRecord : dn.select(rightSQL)) { // 查询右表

    10.        // 合并结果

    11.        for (leftRecord : leftList) {

    12.            if (leftRecord.uid == rightRecord.id) {

    13.                write(leftRecord + leftRecord.uid 拼接结果);

    14.            }

    15.        }

    16.    }

    17. }

    实际情况会更加复杂,我们接下来一点点往下看。

    3.1 JoinParser

    JoinParser 负责对 SQL 进行解析。整体流程如下:

    举个例子, /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 解析后, TableFilter 结果如下:

    • tName :表名

    • tAlia :表自定义命名

    • where :过滤条件

    • order :排序条件

    • parenTable :左连接的 Join 的表名。 t_user表 在 join属性 的 parenTable 为 "o",即 t_order

    • joinParentkey :左连接的 Join 字段

    • joinKey :join 字段。 t_user表 在 join属性 为 id

    • join :子 tableFilter。即,该表连接的右边的表。

    • parent :和 join属性 相对。

    看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilterJoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:

    1. // TableFilter.java

    2. public String getSQL() {

    3.   String sql = "";

    4.   // fields

    5.   for (Entry<String, String> entry : fieldAliasMap.entrySet()) {

    6.       String key = entry.getKey();

    7.       String val = entry.getValue();

    8.       if (val == null) {

    9.           sql = unionsql(sql, getFieldfrom(key), ",");

    10.       } else {

    11.           sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");

    12.       }

    13.   }

    14.   // where

    15.   if (parent == null) {    // on/where 等于号左边的表

    16.       String parentJoinKey = getJoinKey(true);

    17.       // fix sharejoin bug:

    18.       // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:

    19.       // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误

    20.       if (sql != null && parentJoinKey != null &&

    21.               !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {

    22.           sql += ", " + parentJoinKey;

    23.       }

    24.       sql = "select " + sql + " from " + tName;

    25.       if (!(where.trim().equals(""))) {

    26.           sql += " where " + where.trim();

    27.       }

    28.   } else {    // on/where 等于号右边边的表

    29.       if (allField) {

    30.           sql = "select " + sql + " from " + tName;

    31.       } else {

    32.           sql = unionField("select " + joinKey, sql, ",");

    33.           sql = sql + " from " + tName;

    34.           //sql="select "+joinKey+","+sql+" from "+tName;

    35.       }

    36.       if (!(where.trim().equals(""))) {

    37.           sql += " where " + where.trim() + " and (" + joinKey + " in %s )";

    38.       } else {

    39.           sql += " where " + joinKey + " in %s ";

    40.       }

    41.   }

    42.   // order

    43.   if (!(order.trim().equals(""))) {

    44.       sql += " order by " + order.trim();

    45.   }

    46.   // limit

    47.   if (parent == null) {

    48.       if ((rowCount > 0) && (offset > 0)) {

    49.           sql += " limit" + offset + "," + rowCount;

    50.       } else {

    51.           if (rowCount > 0) {

    52.               sql += " limit " + rowCount;

    53.           }

    54.       }

    55.   }

    56.   return sql;

    57. }

    • 当 parent 为空时,即on/where 等于号左边的表。例如: selectid,uidfromt_order

    • 当 parent 不为空时,即on/where 等于号右边的表。例如: selectid,usernamefromt_userwhereidin(1,2,3)

    3.2 ShareJoin.processSQL(...)

    当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:

    当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getSql() 的返回结果为 selectid,uidfromt_order

    生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob

    3.3 BatchSQLJob

    EngineCtxBatchSQLJob 封装,提供上层两个方法:

    1. executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务

    2. executeNativeSQLParallJob :并发在每个数据节点执行SQL任务

    核心代码如下:

    1. // EngineCtx.java

    2. public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,

    3.        SQLJobHandler jobHandler) {

    4.    for (String dataNode : dataNodes) {

    5.        SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,

    6.                jobHandler, this);

    7.        bachJob.addJob(job, false);

    8.    }

    9. }

    10. public void executeNativeSQLParallJob(String[] dataNodes, String sql,

    11.        SQLJobHandler jobHandler) {

    12.    for (String dataNode : dataNodes) {

    13.        SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,

    14.                jobHandler, this);

    15.        bachJob.addJob(job, true);

    16.    }

    17. }    


    BatchSQLJob 通过执行中任务列表待执行任务列表来实现顺序/并发执行任务。核心代码如下:

    1. // BatchSQLJob.java

    2. /**

    3. * 执行中任务列表

    4. */

    5. private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();

    6. /**

    7. * 待执行任务列表

    8. */

    9. private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();

    10. public void addJob(SQLJob newJob, boolean parallExecute) {

    11.   if (parallExecute) {

    12.       runJob(newJob);

    13.   } else {

    14.       waitingJobs.offer(newJob);

    15.       if (runningJobs.isEmpty()) { // 若无正在执行中的任务,则从等待队列里获取任务进行执行。

    16.           SQLJob job = waitingJobs.poll();

    17.           if (job != null) {

    18.               runJob(job);

    19.           }

    20.       }

    21.   }

    22. }

    23. public boolean jobFinished(SQLJob sqlJob) {

    24.    runningJobs.remove(sqlJob.getId());

    25.    SQLJob job = waitingJobs.poll();

    26.    if (job != null) {

    27.        runJob(job);

    28.        return false;

    29.    } else {

    30.        if (noMoreJobInput) {

    31.            return runningJobs.isEmpty() && waitingJobs.isEmpty();

    32.        } else {

    33.            return false;

    34.        }

    35.    }

    36. }

    • 顺序执行时,当 runningJobs 存在执行中的任务时, #addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。

    • 并发执行时, #addJob(...) 时,立即执行。


    SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。

    ShareJoin 里, SQLJobHandler 有两个实现: ShareDBJoinHandlerShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

    3.4 ShareDBJoinHandler

    ShareDBJoinHandler左边的表执行的 SQL 回调。流程如下:

    • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。

    • #rowResponse(...) :接收数据节点返回的 row,放入内存。

    • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中#createQryJob(...)

    当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getChildSQL() 的返回结果为selectid,usernamefromt_userwhereidin(1,2,3)

    核心代码如下:

    1. // ShareJoin.java

    2. private void createQryJob(int batchSize) {

    3.   int count = 0;

    4.   Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();

    5.   String theId = null;

    6.   StringBuilder sb = new StringBuilder().append('(');

    7.   String svalue = "";

    8.   for (Map.Entry<String, String> e : ids.entrySet()) {

    9.       theId = e.getKey();

    10.       byte[] rowbyte = rows.remove(theId);

    11.       if (rowbyte != null) {

    12.           batchRows.put(theId, rowbyte);

    13.       }

    14.       if (!svalue.equals(e.getValue())) {

    15.           if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING

    16.                   || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 为varchar

    17.               sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')

    18.           } else { // 默认joinkey为int/long

    19.               sb.append(e.getValue()).append(','); // (1,2,3)

    20.           }

    21.       }

    22.       svalue = e.getValue();

    23.       if (count++ > batchSize) {

    24.           break;

    25.       }

    26.   }

    27.   if (count == 0) {

    28.       return;

    29.   }

    30.   jointTableIsData = true;

    31.   sb.deleteCharAt(sb.length() - 1).append(')');

    32.   String sql = String.format(joinParser.getChildSQL(), sb);

    33.   getRoute(sql);

    34.   ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));

    35. }

    3.5 ShareRowOutPutDataHandler

    ShareRowOutPutDataHandler右边的表执行的 SQL 回调。流程如下:

    • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。

    • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。

    • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。

    核心代码如下:

    1. // ShareRowOutPutDataHandler.java

    2. public boolean onRowData(String dataNode, byte[] rowData) {

    3.   RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);

    4.   //拷贝一份batchRows

    5.   Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();

    6.   batchRowsCopy.putAll(arows);

    7.   // 获取Id字段,

    8.   String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));

    9.   // 查找ID对应的A表的记录

    10.   byte[] arow = getRow(batchRowsCopy, id, joinL);

    11.   while (arow != null) {

    12.       RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());

    13.       for (int i = 1; i < rowDataPkgold.fieldCount; i++) {

    14.           // 设置b.name 字段

    15.           byte[] bname = rowDataPkgold.fieldValues.get(i);

    16.           rowDataPkg.add(bname);

    17.           rowDataPkg.addFieldCount(1);

    18.       }

    19.       // huangyiming add

    20.       MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();

    21.       if (null == middlerResultHandler) {

    22.           ctx.writeRow(rowDataPkg);

    23.       } else {

    24.           if (middlerResultHandler instanceof MiddlerQueryResultHandler) {

    25.               byte[] columnData = rowDataPkg.fieldValues.get(0);

    26.               if (columnData != null && columnData.length > 0) {

    27.                   String rowValue = new String(columnData);

    28.                   middlerResultHandler.add(rowValue);

    29.               }

    30.               //}

    31.           }

    32.       }

    33.       arow = getRow(batchRowsCopy, id, joinL);

    34.   }

    35.   return false;

    36. }

    4. 彩蛋

    如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。

    ShareJoin 另外不支持的功能:

    1. 只支持 inner join,不支持 left join、right join 等等连接。

    2. 不支持 order by。

    3. 不支持 group by 以及 相关聚合函数。

    4. 即使 join 左表的字段未声明为返回 fields 也会返回。

    恩,MyCAT 弱XA 源码继续走起!

  • 相关阅读:
    Nginx常用日志分割方法
    nginx的 CPU参数worker_processes和worker_cpu_affinity使用说明
    js中的“==”和“===”的区别
    学习JS
    svg
    用户界面设计
    bootstrap和easyui
    axure—日期函数
    axure--轮播图
    字符串属性和函数的使用
  • 原文地址:https://www.cnblogs.com/janehoo/p/8565464.html
Copyright © 2011-2022 走看看