zoukankan      html  css  js  c++  java
  • Spark中对DataFrame的基础操作:列增加,列删除,行增加,列名更换,排序等等

    可以使用select和selectExpr来操作DataFrame中的列

    例如查询:DEST_COUNTRY,ORIGIN_COUNTRY

        val path="/Volumes/Data/BigData_code/data/flight-data/json/2015-summary.json"
        val dataDF = spark.read.format("json").load(path)
        val dataSelect = dataDF.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME")
        dataSelect.show(2)
    

    新增一列

    判断目的国家和起飞国家是否是同一个。

        //创建一个新的列,用来表示目的国家和源国家是否是同一国家
        dataDF.selectExpr(
          "*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as inCountry"
        ).show(30)
    

    使用withColumn添加列

        //添加1列
        dataDF.withColumn("numberOne", lit(1)).show(10)
    

    删除一列

        //删除列
        //方法1:
        dataDF.selectExpr("DEST_COUNTRY_NAME", "count").show(2)
        //方法2:
        dataDF.drop("ORIGIN_COUNTRY_NAME").show(2)
    

    连接和追加行(联合操作)

    注意:DataFrame是不可变的,这意味着用户不能追加行,只能将想要添加的行生成ROW对象,然后再生成DataFrame,再将两个DataFrame进行拼接

        dataDF.printSchema()
        //将需要连接的Schema赋值给需要创建的DataFrame中(因为两个DataFrame连接需要Schema模式相同)
        val mySchema = dataDF.schema
        //创建Row对象的list
        val rowList = List(Row("NewCountry", "OtherCountry", 2L), Row("NewCountry2", "OtherCountry", 5L))
        //创建RDD
        val myrdd = sc.makeRDD(rowList)
        //创建新的DataFrame
        val newDF = spark.createDataFrame(myrdd, mySchema)
        //将两者进行连接
        newDF.union(dataDF).show(20)
    

    会用select语句,我们还可以使用系统预定义好的聚合函数来指定在整个DataFrame上的聚合操作。

        //使用系统已经有的函数,求所有数据的count的平均值,并求出有多少不同的目的国家
        dataDF.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
    

    字面量

    dataDF.select(expr("*"), lit(1).as("one")).show(3)
    

    注意:无法一次性添加多个列。一次性添加多个列,可以先将新列生成一个DataFrame,然后再进行连接即可。

    修改列名:

       //修改列的名字:
        //方法1:
        dataDF.select(expr("DEST_COUNTRY_NAME as dest"), expr("ORIGIN_COUNTRY_NAME"), expr("count")).show(2)
        //方法2:
        dataDF.selectExpr("DEST_COUNTRY_NAME as dect", "ORIGIN_COUNTRY_NAME", "count").show(2)
        //方法3:
        dataDF.withColumnRenamed("DEST_COUNTRY_NAME", "dest").show(2)
    

    行操作

    过滤行

    在DataFrame上实现过滤有两种方法:

    • 创建一个字符串表达式:使用where
    • 通过列操作来构建表达式:使用filter
      下面我们只要count<2的所有行
        val dataDF = spark.read.format("json").load(path)
        dataDF.where("count < 2").show(20)
    

    多个条件之间的关系为‘and’时

    dataDF.where("count > 2").where("count < 4").show(20)
    

    去重操作

    dataDF.selectExpr("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").distinct().show(20)
    

    排序

        //排序
        //方法1:
        dataDF.sort("count", "DEST_COUNTRY_NAME").show(2)   //默认升序
        //方法2:
        dataDF.sort(asc("count"), desc("DEST_COUNTRY_NAME")).show(2)  //asc指定升序,desc指定降序
        //方法3:
        dataDF.sort(expr("count asc"), expr("DEST_COUNTRY_NAME desc")).show(2)
        //方法4:
        dataDF.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
        //方法5:
        dataDF.orderBy(expr("count asc"), expr("DEST_COUNTRY_NAME desc")).show(2)
    

    一个高级技巧是你可以指定空值在排序列表中的位置,使用asc_nulls_first指示空值安排在升序排列的前面,使用desc_nulls_first指示空值安排在降序排列的前面,使用asc_nulls_last指示空值安排在升序排列的后面,使用desc_nulls_last指示空值安排在降序排列的后面。
    还有一种排序是分区内进行排序。这样做能够大大提高性能。使用的函数是sortWithinPartitions

  • 相关阅读:
    TCP 协议如何保证可靠传输
    mysql 优化
    Navicat 导入导出
    Hibernate的优缺点
    寒假学习日报(十八)
    《机器学习十讲》第二讲总结
    寒假学习日报(十七)
    《设计原本》阅读笔记(二)
    《机器学习十讲》第一讲总结
    寒假学习日报(十六)
  • 原文地址:https://www.cnblogs.com/ALINGMAOMAO/p/14447643.html
Copyright © 2011-2022 走看看