批量处理模板方法, 核心处理方法为内部方法
def batchProces(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = { //自定义RDD,此处为demo val dataRDD = sc.makeRDD(List(1, 2), numPartitions) dataRDD.mapPartitions(iterator => { val rawData = iterator.toList var lstT = new ListBuffer[(Int, Int)]() rawData.foreach(v => { if (lstT.size < 50) { lstT.append((v, 1)) } else { //每50处理一次 procesData() } }) //剩余的继续处理 procesData() //批量处理逻辑 def procesData() = { //核心处理逻辑 // doProcess //很重要 lstT.clear() } lstT.iterator }).map((_, 1)).reduceByKey(_ + _).sortByKey().saveAsTextFile("hdfs://hdfscluster/tmp/logs/") }
批量处理模板方法, 核心处理方法为外部方法
def process_outer(lst: List[(Int, Int)]) = { //外部核心处理逻辑,如Request请求等 RequestUtil.postJson("http://xxx", "{paraData}", 1000) } def batchProces_processOuter(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = { val fooCount = sc.longAccumulator("fooCount") //自定义RDD,此处为demo val dataRDD = sc.makeRDD(List(1, 2), numPartitions) dataRDD.foreachPartition(iterator => { val rawData = iterator.toList var lstT = new ListBuffer[(Int, Int)]() rawData.foreach(v => { if (lstT.size < 50) { lstT.append((v, 1)) } else { //每50处理一次 process_outer(lstT.toList) fooCount.add(lstT.size) lstT.clear() } }) //剩余的继续处理 if (lstT.size > 0) { process_outer(lstT.toList) fooCount.add(lstT.size) lstT.clear() } }); println("total =>" + fooCount.value) }
针对文本文件RDD的一些处理逻辑:
//针对单个文件,每行数据超长的情况, 先对行进行拆分,再重新分区,将数据交给多个executor去执行 def bigLine(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = { val fileRDD = sc.textFile("hdfs://hdfscluster/tmp/logs/abc.txt", numPartitions) //对于长文本, 先拆分,然后重新分区,提高并发机器利用率, 减少job执行时间 fileRDD.flatMap(_.split(",")).repartition(24).foreach(println(_)) } //针对无规律零散路径,循环内部使用sc def handlerPath_lingsan(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = { val rawPath: List[String] = List("hdfs://hdfscluster/tmp1/path1", "hdfs://hdfscluster/tmp2/path2", "hdfs://hdfscluster/tmp3/path3") val lsResult = rawPath.flatMap(v => { sc.textFile(v).map((_, 1)).collect().toList }).toList.foreach(println(_)) } //针对文件夹, def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = { //按行输出指定文件夹下所有文件,分区有效 val txtRDD = sc.textFile("hdfs://hdfscluster/tmp1/*", numPartitions) //重新分区,便于输出结果 txtRDD.map((_, 1)).repartition(1) .saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3") } //针对文件夹,且路径下文件数量比较多且比较小的情况 def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = { //返回结果key=文件路径,val=文件内容, 如果content太大的话,容易造成OOM val dirRDD = sc.wholeTextFiles("hdfs://hdfscluster/tmp1/*", numPartitions) dirRDD.flatMap(v => { v._2.split(System.lineSeparator()).map((_, 1)) }).repartition(1).saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3") }
//java scala转换
def java_scala_collection_convert = { var lstT = new ListBuffer[Int]() //注意java,scala转换 import scala.collection.JavaConverters._ val lstBack = SensitiveDevice.batchDecrypt(lstT.toList.asJava).asScala }