数据库中间件MyCAT源码分析——跨库两表Join

1. 概述

MyCAT 支持跨库表 Join,目前版本仅支持跨库两表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。

本文主要分享:

  1. 整体流程、调用顺序图
  2. 核心代码的分析

前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》。

OK,Let's Go。

2. 主流程

当执行跨库两表 Join SQL 时,经历的大体流程如下:

SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL} 。RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。

HintCatletHandler 获取注解对应的 Catlet 实现类,io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看,ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。

核心代码如下:

 
 
 
 
  1. // HintCatletHandler.java
  2. public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,
  3.                            int sqlType, String realSQL, String charset, ServerConnection sc,
  4.                            LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)
  5.        throws SQLNonTransientException {
  6.    String cateletClass = hintSQLValue;
  7.    if (LOGGER.isDebugEnabled()) {
  8.        LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);
  9.    }
  10.    try {
  11.        Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);
  12.        catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);
  13.        catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));
  14.    } catch (Exception e) {
  15.        LOGGER.warn("catlet error " + e);
  16.        throw new SQLNonTransientException(e);
  17.    }
  18.    return null;

3. ShareJoin

目前支持跨库两表 Join。ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。

伪代码如下:

 
 
 
 
  1. // SELECT u.id, o.id FROM t_order o 
  2. // INNER JOIN t_user u ON o.uid = u.id
  3. // 【顺序】查询左表
  4. String leftSQL = "SELECT o.id, u.id FROM t_order o";
  5. List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);
  6. // 【并行】查询右表
  7. String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
  8. for (dn : dns) { // 此处是并行执行,使用回调逻辑
  9.     for (rightRecord : dn.select(rightSQL)) { // 查询右表
  10.         // 合并结果
  11.         for (leftRecord : leftList) {
  12.             if (leftRecord.uid == rightRecord.id) {
  13.                 write(leftRecord + leftRecord.uid 拼接结果);
  14.             }
  15.         }
  16.     }

实际情况会更加复杂,我们接下来一点点往下看。

3.1 JoinParser

JoinParser 负责对 SQL 进行解析。整体流程如下:

举个例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 结果如下:

  • tName :表名
  • tAlia :表自定义命名
  • where :过滤条件
  • order :排序条件
  • parenTable :左连接的 Join 的表名。t_user表 在 join属性 的 parenTable 为 "o",即 t_order。
  • joinParentkey :左连接的 Join 字段
  • joinKey :join 字段。t_user表 在 join属性 为 id。
  • join :子 tableFilter。即,该表连接的右边的表。
  • parent :和 join属性 相对。

看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilter。JoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:

 
 
 
 
  1. // TableFilter.java
  2. public String getSQL() {
  3.    String sql = "";
  4.    // fields
  5.    for (Entry entry : fieldAliasMap.entrySet()) {
  6.        String key = entry.getKey();
  7.        String val = entry.getValue();
  8.        if (val == null) {
  9.            sql = unionsql(sql, getFieldfrom(key), ",");
  10.        } else {
  11.            sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");
  12.        }
  13.    }
  14.    // where
  15.    if (parent == null) {    // on/where 等于号左边的表
  16.        String parentJoinKey = getJoinKey(true);
  17.        // fix sharejoin bug:
  18.        // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
  19.        // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误
  20.        if (sql != null && parentJoinKey != null &&
  21.                !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {
  22.            sql += ", " + parentJoinKey;
  23.        }
  24.        sql = "select " + sql + " from " + tName;
  25.        if (!(where.trim().equals(""))) {
  26.            sql += " where " + where.trim();
  27.        }
  28.    } else {    // on/where 等于号右边边的表
  29.        if (allField) {
  30.            sql = "select " + sql + " from " + tName;
  31.        } else {
  32.            sql = unionField("select " + joinKey, sql, ",");
  33.            sql = sql + " from " + tName;
  34.            //sql="select "+joinKey+","+sql+" from "+tName;
  35.        }
  36.        if (!(where.trim().equals(""))) {
  37.            sql += " where " + where.trim() + " and (" + joinKey + " in %s )";
  38.        } else {
  39.            sql += " where " + joinKey + " in %s ";
  40.        }
  41.    }
  42.    // order
  43.    if (!(order.trim().equals(""))) {
  44.        sql += " order by " + order.trim();
  45.    }
  46.    // limit
  47.    if (parent == null) {
  48.        if ((rowCount > 0) && (offset > 0)) {
  49.            sql += " limit" + offset + "," + rowCount;
  50.        } else {
  51.            if (rowCount > 0) {
  52.                sql += " limit " + rowCount;
  53.            }
  54.        }
  55.    }
  56.    return sql;
  • 当 parent 为空时,即on/where 等于号左边的表。例如:select id, uid from t_order。
  • 当 parent 不为空时,即on/where 等于号右边的表。例如:select id, username from t_user where id in (1, 2, 3)。

3.2 ShareJoin.processSQL(...)

当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getSql() 的返回结果为 select id, uid from t_order。

生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob。

3.3 BatchSQLJob

EngineCtx 对 BatchSQLJob 封装,提供上层两个方法:

  • executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务
  • executeNativeSQLParallJob :并发在每个数据节点执行SQL任务

核心代码如下:

 
 
 
 
  1. // EngineCtx.java
  2. public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
  3.         SQLJobHandler jobHandler) {
  4.     for (String dataNode : dataNodes) {
  5.         SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
  6.                 jobHandler, this);
  7.         bachJob.addJob(job, false);
  8.     }
  9. }
  10. public void executeNativeSQLParallJob(String[] dataNodes, String sql,
  11.         SQLJobHandler jobHandler) {
  12.     for (String dataNode : dataNodes) {
  13.         SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
  14.                 jobHandler, this);
  15.         bachJob.addJob(job, true);
  16.     }

BatchSQLJob 通过执行中任务列表、待执行任务列表来实现顺序/并发执行任务。核心代码如下:

 
 
 
 
  1. // BatchSQLJob.java
  2. /**
  3. * 执行中任务列表
  4. */
  5. private ConcurrentHashMap runningJobs = new ConcurrentHashMap();
  6. /**
  7. * 待执行任务列表
  8. */
  9. private ConcurrentLinkedQueue waitingJobs = new ConcurrentLinkedQueue();
  10. public void addJob(SQLJob newJob, boolean parallExecute) {
  11.    if (parallExecute) {
  12.        runJob(newJob);
  13.    } else {
  14.        waitingJobs.offer(newJob);
  15.        if (runningJobs.isEmpty()) { // 若无正在执行中的任务,则从等待队列里获取任务进行执行。
  16.            SQLJob job = waitingJobs.poll();
  17.            if (job != null) {
  18.                runJob(job);
  19.            }
  20.        }
  21.    }
  22. }
  23. public boolean jobFinished(SQLJob sqlJob) {
  24.     runningJobs.remove(sqlJob.getId());
  25.     SQLJob job = waitingJobs.poll();
  26.     if (job != null) {
  27.         runJob(job);
  28.         return false;
  29.     } else {
  30.         if (noMoreJobInput) {
  31.             return runningJobs.isEmpty() && waitingJobs.isEmpty();
  32.         } else {
  33.             return false;
  34.         }
  35.     }
  • 顺序执行时,当 runningJobs 存在执行中的任务时,#addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。
  • 并发执行时,#addJob(...) 时,立即执行。

SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。

在 ShareJoin 里,SQLJobHandler 有两个实现:ShareDBJoinHandler、ShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

3.4 ShareDBJoinHandler

ShareDBJoinHandler,左边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。
  • #rowResponse(...) :接收数据节点返回的 row,放入内存。
  • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中#createQryJob(...)。

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getChildSQL() 的返回结果为 select id, username from t_user where id in (1, 2, 3)。

核心代码如下:

 
 
 
 
  1. // ShareJoin.java
  2. private void createQryJob(int batchSize) {
  3.    int count = 0;
  4.    Map batchRows = new ConcurrentHashMap();
  5.    String theId = null;
  6.    StringBuilder sb = new StringBuilder().append('(');
  7.    String svalue = "";
  8.    for (Map.Entry e : ids.entrySet()) {
  9.        theId = e.getKey();
  10.        byte[] rowbyte = rows.remove(theId);
  11.        if (rowbyte != null) {
  12.            batchRows.put(theId, rowbyte);
  13.        }
  14.        if (!svalue.equals(e.getValue())) {
  15.            if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING
  16.                    || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 为varchar
  17.                sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
  18.            } else { // 默认joinkey为int/long
  19.                sb.append(e.getValue()).append(','); // (1,2,3)
  20.            }
  21.        }
  22.        svalue = e.getValue();
  23.        if (count++ > batchSize) {
  24.            break;
  25.        }
  26.    }
  27.    if (count == 0) {
  28.        return;
  29.    }
  30.    jointTableIsData = true;
  31.    sb.deleteCharAt(sb.length() - 1).append(')');
  32.    String sql = String.format(joinParser.getChildSQL(), sb);
  33.    getRoute(sql);
  34.    ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler,右边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。
  • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。
  • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。

核心代码如下:

 
 
 
 
  1. // ShareRowOutPutDataHandler.java
  2. public boolean onRowData(String dataNode, byte[] rowData) {
  3.    RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
  4.    //拷贝一份batchRows
  5.    Map batchRowsCopy = new ConcurrentHashMap();
  6.    batchRowsCopy.putAll(arows);
  7.    // 获取Id字段,
  8.    String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
  9.    // 查找ID对应的A表的记录
  10.    byte[] arow = getRow(batchRowsCopy, id, joinL);
  11.    while (arow != null) {
  12.        RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
  13.        for (int i = 1; i < rowDataPkgold.fieldCount; i++) {
  14.            // 设置b.name 字段
  15.            byte[] bname = rowDataPkgold.fieldValues.get(i);
  16.            rowDataPkg.add(bname);
  17.            rowDataPkg.addFieldCount(1);
  18.        }
  19.        // huangyiming add
  20.        MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
  21.        if (null == middlerResultHandler) {
  22.            ctx.writeRow(rowDataPkg);
  23.        } else {
  24.            if (middlerResultHandler instanceof MiddlerQueryResultHandler) {
  25.                byte[] columnData = rowDataPkg.fieldValues.get(0);
  26.                if (columnData != null && columnData.length > 0) {
  27.                    String rowValue = new String(columnData);
  28.                    middlerResultHandler.add(rowValue);
  29.                }
  30.                //}
  31.            }
  32.        }
  33.        arow = getRow(batchRowsCopy, id, joinL);
  34.    }
  35.    return false;

4. 彩蛋

如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。

ShareJoin 另外不支持的功能:

  1. 只支持 inner join,不支持 left join、right join 等等连接。
  2. 不支持 order by。
  3. 不支持 group by 以及 相关聚合函数。
  4. 即使 join 左表的字段未声明为返回 fields 也会返回。

恩,MyCAT 弱XA 源码继续走起!

当前文章:数据库中间件MyCAT源码分析——跨库两表Join
标题网址:http://www.csdahua.cn/qtweb/news20/367270.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网