现有如下数据文件需要处理
格式:CSV
位置:hdfs://myhdfs/input.csv
大小:100GB
字段:用户ID,位置ID,开始时间,停留时长(分钟)
4行样例:
UserA,LocationA,2018-01-01 08:00:00,60
UserA,LocationA,2018-01-01 09:00:00,60
UserA,LocationB,2018-01-01 10:00:00,60
UserA,LocationA,2018-01-01 11:00:00,60
解读:
样例数据中的数据含义是:
用户UserA,在LocationA位置,从8点开始,停留了60分钟
用户UserA,在LocationA位置,从9点开始,停留了60分钟
用户UserA,在LocationB位置,从10点开始,停留了60分钟
用户UserA,在LocationA位置,从11点开始,停留了60分钟
该样例期待输出:
UserA,LocationA,2018-01-01 08:00:00,120
UserA,LocationB,2018-01-01 10:00:00,60
UserA,LocationA,2018-01-01 11:00:00,60
处理逻辑:
1 对同一个用户,在同一个位置,连续的多条记录进行合并
2 合并原则:开始时间取最早时间,停留时长加和
要求:请使用Spark、MapReduce或其他分布式计算引擎处理
思路:按照按照用户ID和位置ID分组,分组之后按照时间列排序,由于数据之间的存在依赖关系,并且依赖关系比较连续,满足某种关系的数据要进行合并操作,因此使用sql部分的代码很难实现。在这使用的是将Dataset转化为RDD之后使用基于分区进行操作的方法处理数据。拿到相关的数据,按照时间顺序读取,判断,累加等进行处理。
1 package com.zhf.streaming 2 3 import java.text.SimpleDateFormat 4 5 import org.apache.spark.Partitioner 6 import org.apache.spark.rdd.RDD 7 import org.apache.spark.sql.{Dataset, SparkSession} 8 9 import scala.collection.mutable.ArrayBuffer 10 case class ResultData(userID:String,locationID:String,startTime:String,endTime:String,stayTime:Long) 11 object Test { 12 def main(args: Array[String]): Unit = { 13 val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() 14 import spark.implicits._ 15 import org.apache.spark.sql.functions._ 16 val info = spark.read 17 .format("csv") 18 .option("path", "src/data/user.csv") 19 .load() 20 .toDF("userID", "locationID", "startTimes", "stayMinutes") 21 .as[(String, String, String, String)] 22 23 val ds: Dataset[((String, String, String), ResultData)] = info.map { 24 case (userID, locationID, startTimes, stayMinutes) => 25 //让起始时间+停留时间=结束时间 26 val sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 27 val date = sd.parse(startTimes) 28 val endTime = sd.format(date.getTime + (stayMinutes.trim.toInt * 60 * 1000)) 29 ((userID, locationID, startTimes), ResultData(userID, locationID, startTimes, endTime, stayMinutes.trim.toLong)) 30 }.as[((String, String, String), ResultData)] 31 32 //按照用户ID和位置ID分组,分组之后按照时间列排序 33 val newDS: RDD[((String, String, String), ResultData)] = ds.rdd.repartitionAndSortWithinPartitions(new Partitioner { 34 override def numPartitions: Int = 4 35 36 override def getPartition(key: Any): Int = key match { 37 case (userID, locationID, _) => (userID.hashCode + locationID.hashCode) % numPartitions 38 case _ => 0 39 } 40 }) 41 val result = newDS.mapPartitions(iter => { 42 val listBuffer = iter.toBuffer 43 val buffer = ArrayBuffer.empty[ResultData] 44 var resultData: ResultData = null; 45 //分区内只有一个元素的情况 46 if (listBuffer.size == 1) { 47 resultData = listBuffer(0)._2; 48 buffer += resultData 49 } else { 50 //分区内有多个元素 51 listBuffer.foreach { 52 case ((userID, locationID, startTimes), currentData) => 53 //初始化赋值 54 if (resultData == null) { 55 resultData = ResultData(userID, locationID, startTimes, currentData.endTime, currentData.stayTime) 56 } else { 57 //如果当前行的起始时间与上一行的结束时间相同 58 if (currentData.startTime == resultData.endTime) { 59 //合并 修改初始值 60 resultData = ResultData(currentData.userID, currentData.locationID, resultData.startTime, currentData.endTime, resultData.stayTime + currentData.stayTime) 61 } else { 62 //不相同的情况下,将上一行结果添加到结果集,并修改初始值 63 buffer += resultData 64 resultData = currentData 65 } 66 } 67 } 68 //最后一个元素对象 69 if (resultData != null) { 70 buffer += resultData 71 } 72 } 73 buffer.toIterator 74 }) 75 result.collect() 76 .sortBy(_.startTime) 77 .foreach(println) 78 } 79 }