import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class PutMerge { private InputStream in = null; private OutputStream out = null; private String localPath; private String hdfsPath; public PutMerge(String localPath, String hdfsPath) { this.localPath = localPath; this.hdfsPath = hdfsPath; } public void start() throws Exception { File file = new File(localPath); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); out = fs.create(new Path(hdfsPath)); work(file); close(); } /* * 1.根据用户定义的参数设置本地目录和HDFS的目标文件 * * 2.创建一个输出流写入到HDFS文件 * * 3.遍历本地目录中的每个文件,打开文件,并读取文件内容,将文件的内容写到HDFS文件中。 */ private void work(File file) throws Exception { if (file.isFile()) { int byteRead = 0; byte[] buffer = new byte[256]; while ((byteRead = in.read(buffer)) > 0) { out.write(buffer, 0, byteRead); } } else if (file.isDirectory()) { File[] files = file.listFiles(); for (int i = 0; i < files.length; i++) { if (files[i].isFile()) { in = new BufferedInputStream(new FileInputStream(files[i])); int byteRead = 0; byte[] buffer = new byte[256]; while ((byteRead = in.read(buffer)) > 0) { out.write(buffer, 0, byteRead); } } else { work(files[i]); } } } } private void close() throws IOException { if (in != null) { in.close(); } if (out != null) { out.close(); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws Exception { if (args.length < 2) { System.out.println("Usage: " + PutMerge.class.getName() + " [LocalPath] [HDFSPath]"); System.exit(1); } new PutMerge(args[0], args[1]).start(); } }