zoukankan      html  css  js  c++  java
  • DataFrames与RDDs的相互转换

    Spark SQL支持两种RDDs转换为DataFrames的方式
    使用反射获取RDD内的Schema
        当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
    通过编程接口指定Schema
        通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
        这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。
    使用反射获取Schema(Inferring the Schema Using Reflection)
    import org.apache.spark.sql.{DataFrameReader, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object InferringSchema {
      def main(args: Array[String]) {
    
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SQL-intsmaze")
    
        //SQLContext要依赖SparkContext
        val sc = new SparkContext(conf)
        //创建SQLContext
        val sqlContext = new SQLContext(sc)
    
        //从指定的地址创建RDD
        val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))
    
        //创建case class
        //将RDD和case class关联
        val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    
        //导入隐式转换,如果不导入无法将RDD转换成DataFrame
        //将RDD转换成DataFrame
        import sqlContext.implicits._
        val personDF = personRDD.toDF
    
        //注册表
        personDF.registerTempTable("intsmaze")
        //传入SQL
        val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")
    
        //将结果以JSON的方式存储到指定位置
        df.write.json("hdfs://192.168.19.131:9000/personresult")
    
        //停止Spark Context
        sc.stop()
      }
    }
    //case class一定要放到外面
    case class Person(id: Int, name: String, age: Int)

    spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。

    打包提交到yarn集群:

    /home/hadoop/app/spark/bin/spark-submit --class InferringSchema 
    --master yarn 
    --deploy-mode cluster 
    --driver-memory 512m 
    --executor-memory 512m 
    --executor-cores 2 
    --queue default 
    /home/hadoop/sparksql-1.0-SNAPSHOT.jar

    通过编程接口指定Schema(Programmatically Specifying the Schema)

    当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

    从原来的RDD创建一个Row格式的RDD.

    创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.

    通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.

    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.sql.types._
    import org.apache.spark.{SparkContext, SparkConf}
    
    object SpecifyingSchema {
      def main(args: Array[String]) {
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SQL-intsmaze")
        //SQLContext要依赖SparkContext
        val sc = new SparkContext(conf)
        //创建SQLContext
        val sqlContext = new SQLContext(sc)
    
        //从指定的地址创建RDD
        val personRDD = sc.textFile(args(0)).map(_.split(","))
    
        //通过StructType直接指定每个字段的schema
        val schema = StructType(
          List(
            StructField("id", IntegerType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true)
          )
        )
    
        //将RDD映射到rowRDD
        val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    
        //将schema信息应用到rowRDD上
        val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    
        //注册表
        personDataFrame.registerTempTable("intsmaze")
        //执行SQL
        val df = sqlContext.sql("select * from intsmaze order by age desc ")
        //将结果以JSON的方式存储到指定位置
        df.write.json(args(1))
        //停止Spark Context
        sc.stop()
      }
    }

    将程序打成jar包,上传到spark集群,提交Spark任务

    /home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema 
    --master yarn 
    --deploy-mode cluster 
    --driver-memory 512m 
    --executor-memory 512m 
    --executor-cores 2 
    --queue default 
    /home/hadoop/sparksql-1.0-SNAPSHOT.jar 
    hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
    /home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema 
    --master yarn 
    --deploy-mode client 
    --driver-memory 512m 
    --executor-memory 512m 
    --executor-cores 2 
    --queue default 
    /home/hadoop/sparksql-1.0-SNAPSHOT.jar 
    hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

    在maven项目的pom.xml中添加Spark SQL的依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>1.6.2</version>
    </dependency>
  • 相关阅读:
    12.22冲刺总结
    Android远程服务
    短信电话监听
    Android本地服务
    意图
    多线程下载
    异步HTTP请求
    提交数据到服务器
    通过HTTP访问网络资源
    观察者
  • 原文地址:https://www.cnblogs.com/itboys/p/9179984.html
Copyright © 2011-2022 走看看