val emp = Seq((1,"Smith",-1,"2018","10","M",3000), (2,"Rose",1,"2010","20","M",4000), (1,"Williams",1,"2020","10","M",1000), (2,"Jones",2,"2005","10","F",2000), (1,"Brown",2,"2020","40","",-1), (6,"Brown",2,"2010","50","",-1) ) val empColumns = Seq("emp_id","name","superior_emp_id","year_joined", "emp_dept_id","gender","salary") import spark.sqlContext.implicits._ val empDF = emp.toDF(empColumns:_*) empDF.show(false) scala> val b = empDF scala> b.show +------+--------+---------------+-----------+-----------+------+------+ |emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary| +------+--------+---------------+-----------+-----------+------+------+ | 1| Smith| -1| 2018| 10| M| 3000| | 2| Rose| 1| 2010| 20| M| 4000| | 1|Williams| 1| 2020| 10| M| 1000| | 2| Jones| 2| 2005| 10| F| 2000| | 1| Brown| 2| 2020| 40| | -1| | 6| Brown| 2| 2010| 50| | -1| +------+--------+---------------+-----------+-----------+------+------+ scala> val a = empDF.groupBy("emp_id").agg(max("year_joined").alias("max")) a: org.apache.spark.sql.DataFrame = [emp_id: int, max: string] scala> a.show +------+----+ |emp_id| max| +------+----+ | 1|2020| | 6|2010| | 2|2010| +------+----+ scala> b.join(a, Seq("emp_id"), "left").show +------+--------+---------------+-----------+-----------+------+------+----+ |emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary| max| +------+--------+---------------+-----------+-----------+------+------+----+ | 1| Smith| -1| 2018| 10| M| 3000|2020| | 2| Rose| 1| 2010| 20| M| 4000|2010| | 1|Williams| 1| 2020| 10| M| 1000|2020| | 2| Jones| 2| 2005| 10| F| 2000|2010| | 1| Brown| 2| 2020| 40| | -1|2020| | 6| Brown| 2| 2010| 50| | -1|2010| +------+--------+---------------+-----------+-----------+------+------+----+ scala> b.join(a, Seq("emp_id"), "left").where(s"year_joined = max").show +------+--------+---------------+-----------+-----------+------+------+----+ |emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary| max| +------+--------+---------------+-----------+-----------+------+------+----+ | 2| Rose| 1| 2010| 20| M| 4000|2010| | 1|Williams| 1| 2020| 10| M| 1000|2020| | 1| Brown| 2| 2020| 40| | -1|2020| | 6| Brown| 2| 2010| 50| | -1|2010| +------+--------+---------------+-----------+-----------+------+------+----+
参考:
https://sparkbyexamples.com/spark/spark-sql-dataframe-join/
https://stackoverflow.com/questions/39699495/spark-2-0-groupby-column-and-then-get-maxdate-on-a-datetype-column?rq=1