zoukankan      html  css  js  c++  java
  • spark sql 之 RDD与DataFrame互相转化

    一、RDD转DataFrame

      方法一:通过 case class 创建 DataFrames

      

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    
    
    object TestDataFrame {
      
      def main(args: Array[String]): Unit = {
        
        /**
         * 1、初始化 spark config
         */
        val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local");    
        /**
         * 2、初始化spark context
         */
        val sc = new SparkContext(conf);
        
        /**
         * 3、初始化spark sql context
         */
        val ssc = new SQLContext(sc);
        
        /**
         * 4、做spark sql 的df获取工作
         */
        val PeopleRDD = sc.textFile("F:\input.txt").map(line => People(line.split(" ")(0),line.split(" ")(1).trim.toInt))
        
        import ssc.implicits._
        
        var df = PeopleRDD.toDF
        
        //将DataFrame注册成临时的一张表,这张表相当于临时注册到内存中,是逻辑上的表,不会物化到磁盘  这种方式用的比较多
        df.registerTempTable("peopel")
        
        var df2 =ssc.sql("select * from peopel where age > 23")show()
        
        /**
         * 5、spark context 结束工作
         */
        sc.stop();
        
      }
    }
    case class People(var name:String ,var age : Int)

      方法二:通过 structType创建 DataFrames

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}
    
    object TestDataFrame2{
      def test2(): Unit = {
        /**
         * 1、初始化 spark config
         */
        val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local");    
        /**
         * 2、初始化spark context
         */
        val sc = new SparkContext(conf);
        
        /**
         * 3、初始化spark sql context
         */
        val ssc = new SQLContext(sc);
        
        /**
         * 4、做spark sql 的df获取工作
         */
        val peopleRDD = sc.textFile("F:\input.txt")map(line => 
          Row(line.split(" ")(0),line.split(" ")(1).trim().toInt))
       
        // 创建 StructType 来定义结构
        val structType : StructType = StructType(
            StructField("name",StringType,true)::
            StructField("age",IntegerType,true) ::Nil       
        );
        
        val df : DataFrame = ssc.createDataFrame(peopleRDD, structType);
        df.registerTempTable("peopel");
        
        ssc.sql("select * from peopel").show();
        
         /**
         * 5、spark context 结束工作
         */
        sc.stop();
      }  
    }

      方法三:通过json创建 DataFream

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}
    import org.apache.spark.sql.DataFrame
    
    object TestDataFrame2{
      def test3() : Unit={
        /**
         * 1、初始化 spark config
         */
        val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local");    
        /**
         * 2、初始化spark context
         */
        val sc = new SparkContext(conf);
        
        /**
         * 3、初始化spark sql context
         */
        val ssc = new SQLContext(sc);
        
        /**
         * 4、做spark sql 的df获取工作
         */
        val df :DataFrame = ssc.read.json("F:\json.json")
        df.registerTempTable("people")
        ssc.sql("select * from people").show();
        
         /**
         * 5、spark context 结束工作
         */
        sc.stop();
      }
    }

    二、RDD转DataFrame

    df.rdd

  • 相关阅读:
    sql2005事务的使用
    [原]using的另一种用法
    [原]Cache的简单用法
    [原] Js动态删除行(支持FireFox)
    [原]为什么文本框高度不一样?
    [原]如何把object解析为int,double,float?
    压缩SQL SERVER日志文件
    [原]替换的更新(Update)查询
    [原]让链接点击过后无虚线
    [原]取得Access表中表的名字
  • 原文地址:https://www.cnblogs.com/ddaifenxiang/p/11488025.html
Copyright © 2011-2022 走看看