zoukankan      html  css  js  c++  java
  • SparkSQL的反射机制和自定义创建DataFrame

    反射机制

    1.RDD[Person]-----(case:反射机制)------>DataFrameF[ROW]---->DataSet[Person]
      RDD DF DS
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
    2.RDD-->DataFrame-->DataSet
      a.RDD-->DataFrame: sparksession.createDataFrame
      b.RDD-->DataSet: sparksession.createDataSet
      c.DF,DS-->RDD: DF.rdd-->RDD[ROW];DS.rdd-->RDD[Person]
      d.DataFrame-->DataSet: sparksession.createDataSet(df.rdd)
      e.DataSet-->Datafrmae: DS.toDF()

    自定义创建DataFrame

      总共分3步:

        1.从原来的RDD创建一个Row格式的RDD

        2.创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
        3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema

      案例:

    def main(args: Array[String]): Unit = {
            val sparksession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
            import sparksession.implicits._
            val rdd = sparksession.sparkContext.textFile("file:///d:/测试数据/users.txt")
            //step1:从原来的RDD创建一个Row格式的RDD
            val rdd_row = rdd.map(x=>x.split(" ")).map(x=>Row(x(0),x(1).toInt,x(2)))
            //step2:创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
        //    val schemaString = "name age address"
        //    // Generate the schema based on the string of schema
        //    val fields = schemaString.split(" ")
        //      .map(fieldName => StructField(fieldName, StringType, nullable = true))
            val fields = List(
                    StructField("name", StringType, nullable = true),
                    StructField("age", IntegerType, nullable = true),
                    StructField("address", StringType, nullable = true)
                     )
            val schema = StructType(fields)
            //step3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema
            val rdd_df = sparksession.createDataFrame(rdd_row,schema)
            rdd_df.show
          }
    SparkSQL的执行流程

      1.SQL执行过程

        select f1,f2,f3 from table_name where condition

        Step1-Parse(解析):
          首先,根据SQL语法搜素关键字(select、from、where、group by等等),标志出projection、DataSource、filter
        Step2-Bind(绑定):
          通过解析阶段的相关内容(projection、DataSource、filter),校验DataSource、filed合法性;如果校验失败,抛异常。
        Step3-optimize(优化):
          通过数据库对当前DataSource进行的统计数据分析,执行相应的优化措施。
        Step3-Execute(执行):
          开启物理执行,将逻辑计划转化为相对应的Task。

      2.执行计划实质:看做成tree(树),树节点上通过Rule对象保存节点信息。

          SparkSQL tree节点分一个几类:

            a.一元节点:filter、count等
            b.二元节点:join等
            c.叶子节点:加载外部数据等;

  • 相关阅读:
    spark 程序 TopN FileSort SecondarySort 的出错解决办法
    预报温度和体感温度不是一回事
    搜索引擎 搜索技巧
    scrapy 爬虫框架
    scala-sbt
    英语削笔机
    php 一句话木马
    [CS充实之路] CS50 WEEK 1
    UBUNTU 16.04 编译 OPENJDK8
    使用logrotate分割Tomcat的catalina日志
  • 原文地址:https://www.cnblogs.com/lyr999736/p/10224676.html
Copyright © 2011-2022 走看看