反射机制
1.RDD[Person]-----(case:反射机制)------>DataFrameF[ROW]---->DataSet[Person]
RDD DF DS
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
2.RDD-->DataFrame-->DataSet
a.RDD-->DataFrame: sparksession.createDataFrame
b.RDD-->DataSet: sparksession.createDataSet
c.DF,DS-->RDD: DF.rdd-->RDD[ROW];DS.rdd-->RDD[Person]
d.DataFrame-->DataSet: sparksession.createDataSet(df.rdd)
e.DataSet-->Datafrmae: DS.toDF()
自定义创建DataFrame |
总共分3步:
1.从原来的RDD创建一个Row格式的RDD
2.创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema
案例:
def main(args: Array[String]): Unit = { val sparksession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate() import sparksession.implicits._ val rdd = sparksession.sparkContext.textFile("file:///d:/测试数据/users.txt") //step1:从原来的RDD创建一个Row格式的RDD val rdd_row = rdd.map(x=>x.split(" ")).map(x=>Row(x(0),x(1).toInt,x(2))) //step2:创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema // val schemaString = "name age address" // // Generate the schema based on the string of schema // val fields = schemaString.split(" ") // .map(fieldName => StructField(fieldName, StringType, nullable = true)) val fields = List( StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("address", StringType, nullable = true) ) val schema = StructType(fields) //step3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema val rdd_df = sparksession.createDataFrame(rdd_row,schema) rdd_df.show }
SparkSQL的执行流程 |
1.SQL执行过程
select f1,f2,f3 from table_name where condition
Step1-Parse(解析):
首先,根据SQL语法搜素关键字(select、from、where、group by等等),标志出projection、DataSource、filter
Step2-Bind(绑定):
通过解析阶段的相关内容(projection、DataSource、filter),校验DataSource、filed合法性;如果校验失败,抛异常。
Step3-optimize(优化):
通过数据库对当前DataSource进行的统计数据分析,执行相应的优化措施。
Step3-Execute(执行):
开启物理执行,将逻辑计划转化为相对应的Task。
2.执行计划实质:看做成tree(树),树节点上通过Rule对象保存节点信息。
SparkSQL tree节点分一个几类:
a.一元节点:filter、count等
b.二元节点:join等
c.叶子节点:加载外部数据等;