zoukankan      html  css  js  c++  java
  • Spark读取HDFS中的Zip文件

    1. 任务背景

    近日有个项目任务,要求读取压缩在Zip中的百科HTML文件,经分析发现,提供的Zip文件有如下特点(=>指代对应解决方案):

    (1) 压缩为分卷文件 => 只需将解压缩在同一目录中的一个分卷zip即可解压缩出整个文件

    (2) 压缩文件中又包含不同的两个文件夹,且各包含n个小zip文件,小zip文件中包含目录及对应的HTML文本文件

    采用第一方案:依次解压缩各小zip文件,存放在一个目录中,然后上传到HDFS中

    存在问题:每个小zip都包含上万个小文件,按照第一方案解压缩,耗费的时间太太太多了

    (3) 解析的zip存在多文件的情况

    (4) 数据总量共计50W

    2. 优化方案

    直接上传小zip文件,然后让Spark直接从zip文件中读取HTML文本,再使用jsoup解析,并存储至elasticsearch中。

    实现过程中有一处需要注意! => 解析zip会遍历的ZipEntry,会识别文件夹和文件夹下的文件,即文件夹和文件在ZipEntry中被当成同类对象来对待。

    例1:本地解析zip压缩文件demo

    import java.io.{BufferedInputStream, BufferedReader, FileInputStream, InputStreamReader}
    import java.util.zip.{ZipFile, ZipInputStream}
    
    import net.sf.json.JSONObject
    import org.jsoup.Jsoup
    
    import scala.collection.mutable
    
    object Test {
    
      def testZip(): Unit = {
        val baseDir = "part2/"
        val path = s"$baseDir\06.zip"
        val zf = new ZipFile(path)
        val in = new BufferedInputStream(new FileInputStream(path))
        val zin = new ZipInputStream(in)
        var zipEn = zin.getNextEntry
        var count = 0
        try {
          while (zipEn != null) {
            if (!zipEn.isDirectory) {
              val buff = new BufferedReader(new InputStreamReader(zf.getInputStream(zipEn)))
              val sb = new StringBuilder()
              var line = buff.readLine()
              while (line != null) {
                count = count + 1
                if (line.nonEmpty) {
                  sb.append(line.trim)
                }
                line = buff.readLine()
              }
              val id = zipEn.getName.substring(zipEn.getName.indexOf("/") + 1, zipEn.getName.indexOf("."))
              val doc = Jsoup.parse(sb.toString())
    
              val title = doc.select(".lemmaWgt-lemmaTitle-title h1").text()
              val sb1 = new mutable.StringBuilder()
              val eles = doc.select(".para")
              for (i <- 0 until eles.size()) {
                sb1.append(eles.get(i).text().trim).append("	")
              }
    
              val json = new JSONObject()
              json.put("id", id)
              json.put("title", title)
              json.put("content", sb1.toString().trim)
              println(json)
              buff.close()
            }
            zipEn = zin.getNextEntry
          }
          zin.closeEntry()
        } catch {
          case _ =>
        }
        println(count)
      }
    
    }
    View Code

    例2:Spark读取HDFS中的含有多文件的zip文件

    def parseBaike(): Unit ={
        val baseDir = "/work/ws/temp/baike"
        val sc = new SparkContext(new SparkConf().setAppName("parseBaike"))
        val rdd = sc.binaryFiles(s"$baseDir/data/*.zip", 40)
            .flatMap{
              case (zipFilePath: String, content: PortableDataStream) => {
                val zis = new ZipInputStream(content.open())
                Stream.continually(zis.getNextEntry)
                  .takeWhile(_ != null)
                  .flatMap(zipEn => {
                    if(zipEn.isDirectory) None
                    else{
                      // 基于文件名获取百科词条的id信息
                      val id = zipEn.getName.substring(zipEn.getName.indexOf("/")+1, zipEn.getName.indexOf("."))
                      val html = scala.io.Source.fromInputStream(zis, "UTF-8").getLines.mkString("")
                      if(html.nonEmpty){
                        val doc = Jsoup.parse(html)
                        // 解析百科中的词条名称
                        val title = doc.select(".lemmaWgt-lemmaTitle-title h1").text()
                        // 获取词条HTML中的全部正文内容
                        val sb = new mutable.StringBuilder()
                        val eles = doc.select(".para")
                        for(i <- 0 until eles.size()){
                          sb.append(eles.get(i).text().trim).append("	")
                        }
                        if(title.trim.nonEmpty && sb.toString.trim.nonEmpty){
                          val json = new JSONObject()
                          json.put("id", id)
                          json.put("title", title)
                          json.put("content", sb.toString().trim)
                          Some(json)
                        }else None
                      }else None
                    }
                  })
              }
            }
        rdd.cache()
        rdd.saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/json"))
        rdd.foreach(f => {
          // 保存在Es中
          ESHelper.saveToEs("baike", "baike", f, "id")
        })
        rdd.unpersist()
        sc.stop()
      }
    View Code

      注意:如上代码仅供参考,并隐去了部分业务相关代码,如HDFS和Es工具类,如若需要,可留言沟通交流!

    3. 参考

    (1)  https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark

    (2) https://stackoverflow.com/questions/32080475/how-to-read-a-zip-containing-multiple-files-in-apache-spark?r=SearchResults

     

     

     

  • 相关阅读:
    js_浏览器对象模型BOM---通过对象来抽象浏览器功能
    js_dom 之事件注册、移除 、pageX
    js组成之dom_dom对象样式操作及运用
    js_组成之DOM_dom对象的注册事件及属性操作
    js_字符串、数组常用方法及应用
    js_内置对象Date Math
    Caffe入门学习(代码实践)
    char和uchar区别
    c/c++中过滤文件路经 后缀
    shell中$(( )) 、 $( ) 、${ }的区别
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/10859584.html
Copyright © 2011-2022 走看看