一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:

二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FSDataOutputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 10 public class putMerge { 11 public static void main(String[] args) throws IOException { 12 Configuration conf = new Configuration(); 13 FileSystem hdfs = FileSystem.get(conf); 14 FileSystem local = FileSystem.getLocal(conf); 15 16 Path inputDir = new Path(args[0]); 17 Path hdfsFile = new Path(args[1]); 18 19 try { 20 FileStatus[] inputFiles = local.listStatus(inputDir); 21 FSDataOutputStream out = hdfs.create(hdfsFile); 22 23 for (int i = 0; i < inputFiles.length; i++) { 24 System.out.println(inputFiles[i].getPath().getName()); 25 FSDataInputStream in = local.open(inputFiles[i].getPath()); 26 byte buffer[] = new byte[256]; 27 int bytesRead = 0; 28 while ((bytesRead = in.read(buffer)) > 0) { 29 out.write(buffer, 0, bytesRead); 30 } 31 in.close(); 32 } 33 out.close(); 34 } catch (IOException e) { 35 e.printStackTrace(); 36 } 38 } 39 }
[root@JueFan pconline]#hadoop jar putMerge.jar putMerge /home/juefan/*(本地目录名) /user/juefan/(HDFS文件系统目录)
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FSDataOutputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 10 public class filesmerge { 11 public static boolean isRecur = false; 12 13 public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs,FSDataOutputStream out) { 14 try { 15 FileStatus[] inputFiles = hdfs.listStatus(inputDir); 16 for (int i = 0; i < inputFiles.length; i++) { 17 if (!hdfs.isFile(inputFiles[i].getPath())) { 18 if (isRecur){ 19 merge(inputFiles[i].getPath(), hdfsFile, hdfs,out); 20 return ; 21 } 22 else { 23 System.out.println(inputFiles[i].getPath().getName() 24 + "is not file and not allow recursion, skip!"); 25 continue; 26 } 27 } 28 System.out.println(inputFiles[i].getPath().getName()); 29 FSDataInputStream in = hdfs.open(inputFiles[i].getPath()); 30 byte buffer[] = new byte[256]; 31 int bytesRead = 0; 32 while ((bytesRead = in.read(buffer)) > 0) { 33 out.write(buffer, 0, bytesRead); 34 } 35 in.close(); 36 } 37 out.close(); 38 } catch (IOException e) { 39 e.printStackTrace(); 40 } 41 } 42 43 public static void errorMessage(String str) { 44 System.out.println("Error Message: " + str); 45 System.exit(1); 46 } 47 48 public static void main(String[] args) throws IOException { 49 if (args.length == 0) 50 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 51 if (args[0].matches("^-[rR]$")) { 52 isRecur = true; 53 } 54 if ((isRecur && args.length != 3) || ( !isRecur && args.length != 2)) { 55 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 56 } 57 58 Configuration conf = new Configuration(); 59 FileSystem hdfs = FileSystem.get(conf); 60 61 Path inputDir; 62 Path hdfsFile; 63 if(isRecur){ 64 inputDir = new Path(args[1]); 65 hdfsFile = new Path(args[2]); 66 } 67 else{ 68 inputDir = new Path(args[0]); 69 hdfsFile = new Path(args[1]); 70 } 71 72 if (!hdfs.exists(inputDir)) { 73 errorMessage("hdfsTargetDir not exist!"); 74 } 75 if (hdfs.exists(hdfsFile)) { 76 errorMessage("hdfsFileName exist!"); 77 } 78 79 FSDataOutputStream out = hdfs.create(hdfsFile); 80 merge(inputDir, hdfsFile, hdfs,out); 81 System.exit(0); 82 } 83 }
其弊端主要有:1、可操作性弱,要来回折腾HDFS与本地上的文件 2、zip文件解压出来加载回HDFS后占用的空间较大
1 import java.io.File; 2 import java.io.IOException; 3 import java.util.zip.GZIPOutputStream; 4 import java.util.zip.ZipEntry; 5 import java.util.zip.ZipInputStream; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FSDataInputStream; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 import org.apache.hadoop.fs.FileStatus; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.Text; 14 15 public class filesmerge { 16 //判断是否递归执行 17 public static boolean isRecur = false; 18 19 /** 20 * @author JueFan 21 * @param inputDir zip文件的存储地址 22 * @param hdfsFile 解压结果的存储地址 23 * @param hdfs 分布式文件系统数据流 24 * @param pcgroupText 需要解压缩的文件关键名 25 */ 26 public static void merge(Path inputDir, Path hdfsFile, 27 FileSystem hdfs, Text pcgroupText) { 28 try { 29 //文件系统地址inputDir下的FileStatus 30 FileStatus[] inputFiles = hdfs.listStatus(inputDir); 31 for (int i = 0; i < inputFiles.length; i++) { 32 if (!hdfs.isFile(inputFiles[i].getPath())) { 33 if (isRecur){ 34 merge(inputFiles[i].getPath(), hdfsFile, hdfs,pcgroupText); 35 return ; 36 } 37 else { 38 System.out.println(inputFiles[i].getPath().getName() 39 + "is not file and not allow recursion, skip!"); 40 continue; 41 } 42 } 43 //判断文件名是否在需要解压缩的关键名内 44 if(inputFiles[i].getPath().getName().contains(pcgroupText.toString()) == true){ 45 //输出待解压的文件名 46 System.out.println(inputFiles[i].getPath().getName()); 47 //将数据流指向待解压文件 48 FSDataInputStream in = hdfs.open(inputFiles[i].getPath()); 49 /** 50 *数据的解压执行过程 51 */ 52 ZipInputStream zipInputStream = null; 53 try{ 54 zipInputStream = new ZipInputStream(in); 55 ZipEntry entry; 56 //解压后有多个文件一并解压出来并实现合并 57 //合并后的地址 58 FSDataOutputStream mergerout = hdfs.create(new Path(hdfsFile + File.separator + 59 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")))); 60 while((entry = zipInputStream.getNextEntry()) != null){ 61 byte[] buffer1 = new byte[2048]; 62 int nNumber; 63 while((nNumber = zipInputStream.read(buffer1, 64 0, buffer1.length)) != -1) 65 mergerout.write(buffer1, 0, nNumber); 66 } 67 mergerout.flush(); 68 mergerout.close(); 69 zipInputStream.close(); 70 }catch(IOException e){ 71 continue; 72 } 73 in.close(); 74 /** 75 *将解压合并后的数据压缩成gzip格式 76 */ 77 GZIPOutputStream gzipOutputStream = null; 78 try{ 79 FSDataOutputStream outputStream = null; 80 outputStream = hdfs.create(new Path(hdfsFile + File.separator + 81 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")) + ".gz")); 82 FSDataInputStream inputStream = null; 83 gzipOutputStream = new GZIPOutputStream(outputStream); 84 inputStream = hdfs.open(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")))); 85 byte[] buffer = new byte[1024]; 86 int len; 87 while((len = inputStream.read(buffer)) > 0){ 88 gzipOutputStream.write(buffer, 0, len); 89 } 90 inputStream.close(); 91 gzipOutputStream.finish(); 92 gzipOutputStream.flush(); 93 outputStream.close(); 94 }catch (Exception exception){ 95 exception.printStackTrace(); 96 } 97 gzipOutputStream.close(); 98 //删除zip文件解压合并后的临时文件 99 String tempfiles = hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")); 100 try{ 101 if(hdfs.exists(new Path(tempfiles))){ 102 hdfs.delete(new Path(tempfiles), true); 103 } 104 }catch(IOException ie){ 105 ie.printStackTrace(); 106 } 107 } 108 } 109 }catch (IOException e) { 110 e.printStackTrace(); 111 } 112 } 113 114 public static void errorMessage(String str) { 115 System.out.println("Error Message: " + str); 116 System.exit(1); 117 } 118 119 @SuppressWarnings("null") 120 public static void main(String[] args) throws IOException { 121 if (args.length == 0) 122 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 123 if (args[0].matches("^-[rR]$")) { 124 isRecur = true; 125 } 126 if ((isRecur && args.length != 4) || ( !isRecur && args.length != 3)) { 127 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 128 } 129 130 Configuration conf = new Configuration(); 131 FileSystem hdfs = FileSystem.get(conf); 132 133 Path inputDir; 134 Path hdfsFile; 135 Text pcgroupText; 136 if(isRecur){ 137 inputDir = new Path(args[1]); 138 hdfsFile = new Path(args[2]); 139 pcgroupText = new Text(args[3]); 140 } 141 else{ 142 inputDir = new Path(args[0]); 143 hdfsFile = new Path(args[1]); 144 pcgroupText = new Text(args[2]); 145 } 146 147 if (!hdfs.exists(inputDir)) { 148 errorMessage("hdfsTargetDir not exist!"); 149 } 150 if (hdfs.exists(hdfsFile)) { 151 errorMessage("hdfsFileName exist!"); 152 } 153 merge(inputDir, hdfsFile, hdfs, pcgroupText); 154 System.exit(0); 155 } 156 }
[root@JueFan pconline]# hadoop jar zip_to_gzip.jar filesmerge /zip/(待转换文件路径,在HDFS上) /user/juefan/pconline/(转换完成后的文件存储地址,也在HDFS上) pconline(待转换的文件名包含的字符)
如果要实现递归的话,可以在filesmerge后面加上 -r