zoukankan      html  css  js  c++  java
  • Spark实战

    背景

    业务上有一份行车轨迹的数据 carRecord.csv 如下:

    id;carNum;orgId;capTime
    1;粤A321;0002;20200512 102010
    2;云A321;0001;20200512 102010
    3;粤A321;0001;20200512 103010
    4;云A321;0002;20200512 103010
    5;粤A321;0003;20200512 114010
    6;京A321;0003;20200512 114011
    

    其中各字段含义分别为记录id,车牌号,抓拍卡口,抓拍时间。现在需要筛选出所有车辆最后出现的一条记录,得到每辆车最后经过的抓拍点信息,也就是要将其他日期的数据过滤掉,我们可以使用选择去重。下面分别展示通过 dataframe 和 rdd 如果实现。

    DataFrame实现

    具体实现:

    1. 导入行车数据;
    2. 首先使用 withColumn() 添加 num 字段,num 字段是由 row_number() + Window() + orderBy() 实现的:开窗函数中进行去重,先对车牌carNum 进行分组,倒序排序,然后取窗口内排在第一位的则为最后的行车记录,使用 where 做过滤,最后drop掉不再使用的 num 字段;
    3. 通过 explain 打印 dataFrame 的物理执行过程,show() 作为 action算子触发了以上的系列运算。
    val carDF = spark.read.format("csv")
          .option("sep", ";")
          .option("inferSchema", "true")
          .option("header", "true")
          .csv(basePath + "/car.csv")
    
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    // This import is needed to use the $-notation
    import spark.implicits._
    
    val lastPassCar = carDF.withColumn("num",
       row_number().over(
         Window.partitionBy($"carNum")
               .orderBy($"capTime" desc)
       )
    ).where($"num" === 1).drop($"num")
    lastPassCar.explain()
    lastPassCar.show()
    

    执行计划如下:

    == Physical Plan ==
    *(3) Project [id#10, carNum#11, orgId#12, capTime#13]
    +- *(3) Filter (isnotnull(num#19) && (num#19 = 1))
       +- Window [row_number() windowspecdefinition(carNum#11, capTime#13 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS num#19], [carNum#11], [capTime#13 DESC NULLS LAST]
          +- *(2) Sort [carNum#11 ASC NULLS FIRST, capTime#13 DESC NULLS LAST], false, 0
             +- Exchange hashpartitioning(carNum#11, 200)
                +- *(1) FileScan csv [id#10,carNum#11,orgId#12,capTime#13]
    

    结果如下:

    // 获得其中每辆车最后经过的卡口等信息
    +---+------+-----+---------------+
    | id|carNum|orgId|        capTime|
    +---+------+-----+---------------+
    |  5|粤A321|    3|20200512 114010|
    |  6|京A321|    3|20200512 114011|
    |  4|云A321|    2|20200512 103010|
    +---+------+-----+---------------+
    

    RDD实现

    思路:

    1. 加载源数据并封装到 CarRecord 样例类中,生成RDD;
    2. 首先通过 groupBy 对 数据做分组后生成 RDD[(String, Iterable[CarRecord])]对象,随即使用 map 对每个 key 对应的多组记录(Iterable[CarRecord])进行reduce操作(maxBy),最后在 maxBy 算子传入一个字面量函数(也可写为x=>x.capTime),即提取该carNum下每条记录中的 capTime 进行比对,然后选出最新时间记录(maxBy 为高阶函数,依赖 reduceLeft 实现);
    case class CarRecord(id: Int, carNum: String, orgId: Int, capTime: String)
    
    // 构造 schema RDD
    val carRDD: RDD[CarRecord] =
        carDF.rdd.map(x => 
            CarRecord(x.getInt(0), x.getString(1), x.getInt(2), x.getString(3)))
    val res = carRDD.groupBy(_.carNum).map{
        x => {
            // x._2 是 iter,取其中 capTime 最大的记录
            x._2.maxBy { _.capTime }
        }
    }
    res.toDebugString
    res.collect.foreach(x => println(x))
    

    总结

    实现选择去重的两种常用方法:

    1. 通过开窗函数 row_number+window+orderBy 进行聚合后去重;
    2. 通过 groupby + maxBy 等算子进行聚合后去重。

    扩展

    Hive 又如何实现选择去重呢?与上文两种方法一样,请自行实现。

  • 相关阅读:
    sql优化-mysql的慢查询
    LInux服务器防火墙-开放端口
    vim打开文件中文乱码解决方法总结
    查看指定文件夹或文件总的大小,文件夹下各个文件的大小
    grep -v 反选匹配内容(not操作)以及grep -E(or操作)
    查看Liunx服务器的磁盘使用情况df命令,以及查看磁盘分区lsblk命令
    top发现僵尸进程
    查看linux服务器内存使用情况
    GitHub 和 GitLab对比
    git与svn
  • 原文地址:https://www.cnblogs.com/stillcoolme/p/12885197.html
Copyright © 2011-2022 走看看