zoukankan      html  css  js  c++  java
  • Spark 扫描 HDFS lzo/gz/orc异常压缩文件

    一、问题背景

    考虑到 Hadoop 3.0.0 的新特性 EC 码,HDFS 在存储数据时能获得很好的压缩比,同时 Hadoop 2.6.0 集群 HDFS 存储压力较大,我们将 Hadoop 2.6.0 集群的数据冷备到 Hadoop 3.0.0,来缓解 HDFS 存储的压力,但在冷备操作进行了一段时间后,用户反馈数据读取存在异常报错,先花了一些时间根据异常信息从集群层面去排查问题,但都于事无补。后续根据对比数据在冷备前和冷备后的区别,发现文件的本身属性已经破坏的,也就是在 distcp 过程中文件被异常损坏了,但我们又没有进行文件的完整性校验(之前同版本集群冷备都是正常的),导致有一部分数据已经损坏,而且还补可修复,因此我们要做两件事,一件事是针对已冷备的数据,找出所有异常的文件,剔除掉保证用户任务正常执行,另一件事修改 distcp 拷贝数据的协议,不再依赖 hdfs 协议,而是改为 webhdfs 协议,摆脱不同集群版本之间差异的影响。

    而本文主要是解决第一件事,即扫描出所有已冷备的异常文件。在集群数据中,目前发现出错的文件主要是 orc、lzo 和 gz 三种格式的压缩文件,因此开发相应的程序来找出所有的异常文件(具体如何恢复数据还有待研究。下面是访问三类压缩文件报错的异常信息。

    # 读 orc 文件异常信息
    Caused by: java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 3043758
    
    # 读 lzo 文件异常信息
    Caused by: java.io.IOException: Invalid LZO header
    
    # 读 gz 文件异常信息
    Caused by: java.io.IOException: invalid stored block lengths

    二、Spark-shell 脚本方式扫描

    前期是尝试开发 MapReduce 程序去实现,但在代码编写中发现并不太好实现,后续尝试用 Spark 来实现,最直接的方式也就是通过 spark-shell 去读取不同格式文件,如下也是通过 spark-shell 去读取三类压缩文件(包括正常文件和异常文件)的命令,对于异常文件,均能正常复现出上面的三种异常信息,也就是说通过这种方式访问是可行的。

    # 读取 orc 文件(正常文件)
    sqlContext.read.orc("/user/11085245/orc/000124_0").count()
    
    # 读取 orc 文件(异常文件)
    sqlContext.read.orc("/user/11085245/orc/000097_0").count()
    
    
    # 读取 lzo 文件(正常文件)
    sc.newAPIHadoopFile("/user/11085245/lzo/part-00415.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]).map(_._2.toString).count()
    
    # 读取 lzo 文件(异常文件)
    sc.newAPIHadoopFile("/user/11085245/lzo/part-00040.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]).map(_._2.toString).count()
    
    
    # 读取 gz 文件(正常文件)
    sc.textFile("/user/11085245/gz/000096_0.gz").count()
    
    # 读取 gz 文件(异常文件)
    sc.textFile("/user/11085245/gz/000100_0.gz").count()

    三、Spark 项目方式扫描

    3.1 代码开发

    在章节二中用 spark-shell 能够复现出异常信息,对于要扫描给定 HDFS 目录下所有文件的完整性,顺理成章地也就想到用 Spark 工程化的代码(这里选择用 scala 语言)去实现这一逻辑。其实主要就是三个流程:一是创建 scala 工程,二是根据 HDFS 目录列出目录下所有文件,三个根据单个文件调用类似 spark-shell 的 api 去操作文件,并输出异常压缩的文件。

    idea 工具创建 scala 工程代码参考 https://blog.csdn.net/qq_32575047/article/details/103045641

    具体的实现代码如下:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.hive.HiveContext
    import java.net.URI
    import org.apache.hadoop.io.{LongWritable,Text}
    import com.hadoop.mapreduce.LzoTextInputFormat
    
    object FileScan {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf()
        sparkConf.setAppName("SparkFileScan")
        val sc =  new SparkContext(sparkConf)
        val hiveContext = new HiveContext(sc)
        if (args.length != 1) {
          println("------Please type correct input-----------")
          sc.stop()
          sys.exit(-1)
        }
        // 获取给定的 HDFS 目录
        val path = args.head
    
        // 针对单个文件依次扫描,并输出异常文件
        listFile(path).foreach( patha =>
          try {
            if (patha.contains(".lzo")) {
              sc.newAPIHadoopFile[LongWritable, Text, LzoTextInputFormat](patha).map(_._2.toString).count()
            } else if (patha.contains(".gz")) {
              sc.textFile(patha).count()
            } else if (patha.contains("SUCCESS")) {
              // success 文件不作处理
            } else {
              hiveContext.read.orc(patha).count()
            }
          } catch {
            case e: Exception =>
              println(patha)
              e.printStackTrace()
          }
        )
        sc.stop()
      }
    
      /**
        * 生成 FileSystem 对象
        */
      def getHdfs(path: String): FileSystem = {
        val conf = new Configuration();
        FileSystem.newInstance(URI.create(path), conf)
      }
    
      /**
        * 获取目录下的一级文件
        */
      def listFile(path: String): Array[String] = {
        val hdfs = getHdfs(path)
        val fs = hdfs.listStatus(new Path(path))
        FileUtil.stat2Paths(fs).filter(hdfs.getFileStatus(_).isFile()).map(_.toString)
      }
    }

    对应的 xml 依赖文件:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.kwang.bigdata</groupId>
      <artifactId>SparkFileScan</artifactId>
      <version>1.0-SNAPSHOT</version>
      <name>${project.artifactId}</name>
    
      <properties>
        <spark.version>1.6.0</spark.version>
        <scala.version>2.10.5</scala.version>
        <hadoop.version>2.6.0-cdh5.14.0</hadoop.version>
      </properties>
    
      <dependencies>
          <!-- scala 依赖 -->
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
    
        <!-- spark依赖 -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.10</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <!-- Hadoop 依赖 -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
    
        <!-- hadoop lzo 依赖 -->
        <dependency>
          <groupId>org.anarres.lzo</groupId>
          <artifactId>lzo-hadoop</artifactId>
          <version>1.0.6</version>
        </dependency>
    
      </dependencies>
    </project>

    至此,代码工作已开发完成,接下来就是如何编译打包运行项目了。

    3.2 项目运行

    运行项目执行当然是要把工程打包呀,打包方式如下:

    mvn clean package

    打包好了当然就是在 Hadoop 集群上去运行 Spark 项目咯,运行方式如下。

    spark-submit --master yarn --deploy-mode client --executor-memory 2g --queue root.exquery --class com.vivo.bigdata.FileScan SparkFileScan-1.0-SNAPSHOT.jar hdfs://nameservice/user/11085245/file/ >errfile
     
    参数说明:
    hdfs://nameservice/user/11085245/file/:要扫描的HDFS目录
    errfile:输出的异常文件列表

    至此,整个项目开发的工作基本完成了。第一次写项目开发代码,从如何创建 scala 项目,到写下 scala 的第一行代码,到编译运行项目,摸爬滚打,整个流程都打通了,功能不复杂,但还挺有意思也小有成就感的。^_^

    【参考资料】

    1. https://blog.csdn.net/qq_32575047/article/details/103045641

    2. https://stackoverflow.com/questions/62565953/pyspark-read-orc-files-with-new-schema (Spark 读取 orc 文件)

    3. http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-LZO-file-in-Spark-td29382.html  (Spark 读取 lzo 文件)

    4. https://blog.csdn.net/dkl12/article/details/84312307 (Spark 获取 HDFS 目录下文件)

  • 相关阅读:
    装饰器模式
    mockups 安装
    单例设计模式
    css优先级问题
    GPU的nvadiasmi解析
    conda配置镜像并安装gpu版本pytorch和tensorflow2
    索引的优化
    [转]必须掌握的八个【cmd 命令行】
    BDE莫名的不自动COMMIT问题
    窗体控件笔记
  • 原文地址:https://www.cnblogs.com/lemonu/p/14251968.html
Copyright © 2011-2022 走看看