zoukankan      html  css  js  c++  java
  • 基于API和SQL的基本操作【DataFrame】

    写在前面:

    当得到一个DataFrame对象之后,可以使用对象提供的各种API方法进行直接调用,进行数据的处理。

      // =====基于dataframe的API=======之后的就都是DataFrame 的操作了==============
        infoDF.show()
        infoDF.filter(infoDF.col("age") > 30).show()
    

    另,也可以将DataFrame对象通过createOrReplaceTempView()方法,将其转为一张表,从而使用SQL来进行数据处理。

        // ======基于SQL的API===========DataFrame 创建为一张表================
        infoDF.createOrReplaceTempView("infos")
        spark.sql("select * from infos where age > 30").show()
    

    主要介绍一下API的基本操作,因为SQL的话,写法和传统的基本没差。

    DEMO1

    package february.sql
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * Description: DataFrame API基本操作  直接读取Json文件为DataFrame对象
      *
      * @Author: 留歌36
      * @Date: 2019/2/24 17:54
      */
    object DataFrameApp {
      def main(args: Array[String]): Unit = {
        // Spark SQL的入口点是:SparkSession
       val spark = SparkSession.builder()
                                .appName(this.getClass.getSimpleName)
                                .master("local[2]")
                                .getOrCreate()
        // 将json文件加载成一个DataFrame
       val DF = spark.read.format("json").load("f:\user.json")
    
        // 输出dataframe对应的schema的信息
        DF.printSchema()
    
        // 输出数据集的前20条数据
        DF.show()
    
        // 查询某几列所有的数据:select name from table
        DF.select("name").show()
    
        // 查询某几列的所有数据,并对列进行计算:select name, age+10 from table;
        DF.select(DF.col("name"), (DF.col("age")+10).as("age2")).show()
    
        // 根据某一列的值,进行过滤。select * from table where age > 21
        DF.filter(DF.col("age") > 21).show()
    
        // 根据某一列进行分组,然后再进行聚合 select age,count(1) from table group by age
        DF.groupBy("age").count().show()
    
        spark.stop()
    
    
      }
    
    }
    
    

    DEMO2

    package february.sql
    
    import february.sql.DataFrameRDDApp.Info
    import org.apache.spark.sql.SparkSession
    
    /**
      * Description: DataFrame中的其他操作  读取TXT文件为RDD,再反射隐式转换为DataFrame对象
      *
      * @Author: 留歌36
      * @Date: 2019/2/25 19:31
      */
    object DataFrameCase {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
    
        // RDD => DataFrame
        val textFile = spark.sparkContext.textFile("f:\users.txt")
        textFile.foreach(println)
        // 注意这里的隐私转换,split("\|") 竖线需要转义
        import spark.implicits._
        val DF = textFile.map(_.split("\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
    
        DF.show()
        // show 方法默认只显示前20条记录,show()在这里被重载了很多次
        DF.show(30)
        DF.show(30,false) //不隐藏其余的
        // 返回前10条记录
        DF.take(10).foreach(println)
        DF.first()
        DF.head(5)
        DF.select("name","phone").show(30,false)
        //name字段为空或NULL的过滤出来
        DF.filter("name='' OR name='NULL'").show()
        // name 以M开头的
        DF.filter("SUBSTR(name,0,1)='留'").show()
    
        // 排序的使用,几种传递参数的方式
        DF.sort(DF.col("name").desc).show() //降序
        DF.sort(DF("name")).show()
        DF.sort("name").show()
        // 排序升级操作
        DF.sort("name", "id").show()
        DF.sort(DF("name").asc, DF("id").desc).show() //name的升序,id的降序
        // 修改查询的列名(别名的使用)
        DF.select(DF("name").as("stuent_name")).show()
    
       // join 操作
        val DF2 = textFile.map(_.split("\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
       // 默认内连接
        val joinDF = DF.join(DF2, DF("id") === DF2("id"))
    
        joinDF.show()
    
       //查看所有的内置函数
    //    spark.sql("show functions").show(1000)
        spark.stop()
    
      }
      case class Student(id: Int, name: String, phone: String,email: String)
    
    
    }
    
    

    更多相关小demo:每天一个程序:https://blog.csdn.net/liuge36/column/info/34094

  • 相关阅读:
    双11实时物流订单最佳实践
    一文理解 K8s 容器网络虚拟化
    新能源汽车太猛了,这些卡脖子技术你了解吗?
    龙蜥社区成立系统运维SIG,重磅开源sysAK系统运维工具集
    sysAK(青囊)系统运维工具集:如何实现高效自动化运维?| 龙蜥技术
    零信任策略下云上安全信息与事件管理最佳实践
    MySQL 处理重复数据
    JavaScript toFixed()、toExponential、toPrecision方法
    Nginx 安装
    spring定时任务执行两次的原因与解决方法
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614758.html
Copyright © 2011-2022 走看看