zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之小文件合并(二十九)

      不多说,直接上代码。

         Hadoop 自身提供了几种机制来解决相关的问题,包括HAR,SequeueFile和CombineFileInputFormat。

    Hadoop 自身提供的几种小文件合并机制

    Hadoop HAR

            将众多小文件打包成一个大文件进行存储,并且打包后原来的文件仍然可以通过Map-reduce进行操作,打包后的文件由索引和存储两大部分组成

            缺点:一旦创建就不能修改,也不支持追加操作,还不支持文档压缩,当有新文件进来以后,需要重新打包。
     
     
     

    SequeuesFile

            Sequence file由一系列的二进制key/value组成,如果key为小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。

            优缺点:对小文件的存取都比较自由,也不限制用户和文件的多少,但是该方法不能使用append方法,所以适合一次性写入大量小文件的场景。
     
     
     

    CombineFileInputFormat

            CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split作为输入,而不是通常使用一个文件作为输入。另外,它会考虑数据的存储位置。

         目前很多公司采用的方法就是在数据进入 Hadoop 的 HDFS 系统之前进行合并(也是本博文这方法),一般效果较上述三种方法明显。

     

     代码版本1

    MergeSmallFilesToHDFS .java
    package zhouls.bigdata.myMapReduce.MergeSmallFiles;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.io.IOUtils;
    /**
    * function 合并小文件至 HDFS 
    * 
    *
    */
    public class MergeSmallFilesToHDFS {
    private static FileSystem fs = null;
    private static FileSystem local = null;
    /**
    * @function main 
    * @param args
    * @throws IOException
    * @throws URISyntaxException
    */
    public static void main(String[] args) throws IOException,
    URISyntaxException {
    list();
    }
    
    /**
    * 
    * @throws IOException
    * @throws URISyntaxException
    */
    public static void list() throws IOException, URISyntaxException {
    // 读取hadoop文件系统的配置
    Configuration conf = new Configuration();
    //文件系统访问接口
    URI uri = new URI("hdfs://HadoopMaster:9000");
    //创建FileSystem对象aa
    fs = FileSystem.get(uri, conf);
    // 获得本地文件系统
    local = FileSystem.getLocal(conf);
    //过滤目录下的 svn 文件
    FileStatus[] dirstatus = local.globStatus(new Path("./data/mergeSmallFiles/*"),new RegexExcludePathFilter("^.*svn$"));
    //获取73目录下的所有文件路径
    Path[] dirs = FileUtil.stat2Paths(dirstatus);
    FSDataOutputStream out = null;
    FSDataInputStream in = null;
    for (Path dir : dirs) {
    String fileName = dir.getName().replace("-", "");//文件名称
    //只接受日期目录下的.txt文件a
    FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
    // 获得日期目录下的所有文件
    Path[] listedPaths = FileUtil.stat2Paths(localStatus);
    //输出路径
    Path block = new Path("hdfs://HadoopMaster:9000/tv/"+ fileName + ".txt");
    // 打开输出流
    out = fs.create(block);    
    for (Path p : listedPaths) {
    in = local.open(p);// 打开输入流
    IOUtils.copyBytes(in, out, 4096, false); // 复制数据
    // 关闭输入流
    in.close();
    }
    if (out != null) {
    // 关闭输出流a
    out.close();
    }
    }
    
    }
    
    /**
    * 
    * @function 过滤 regex 格式的文件
    *
    */
    public static class RegexExcludePathFilter implements PathFilter {
    private final String regex;
    
    public RegexExcludePathFilter(String regex) {
    this.regex = regex;
    }
    
    @Override
    public boolean accept(Path path) {
    // TODO Auto-generated method stub
    boolean flag = path.toString().matches(regex);
    return !flag;
    }
    
    }
    
    /**
    * 
    * @function 接受 regex 格式的文件
    *
    */
    public static class RegexAcceptPathFilter implements PathFilter {
    private final String regex;
    
    public RegexAcceptPathFilter(String regex) {
    this.regex = regex;
    }
    
    @Override
    public boolean accept(Path path) {
    // TODO Auto-generated method stub
    boolean flag = path.toString().matches(regex);
    return flag;
    }
    
    }
    }

     代码版本2

    MergeSmallFilesToHDFS .java
    package zhouls.bigdata.myMapReduce.MergeSmallFiles;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.io.IOUtils;
    /**
     * function 合并小文件至 HDFS     ,  文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
     * @author 小讲
     *
     */
    public class MergeSmallFilesToHDFS {
        private static FileSystem fs = null;
        private static FileSystem local = null;
        /**
         * @function main 
         * @param args
         * @throws IOException
         * @throws URISyntaxException
         */
        public static void main(String[] args) throws IOException,
                URISyntaxException {
            list();
        }
    
        /**
         * 
         * @throws IOException
         * @throws URISyntaxException
         */
        public static void list() throws IOException, URISyntaxException {
            // 读取hadoop文件系统的配置
            Configuration conf = new Configuration();
            //文件系统访问接口
            URI uri = new URI("hdfs://master:9000");
            
    //        URL、URI与Path三者的区别
    //        Hadoop文件系统中通过Hadoop Path对象来代表一个文件    
    //        URL(相当于绝对路径)    ->   (文件) ->    URI(相当于相对路径,即代表URL前面的那一部分)
    //        URI:如hdfs://master:9000
    //        如,URL.openStream
            
            
            
            //获得FileSystem实例,即HDFS
            fs = FileSystem.get(uri, conf);
                
            
            //获得FileSystem实例,即Local
            local = FileSystem.getLocal(conf);
    //            为什么要获取到Local呢,因为,我们要把本地D盘下data/73目录下的文件要合并后,上传到HDFS里,所以,我们需先获取到Local,再来做合并工作啦!
            
            
            //过滤目录下的 svn 文件,globStatus从第一个参数通配符合到文件,剔除满足第二个参数到结果,因为PathFilter中accept是return!  
            FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));//一般这是隐藏文件,所以得排除
    //        ^表示匹配我们字符串开始的位置               *代表0到多个字符                        $代表字符串结束的位置
    //        RegexExcludePathFilter来只排除我们不需要的,即svn格式
    //        RegexExcludePathFilter这个方法我们自己写
            
    //        但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
            
            //获取73目录下的所有文件路径,注意FIleUtil中stat2Paths()的使用,它将一个FileStatus对象数组转换为Path对象数组。
            Path[] dirs = FileUtil.stat2Paths(dirstatus);//dirstatus是FileStatus数组类型
            
            
            FSDataOutputStream out = null;//输出流
            FSDataInputStream in = null;//输入流
            
    //        很多人搞不清输入流和输出流,!!!!
    //        其实啊,输入流、输出流都是针对内存的
    //        往内存里写,是输入流。
    //        内存往文件里写,是输出Luis。
    //        
    //        比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
    //           =>   则文件A写到内存里,叫输入流。
    //           =>    则内存里写到文件B,叫输出流    
            
            
            for (Path dir : dirs) {//for星型循环,即将dirs是Path对象数组,一一传给Path dir
                
                String fileName = dir.getName().replace("-", "");//文件名称
    //                        即获取到如2012-09-17,然后经过replace("-", ""),得到20120917
                
                
                //只接受日期目录下的.txt文件,^匹配输入字符串的开始位置,$匹配输入字符串的结束位置,*匹配0个或多个字符。
                FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
    //            FileStatus[] localStatus = local.listStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));//试试,看有什么区别?出现错误的!为什么?
    
        
                        //RegexAcceptPathFilter这个方法,我们自己写
    //            RegexAcceptPathFilter来只接收我们需要,即txt格式
    //            这里,我们还可以只接收别的格式,自己去改,一定要锻炼学会改别人的代码
                
                
                // 获得如2012-09-17日期目录下的所有文件
                Path[] listedPaths = FileUtil.stat2Paths(localStatus);
    //            同样,但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
                
                
                
                //输出路径
                Path block = new Path("hdfs://master:9000/outData/MergeSmallFilesToHDFS/"+ fileName + ".txt");
                
                
                // 打开输出流
                out = fs.create(block);//因为,合并小文件之后,比如这是,合并2012-09-17日期目录下的所有小文件,之后,要上传到HDFS里。
    //                类似于,文件A写到内存里,再内存里写到文件B。而这行代码out = fs.create(block);是相当于是,内存里写到文件B。所以是输出流,即是从内存里输出的,所以叫输出流。
    //                            这里,文件A是Local                文件B是HDFS
    
    //                                        文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
                
    //            很多人搞不清输入流和输出流,!!!!
    //            其实啊,输入流、输出流都是针对内存的
    //            往内存里写,是输入流。
    //            内存往文件里写,是输出Luis。
    //            
    //            比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
    //               =>   则文件A写到内存里,叫输入流。
    //               =>    则内存里写到文件B,叫输出流    
                
                
                for (Path p : listedPaths) {//for星型循环,即将listedPaths的值一一传给Path p
                    in = local.open(p);// 打开输入流in
    //                类似于,文件A写到内存里,再内存里写到文件B。而这行代码in = local.open(p);是相当于是,文件A写到内存里。所以是输如流,即是写到内存里的,所以叫输入流。
    //                    这里,文件A是Local                文件B是HDFS
                    
                    IOUtils.copyBytes(in, out, 4096, false); // 复制数据,IOUtils.copyBytes可以方便地将数据写入到文件,不需要自己去控制缓冲区,也不用自己去循环读取输入源。false表示不自动关闭数据流,那么就手动关闭。
    //                IOUtils.copyBytes这个方法很重要
                                    //是否自动关闭输入流和输出流,若是false,就要单独去关闭。则不在这个方法体里关闭输入和输出流了。
    //                                                     若是true,则在这个方法里关闭输入和输出流。不需单独去关闭了
                    
                    
    //                明白,IOUtils类的copyBytes将hdfs数据流拷贝到标准输出流System.out中,
    //                copyBytes前两个参数好理解,一个输入,一个输出,第三个是缓存大小,第四个指定拷贝完毕后是否关闭流。
    //                要设置为false,标准输出流不关闭,我们要手动关闭输入流。即,设置为false表示关闭输入流
                    
    //                主要是把最后的这个参数定义好, 就可以了。 定义为true还是false,则决定着是否在这个方法体里关闭
    //                若定义为true,则在这个方法体里直接关闭输入流、输出流。不需单独去关闭了
    //                若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了
                    
                    
                    // 关闭输入流
                    in.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输入流!!!懂了吗
                }
                if (out != null) {//这里为什么不为空,空指针,则说明里面还有资源。
                    // 关闭输出流
                    out.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输出流!!!懂了吗
                }
            }
            
        }
    
        /**
         * 
         * @function 过滤 regex 格式的文件
         *
         */
        public static class RegexExcludePathFilter implements PathFilter {
            private final String regex;//变量
    
            public RegexExcludePathFilter(String regex) {//这个是上面的那个,正在表达式
                this.regex = regex;//将String regex的值,赋给RegexExcludePathFilter类里的private final String regex的值
            }
    
            public boolean accept(Path path) {//主要是实现accept方法
                // TODO Auto-generated method stub
                boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*svn$
                return !flag;//如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。
            }
    
        }
    
        /**
         * 
         * @function 接受 regex 格式的文件
         *
         */
        public static class RegexAcceptPathFilter implements PathFilter {
            private final String regex;//变量
    
            public RegexAcceptPathFilter(String regex) {//这个是上面的那个,正在表达式
                this.regex = regex;//将String regex的值,赋给RegexAcceptPathFilter类里的private final String regex的值
            }
    
            public boolean accept(Path path) {//主要是实现accept方法
                // TODO Auto-generated method stub
                boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*txt$
                return flag;//如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。
            }
    
        }
    }
  • 相关阅读:
    30条MySQL优化总结
    安装了最新的pycharm不能自动补全的解决方法。路径中输入变成¥的解决方法
    You should consider upgrading via the 'python -m pip install --upgrade pip' command解决方法
    B站MySQL学习之笔记
    B站MySQL学习之job_grades表数据
    B站MySQL学习之student数据
    Faker 造相对真实的测试数据
    浏览器地址栏输入url后的过程
    cookie、session及token的区别
    Get和Post的区别
  • 原文地址:https://www.cnblogs.com/zlslch/p/6171460.html
Copyright © 2011-2022 走看看