zoukankan      html  css  js  c++  java
  • 一个spark SQL和DataFrames的故事

    package com.lin.spark
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    /**
      * Created by Yaooo on 2019/6/8.
      */
    object SparkSQLExample {
      case class Person(name:String,age:Long)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("Spark SQL")
          .config("spark.come.config.option","some-value")
            .master("local[2]")
          .getOrCreate()
    
        runBasicDataFrameExample(spark)
        runDatasetCreationExample(spark)
        runInferSchemaExample(spark)
        runProgrammaticSchemaExample(spark)
      }
      private def runProgrammaticSchemaExample(spark:SparkSession): Unit ={
        import spark.implicits._
        val personRDD = spark.sparkContext.textFile("src/main/resources/people.txt")
    
        val schemaString = "name age"
    
        val fields = schemaString.split(" ")
          .map(fieldName => StructField(fieldName, StringType, nullable = true))
    
        val schema = StructType(fields)
    
        val rowRDD = personRDD
          .map(_.split(","))
          .map(att => Row(att(0),att(1).trim))
    
        val peopleDF = spark.createDataFrame(rowRDD,schema)
    
        peopleDF.createOrReplaceTempView("people")
    
        val results = spark.sql("select * from people")
    
        results.map(att=>"Name : "+att(0)).show()
    
      }
    
      private def runInferSchemaExample(spark:SparkSession): Unit ={
        import spark.implicits._
        val personDF = spark.sparkContext
          .textFile("src/main/resources/people.txt")
          .map(_.split(","))
          .map(attributes => Person(attributes(0),attributes(1).trim.toInt))
          .toDF()
    
        personDF.createOrReplaceTempView("people")
    
        val teenagersDF = spark.sql("select * from people where age between 13 and 19")
        teenagersDF.show()
        teenagersDF.map(teenager =>"name: "+teenager(0)).show()
        teenagersDF.map(teenager => "Name: "+ teenager.getAs[String]("name")).show()
    
        implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
        teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name","age"))).collect()
          .foreach(println)
      }
    
      private def runDatasetCreationExample(spark:SparkSession): Unit ={
        import spark.implicits._
        val caseClassDS = Seq(Person("Andy",18)).toDF()
        caseClassDS.show()
    
        val primitiveDS = Seq(1, 2, 3).toDS()
        primitiveDS.map(_+1).collect().foreach(println)
    
        val path = "src/main/resources/person.json"
        val personDS = spark.read.json(path).as[Person]
        personDS.show()
      }
    
      private def runBasicDataFrameExample(spark:SparkSession): Unit ={
        import spark.implicits._
        val df = spark.read.json("src/main/resources/person.json")
        df.show()
        df.printSchema()
        df.select("name").show()
        df.select($"name",$"age"+1).show()
        df.filter($"age">21).show()
        df.groupBy($"age").count().show()
    
        /*df.createOrReplaceTempView("people")
        val sqlDF = spark.sql("select * from people")
        sqlDF.show()*/
    
        df.createOrReplaceGlobalTempView("people")
        spark.sql("select * from global_temp.people").show()
      }
    }
  • 相关阅读:
    lrzsz踩坑记
    《西安游记》
    《这世界那么多人》
    Go 日常开发常备第三方库和工具
    Go 里的超时控制
    菜鸟轻松拿offer: 软件测试工程师面试秘笈
    Django 练习教程
    JasperReports入门教程(五):分组打印
    并发的特性和锁的原理,分类
    面试高频算法
  • 原文地址:https://www.cnblogs.com/linkmust/p/10992643.html
Copyright © 2011-2022 走看看