zoukankan      html  css  js  c++  java
  • PrestoSQL(trinodb)源码分析 执行(下)

    TaskExecutor

    那么都准备好了,就要开始真正的执行了

    初始化的时候

    增加TaskRunner线程

     

      

    TaskRunner

    核心就是不断的从waitingSplits中获取split,然后process

    到这会创建driver,

    CreateDriver

    先使用之前的operatorFactory,创建出Operator对象,然后创建Driver

    这里看下,ExchangeOperator的例子

      

    对于ScanFilterAndProjectOperator

    需要封装成,WorkProcessorSourceOperatorAdapter

    这里会生成sourceOperator,ScanFilterAndProjectOperator

    这里pages是workProcessor,如果要获取page,需要调用process接口实际获取

    processFor

    调用driver的processFor

    processInternal

    核心就是数据在pipeline中move的过程,可以看出在pipeline中数据是以push的方式进行的,在pipeline之间是pull的方式

    Current.getOutput

    pages的定义,

     所以按照这个反向执行,直到SplitToPages,

    process,

    先是生成source,这里就是访问TPCH的链接,从这里可以get到cursor,这是要作为参数用于get数据的

    然后调用processColumnSource,

     processColumnSource

    返回一个processor,RecordCursorToPages,

    RecordCursorToPages被调用时,process

    这里就用到前面生成的cursorProcessor来获取数据

    cursorProcessor是CodeGen动态生成的,

    代码生成

    在生成PhysicalOperation的同时,做代码生成,

    Visit,不停的迭代source,直到Filter

    调用到visitScanFilterAndProject,几个参数需要注意

    sourceNode, 是TableScanNode

    FilterExpression,代表过滤条件

    outputSymbols,代表projects

     这里逻辑,将filter和project生成,RowExpression

    分别通过Complier生成CursorProcessor和PageProcessor类

    这两个应该是对等的,只会使用一个

    对于PageProcessor

    会把filter,project,CodeGen成class,传入PageProcessor

    所有processor都是通过process来执行,

    这个processor的功能,执行filter得到selectedPosition,然后生成新的processor

    这个ProjectSelectedPositions,再被process

    调用,ProcessBatchResult result = processBatch(batchSize);

    逻辑就是对于filter后的结果,执行project,最终返回ProcessResult

    所以可以看出,PageProcess的目的就是对于page进行filter和project的操作,由于这里的filter和project是CodeGen的,所以整个部分都需要codeGen出来

    对于CursorProcessor,更彻底,整个class都是动态生成的

    expressionCompiler.compileCursorProcessor

     

    这里核心逻辑是产生method,

    可以看到主要生成,3中method,核心就是process,其他的filter和projec都是在process中需要调用到的函数

    Process,

     createProjectIfStatement,

    Blocked

    每个split执行时,如果上游数据不ready,会怎么处理?

    在TaskExecutor.TaskRunner中,会根据返回的blocked来判断

    那么这个blocked怎么来的呢?

    在Driver.processInternal中,如果没有movedPage,即没有数据被处理

    那么就会从operator中获取blocked

    getBlockedFuture 

    这里取名有问题,isBlocked,不应该得到一个future,应该是一个bool

    实现isBlocked的operator都是可能会出现block case的,基本都是和IO相关,大部分operator是不会block的

    比如对于,WorkProcessorSourceOperatorAdapter

    这里的firstFinishedFuture的挺有意思,

    如果一个driver中有多个blocked点,那么需要找到最先完成的,那我怎么知道谁先完成了?

    答案是,不知道;所以这里用SettableFuture,对于每个blocked加上listener,这样完成的时候就会把自己set到result,从而返回。

  • 相关阅读:
    LeetCode 623. Add One Row to Tree
    LeetCode 894. All Possible Full Binary Trees
    LeetCode 988. Smallest String Starting From Leaf
    LeetCode 979. Distribute Coins in Binary Tree
    LeetCode 814. Binary Tree Pruning
    LeetCode 951. Flip Equivalent Binary Trees
    LeetCode 426. Convert Binary Search Tree to Sorted Doubly Linked List
    LeetCode 889. Construct Binary Tree from Preorder and Postorder Traversal
    LeetCode 687. Longest Univalue Path
    LeetCode 428. Serialize and Deserialize N-ary Tree
  • 原文地址:https://www.cnblogs.com/fxjwind/p/15775298.html
Copyright © 2011-2022 走看看