zoukankan      html  css  js  c++  java
  • spark读取压缩文件

    spark读取压缩文件,对同一个压缩文件内文件进行分布式处理,粒度:文件级

    -| .rar.gz

       -| .gz

       -| .zip

    -| .zip  

       -| .gz

       -| .zip

    使用 sc.binaryFile()得到-> JavaPairRDD<String,PortableDataStream>

    key是压缩文件根目录,PortableDataStream是根目录的二进制流。

    并行化处理:将每个压缩文件根据内部文件拆分成文件流,实现1:n的并行度

     1 // 一个压缩包流,对应多个流,每个流对应一个文件名称
     2     public static JavaPairRDD<PortableDataStream, FilePropertyPojo> getFileListRdd(
     3             JavaPairRDD<String, PortableDataStream> zipRdd) {
     4         return zipRdd.flatMapToPair(tuple2 -> {
     5             List<Tuple2<PortableDataStream, FilePropertyPojo>> targetList = new ArrayList<>();
     6             List<FilePropertyPojo> fileNameList = getFileNameList(tuple2._2);
     7 
     8             for (FilePropertyPojo filePropertyPojo : fileNameList) {
     9 
    10                 targetList.add(new Tuple2<>(tuple2._2, filePropertyPojo));
    11             }
    12             return targetList.iterator();
    13 
    14         });
    15 
    16     }
    17 
    18     private static List<FilePropertyPojo> getFileNameList(PortableDataStream portableDataStream) {
    19         List<FilePropertyPojo> fileNameList = new ArrayList<>();
    20         try {
    21             List<FilePropertyPojo> mrPropertyPojoList = new ArrayList<>();
    22             String path = portableDataStream.getPath();
    23 
    24             String fileCompressMode = path.substring(path.lastIndexOf('.')).toLowerCase();
    25             switch (fileCompressMode) {
    26             case ".gz":
    27                 getFileNameFromGz(portableDataStream, mrPropertyPojoList);
    28                 break;
    29             case ".zip":
    30                 getFileNameFromZip(portableDataStream, mrPropertyPojoList);
    31                 break;
    32 
    33             default:
    34             }
    35             return mrPropertyPojoList;
    36 
    37         } catch (Exception e) {
    38             //
    39         }
    40         return fileNameList;
    41     }
    42 
    43     private static void getFileNameFromGz(PortableDataStream portableDataStream,
    44             List<FilePropertyPojo> mrPropertyPojoList) {
    45         try (TarArchiveInputStream inputStream = new TarArchiveInputStream(
    46                 new GZIPInputStream(portableDataStream.open()))) {
    47             TarArchiveEntry tarArchiveEntry;
    48             while ((tarArchiveEntry = inputStream.getNextTarEntry()) != null) {
    49                 try {
    50                     getEachFileName(mrPropertyPojoList, tarArchiveEntry.getName(), tarArchiveEntry.getSize());
    51 
    52                 } catch (Exception e) {
    53                     //
    54                 }
    55             }
    56         } catch (Exception e) {
    57             //
    58         }
    59     }
    60 
    61     private static void getFileNameFromZip(PortableDataStream portableDataStream,
    62             List<FilePropertyPojo> mrPropertyPojoList) throws IOException {
    63 
    64         try (ZipArchiveInputStream zipArchiveInputStream = new ZipArchiveInputStream(portableDataStream.open())) {
    65             ZipArchiveEntry nextZipEntry;
    66             while ((nextZipEntry = zipArchiveInputStream.getNextZipEntry()) != null) {
    67                 try {
    68                     getEachFileName(mrPropertyPojoList, nextZipEntry.getName(), nextZipEntry.getSize());
    69 
    70                 } catch (Exception e) {
    71                     //
    72                 }
    73             }
    74         }
    75     }
    View Code
  • 相关阅读:
    GdiPlus[34]: IGPGraphicsPath (一)
    GdiPlus[33]: 基本绘图与填充命令
    GdiPlus[29]: IGPPen: 虚线样式
    [征询意见]博客园logo
    [公告]网站恢复正常运行
    [公告]博客园将与博文视点合作推出一系列.NET图书
    [公告]由Bruce Zhang负责博客园的出书工作
    通过Windows Live Alerts订阅博客园首页RSS
    [征询意见]博客园新logo
    [新功能]新闻管理
  • 原文地址:https://www.cnblogs.com/carsonwuu/p/14792548.html
Copyright © 2011-2022 走看看