zoukankan      html  css  js  c++  java
  • HDFS操作及小文件合并

     

    小文件合并是针对文件上传到HDFS之前

    这些文件夹里面都是小文件

    参考代码

    package com.gong.hadoop2;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.io.IOUtils;
    /**
     * function 合并小文件至 HDFS 
     * @author 小讲
     *
     */
    public class MergeSmallFilesToHDFS {
        private static FileSystem fs = null;
        private static FileSystem local = null;
        /**
         * @function main 
         * @param args
         * @throws IOException
         * @throws URISyntaxException
         */
        public static void main(String[] args) throws IOException,
                URISyntaxException {
            list();
        }
    
        /**
         * 
         * @throws IOException
         * @throws URISyntaxException
         */
        public static void list() throws IOException, URISyntaxException {
            // 读取hadoop文件系统的配置
            Configuration conf = new Configuration();
            //文件系统访问接口
            URI uri = new URI("hdfs://dajiangtai:9000");
            //创建FileSystem对象
            fs = FileSystem.get(uri, conf);
            // 获得本地文件系统
            local = FileSystem.getLocal(conf);
            //过滤目录下的 svn 文件,globStatus从第一个参数通配符合到文件,剔除满足第二个参数到结果,因为PathFilter中accept是return!  
            FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));
            //获取73目录下的所有文件路径,注意FIleUtil中stat2Paths()的使用,它将一个FileStatus对象数组转换为Path对象数组。
            Path[] dirs = FileUtil.stat2Paths(dirstatus);
            FSDataOutputStream out = null;
            FSDataInputStream in = null;
            for (Path dir : dirs) {
                String fileName = dir.getName().replace("-", "");//文件名称
                //只接受日期目录下的.txt文件,^匹配输入字符串的开始位置,$匹配输入字符串的结束位置,*匹配0个或多个字符。
                FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
                // 获得日期目录下的所有文件
                Path[] listedPaths = FileUtil.stat2Paths(localStatus);
                //输出路径
                Path block = new Path("hdfs://dajiangtai:9000/middle/tv/"+ fileName + ".txt");
                // 打开输出流
                out = fs.create(block);            
                for (Path p : listedPaths) {
                    in = local.open(p);// 打开输入流
                    IOUtils.copyBytes(in, out, 4096, false); // 复制数据,IOUtils.copyBytes可以方便地将数据写入到文件,不需要自己去控制缓冲区,也不用自己去循环读取输入源。false表示不自动关闭数据流,那么就手动关闭。
                    // 关闭输入流
                    in.close();
                }
                if (out != null) {
                    // 关闭输出流
                    out.close();
                }
            }
            
        }
    
        /**
         * 
         * @function 过滤 regex 格式的文件
         *
         */
        public static class RegexExcludePathFilter implements PathFilter {
            private final String regex;
    
            public RegexExcludePathFilter(String regex) {
                this.regex = regex;
            }
    
            @Override
            public boolean accept(Path path) {
                // TODO Auto-generated method stub
                boolean flag = path.toString().matches(regex);
                return !flag;
            }
    
        }
    
        /**
         * 
         * @function 接受 regex 格式的文件
         *
         */
        public static class RegexAcceptPathFilter implements PathFilter {
            private final String regex;
    
            public RegexAcceptPathFilter(String regex) {
                this.regex = regex;
            }
    
            @Override
            public boolean accept(Path path) {
                // TODO Auto-generated method stub
                boolean flag = path.toString().matches(regex);
                return flag;
            }
    
        }
    }

    最后一点,分清楚hadoop fs 和dfs的区别

    hadoop fs <args>

    FS涉及可以指向任何文件系统(如本地,HDFS等)的通用文件系统。因此,当您处理不同的文件系统(如本地FS,HFTP FS,S3 FS等)时,可以使用它

    hadoop dfs <args>

    dfs非常具体到HDFS。 将工作与HDFS有关。 这已被弃用,我们应该使用hdfs dfs。


    hdfs dfs <args>

    与第二个相同,即适用于与HDFS相关的所有操作,并且是推荐的命令,而不是hadoop dfs
  • 相关阅读:
    javascript简易下拉菜单效果
    精通javascript笔记(智能社)——简易tab选项卡及应用面向对象方法实现
    精通JS正则表达式(转)
    精通javascript笔记(智能社)——数字时钟
    ERROR: transport error 202: bind failed: Address already in use
    理解git对象
    InvocationHandler中invoke()方法的调用问题
    深入理解Java Proxy机制(转)
    IP、子网的详述 ——IP分类、网关地址,子网掩码、子网作用(转)
    getRequestDispatcher()与sendRedirect()的区别
  • 原文地址:https://www.cnblogs.com/braveym/p/7583218.html
Copyright © 2011-2022 走看看