zoukankan      html  css  js  c++  java
  • Spark笔记-DataSet,DataFrame

    DataSet:面向对象的,从JVM进行构建,或从其它格式进行转化

    DataFrame:面向SQL查询,从多种数据源进行构建,或从其它格式进行转化

    RDD DataSet DataFrame互转

    1.RDD -> Dataset 
    val ds = rdd.toDS()
    
    2.RDD -> DataFrame 
    val df = spark.read.json(rdd)
    
    3.Dataset -> RDD
    val rdd = ds.rdd
    
    4.Dataset -> DataFrame
    val df = ds.toDF()
    
    5.DataFrame -> RDD
    val rdd = df.toJSON.rdd
    
    6.DataFrame -> Dataset
    val ds = df.toJSON 

    DataFrameTest1.scala

    package com.spark.dataframe
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    class DataFrameTest1 {
    }
    
    object DataFrameTest1{
    
      def main(args : Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin");
        val logFile = "e://temp.txt"
        val conf = new SparkConf().setAppName("test").setMaster("local[4]")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile,2).cache()
    
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
    
        println(s"Lines with a: $numAs , Line with b : $numBs")
    
        sc.stop()
      }
    }

    DataFrameTest2.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.SparkSession
    
    class DataFrameTest2 {
    }
    
    object DataFrameTest2{
    
      def main(args : Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          .master("local[4]")
          .getOrCreate()
    
        val df = spark.read.json("E:\spark\datatemp\people.json")
        df.show()
    
        // This import is needed to use the $-notation
        import spark.implicits._
        df.printSchema()
        df.select("name").show()
        df.filter("age>21").show()
        df.select($"name",$"age"+1).show()
    
        df.groupBy("age").count().show()
    
      }
    }

    DataFrameTest3.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.SparkSession
    
    class DataFrameTest3 {
    }
    
    object DataFrameTest3{
    
      def main(args : Array[String]): Unit ={
          System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
          val spark = SparkSession
            .builder()
            .appName("Spark SQL basic example")
            .master("local[4]")
            .getOrCreate()
    
          val df = spark.read.json("E:\spark\datatemp\people.json")
          // 将DataFrame注册为sql temporary view
          df.createOrReplaceTempView("people")
    
          val sqlDF = spark.sql("select * from people")
          sqlDF.show()
          //spark.sql("select * from global_temp.people").show()
    
        }
    }

    DataSetTest1.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.SparkSession
    
    class DataSetTest1 {
    }
    
    case class Person(name: String, age: Long)
    
    object DataSetTest1 {
      def main(args : Array[String]): Unit ={
    
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    
        val caseClassDS = Seq(Person("Andy", 32)).toDS()
        caseClassDS.show()
    
        val ds = spark.read.json("E:\spark\datatemp\people.json").as[Person]
        ds.show()
    
      }
    }

     

    RDDToDataFrame.scala

    package com.spark.dataframe
    
    import org.apache.spark.sql.{Row, SparkSession}
    
    class RDDToDataFrame {
    }
    
    //介绍两种将RDD转换为DataFrame的方式
    object RDDToDataFrame{
      def main(args : Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Rdd to DataFrame")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    
        // 数据读取类可以提前定义,Person
        val peopleDF =spark.sparkContext
          .textFile("E:\spark\datatemp\people.txt")
          .map(_.split(","))
          .map(attribute => Person(attribute(0),attribute(1).trim.toInt))
          .toDF()
    
        peopleDF.createOrReplaceTempView("people")
    
        val teenagerDF = spark.sql("select name, age from people where age between 13 and 19")
        teenagerDF.map(teenager=> "name:"+teenager(0)).show()
        teenagerDF.map(teenager => "Name: "+teenager.getAs[String]("name")).show()
    
        // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
    
        //隐式参数,后面需要Encoder类型的参数时时候则自动调用
        implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String,Any]]
        // Primitive types and case classes can be also defined as
        // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
    
        // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
        teenagerDF.map(teenager => teenager.getValuesMap[Any](List("name","age"))).collect().foreach(println(_))
        // Array(Map("name" -> "Justin", "age" -> 19))
    
    
        //////////////////////////////////////////
        //case classes 不能提前定义
        /*
        * When case classes cannot be defined ahead of time
        * (for example, the structure of records is encoded in a string,
        * or a text dataset will be parsed and fields will be projected differently for different users),
        * a DataFrame can be created programmatically with three steps.
        * 1. Create an RDD of Rows from the original RDD;
        * 2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
        * 3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
        * */
        import org.apache.spark.sql.types._
    
        //1. 创建RDD
        val peopleRDD = spark.sparkContext.textFile("e:\spark\datatemp\people.txt")
        //2.1 创建和RDD相匹配的schema
        val schemaString = "name age"
        val fields = schemaString.split(" ")
          .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
    
        //2.2. 将RDD进行格式化
        val rowRDD = peopleRDD
          .map(_.split(","))
          .map(attributes => Row(attributes(0),attributes(1).trim))
    
        //3. 将RDD转换为DF
        val peopleDF2 = spark.createDataFrame(rowRDD, schema)
        peopleDF2.createOrReplaceTempView("people")
        val results = spark.sql("select name from people")
    
        results.show()
    
      }
    }

     GenericLoadAndSave.scala

    package com.spark.dataframe
    import org.apache.spark.sql.{SaveMode, SparkSession}
    class GenericLoadAndSave { } object GenericLoadAndSave{ def main(args: Array[String]): Unit ={ System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin") val spark = SparkSession .builder() .appName("Rdd to DataFrame") .master("local[4]") .getOrCreate() // This import is needed to use the $-notation import spark.implicits._ //保存为parquet格式的数据 val userDF = spark.read.json("e:\spark\datatemp\people.json") //userDF.select("name","age").write.save("e:\spark\datasave\nameAndAge.parquet") //数据保存时的模式设置为append userDF.select("name","age").write.mode(SaveMode.Overwrite).save("e:\spark\datasave\nameAndAge.parquet") //数据源的格式可以指定为 (json, parquet, jdbc, orc, libsvm, csv, text) val peopleDF = spark.read.format("json").load("e:\spark\datatemp\people.json") //peopleDF.select("name","age").write.format("json").save("e:\spark\datasave\peopleNameAndAge.json") //数据保存时的模式设置为overwrite peopleDF.select("name","age").write.mode(SaveMode.Overwrite).format("json").save("e:\spark\datasave\peopleNameAndAge.json") //从parquet格式的数据源中读取数据构建DataFrame val peopleDF2 = spark.read.format("parquet").load("E:\spark\datasave\nameAndAge.parquet\") //+"part-00000-*.snappy.parquet") //这行加上便于精准定位。事实上parquet可以根据文件路径自行发现和推断分区信息 System.out.println("------------------") peopleDF2.select("name","age").show() //userDF.select("name","age").write.saveAsTable("e:\spark\datasave\peopleSaveAsTable") //代码有错误,原因暂时未知 //val sqlDF = spark.sql("SELECT * FROM parquet.'E:\spark\datasave\nameAndAge.parquet\part-00000-c8740fc5-cba8-4ebe-a7a8-9cec3da7dfa2.snappy.parquet'") //sqlDF.show() } }

    ReadFromParquet.scala

    package com.spark.dataframe
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    class ReadFromParquet {
    }
    
    object  ReadFromParquet{
      def main(args: Array[String]): Unit ={
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Rdd to DataFrame")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    //从parquet格式的数据源中读取数据构建DataFrame val peopleDF2 = spark.read.format("parquet").load("E:\spark\datasave\people") /* * 目录结构为: * people * |- country=china * |-data.parquet * |- country=us * |-data.parquet * * data.parquet内包含people的name和age。加上文件路径中的country信息,最终得到的表结构为: * +-------+----+-------+ * | name| age|country| * +-------+----+-------+ * */ peopleDF2.show() } }

     SchemaMerge.scala

    package com.spark.dataframe
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    class SchemaMerge {
    }
    
    object SchemaMerge{
      def main(args: Array[String]) {
        System.setProperty("hadoop.home.dir", "E:\spark\hadoophome\hadoop-common-2.2.0-bin")
        val spark = SparkSession
          .builder()
          .appName("Rdd to DataFrame")
          .master("local[4]")
          .getOrCreate()
    
        // This import is needed to use the $-notation
        import spark.implicits._
    
        val squaresDF = spark.sparkContext.makeRDD(1 to 5)
          .map(i=>(i,i*i))
          .toDF("value","square")
    
        squaresDF.write.mode(SaveMode.Overwrite).parquet("E:\spark\datasave\schemamerge\test_table\key=1")
    
        val cubesDF = spark.sparkContext.makeRDD(1 to 5)
          .map(i => (i,i*i*i))
          .toDF("value","cube")
        cubesDF.write.mode(SaveMode.Overwrite).parquet("E:\spark\datasave\schemamerge\test_table\key=2")
    
        val mergedDF = spark.read.option("mergeSchema","true")
          .parquet("E:\spark\datasave\schemamerge\test_table\")
    
        mergedDF.printSchema()
        mergedDF.show()
      }
    }

    结果:

  • 相关阅读:
    hdu 4002 Find the maximum
    hdu 2837 坑题。
    hdu 3123
    zoj Treasure Hunt IV
    hdu 2053 Switch Game 水题一枚,鉴定完毕
    poj 1430 Binary Stirling Numbers
    hdu 3037 Saving Beans
    hdu 3944 dp?
    南阳oj 求N!的二进制表示最低位的1的位置(从右向左数)。
    fzu 2171 防守阵地 II
  • 原文地址:https://www.cnblogs.com/gnivor/p/6644790.html
Copyright © 2011-2022 走看看