zoukankan      html  css  js  c++  java
  • Spark之 RDD转换成DataFrame的Scala实现

    依赖

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.3</version>
    </dependency>

    RDD转化成DataFrame:通过StructType指定schema

    package com.zy.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    /**
      * RDD转化成DataFrame:通过StructType指定schema
      */
    object StructTypeSchema {
      def main(args: Array[String]): Unit = {
        //创建sparkSession对象
        val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
        //获取sparkContext
        val sc: SparkContext = sparkSession.sparkContext
        //设置日志级别
        sc.setLogLevel("WARN")
    
        //读取文件
        val textFile: RDD[String] = sc.textFile("D:\person.txt")
        //切分文件
        val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(","))
    
        //关联对象
        val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
        //创建rdd的schema信息
        val schema: StructType = (new StructType)
          .add("id", IntegerType, true, "id")
          .add("name", StringType, false, "姓名")
          .add("age", IntegerType, true, "年龄")
        //根据rdd和schema信息创建DataFrame
        val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
    
        //DSL操作
        personDF.show()
    
        //sql 操作
        //将df注册成表
        personDF.createTempView("person")
    
        sparkSession.sql("select * from person where id =3").show()
    
        sparkSession.stop()
      }
    }

    RDD转化成DataFrame:利用反射机制推断schema

    package com.zy.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    
    /**
      * RDD转化成DataFrame:利用反射机制推断schema
      */
    
    //todo 定义一个样例类
    case class Person(id: Int, name: String, age: Int)
    
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
        //构建sparkSession 指定appName和master地址(本地测试local)
        val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
        //获取sparkContext
        val sc: SparkContext = sparkSession.sparkContext
    
        //设置日志输出级别
        sc.setLogLevel("WARN")
    
        //加载数据
        val dataRDD: RDD[String] = sc.textFile("D:\person.txt")
        //切分数据
        val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))
        //将rdd和person样例类关联
        val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    
        //将rdd转换成dataFrame 导入隐式转换
        import sparkSession.implicits._
        val personDF: DataFrame = personRDD.toDF
    
        //DSL语法
        personDF.show()
        personDF.printSchema()
        personDF.select("name").show()
        personDF.filter($"age" > 30).show()
    
        println("---------------------------------------------")
    
        //sql语法
        //首先要创建临时视图
        personDF.createTempView("person")
        sparkSession.sql("select * from person where id>1").show()
    
        sparkSession.stop()
      }
    }
  • 相关阅读:
    SysUtils.UpperCase、SysUtils.LowerCase 大小写转换
    Sql Server 2005 数据库备份还原后出现“受限制用户”问题的解决
    为什么要跪谢
    【基础】C#卸载快捷方式添加
    DataTable 复制 DataRow 出现 “该行已经属于另一个表”错误的解决办法
    将int型转化成五位字符串,前面用0填充
    C#嵌套类的使用方法及特性(转)
    net内存回收与Dispose﹐Close﹐Finalize方法(转)
    sqlhelper中文注释版(转)
    windows2003消息队列的安装
  • 原文地址:https://www.cnblogs.com/blazeZzz/p/9851058.html
Copyright © 2011-2022 走看看