zoukankan      html  css  js  c++  java
  • spark sql 源码学习Dataset(三)structField、structType、schame

    1、structField

    源码结构:

        case class StructField(
            name: String,
            dataType: DataType,
            nullable: Boolean = true,
            metadata: Metadata = Metadata.empty) {}

    -----A field inside a StructType
    name:The name of this field.
    dataType:The data type of this field.
    nullable:Indicates if values of this field can be null values.  //指示这个字段的指是否可以为空值
    metadata:The metadata of this field. The metadata should be preserved during transformation if the content of the column is not modified, e.g, in selection.
    //此字段的元数据。如果不修改列的内容,则在转换期间应保存元数据,例如。g,在选择。
    一个结构体内部的 一个StructField就像一个SQL中的一个字段一样,它包含了這个字段的具体信息,可以看如下列子:

    def schema_StructField()={
    /**
      * StructField 是 一个 case class ,其中是否可以为空,默认是 true,初始元信息是为空
      * 它是作为描述 StructType中的一个字段
      */
        val sf = new StructField("b",IntegerType)
        println(sf.name)//b
        println(sf.dataType)//IntegerType
        println(sf.nullable)//true
        println(sf.metadata)//{}
    }
    

    2、structType
    A StructType object can be constructed by

    StructType(fields: Seq[StructField])
    一个StructType对象,可以有多个StructField,同时也可以用名字(name)来提取,就想当于Map可以用key来提取value,但是他StructType提取的是整条字段的信息
    在源码中structType是一个case class,如下:
    case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}

    它是继承Seq的,也就是说Seq的操作,它都拥有,但是从形式上来说,每个元素是用  StructField包住的。

    package Dataset
     
    import org.apache.spark.sql.types._
     
     
    /**
      * Created by root on 9/21/16.
      */
    object schemaAnalysis {
      //--------------------------------------------------StructType analysis---------------------------------------
      val struct = StructType(
        StructField("a", IntegerType) ::
          StructField("b", LongType, false) ::
          StructField("c", BooleanType, false) :: Nil)
     
      def schema_StructType()={
        /**
          * 一个scheme是
          */
        import org.apache.spark.sql.types.StructType
        val schemaTyped = new StructType()
          .add("a","int").add("b","string")
        schemaTyped.foreach(println)
        /**
          * StructField(a,IntegerType,true)
          * StructField(b,StringType,true)
          */
      }
      def structType_extracted()={
     
        // Extract a single StructField.
        val singleField_a = struct("a")
        println(singleField_a)
        //省却的清空下表示:可以为空的,
        //StructField(a,IntegerType,true)
        val singleField_b = struct("b")
        println(singleField_b)
        //StructField(b,LongType,false)
     
        //val nonExisting = struct("d")
        //println(nonExisting)
        //java.lang.IllegalArgumentException: Field "d" does not exist.
     
        // Extract multiple StructFields. Field names are provided in a set.
        // A StructType object will be returned.
        val twoFields = struct(Set("b", "c"))
        println(twoFields)
     
     
        //StructType(StructField(b,LongType,false), StructField(c,BooleanType,false))
        // Any names without matching fields will be ignored.
        // For the case shown below, "d" will be ignored and
        // it is treated as struct(Set("b", "c")).
        val ignoreNonExisting = struct(Set("b", "c", "d"))
        println(ignoreNonExisting)
        // ignoreNonExisting: StructType =
        //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
     
        //值得注意的是:当没有存在的字段的时候,官方文档说:单个返回的是null,多个返回的是当没有那个字段
        //但是实验的时候,报错---Field d does not exist
        //源码调用的是apply方法,确实还没有处理好这部分功能
        //我是用的是spark2.0初始版本
     
      }
      def structType_opration()={
     
        /**
          * 源码:case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
          * 它是继承与Seq的,也就是说 Seq的操作,StructType都有
          * 可以查看scala的Seq的操作:http://www.scala-lang.org/api/current/#scala.collection.Seq
          */
        val tmpStruct = StructType(StructField("d", IntegerType)::Nil)
        //集合与集合的操作
        println(struct++tmpStruct)
        // println(struct++:tmpStruct)
        //List(StructField(a,IntegerType,true), StructField(b,LongType,false), StructField(c,BooleanType,false), StructField(d,IntegerType,true))
     
        //集合与元素的操作
        println(struct :+ StructField("d", IntegerType))
     
        //可以用add来进行
     
        println(struct.add("e",IntegerType))
        //StructType(StructField(a,IntegerType,true), StructField(b,LongType,false), StructField(c,BooleanType,false), StructField(e,IntegerType,true))
     
        //head 部分的元素
        println(struct.head)
        //StructField(a,IntegerType,true)
     
     
        //last 部分的元素
        println(struct.last)
        //StructField(c,BooleanType,false)
     
        println(struct.apply("a"))
        //StructField(a,IntegerType,true)
     
        println(struct.treeString)
     
        /**
          * root
           |-- a: integer (nullable = true)
           |-- b: long (nullable = false)
           |-- c: boolean (nullable = false)
          */
     
        println(struct.contains(StructField("f", IntegerType)))
        //false
     
        println(struct.mkString)
        //StructField(a,IntegerType,true)StructField(b,LongType,false)StructField(c,BooleanType,false)
     
        println(struct.prettyJson)
     
        /**
          * {
              "type" : "struct",
              "fields" : [ {
                "name" : "a",
                "type" : "integer",
                "nullable" : true,
                "metadata" : { }
              }, {
                "name" : "b",
                "type" : "long",
                "nullable" : false,
                "metadata" : { }
              }, {
                "name" : "c",
                "type" : "boolean",
                "nullable" : false,
                "metadata" : { }
              } ]
            }
          */
        //更多操作可以查看API:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
      }
     
     
     
      def main(args: Array[String]) {
        //schema_StructType()
        //structType_extracted()
        structType_opration()
      }
    }

    3、Schema
    ---------Schema就是我们数据的数据结构描述,
    一个Schema是一个数据结构的描述(比如描述一个Json文件),它可以是在运行的时候隐式导入,或者在编译的时候就导入。它是用一个StructField集合对象的StructType描述(用一个三元tuple,内部是:name,type.nullability),本来有四个信息的为什么会说是三元数组?其实metadata,你是可以调出来。

    def schema_op()={
      case class Person(name: String, age: Long)
      val sparkSession = SparkSession.builder().appName("data set example")
        .master("local").getOrCreate()
      import sparkSession.implicits._
      val rdd = sparkSession.sparkContext.textFile("hdfs://master:9000/src/main/resources/people.txt")
      val dataSet = rdd.map(_.split(",")).map(p =>Person(p(0),p(1).trim.toLong)).toDS()
      println(dataSet.schema)
      //StructType(StructField(name,StringType,true), StructField(age,LongType,false))
     
     
      /**
        * def schema: StructType = queryExecution.analyzed.schema
        *
        * def apply(name: String): StructField = {
        * nameToField.getOrElse(name,
        * throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
        * }
        */
      val tmp: StructField = dataSet.schema("name")
      println(tmp)
      //StructField(name,StringType,true)
     
     
      println(tmp.name)//name
      println(tmp.dataType)//StringType
      println(tmp.nullable)//true
      println(tmp.metadata)//{}
    --------------------- 



  • 相关阅读:
    CentOS7中安装Mysql5.7
    CentOS7安装JDK
    设计模式之策略模式
    jmeter:文件下载连接请求保存文件
    pytest框架
    jmeter:设置全局默认请求
    jmeter:全局设置变量参数
    Badboy报错:不支持XXX属性、方法
    jmeter配置元器件:CSV Data Set Config
    jmeter报错:java.lang.IllegalArgumentException: Filename must not be null or empty
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/11052173.html
Copyright © 2011-2022 走看看