TaskExecutor
那么都准备好了,就要开始真正的执行了
初始化的时候
增加TaskRunner线程

TaskRunner
核心就是不断的从waitingSplits中获取split,然后process

到这会创建driver,


CreateDriver

先使用之前的operatorFactory,创建出Operator对象,然后创建Driver
这里看下,ExchangeOperator的例子


对于ScanFilterAndProjectOperator

需要封装成,WorkProcessorSourceOperatorAdapter
这里会生成sourceOperator,ScanFilterAndProjectOperator
这里pages是workProcessor,如果要获取page,需要调用process接口实际获取

processFor
调用driver的processFor


processInternal
核心就是数据在pipeline中move的过程,可以看出在pipeline中数据是以push的方式进行的,在pipeline之间是pull的方式

Current.getOutput

pages的定义,

所以按照这个反向执行,直到SplitToPages,

process,
先是生成source,这里就是访问TPCH的链接,从这里可以get到cursor,这是要作为参数用于get数据的
然后调用processColumnSource,

processColumnSource
返回一个processor,RecordCursorToPages,

RecordCursorToPages被调用时,process
这里就用到前面生成的cursorProcessor来获取数据
cursorProcessor是CodeGen动态生成的,

代码生成
在生成PhysicalOperation的同时,做代码生成,

Visit,不停的迭代source,直到Filter

调用到visitScanFilterAndProject,几个参数需要注意
sourceNode, 是TableScanNode
FilterExpression,代表过滤条件
outputSymbols,代表projects

这里逻辑,将filter和project生成,RowExpression

分别通过Complier生成CursorProcessor和PageProcessor类
这两个应该是对等的,只会使用一个

对于PageProcessor
会把filter,project,CodeGen成class,传入PageProcessor

所有processor都是通过process来执行,
这个processor的功能,执行filter得到selectedPosition,然后生成新的processor

这个ProjectSelectedPositions,再被process
调用,ProcessBatchResult result = processBatch(batchSize);
逻辑就是对于filter后的结果,执行project,最终返回ProcessResult

所以可以看出,PageProcess的目的就是对于page进行filter和project的操作,由于这里的filter和project是CodeGen的,所以整个部分都需要codeGen出来
对于CursorProcessor,更彻底,整个class都是动态生成的
expressionCompiler.compileCursorProcessor


这里核心逻辑是产生method,
可以看到主要生成,3中method,核心就是process,其他的filter和projec都是在process中需要调用到的函数

Process,

createProjectIfStatement,

Blocked
每个split执行时,如果上游数据不ready,会怎么处理?
在TaskExecutor.TaskRunner中,会根据返回的blocked来判断

那么这个blocked怎么来的呢?
在Driver.processInternal中,如果没有movedPage,即没有数据被处理
那么就会从operator中获取blocked

getBlockedFuture
这里取名有问题,isBlocked,不应该得到一个future,应该是一个bool

实现isBlocked的operator都是可能会出现block case的,基本都是和IO相关,大部分operator是不会block的
比如对于,WorkProcessorSourceOperatorAdapter

这里的firstFinishedFuture的挺有意思,
如果一个driver中有多个blocked点,那么需要找到最先完成的,那我怎么知道谁先完成了?
答案是,不知道;所以这里用SettableFuture,对于每个blocked加上listener,这样完成的时候就会把自己set到result,从而返回。
