zoukankan      html  css  js  c++  java
  • Spark——DataFrame与RDD互操作方式

    一.引言

           Spark SQL支持两种不同的方法将现有RDD转换为数据集。

      1.第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以使代码更简洁,并且在编写Spark应用程序时已经了解了模式,因此可以很好地工作。详细资料参考  DataFrame与RDD互操作之反射

    在开始之前现在项目的根路径下创建一个infos.txt文件,里面插入下面这种数据,以便进行学习
    1,李国辉,26
    2,华华,23
    3,ligh,24

    package com.spark
    
    import org.apache.spark.sql.SparkSession
    
    object DataFrameRDDAPP {
    
      def main(args: Array[String]): Unit = {
    
        val path="E:\data\infos.txt"
        val spark =SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate()
    
        //RDD==>DataFrame
        val rdd=spark.sparkContext.textFile(path)
    
    
        //注意导入隐式转换
        import spark.implicits._
        val infoDF=rdd.map(_.split(",")).map(line=>Info(line(0).toInt,line(1),line(2).toInt)).toDF()
    
        infoDF.show()
        infoDF.filter(infoDF.col("age")>19).show()
    
        infoDF.createOrReplaceTempView("infos")
        spark.sql("select *from infos where age>10").show()
    
        spark.stop()
      }
    
      case class Info(id:Int ,name:String ,age:Int)
    }

      2.创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有RDD。虽然此方法更详细,但它允许您在直到运行时才知道列及其类型时构造数据集。
      DataFrame则可以通过三个步骤以编程方式创建。

      1)Row从原始RDD 创建s的RDD;
      2)创建由StructType匹配Row步骤1中创建的RDD中的s 结构 表示的模式。
      3)Row通过createDataFrame提供的方法将模式应用于s 的RDD SparkSession。

    package com.spark
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    /**
      * convert rdd to dataframe 2
      *
      * @param spark
      */
    
    object DataFrameRDDAPP2 {
    
      val spark =SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate()
      // 1.转成RDD
      val rdd = spark.sparkContext.textFile("E:\data\spark\infos.txt")
    
      // 2.定义schema,带有StructType的
      // 定义schema信息
      val schemaString = "name age"
      // 对schema信息按空格进行分割
      // 最终fileds里包含了2个StructField
      val fields = schemaString.split(" ")
        // 字段类型,字段名称判断是不是为空
        .map(fieldName => StructField(fieldName, StringType, nullable = true))
      val schema = StructType(fields)
    
      // 3.把我们的schema信息作用到RDD上
      //   这个RDD里面包含了一些行
      // 形成Row类型的RDD
      val rowRDD = rdd.map(_.split(","))
        .map(x => Row(x(1), x(2).trim))
      // 通过SparkSession创建一个DataFrame
      // 传进来一个rowRDD和schema,将schema作用到rowRDD上
      val peopleDF = spark.createDataFrame(rowRDD, schema)
    
      peopleDF.show()
    
    }
  • 相关阅读:
    Shell中判断语句if中-z至-d的意思
    每日英语-20171129
    THINK PHP 学习笔记20171115
    每日英语-20171115
    git bash安装和基本设置
    Centos6.8搭建Git服务(git版本可选)
    一键安装lamp环境出现的问题
    用PHP实现反向代理服务器
    动词的过去式、过去分词、现在分词
    树莓派 中文
  • 原文地址:https://www.cnblogs.com/aishanyishi/p/10317950.html
Copyright © 2011-2022 走看看