下图显示了HDFS文件系统中路径为“localhost:50070/explorer.html#/user/hadoop”的目录中所有的文件信息:
对于该目录下的所有文件,我们将执行以下操作:
首先,从该目录中过滤出所有后缀名不为".abc"的文件。
然后,对过滤之后的文件进行读取。
最后,将这些文件的内容合并到文件“hdfs://localhost:9000/user/hadoop/merge.txt”中。
代码如下:
1 package mergeFile; 2 3 import java.io.IOException; 4 import java.io.PrintStream; 5 import java.net.URI; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FSDataInputStream; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 import org.apache.hadoop.fs.FileStatus; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.fs.PathFilter; 14 15 16 class myPathFilter implements PathFilter{ //过滤掉文件名满足特定条件的文件 17 String reg = null; 18 myPathFilter(String reg){ 19 this.reg = reg; 20 } 21 public boolean accept(Path path) { 22 if(!(path.toString().matches(reg))) 23 return true; 24 return false; 25 } 26 27 } 28 29 public class merge { 30 Path inputPath = null; //待合并的文件所在的目录的路径 31 Path outputPath = null; //输出文件的路径 32 public merge(String input, String output){ 33 this.inputPath = new Path(input); 34 this.outputPath = new Path(output); 35 } 36 public void doMerge() throws IOException{ 37 Configuration conf = new Configuration(); 38 conf.set("fs.defaultFS","hdfs://localhost:9000" ); 39 conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); 40 41 FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf); 42 FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf); 43 44 FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new myPathFilter(".*\.abc")); //过滤掉目录中后缀为.abc的文件 45 FSDataOutputStream fsdos = fsDst.create(outputPath); 46 47 //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中 48 for(FileStatus sta:sourceStatus){ 49 System.out.println("路径: " + sta.getPath() + " 文件大小: " + sta.getLen() + " 权限: " + sta.getPermission() + " 内容: "); 50 FSDataInputStream fsdis = fsSource.open(sta.getPath()); 51 byte[] data = new byte[1024]; 52 int read = -1; 53 PrintStream ps = new PrintStream(System.out); 54 while((read = fsdis.read(data)) > 0){ 55 ps.write(data, 0, read); 56 fsdos.write(data, 0, read); 57 } 58 } 59 fsdos.close(); 60 } 61 public static void main(String args[]) throws IOException{ 62 merge merge = new merge("hdfs://localhost:9000/user/hadoop/", "hdfs://localhost:9000/user/hadoop/merge.txt"); 63 merge.doMerge(); 64 } 65 }
执行结果: