zoukankan      html  css  js  c++  java
  • Spark读取HDFS中的Zip文件

    1. 任务背景

    近日有个项目任务,要求读取压缩在Zip中的百科HTML文件,经分析发现,提供的Zip文件有如下特点(=>指代对应解决方案):

    (1) 压缩为分卷文件 => 只需将解压缩在同一目录中的一个分卷zip即可解压缩出整个文件

    (2) 压缩文件中又包含不同的两个文件夹,且各包含n个小zip文件,小zip文件中包含目录及对应的HTML文本文件

    采用第一方案:依次解压缩各小zip文件,存放在一个目录中,然后上传到HDFS中

    存在问题:每个小zip都包含上万个小文件,按照第一方案解压缩,耗费的时间太太太多了

    (3) 解析的zip存在多文件的情况

    (4) 数据总量共计50W

    2. 优化方案

    直接上传小zip文件,然后让Spark直接从zip文件中读取HTML文本,再使用jsoup解析,并存储至elasticsearch中。

    实现过程中有一处需要注意! => 解析zip会遍历的ZipEntry,会识别文件夹和文件夹下的文件,即文件夹和文件在ZipEntry中被当成同类对象来对待。

    例1:本地解析zip压缩文件demo

    import java.io.{BufferedInputStream, BufferedReader, FileInputStream, InputStreamReader}
    import java.util.zip.{ZipFile, ZipInputStream}
    
    import net.sf.json.JSONObject
    import org.jsoup.Jsoup
    
    import scala.collection.mutable
    
    object Test {
    
      def testZip(): Unit = {
        val baseDir = "part2/"
        val path = s"$baseDir\06.zip"
        val zf = new ZipFile(path)
        val in = new BufferedInputStream(new FileInputStream(path))
        val zin = new ZipInputStream(in)
        var zipEn = zin.getNextEntry
        var count = 0
        try {
          while (zipEn != null) {
            if (!zipEn.isDirectory) {
              val buff = new BufferedReader(new InputStreamReader(zf.getInputStream(zipEn)))
              val sb = new StringBuilder()
              var line = buff.readLine()
              while (line != null) {
                count = count + 1
                if (line.nonEmpty) {
                  sb.append(line.trim)
                }
                line = buff.readLine()
              }
              val id = zipEn.getName.substring(zipEn.getName.indexOf("/") + 1, zipEn.getName.indexOf("."))
              val doc = Jsoup.parse(sb.toString())
    
              val title = doc.select(".lemmaWgt-lemmaTitle-title h1").text()
              val sb1 = new mutable.StringBuilder()
              val eles = doc.select(".para")
              for (i <- 0 until eles.size()) {
                sb1.append(eles.get(i).text().trim).append("	")
              }
    
              val json = new JSONObject()
              json.put("id", id)
              json.put("title", title)
              json.put("content", sb1.toString().trim)
              println(json)
              buff.close()
            }
            zipEn = zin.getNextEntry
          }
          zin.closeEntry()
        } catch {
          case _ =>
        }
        println(count)
      }
    
    }
    View Code

    例2:Spark读取HDFS中的含有多文件的zip文件

    def parseBaike(): Unit ={
        val baseDir = "/work/ws/temp/baike"
        val sc = new SparkContext(new SparkConf().setAppName("parseBaike"))
        val rdd = sc.binaryFiles(s"$baseDir/data/*.zip", 40)
            .flatMap{
              case (zipFilePath: String, content: PortableDataStream) => {
                val zis = new ZipInputStream(content.open())
                Stream.continually(zis.getNextEntry)
                  .takeWhile(_ != null)
                  .flatMap(zipEn => {
                    if(zipEn.isDirectory) None
                    else{
                      // 基于文件名获取百科词条的id信息
                      val id = zipEn.getName.substring(zipEn.getName.indexOf("/")+1, zipEn.getName.indexOf("."))
                      val html = scala.io.Source.fromInputStream(zis, "UTF-8").getLines.mkString("")
                      if(html.nonEmpty){
                        val doc = Jsoup.parse(html)
                        // 解析百科中的词条名称
                        val title = doc.select(".lemmaWgt-lemmaTitle-title h1").text()
                        // 获取词条HTML中的全部正文内容
                        val sb = new mutable.StringBuilder()
                        val eles = doc.select(".para")
                        for(i <- 0 until eles.size()){
                          sb.append(eles.get(i).text().trim).append("	")
                        }
                        if(title.trim.nonEmpty && sb.toString.trim.nonEmpty){
                          val json = new JSONObject()
                          json.put("id", id)
                          json.put("title", title)
                          json.put("content", sb.toString().trim)
                          Some(json)
                        }else None
                      }else None
                    }
                  })
              }
            }
        rdd.cache()
        rdd.saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/json"))
        rdd.foreach(f => {
          // 保存在Es中
          ESHelper.saveToEs("baike", "baike", f, "id")
        })
        rdd.unpersist()
        sc.stop()
      }
    View Code

      注意:如上代码仅供参考,并隐去了部分业务相关代码,如HDFS和Es工具类,如若需要,可留言沟通交流!

    3. 参考

    (1)  https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark

    (2) https://stackoverflow.com/questions/32080475/how-to-read-a-zip-containing-multiple-files-in-apache-spark?r=SearchResults

     

     

     

  • 相关阅读:
    Java实现 LeetCode 136 只出现一次的数字
    Java实现 LeetCode 136 只出现一次的数字
    Java实现 LeetCode 136 只出现一次的数字
    Java实现 LeetCode 135 分发糖果
    Java实现 LeetCode 135 分发糖果
    Java实现 LeetCode 135 分发糖果
    Java实现 LeetCode 134 加油站
    Java实现 LeetCode 134 加油站
    Java实现 LeetCode 134 加油站
    Java实现 LeetCode 133 克隆图
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/10859584.html
Copyright © 2011-2022 走看看