zoukankan      html  css  js  c++  java
  • HDFS简单编程实例:文件合并

     下图显示了HDFS文件系统中路径为“localhost:50070/explorer.html#/user/hadoop”的目录中所有的文件信息:

    对于该目录下的所有文件,我们将执行以下操作:

    首先,从该目录中过滤出所有后缀名不为".abc"的文件。

    然后,对过滤之后的文件进行读取。

    最后,将这些文件的内容合并到文件“hdfs://localhost:9000/user/hadoop/merge.txt”中。

    代码如下:

     1 package mergeFile;
     2 
     3 import java.io.IOException;
     4 import java.io.PrintStream;
     5 import java.net.URI;
     6 
     7 import org.apache.hadoop.conf.Configuration;
     8 import org.apache.hadoop.fs.FSDataInputStream;
     9 import org.apache.hadoop.fs.FSDataOutputStream;
    10 import org.apache.hadoop.fs.FileStatus;
    11 import org.apache.hadoop.fs.FileSystem;
    12 import org.apache.hadoop.fs.Path;
    13 import org.apache.hadoop.fs.PathFilter;
    14 
    15 
    16 class myPathFilter implements PathFilter{  //过滤掉文件名满足特定条件的文件
    17     String reg = null;
    18     myPathFilter(String reg){
    19         this.reg = reg;
    20     }
    21     public boolean accept(Path path) {
    22         if(!(path.toString().matches(reg)))
    23             return true;
    24         return false;
    25     }
    26     
    27 }
    28 
    29 public class merge {
    30     Path inputPath = null;  //待合并的文件所在的目录的路径
    31     Path outputPath = null; //输出文件的路径
    32     public merge(String input, String output){
    33         this.inputPath = new Path(input);
    34         this.outputPath = new Path(output);
    35     }
    36     public void doMerge() throws IOException{
    37         Configuration conf = new Configuration();
    38         conf.set("fs.defaultFS","hdfs://localhost:9000" );
    39         conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
    40         
    41         FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf);
    42         FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf);
    43         
    44         FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new myPathFilter(".*\.abc"));   //过滤掉目录中后缀为.abc的文件
    45         FSDataOutputStream fsdos = fsDst.create(outputPath);
    46         
    47         //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
    48         for(FileStatus sta:sourceStatus){
    49             System.out.println("路径: " + sta.getPath() + "   文件大小: " + sta.getLen() + "   权限: " + sta.getPermission() + "  内容: ");
    50             FSDataInputStream fsdis = fsSource.open(sta.getPath());
    51             byte[] data = new byte[1024];
    52             int read = -1;
    53             PrintStream ps = new PrintStream(System.out);
    54             while((read = fsdis.read(data)) > 0){
    55                 ps.write(data, 0, read);
    56                 fsdos.write(data, 0, read);
    57             }
    58         }
    59         fsdos.close();
    60     }
    61     public static void main(String args[]) throws IOException{
    62         merge merge = new merge("hdfs://localhost:9000/user/hadoop/", "hdfs://localhost:9000/user/hadoop/merge.txt");
    63         merge.doMerge();
    64     }
    65 }

    执行结果:

  • 相关阅读:
    bat脚本运行py文件失败(一闪而过)
    python 将日期戳(五位数时间)转换为标准时间
    Pandas 如何通过获取双(多)重索引获取指定行DataFrame数据
    Pandas 横向合并DataFrame数据
    Pandas 删除指定列中为NaN的行
    git 解决push报错:[rejected] master -> master (fetch first) error: failed to push some refs to
    pandas删除包含指定内容的行
    python项目环境的导出、导入
    pandas 修改列名
    Javascript 异步编程的4种方法
  • 原文地址:https://www.cnblogs.com/zyb993963526/p/10222130.html
Copyright © 2011-2022 走看看