zoukankan      html  css  js  c++  java
  • SparkSQL的反射机制和自定义创建DataFrame

    反射机制

    1.RDD[Person]-----(case:反射机制)------>DataFrameF[ROW]---->DataSet[Person]
      RDD DF DS
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
        Person ["name","age","address"] {Person:("name","age","address")}
    2.RDD-->DataFrame-->DataSet
      a.RDD-->DataFrame: sparksession.createDataFrame
      b.RDD-->DataSet: sparksession.createDataSet
      c.DF,DS-->RDD: DF.rdd-->RDD[ROW];DS.rdd-->RDD[Person]
      d.DataFrame-->DataSet: sparksession.createDataSet(df.rdd)
      e.DataSet-->Datafrmae: DS.toDF()

    自定义创建DataFrame

      总共分3步:

        1.从原来的RDD创建一个Row格式的RDD

        2.创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
        3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema

      案例:

    def main(args: Array[String]): Unit = {
            val sparksession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
            import sparksession.implicits._
            val rdd = sparksession.sparkContext.textFile("file:///d:/测试数据/users.txt")
            //step1:从原来的RDD创建一个Row格式的RDD
            val rdd_row = rdd.map(x=>x.split(" ")).map(x=>Row(x(0),x(1).toInt,x(2)))
            //step2:创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
        //    val schemaString = "name age address"
        //    // Generate the schema based on the string of schema
        //    val fields = schemaString.split(" ")
        //      .map(fieldName => StructField(fieldName, StringType, nullable = true))
            val fields = List(
                    StructField("name", StringType, nullable = true),
                    StructField("age", IntegerType, nullable = true),
                    StructField("address", StringType, nullable = true)
                     )
            val schema = StructType(fields)
            //step3.通过SparkSession提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema
            val rdd_df = sparksession.createDataFrame(rdd_row,schema)
            rdd_df.show
          }
    SparkSQL的执行流程

      1.SQL执行过程

        select f1,f2,f3 from table_name where condition

        Step1-Parse(解析):
          首先,根据SQL语法搜素关键字(select、from、where、group by等等),标志出projection、DataSource、filter
        Step2-Bind(绑定):
          通过解析阶段的相关内容(projection、DataSource、filter),校验DataSource、filed合法性;如果校验失败,抛异常。
        Step3-optimize(优化):
          通过数据库对当前DataSource进行的统计数据分析,执行相应的优化措施。
        Step3-Execute(执行):
          开启物理执行,将逻辑计划转化为相对应的Task。

      2.执行计划实质:看做成tree(树),树节点上通过Rule对象保存节点信息。

          SparkSQL tree节点分一个几类:

            a.一元节点:filter、count等
            b.二元节点:join等
            c.叶子节点:加载外部数据等;

  • 相关阅读:
    简单的方法爬取b站dnf视频封面步骤解释
    ROS讲座 关于ROS2和Gazebo C++ in Open Source Robotics
    深圳3分钟完成港澳签注 24小时自助办证服务攻略
    如何建立数据平台?看上市公司的选择!
    从开发转型到技术总监的迷茫
    计算机控制技术课程解释与问题答疑
    深度剖析 | 基于大数据架构的BI应用
    Android系统开机启动流程及init进程浅析
    经验分享 | 如何搭建企业管理驾驶舱
    android 修改framework下资源文件后如何编译
  • 原文地址:https://www.cnblogs.com/lyr999736/p/10224676.html
Copyright © 2011-2022 走看看