通过TpchQueryRunner可以跑起来一个测试服务
仍然使用‘SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10’
Mac M1, Java CLI有bug,可以用python替代
conn = trino.dbapi.connect( host='localhost', port=8080, user='test', catalog='tpch', schema='tiny', request_timeout=30000 ) cur = conn.cursor() cur.execute('SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10') rows = cur.fetchmany()
SqlQueryExecution
前面的流程忽略,直接到SqlQueryExecution
start的核心逻辑,
planQuery -> doPlanQuery
plan,优化并生成plan
Presto的优化器不太具有参考价值,简单看下数据的变化,
analysis是AST,语法树,
转化成初始的逻辑计划,已经从语法树变成布尔的逻辑算子,
PlanNode root = planStatement(analysis, analysis.getStatement())
优化完的结果,最大的不同是加上ExchangeNode
createSubPlans
首先是,用Fragmenter,Visit整个plan,根据ExchangeNode生成fragment,
产生的效果如下,SimplePlanRewriter.rewriteWith
RemoteSourceNode,被替换成,Fragment“1”,
在FragmentProperties可以找到所有的Fragments,这里生成出两个fragments,
buildRootFragment
将上面的fragments,封装成SubPlan,
这里会将root封装成fragment“0”,代表OutputNode
planDistribution
将Fragment封装成StageExecutionPlan,
doPlan
封装StageExecutionPlan,这里除了Fragment,
还多出3个东西,SplitSource,dependencies,tables;
其中SplitSource和tables,只有包含tableScan的Stage会有,这里就是Fragment2
dependences包含当前stage所依赖的stages,比如对于Fragment1,
splitSources
获取存储输入的splits信息,依赖于存储的实现,这里是tpch
获取逻辑参考调用栈,
对于3个Fragment递归调用doPlan,在visit中,只有TableScan算子会触发getSplits,其他的算子都是传递visit
Tpch的getSplits,只是根据节点数,每个节点splits数目,创建一堆TpchSplit
这里TpchSplit的组成很简单,仅仅是partNum,node地址
最终doPlan得到的结果,
scheduler.start()
产生Scheduler,
SqlQueryExecution.start -> SqlQueryExecution.schedule
可以看到这里schedule是异步调用的,
对于每个stage,调动schedule
Schedule的过程,首先会选取一个Scheduler,
可以看到stage0和1,没有source,所以选的是FixedCountScheduler,
对于不同Scheduler的区别,详细参考,https://github.com/prestodb/presto/wiki/Stage-and-Source-Scheduler-and-Grouped-Execution
这个调度逻辑就是,对于每个node调度一个task
对于stage2,选择FixedSourcePartitionedScheduler
逻辑是先类似FixedCountScheduler去创建task,然后再调用SourcePartitionedScheduler的逻辑(SourcePartitionedScheduler会为一批splits动态调度一个新的task,而FixedSourcePartitionedScheduler是使用先前调度好的task)
SourcePartitionedScheduler
调用栈,
SourcePartitionedScheduler,把splits分配到各个node上,
SqlStageExecution,把对应的splits,加入到task中,这里如果没有事先生成的Task,会动态的生成一个新的task
最终Scheduler生成的调度结果是ScheduleResult
对于Fragment2对应的stage,生成了3个Task,平均分配了包含的24个splits
scheduleTask
在scheduleTask中,创建RemoteTask,并且start
继续调用到,HttpRemoteTask的scheduleUpdate,
用线程池去调用,executor.execute(this::sendUpdate)
最终,通过HttpClient,将json化的task request发出到worker。