zoukankan      html  css  js  c++  java
  • SQLFlow深度解析——含源码分析,商用存疑点:(1)RPC获取sql db数据,然后在引擎中计算AI部分(2)无监督算法(聚类、IForest)不支持,当前仅autoencoder

    SQLFlow深度解析

    SQLFLow介绍

    SQLFlow是阿里巴巴蚂蚁金服开源的一个AI On SQL的项目, 目标是SQL 引擎和 AI 引擎连接起来,让用户仅需几行 SQL 代码就能描述整个应用或者产品背后的数据流和 AI 构造

    SQLFlow 最早的初衷,就是希望解决分析师既要操作数据又要使用 AI、往往需要在两个甚至更多的系统之间切换、工作效率低的窘境。

    目前业界已有的AI ON SQL的方案:

    • Microsoft SQL Server:Microsoft SQL Server 支持机器学习服务,可以将 R 或 Python 编写的机器学习程序作为外部脚本运行.缺点: 需要编写R或者Python程序代码
    • Teradata SQL for DL:Teradata 也提供了 RESTful 服务,可以通过扩展的 SQL SELECT 语法调用. 缺点: 语法耦合了它的Rest服务
    • Google BigQuery:Google BigQuery 通过引入 CREATE MODEL 语句让用 SQL 实现机器学习成为可能. 缺点: 支持的模型有点少, 深度学习还不支持.

    针对以上三个方案的缺点, SQLFlow的定义了自己的三个设计目标:

    • 这一解决方案应与许多 SQL 引擎都兼容,而不是只能兼容特定版本或类型的 SQL 引擎。
    • 它应该支持复杂的机器学习模型,包括用于深度学习的 TensorFlow 和用于树模型的 XGBoost。
    • 能够灵活地配置和运行前沿机器学习算法,包括指定特征交叉,无需在 SQL 语句中嵌入 Python 或 R 代码,以及完全集成超参数估计等。

    目前阶段来说, SQLFlow已经支持MySQL/MaxCompute/Hive, 机器学习框架已经支持TF和XGBoost.

    资源:

    宣传文章

    Github官网

    官方文档

    Spark社区既有MLib这样的机器学习框架, 也有一些基于深度学习的扩展, 例如Uber的horovod还有Yahoo的TensorFlowOnSpark, 但是这些框架都是基于的是Spark DataSet的接口联通的, 你可以在DataSet API上使用SQL, 也可以使用AI接口, 你可以认为是AI + SQL的模式, 而不是AI ON SQL的模式

    SQLFlow试用

    社区提供了一个官方的试用的Docker镜像, 只要键入以下命令启动容器即可:

    1
    docker run -d -p 8888:8888 sqlflow/sqlflow:latest

    试用浏览器打开容器机器所在的8888端口, 可以看到一个notebook的页面, 默认已经有一个example.ipynb的样例文件了

    打开这个example.ipynb文件, 这个镜像里面已经安装了mysql, 同时也把部分测试数据导入到数据库里面, 你不需要做任何数据处理的工作, 就可以直接运行Notebook里面的Cell. 蚂蚁的开发人员真的很贴心啊

    执行推理过程, 使用DNNClassifier算法模型, 并将结果写入到sqlflow_models.my_dnn_model表中, my_dnn_model只有两个字段: ID和BLOB(存放模型序列化后的字节流)

    执行对于测试集(iris.test)进行推理, 并写入到预测结果表之中(iris.predict.class)

    最后展示推理结果集iris.predict

    架构解析

    这个AI ON SQL系统里面, 首先要回到的一个问题是, AI系统的计算层和SQL系统的计算层是什么关系?

    例如Spark(BigQuery大概率也是, 但是因为闭源不能确定)AI引擎代码是内嵌于SQL计算系统之中的, 并行执行的能力由Spark管理, AI系统就像代码库一样被Spark系统所调用而已.

    但SQLFlow明显不是这种模式的:

    从SQLFlow的架构图上看的出来, AI Engine和SQL Engine之间是独立的, 两者通过RPC交互数据和模型

    1. AI Engine训练或者推理计算的时候, 从SQLEngine获取数据
    2. AI Engine完成训练过程, 将模型写入到SQL Engine; 推理过程从SQL Engine读取模型

    整个SQLFlow的流程大致如下(图上红色部分为SQLFlow):

    1. Notebook输入SQL之后, 送入到Parser之内, 这儿的语法解析借用了Hive/MaxCompute等引擎
    2. 解析完SQL语法后, 进行Schema Verification
    3. 然后根据SQL语法, 产生对应的Code(根据不同模型和不同引擎产生不同的Code)
    4. 最后执行Code

    设计文档

    源码解析

    最后我们来简单过一遍SQLFlow的代码, 提炼一下找代码框架的思路:

    SQLFlow代码量目测在1-1.5万行左右, 半天就能看懂基础流程了

    找到入口

    首先, 找到整个项目的Dockerfile, 就在根目录下

    如果项目有Docker镜像, Dockerfile就是个个进程的入口, 比main函数还在前面

    1
    2
    ADD scripts/start.sh /
    CMD ["bash", "/start.sh"]

    看到最后一行启动的命令为start.sh, 源码文件在scripts/start.sh

    找到启动文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    function print_usage() {
    echo "Usage: /bin/bash start.sh [OPTION] "
    echo " populate-example-dataset-mysql: populate an existing mysql instance with the example dataset."
    echo " mysql: setup the mysql server with the example dataset initialized."
    echo " sqlflow_server: setup the sqlflow gRPC server."
    echo " sqlflow_notebook: setup the Jupyter Notebook server."
    echo " all(default): setup a MySQL server instance, a sqlflow gRPC server and a Jupyter Notebook server sequentially."
    }

    function main() {
    ARG=${1:-all}
    case $ARG in
    all)
    echo "setup all-in-one"
    setup_mysql
    setup_sqlflow_server &
    setup_sqlflow_notebook
    ;;
    *)
    print_usage
    ;;
    esac
    }

    可以看到启动文件启动了三个内容:

    1. mysql: 默认的SQL Engine, 已经初始化了数据

    2. sqlflow_server: 我们要找的程序, 找到里面真正的执行命令sqlflowserver --datasource=${DS}

    3. sqlflow_notebook: Notebook交互式界面

    找到main函数入口

    全局搜索function main(), 发现只有cmd/sqlflowserver/main.go有.

    start函数里面找到关键的proto.RegisterSQLFlowServer(s, server.NewServer(sql.Run, nil, modelDir, enableSession))服务启动代码, 其中的sql.Run就整个SQL处理代码.

    不太熟悉go语言是怎么出包的

    SQL.Run

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func Run(slct string, db *DB, modelDir string, session *pb.Session) *PipeReader {
    splittedSQL, err := splitExtendedSQL(slct)
    if err != nil {
    rd, wr := Pipe()
    // return the lexer error message to client side
    go func() {
    defer wr.Close()
    wr.Write(err)
    }()
    return rd
    }
    if len(splittedSQL) == 2 {
    return runExtendedSQL(slct, db, modelDir, session)
    }
    return runStandardSQL(slct, db)
    }

    如果是SQL语句, 走入到runStandardSQL分支, 如果有训练或者推理语法, 就会走入runExtendedSQL分支

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    func runExtendedSQL(slct string, db *DB, modelDir string, session *pb.Session) *PipeReader {
    rd, wr := Pipe()
    go func() {
    defer wr.Close()

    err := func() error {
    defer func(startAt time.Time) {
    log.Debugf("runExtendedSQL %v finished, elapsed:%v", slct, time.Since(startAt))
    }(time.Now())
    pr, e := newParser().Parse(slct) // 语言解析
    if e != nil {
    return e
    }
    cwd, e := ioutil.TempDir("/tmp", "sqlflow")
    if e != nil {
    return e
    }
    defer os.RemoveAll(cwd)

    if pr.train {
    ds, e := newTrainAndValDataset(db, pr.standardSelect.String(), pr.standardSelect.tables[0], 0.8)
    if e != nil {
    return e
    }

    return train(wr, pr, db, cwd, modelDir, slct, ds) // 调用训练
    }
    return pred(wr, pr, db, cwd, modelDir) // 调用推理部分
    }()

    if err != nil {
    log.Errorf("runExtendedSQL error:%v", err)
    if err != ErrClosedPipe {
    if err := wr.Write(err); err != nil {
    log.Errorf("runExtendedSQL error(piping):%v", err)
    }
    }
    }
    }()
    return rd
    }

    可以看到, 先进行语法解析, 然后进行训练或者推理逻辑, 我们简单看一眼训练过程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func train(wr *PipeWriter, tr *extendedSelect, db *DB, cwd string, modelDir string, slct string, ds *trainAndValDataset) error {
    fts, e := verify(tr, db) // 语法校验
    if e != nil {
    return e
    }
    var program bytes.Buffer
    if e := genTF(&program, tr, ds, fts, db); e != nil { // 生成代码
    return fmt.Errorf("genTF %v", e)
    }

    cw := &logChanWriter{wr: wr}
    defer cw.Close()
    cmd := tensorflowCmd(cwd, db.driverName) // 执行命令
    cmd.Stdin = &program
    cmd.Stdout = cw
    cmd.Stderr = cw
    if e := cmd.Run(); e != nil {
    return fmt.Errorf("training failed %v", e)
    }
    m := model{workDir: cwd, TrainSelect: slct}
    if modelDir != "" {
    return m.saveTar(modelDir, tr.save) // 保存模型到本地文件夹
    }
    return m.save(db, tr.save) // 保存模型到数据库
    }

    可以看到, 进入训练过程之后, 先做了语法校验, 然后生成的对应的TF的代码, 然后调用tensorflowCmd执行命令, 最后将模型保存完毕, 完成训练过程.

    语法校验先略过, 代码生成过程比较复杂后面再介绍, 我们先关注于命令执行过程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    func tensorflowCmd(cwd, driverName string) (cmd *exec.Cmd) {
    if hasPython() && hasTensorFlow() && hasDatabaseConnector(driverName) {
    log.Printf("tensorflowCmd: run locally")
    cmd = exec.Command("python", "-u")
    cmd.Dir = cwd
    } else if hasDocker() {
    log.Printf("tensorflowCmd: run in Docker container")
    const tfImg = "sqlflow/sqlflow"
    if !hasDockerImage(tfImg) {
    log.Printf("No local Docker image %s. It will take a long time to pull.", tfImg)
    }
    cmd = exec.Command("docker", "run", "--rm",
    fmt.Sprintf("-v%s:/work", cwd),
    "-w/work", "--network=host", "-i", tfImg, "python")
    } else {
    log.Fatalf("No local TensorFlow or Docker. No way to run TensorFlow programs")
    }
    return cmd
    }

    tensorflowCmd有两种执行模式: 本地执行和容器执行, 目前这两种方式都是单机执行模型, 实际上这儿就印证了AI Engine和SQL Engine分离的架构

    未来这儿可以很方便的将这个扩展为分布式任务, 例如Kubeflow的TFJob,这个这块需要跟代码生成那儿一起修改.

    代码生成

    让我们在回到genTF这个函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func genTF(w io.Writer, pr *extendedSelect, ds *trainAndValDataset, fts fieldTypes, db *DB) error {
    r, e := newFiller(pr, ds, fts, db) // 应该是按照字段的过滤吧, 没仔细看, 可能会出错
    if e != nil {
    return e
    }
    if pr.train {
    return tfTrainTemplate.Execute(w, r) // 根据训练模板生成code
    }
    return tfPredTemplate.Execute(w, r) // 根据推理模板生成code
    }

    var tfTrainTemplate = template.Must(template.New("codegenTfTrain").Parse(tfTrainTemplateText)) // 训练模板
    var tfPredTemplate = template.Must(template.New("codegenTfPred").Parse(tfPredTemplateText)) // 推理模板

    这儿实际上最关键的是两个训练模板, 这两个模块在template_tf.go里面定义

    除了TF的模板, 还有template_alpstemplate_elasticdl这两个

    模板里面就是Python代码了, 截取里面的一部分说明一下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    import os
    # Disable Tensorflow INFO and WARNING logs
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

    import sys, json
    import tensorflow as tf
    import functools
    import sqlflow_models # 注意点1

    from sqlflow_submitter.db import connect, db_generator # 注意点2

    # 忽略一部分代码
    def input_fn(datasetStr):
    feature_types = []
    for name in feature_column_names:
    if feature_metas[name]["is_sparse"]:
    feature_types.append((tf.int64, tf.int32, tf.int64))
    else:
    feature_types.append(get_dtype(feature_metas[name]["dtype"]))

    gen = db_generator(driver, conn, datasetStr, feature_column_names, "{{.Y.FeatureName}}", feature_metas)
    dataset = tf.data.Dataset.from_generator(gen, (tuple(feature_types), tf.{{.Y.Dtype}})) # 注意点3
    ds_mapper = functools.partial(_parse_sparse_feature, feature_metas=feature_metas)
    return dataset.map(ds_mapper)

    注意点1是sqlflow_models, 这个是定义在同组织的Model里面, 目前只实现了2个实现dnnclassifierlstmclassifier, 这里这个dnnclassifier就是试用里面DNNClassifier算法模型的定义所在

    注意点2是sqlflow_submitter这个定义在sql/python/sqlflow_submmiter包目录下, 在这儿你可以执行本地的Python文件, 也可以定义自己的submit将CodeGen的代码当做客户端代码, 给远程的深度学习服务提交自己的学习任务. 同时这儿也定义了与SQL Engine的交互代码逻辑, 就是里面的connectdb_generator

    注意点3关注于TensorFlow框架是如何读取从数据库里面的数据的, 使用的接口为tf.data.Dataset.from_generator

    至此代码分析已经完成, 主流程已经明确了.

    社区动态

    蚂蚁对于这个项目的投入还是很大的, 应该由专门的人在投入这个项目, 更新频率还是相当快的

    但是贡献者还是比较少的, 应该目前看只有21贡献者, 基本上是蚂蚁金服内部员工.

    之前看到他们2019年的路标, 2019年的目标预定是支持各种框架, 例如Calcite支持或者GPU TF支持

    总体来说2019年主要在完善功能, 但是后来这个ISSUE关闭了, 不确定19年能否完成这些内容.

    总结

    SQLFlow目前看还处于原型阶段, 整体支持的能力还非常欠缺: GPU的支持, 模型的定义等功能目前好像都不具备.

    就目前整体的设计, 我认为以下三点未来需要加强:

    1. 模型定义接口太复杂了: 1. 实现一个模板;2. 实现一个模型算法;3. 实现一个submitter. 这套逻辑对于工程师可能比较简单(实际上定义地方太多, 也麻烦), 但是对于AI算法的人, 肯定不是用
    2. AI EngineSQL Engine分离带来的性能问题, 这套架构的问题就是AI系统离数据远了一点, 所有数据都是通过SQL Engine计算而来, 而且是通过RPC获取的数据, 比起Spark这种直接在内存中获取数据, 这里会成为正式商用时候的大瓶颈点
    3. 数据分布式化工程量太大, 这其实是问题二的引申版, 未来肯定要实现数据分布化, AI计算分布式化, 这钟模式我还没有想到如何分布式化(回去再好好想想)

    另外对于这个项目的商业前景如何, 这个确实存疑的, 也许在阿里内部可能有这部分需求, 但对我们来说却不是.

    2018年8月BigQuery出了ML之后, 我们也有计划跟进, 调研了XGBoost On Spark, 但最后还是没决定要做, 最主要的原因是没有客户明确需要这个能力.

    值得表扬的是: 蚂蚁的文档和demo做的是真好, 做技术调研能遇到这样的项目, 确实让我省了不少功夫.

    附录: Go语言环境安装

    SQLFlow的主编程语言为Go语言, 安装部署也相对方便, 不记录过程了, 只放置一些安装资源链接

    Go安装包

    GoLand下载地址

    GoLand破解

    IDEA License Server搭建

  • 相关阅读:
    hdu 1823 Luck and Love 二维线段树
    UVA 12299 RMQ with Shifts 线段树
    HDU 4578 Transformation 线段树
    FZU 2105 Digits Count 线段树
    UVA 1513 Movie collection 树状数组
    UVA 1292 Strategic game 树形DP
    【ACM】hdu_zs2_1003_Problem C_201308031012
    qsort快速排序
    【ACM】nyoj_7_街区最短路径问题_201308051737
    【ACM】nyoj_540_奇怪的排序_201308050951
  • 原文地址:https://www.cnblogs.com/bonelee/p/15348852.html
Copyright © 2011-2022 走看看