zoukankan      html  css  js  c++  java
  • DataFrame与RDD互操作

    DataFrame与RDD互操作之一: 反射方式

      使用反射来推断包含了特定数据类型的RDD的元数据信息

      使用DataFrame API或者sql方式编程

    代码如下

    import org.apache.spark.sql.SparkSession
    
    object DataFrameRDDApp {
    
      def main(args: Array[String]): Unit = {
    
        //初始化
        val sparkSession = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()
    
        //RDD => DataFrame
        val rdd = sparkSession.sparkContext.textFile("G:\people.txt")
    
        //需要导入隐式转换
        import sparkSession.implicits._
        val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt,line(1),line(2).toInt)).toDF()
    
        infoDF.show()
    
        infoDF.filter(infoDF.col("age")>10).show
    
      //将DataFrame转化成sql表 infoDF.createOrReplaceTempView(
    "infos")   //利用spark.sql 进行api操作 sparkSession.sql("select * from infos where age > 10").show() sparkSession.close() } //采用反射的方式,获取字段的类型和名字 使用反射来推断包含了特定数据类型的RDD的元数据信息 case class Info(id : Int,name: String,age:Int) }

    操作文件如下

    1,zhangsan,9
    2,lish,14
    3,zhangwu,17

    DataFrame与RDD互操作之二: 编程方式

    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SparkSession}
    
    object DataFrameRDDApp {
    
      def main(args: Array[String]): Unit = {
    
        //初始化
        val sparkSession = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()
    
        program(sparkSession)
    
        sparkSession.close()
      }
      def program(sparkSession: SparkSession): Unit = {
    
        //RDD => DataFrame
        val rdd = sparkSession.sparkContext.textFile("G:\people.txt")
    
        val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt,line(1),line(2).toInt))
    
        val structType = StructType(Array(StructField("id",IntegerType,true),
          StructField("name", StringType,true),
          StructField("age",IntegerType,true)))
    
        val people = sparkSession.createDataFrame(infoRDD,structType)
    
        people.printSchema()
    
      //通过df的api方式进行操作 people.
    filter(people.col("age")>8).show people.show() } }

    DataFrame 和RDD互操作的两种方式

    1反射 case class  前提:事先知道你的字段、字段类型(推荐)

    2编程 row            前提: 事先不知道你的字段类型

    例子:对日志文件进行即席查询

    1 上传文件到hdfs上,这里随便上传了一个datanode的日志文件

    hdfs dfs -put /var/log/hadoop-hdfs/hadoop-cmf-hdfs-DATANODE-udap69a166.log.out  /test

    2 将文件加载成rdd

    scala> val logrdd = sc.textFile("hdfs://hadoop1/test/hadoop-cmf-hdfs-DATANODE-udap69a166.log.out")

    3 对原来的rdd进行map操作 这里直接用一列

    import org.apache.spark.sql.Row
    
    val masterrdd = logrdd.map(line => Row(line))

    4 定义schema信息

      加载隐式转换
    import spark.implicits._

    val schemaString = "line"

    val filed = schemaString.split(" ").map(fieldName => StructField(fieldName ,StringType,nullable = true))

    val schema = StructType(filed)

    val masterDF = spark.createDataFrame(masterrdd,schema)

     masterDF.createOrReplaceTempView("logs")

     spark.sql("select * from logs").show

    这个地方会报错

     error: not found: value StructField

    解决方法:
    stackoverflow上 给出的解决方法是导入相应的类型

     import org.apache.spark.sql.types._
  • 相关阅读:
    Linux下搭建PHP环境的参考文章小记
    jQuery遇到问题的小记
    小程序 login
    小程序编辑器vscode
    弹性布局详解——5个div让你学会弹性布局
    vue在页面嵌入别的页面或者是视频2
    VUE设置浏览器icon图标
    遮罩层出现后不能滚动 添加事件@touchmove.prevent
    vue 在script里写页面跳转
    axios post、get 请求参数和headers配置
  • 原文地址:https://www.cnblogs.com/erlou96/p/12913440.html
Copyright © 2011-2022 走看看