zoukankan      html  css  js  c++  java
  • 将存储在本地的大量分散的小文件,合并并保存在hdfs文件系统中

    import java.io.BufferedInputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.URI;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
     
    public class PutMerge {
     
        private InputStream in = null;
        private OutputStream out = null;
        private String localPath;
        private String hdfsPath;
     
        public PutMerge(String localPath, String hdfsPath) {
            this.localPath = localPath;
            this.hdfsPath = hdfsPath;
        }
     
        public void start() throws Exception {
            File file = new File(localPath);
            Configuration conf = new Configuration();
     
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            out = fs.create(new Path(hdfsPath));
     
            work(file);
     
            close();
        }
     
        /*
         * 1.根据用户定义的参数设置本地目录和HDFS的目标文件
         * 
         * 2.创建一个输出流写入到HDFS文件
         * 
         * 3.遍历本地目录中的每个文件,打开文件,并读取文件内容,将文件的内容写到HDFS文件中。
         */
        private void work(File file) throws Exception {
            if (file.isFile()) {
                int byteRead = 0;
                byte[] buffer = new byte[256];
                while ((byteRead = in.read(buffer)) > 0) {
                    out.write(buffer, 0, byteRead);
                }
            } else if (file.isDirectory()) {
                File[] files = file.listFiles();
                for (int i = 0; i < files.length; i++) {
                    if (files[i].isFile()) {
                        in = new BufferedInputStream(new FileInputStream(files[i]));
                        int byteRead = 0;
                        byte[] buffer = new byte[256];
                        while ((byteRead = in.read(buffer)) > 0) {
                            out.write(buffer, 0, byteRead);
                        }
                    } else {
                        work(files[i]);
                    }
                }
            }
        }
     
        private void close() throws IOException {
            if (in != null) {
                in.close();
            }
     
            if (out != null) {
                out.close();
            }
        }
     
        /**
         * @param args
         * @throws IOException
         */
        public static void main(String[] args) throws Exception {
            if (args.length < 2) {
                System.out.println("Usage:
    	 " + PutMerge.class.getName()
                        + " [LocalPath] [HDFSPath]");
                System.exit(1);
            }
            new PutMerge(args[0], args[1]).start();
        }
    }
    

      

  • 相关阅读:
    [大山中学模拟赛] 2016.9.17
    [DP优化方法之斜率DP]
    Gengxin讲STL系列——String
    小班讲课之动态规划基础背包问题
    ubuntu安装体验
    小班出题之字符串基础检测
    G
    B
    小项目--反eclass
    树--天平问题
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5027175.html
Copyright © 2011-2022 走看看