zoukankan      html  css  js  c++  java
  • spark textfile rdd 日记

    批量处理模板方法, 核心处理方法为内部方法

      def batchProces(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
    
        //自定义RDD,此处为demo
        val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
        dataRDD.mapPartitions(iterator => {
    
          val rawData = iterator.toList
          var lstT = new ListBuffer[(Int, Int)]()
    
          rawData.foreach(v => {
            if (lstT.size < 50) {
              lstT.append((v, 1))
            } else {
              //每50处理一次
              procesData()
            }
          })
    
          //剩余的继续处理
          procesData()
    
          //批量处理逻辑
          def procesData() = {
    
            //核心处理逻辑
            // doProcess
            //很重要
            lstT.clear()
          }
    
          lstT.iterator
    
        }).map((_, 1)).reduceByKey(_ + _).sortByKey().saveAsTextFile("hdfs://hdfscluster/tmp/logs/")
      }

    批量处理模板方法, 核心处理方法为外部方法

      def process_outer(lst: List[(Int, Int)]) = {
        //外部核心处理逻辑,如Request请求等
        RequestUtil.postJson("http://xxx", "{paraData}", 1000)
      }
    
      def batchProces_processOuter(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
        val fooCount = sc.longAccumulator("fooCount")
        //自定义RDD,此处为demo
        val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
        dataRDD.foreachPartition(iterator => {
    
    
          val rawData = iterator.toList
          var lstT = new ListBuffer[(Int, Int)]()
    
          rawData.foreach(v => {
            if (lstT.size < 50) {
              lstT.append((v, 1))
            } else {
              //每50处理一次
              process_outer(lstT.toList)
              fooCount.add(lstT.size)
              lstT.clear()
            }
          })
    
          //剩余的继续处理
          if (lstT.size > 0) {
            process_outer(lstT.toList)
            fooCount.add(lstT.size)
            lstT.clear()
          }
        });
        println("total =>" + fooCount.value)
      }

     

    针对文本文件RDD的一些处理逻辑:

      //针对单个文件,每行数据超长的情况, 先对行进行拆分,再重新分区,将数据交给多个executor去执行
      def bigLine(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
        val fileRDD = sc.textFile("hdfs://hdfscluster/tmp/logs/abc.txt", numPartitions)
    
        //对于长文本, 先拆分,然后重新分区,提高并发机器利用率, 减少job执行时间
        fileRDD.flatMap(_.split(",")).repartition(24).foreach(println(_))
      }
    
      //针对无规律零散路径,循环内部使用sc
      def handlerPath_lingsan(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
        val rawPath: List[String] = List("hdfs://hdfscluster/tmp1/path1", "hdfs://hdfscluster/tmp2/path2", "hdfs://hdfscluster/tmp3/path3")
        val lsResult = rawPath.flatMap(v => {
          sc.textFile(v).map((_, 1)).collect().toList
        }).toList.foreach(println(_))
      }
    
      //针对文件夹, 
      def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
        //按行输出指定文件夹下所有文件,分区有效
        val txtRDD = sc.textFile("hdfs://hdfscluster/tmp1/*", numPartitions)
        //重新分区,便于输出结果
        txtRDD.map((_, 1)).repartition(1)
          .saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")
      }
    
      //针对文件夹,且路径下文件数量比较多且比较小的情况 
      def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
    
        //返回结果key=文件路径,val=文件内容, 如果content太大的话,容易造成OOM
        val dirRDD = sc.wholeTextFiles("hdfs://hdfscluster/tmp1/*", numPartitions)
        dirRDD.flatMap(v => {
          v._2.split(System.lineSeparator()).map((_, 1))
        }).repartition(1).saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")
    
      }

    //java scala转换

      def java_scala_collection_convert = {
        var lstT = new ListBuffer[Int]()
        //注意java,scala转换
        import scala.collection.JavaConverters._
        val lstBack = SensitiveDevice.batchDecrypt(lstT.toList.asJava).asScala
      }
  • 相关阅读:
    SQL强化
    JSP深入
    会话Session和cookie
    SQL快速入门
    小程序 + node koa2 session存储验证码碰到最大的坑,(喜极而泣 /狗头)
    mongoose 数据如果存在更新 不存在插入
    vue 相同路由不同参数跳转时,不刷新
    因为自己的小服务器读取太慢,弄一个本地的mysql
    git 提交代码不想包括那个文件,比如node_modules
    js中 ! 和 !!
  • 原文地址:https://www.cnblogs.com/snow-man/p/13686099.html
Copyright © 2011-2022 走看看