zoukankan      html  css  js  c++  java
  • Spark笔记-DataSet,DataFrame

    DataSet:面向对象的,从JVM进行构建,或从其它格式进行转化

    DataFrame:面向SQL查询,从多种数据源进行构建,或从其它格式进行转化

    RDD DataSet DataFrame互转

    1.RDD -> Dataset 
    val ds = rdd.toDS()
    
    2.RDD -> DataFrame 
    val df = spark.read.json(rdd)
    
    3.Dataset -> RDD
    val rdd = ds.rdd
    
    4.Dataset -> DataFrame
    val df = ds.toDF()
    
    5.DataFrame -> RDD
    val rdd = df.toJSON.rdd
    
    6.DataFrame -> Dataset
    val ds = df.toJSON 

    DataFrameTest1.scala

    package com.spark.dataframe
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    class DataFrameTest1 {
    }
    
    object DataFrameTest1{
    
      def main(args : Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin");
        val logFile = "e://temp.txt"
        val conf = new SparkConf().setAppName("test").setMaster("local[4]")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile,2).cache()
    
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
    
        println(s"Lines with a: $numAs , Line with b : $numBs")
    
        sc.stop()
      }
    }

    DataFrameTest2.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.SparkSession
    
    class DataFrameTest2 {
    }
    
    object DataFrameTest2{
    
      def main(args : Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          .master("local[4]")
          .getOrCreate()
    
        val df = spark.read.json("E:\spark\datatemp\people.json")
        df.show()
    
        // This import is needed to use the $-notation
        import spark.implicits._
        df.printSchema()
        df.select("name").show()
        df.filter("age>21").show()
        df.select($"name",$"age"+1).show()
    
        df.groupBy("age").count().show()
    
      }
    }

    DataFrameTest3.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.SparkSession
    
    class DataFrameTest3 {
    }
    
    object DataFrameTest3{
    
      def main(args : Array[String]): Unit ={
          System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
          val spark = SparkSession
            .builder()
            .appName("Spark SQL basic example")
            .master("local[4]")
            .getOrCreate()
    
          val df = spark.read.json("E:\spark\datatemp\people.json")
          // 将DataFrame注册为sql temporary view
          df.createOrReplaceTempView("people")
    
          val sqlDF = spark.sql("select * from people")
          sqlDF.show()
          //spark.sql("select * from global_temp.people").show()
    
        }
    }

    DataSetTest1.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.SparkSession
    
    class DataSetTest1 {
    }
    
    case class Person(name: String, age: Long)
    
    object DataSetTest1 {
      def main(args : Array[String]): Unit ={
    
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    
        val caseClassDS = Seq(Person("Andy", 32)).toDS()
        caseClassDS.show()
    
        val ds = spark.read.json("E:\spark\datatemp\people.json").as[Person]
        ds.show()
    
      }
    }

     

    RDDToDataFrame.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.{Row, SparkSession}
    
    class RDDToDataFrame {
    }
    
    //介绍两种将RDD转换为DataFrame的方式
    object RDDToDataFrame{
      def main(args : Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Rdd to DataFrame")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    
        // 数据读取类可以提前定义,Person
        val peopleDF =spark.sparkContext
          .textFile("E:\spark\datatemp\people.txt")
          .map(_.split(","))
          .map(attribute => Person(attribute(0),attribute(1).trim.toInt))
          .toDF()
    
        peopleDF.createOrReplaceTempView("people")
    
        val teenagerDF = spark.sql("select name, age from people where age between 13 and 19")
        teenagerDF.map(teenager=> "name:"+teenager(0)).show()
        teenagerDF.map(teenager => "Name: "+teenager.getAs[String]("name")).show()
    
        // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
    
        //隐式参数,后面需要Encoder类型的参数时时候则自动调用
        implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String,Any]]
        // Primitive types and case classes can be also defined as
        // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
    
        // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
        teenagerDF.map(teenager => teenager.getValuesMap[Any](List("name","age"))).collect().foreach(println(_))
        // Array(Map("name" -> "Justin", "age" -> 19))
    
    
        //////////////////////////////////////////
        //case classes 不能提前定义
        /*
        * When case classes cannot be defined ahead of time
        * (for example, the structure of records is encoded in a string,
        * or a text dataset will be parsed and fields will be projected differently for different users),
        * a DataFrame can be created programmatically with three steps.
        * 1. Create an RDD of Rows from the original RDD;
        * 2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
        * 3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
        * */
        import org.apache.spark.sql.types._
    
        //1. 创建RDD
        val peopleRDD = spark.sparkContext.textFile("e:\spark\datatemp\people.txt")
        //2.1 创建和RDD相匹配的schema
        val schemaString = "name age"
        val fields = schemaString.split(" ")
          .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
    
        //2.2. 将RDD进行格式化
        val rowRDD = peopleRDD
          .map(_.split(","))
          .map(attributes => Row(attributes(0),attributes(1).trim))
    
        //3. 将RDD转换为DF
        val peopleDF2 = spark.createDataFrame(rowRDD, schema)
        peopleDF2.createOrReplaceTempView("people")
        val results = spark.sql("select name from people")
    
        results.show()
    
      }
    }

     GenericLoadAndSave.scala

    package com.spark.dataframe
    import org.apache.spark.sql.{SaveMode, SparkSession}
    class GenericLoadAndSave { } object GenericLoadAndSave{ def main(args: Array[String]): Unit ={ System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin") val spark = SparkSession .builder() .appName("Rdd to DataFrame") .master("local[4]") .getOrCreate() // This import is needed to use the $-notation import spark.implicits._ //保存为parquet格式的数据 val userDF = spark.read.json("e:\spark\datatemp\people.json") //userDF.select("name","age").write.save("e:\spark\datasave\nameAndAge.parquet") //数据保存时的模式设置为append userDF.select("name","age").write.mode(SaveMode.Overwrite).save("e:\spark\datasave\nameAndAge.parquet") //数据源的格式可以指定为 (json, parquet, jdbc, orc, libsvm, csv, text) val peopleDF = spark.read.format("json").load("e:\spark\datatemp\people.json") //peopleDF.select("name","age").write.format("json").save("e:\spark\datasave\peopleNameAndAge.json") //数据保存时的模式设置为overwrite peopleDF.select("name","age").write.mode(SaveMode.Overwrite).format("json").save("e:\spark\datasave\peopleNameAndAge.json") //从parquet格式的数据源中读取数据构建DataFrame val peopleDF2 = spark.read.format("parquet").load("E:\spark\datasave\nameAndAge.parquet\") //+"part-00000-*.snappy.parquet") //这行加上便于精准定位。事实上parquet可以根据文件路径自行发现和推断分区信息 System.out.println("------------------") peopleDF2.select("name","age").show() //userDF.select("name","age").write.saveAsTable("e:\spark\datasave\peopleSaveAsTable") //代码有错误,原因暂时未知 //val sqlDF = spark.sql("SELECT * FROM parquet.'E:\spark\datasave\nameAndAge.parquet\part-00000-c8740fc5-cba8-4ebe-a7a8-9cec3da7dfa2.snappy.parquet'") //sqlDF.show() } }

    ReadFromParquet.scala

    package com.spark.dataframe
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    class ReadFromParquet {
    }
    
    object  ReadFromParquet{
      def main(args: Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Rdd to DataFrame")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    //从parquet格式的数据源中读取数据构建DataFrame val peopleDF2 = spark.read.format("parquet").load("E:\spark\datasave\people") /* * 目录结构为: * people * |- country=china * |-data.parquet * |- country=us * |-data.parquet * * data.parquet内包含people的name和age。加上文件路径中的country信息,最终得到的表结构为: * +-------+----+-------+ * | name| age|country| * +-------+----+-------+ * */ peopleDF2.show() } }

     SchemaMerge.scala

    package com.spark.dataframe
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    class SchemaMerge {
    }
    
    object SchemaMerge{
      def main(args: Array[String]) {
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Rdd to DataFrame")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    
        val squaresDF = spark.sparkContext.makeRDD(1 to 5)
          .map(i=>(i,i*i))
          .toDF("value","square")
    
        squaresDF.write.mode(SaveMode.Overwrite).parquet("E:\spark\datasave\schemamerge\test_table\key=1")
    
        val cubesDF = spark.sparkContext.makeRDD(1 to 5)
          .map(i => (i,i*i*i))
          .toDF("value","cube")
        cubesDF.write.mode(SaveMode.Overwrite).parquet("E:\spark\datasave\schemamerge\test_table\key=2")
    
        val mergedDF = spark.read.option("mergeSchema","true")
          .parquet("E:\spark\datasave\schemamerge\test_table\")
    
        mergedDF.printSchema()
        mergedDF.show()
      }
    }

    结果:

  • 相关阅读:
    使用python3安装frida-tools出错
    xposed代码示例
    android studio3.4打jar包
    uiautomator代码例子--java
    Mac下不能成功打开uiautomatorviewer的问题解决
    mac下生成keystore
    Python杨辉三角算法
    Python迭代器:捕获Generator的返回值
    Python函数:一个简单的迭代
    Python参数组合
  • 原文地址:https://www.cnblogs.com/gnivor/p/6644790.html
Copyright © 2011-2022 走看看