zoukankan      html  css  js  c++  java
  • Spark-Sql之DataFrame实战详解

    1、DataFrame简介:

    在Spark中,DataFrame是一种以RDD为基础的分布式数据据集,类似于传统数据库听二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

    类似这样的

    root  
     |-- age: long (nullable = true)  
     |-- id: long (nullable = true)  
     |-- name: string (nullable = true)  

     

    2、准备测试结构化数据集

    people.json

     
    {"id":1, "name":"Ganymede", "age":32}  
    {"id":2, "name":"Lilei", "age":19}  
    {"id":3, "name":"Lily", "age":25}  
    {"id":4, "name":"Hanmeimei", "age":25}  
    {"id":5, "name":"Lucy", "age":37}  
    {"id":6, "name":"Tom", "age":27}  

    3、通过编程方式理解DataFrame

    1)  通过DataFrame的API来操作数据

    import org.apache.spark.sql.SQLContext  
    import org.apache.spark.SparkConf  
    import org.apache.spark.SparkContext  
    import org.apache.log4j.Level  
    import org.apache.log4j.Logger  
      
    object DataFrameTest {  
      def main(args: Array[String]): Unit = {  
        //日志显示级别  
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)  
      
        //初始化  
        val conf = new SparkConf().setAppName("DataFrameTest")  
        val sc = new SparkContext(conf)  
        val sqlContext = new SQLContext(sc)  
        val df = sqlContext.read.json("people.json")  
      
        //查看df中的数据  
        df.show()  
        //查看Schema  
        df.printSchema()  
        //查看某个字段  
        df.select("name").show()  
        //查看多个字段,plus为加上某值  
        df.select(df.col("name"), df.col("age").plus(1)).show()  
        //过滤某个字段的值  
        df.filter(df.col("age").gt(25)).show()  
        //count group 某个字段的值  
        df.groupBy("age").count().show()  
      
        //foreach 处理各字段返回值  
        df.select(df.col("id"), df.col("name"), df.col("age")).foreach { x =>  
          {  
            //通过下标获取数据  
            println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))  
          }  
        }  
      
        //foreachPartition 处理各字段返回值,生产中常用的方式  
        df.select(df.col("id"), df.col("name"), df.col("age")).foreachPartition { iterator =>  
          iterator.foreach(x => {  
            //通过字段名获取数据  
            println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))  
      
          })  
        }  
      
      }  
    }  
     


    2)通过注册表,操作sql的方式来操作数据

    1. import org.apache.spark.sql.SQLContext  
      import org.apache.spark.SparkConf  
      import org.apache.spark.SparkContext  
      import org.apache.log4j.Level  
      import org.apache.log4j.Logger  
        
      /** 
       * @author Administrator 
       */  
      object DataFrameTest2 {  
        def main(args: Array[String]): Unit = {  
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);  
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
        
          val conf = new SparkConf().setAppName("DataFrameTest2")  
          val sc = new SparkContext(conf)  
          val sqlContext = new SQLContext(sc)  
          val df = sqlContext.read.json("people.json")  
        
          df.registerTempTable("people")  
        
          df.show();  
          df.printSchema();  
        
          //查看某个字段  
          sqlContext.sql("select name from people ").show()  
          //查看多个字段  
          sqlContext.sql("select name,age+1 from people ").show()  
          //过滤某个字段的值  
          sqlContext.sql("select age from people where age>=25").show()  
          //count group 某个字段的值  
          sqlContext.sql("select age,count(*) cnt from people group by age").show()  
        
          //foreach 处理各字段返回值  
          sqlContext.sql("select id,name,age  from people ").foreach { x =>  
            {  
              //通过下标获取数据  
              println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))  
            }  
          }  
        
          //foreachPartition 处理各字段返回值,生产中常用的方式  
          sqlContext.sql("select id,name,age  from people ").foreachPartition { iterator =>  
            iterator.foreach(x => {  
              //通过字段名获取数据  
              println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))  
        
            })  
          }  
        
        }  
      }  

    两种方式运行结果是一样的,第一种适合程序员,第二种适合熟悉sql的人员。

    4、对于非结构化的数据

    people.txt

    1. 1,Ganymede,32  
      2, Lilei, 19  
      3, Lily, 25  
      4, Hanmeimei, 25  
      5, Lucy, 37  
      6, wcc, 4  

    1)  通过字段反射来映射注册临时表


      

         import org.apache.spark.sql.SQLContext  
    
    import org.apache.spark.SparkConf  
    import org.apache.spark.SparkContext  
    import org.apache.log4j.Level  
    import org.apache.log4j.Logger  
    import org.apache.spark.sql.types.IntegerType  
    import org.apache.spark.sql.types.StructType  
    import org.apache.spark.sql.types.StringType  
    import org.apache.spark.sql.types.StructField  
    import org.apache.spark.sql.Row  
      
    /** 
     * @author Administrator 
     */  
    object DataFrameTest3 {  
      def main(args: Array[String]): Unit = {  
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);  
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
      
        val conf = new SparkConf().setAppName("DataFrameTest3")  
        val sc = new SparkContext(conf)  
        val sqlContext = new SQLContext(sc)  
        val people = sc.textFile("people.txt")  
      
        val peopleRowRDD = people.map { x => x.split(",") }.map { data =>  
          {  
            val id = data(0).trim().toInt  
            val name = data(1).trim()  
            val age = data(2).trim().toInt  
            Row(id, name, age)  
          }  
        }  
      
        val structType = StructType(Array(  
          StructField("id", IntegerType, true),  
          StructField("name", StringType, true),  
          StructField("age", IntegerType, true)));  
      
        val df = sqlContext.createDataFrame(peopleRowRDD, structType);  
      
        df.registerTempTable("people")  
      
        df.show()  
        df.printSchema()  
      
      }  
    }  

    2)   通过case class反射来映射注册临时表

     
    
    import org.apache.spark.sql.SQLContext  
    import org.apache.spark.SparkConf  
    import org.apache.spark.SparkContext  
    import org.apache.log4j.Level  
    import org.apache.log4j.Logger  
    import org.apache.spark.sql.types.IntegerType  
    import org.apache.spark.sql.types.StructType  
    import org.apache.spark.sql.types.StringType  
    import org.apache.spark.sql.types.StructField  
    import org.apache.spark.sql.Row  
      
    /** 
     * @author Administrator 
     */  
    object DataFrameTest4 {  
      case class People(id: Int, name: String, age: Int)  
      def main(args: Array[String]): Unit = {  
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);  
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
      
        val conf = new SparkConf().setAppName("DataFrameTest4")  
        val sc = new SparkContext(conf)  
        val sqlContext = new SQLContext(sc)  
        val people = sc.textFile("people.txt")  
      
        val peopleRDD = people.map { x => x.split(",") }.map { data =>  
          {  
            People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt)  
          }  
        }  
      
        //这里需要隐式转换一把  
        import sqlContext.implicits._  
        val df = peopleRDD.toDF()  
        df.registerTempTable("people")  
      
        df.show()  
        df.printSchema()  
          
      
      }  
    }  

    5、总结:

    Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。

    DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

  • 相关阅读:
    scan design rules
    scan cell
    DFT basics
    测试性分析
    DFT设计绪论
    clock gate cell
    Linux命令
    Multi-voltage和power gating的实现
    Power Gating的设计(架构)
    Power Gating的设计(模块二)
  • 原文地址:https://www.cnblogs.com/ilinuxer/p/6851652.html
Copyright © 2011-2022 走看看