zoukankan      html  css  js  c++  java
  • Hadoop对小文件的解决方式

    小文件指的是那些size比HDFS的block size(默认64M)小的多的文件。不论什么一个文件,文件夹和block,在HDFS中都会被表示为一个object存储在namenode的内存中, 每一个object占用150 bytes的内存空间。

    所以,假设有10million个文件, 每一个文件相应一个block,那么就将要消耗namenode 3G的内存来保存这些block的信息。

    假设规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限。


    控制小文件的方法有:

    1、应用程序自己控制
    2、archive
    3、Sequence File / Map File
    4、CombineFileInputFormat***
    5、合并小文件,如HBase部分的compact

    1、应用程序自己控制

    final Path path = new Path("/combinedfile");
    final FSDataOutputStream create = fs.create(path);
    final File dir = new File("C:\Windows\System32\drivers\etc");
    for(File fileName : dir.listFiles()) 
    {
        System.out.println(fileName.getAbsolutePath());
        final FileInputStream fileInputStream = new
    FileInputStream(fileName.getAbsolutePath());
        final List<String> readLines = IOUtils.readLines(fileInputStream);
        for (String line : readLines) 
        {
            create.write(line.getBytes());
        }
        fileInputStream.close();
    }
    create.close();

    2、archive 命令行操作

    详细參考例如以下:
    http://blog.csdn.net/scgaliguodong123_/article/details/46341587

    3、Sequence File/Map File

    Sequence File
    通常对于”the small files problem”的回应会是:使用SequenceFile。
    这样的方法是说,使用filename作为key,而且file contents作为value。实践中这样的方式非常管用。
    假设有10000个100KB的文件,能够写一个程序来将这些小文件写入到一个单独的 SequenceFile中去,然后就能够在一个streaming fashion(directly or using mapreduce)中来使用这个sequenceFile

    不仅如此,SequenceFiles也是splittable的。所以mapreduce 能够break them into chunks,而且分别的被独立的处理。

    和HAR不同的是,这样的方式还支持压缩。 block的压缩在很多情况下都是最好的选择,由于它将多个 records压缩到一起,而不是一个record一个压缩。

    在存储结构上, SequenceFile主要由一个Header后跟多条Record组成。
    Header主要包括了Key classname, Value classname。存储压缩算法。用户自己定义元数据等信息,此外,还包括了一些同步标识,用于高速定位到记录的边界。

    每条Record以键值对的方式进行存储。用来表示它的字符数组可依次解析成:记录的长度、 Key的长度、 Key值和Value值。而且Value值的结构取决于该记录是否被压缩。

    数据压缩有利于节省磁盘空间和加快网络传输, SeqeunceFile支持两种格式的数据压缩。各自是: record compression和block compression。


    record compression是对每条记录的value进行压缩
    block compression是将一连串的record组织到一起。统一压缩成一个block。
    block信息主要存储了:块所包括的记录数、每条记录Key长度的集合、每条记录Key值的集合、每条记录Value长度的集合和每条记录Value值的集合
    注:每一个block的大小是可通过io.seqfile.compress.blocksize属性来指定的。

    Configuration conf=new Configuration();
    FileSystem fs=FileSystem.get(conf);
    Path seqFile=new Path("seqFile.seq");
    //Reader内部类用于文件的读取操作
    SequenceFile.Reader reader=new SequenceFile.Reader(fs,seqFile,conf);
    //Writer内部类用于文件的写操作,假设Key和Value都为Text类型
    SequenceFile.Writer writer=new SequenceFile.Writer(fs,conf,seqFile,Text.class,Text.class);
    //通过writer向文档中写入记录
    writer.append(new Text("key"),new Text("value"));
    IOUtils.closeStream(writer);//关闭write流
    //通过reader从文档中读取记录
    Text key=new Text();
    Text value=new Text();
    while(reader.next(key,value))
    {
        System.out.println(key);
        System.out.println(value);
    }
    IOUtils.closeStream(reader);//关闭read流

    详细可參考:
    http://blog.csdn.net/scgaliguodong123_/article/details/46391061

    MapFile
    MapFile是排序后的SequenceFile,通过观察其文件夹结构能够看到
    MapFile由两部分组成。各自是data和index。
    index作为文件的数据索引。主要记录了每一个Record的key值,以及
    该Record在文件里的偏移位置。
    在MapFile被訪问的时候,索引文件会被载入到内存,通过索引映射关系可迅速定位到指定Record所在文件位置。因此,相对SequenceFile而言, MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。
    注意的是。 MapFile并不会把全部Record都记录到index中去,默认情况下每隔128条记录存储一个索引映射。当然,记录间隔可人为改动,通过MapFIle.Writer的setIndexInterval()方法,或改动io.map.index.interval属性;
    另外,与SequenceFile不同的是。 MapFile的KeyClass一定要实现
    WritableComparable接口 ,即Key值是可比較的。

    Configuration conf=new Configuration();
    FileSystem fs=FileSystem.get(conf);
    Path mapFile=new Path("mapFile.map");
    //Writer内部类用于文件的写操作,假设Key和Value都为Text类型
    MapFile.Writer writer=new MapFile.Writer(conf,fs,mapFile.toString(),Text.class,Text.class);
    //通过writer向文档中写入记录
    writer.append(new Text("key"),new Text("value"));
    IOUtils.closeStream(writer);//关闭write流
    //Reader内部类用于文件的读取操作
    MapFile.Reader reader=new MapFile.Reader(fs,mapFile.toString(),conf);
    //通过reader从文档中读取记录
    Text key=new Text();
    Text value=new Text();
    while(reader.next(key,value))
    {
        System.out.println(key);
        System.out.println(value);
    }
    IOUtils.closeStream(reader);//关闭read流

    5、CombineFileInputFormat

    相对于大量的小文件来说。hadoop更合适处理少量的大文件。
    CombineFileInputFormat能够缓解这个问题,它是针对小文件而设计的。


    **注:**CombineFileInputFormat是一个抽象类。须要编写一个继承类。
    使用CombineFileInputFormat作为Map任务的输入规格描写叙述,首先须要实现一个自己定义的RecordReader。

    CombineFileInputFormat的大致原理
    它会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,由于小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包括了一组文件Block。包括每一个文件的起始偏移(offset),长度(length)。Block位置(localtions)等元数据。

    假设想要处理一个 CombineFileSplit。非常easy想到。对其包括的每一个InputSplit(实际上这里面没有这个,你须要读取一个小文件块的时候,须要构造一 个FileInputSplit对象)。
    在运行MapReduce任务的时候,须要读取文件的文本行(简单一点是文本行。也可能是其它格式数据)。
    那么对于CombineFileSplit来说,你须要处理其包括的小文件Block,就要相应设置一个RecordReader,才干正确读取文件数据内容。
    通常情况下,我们有一批小文件,格式一般是同样的,仅仅须要在CombineFileSplit实现一个RecordReader的时候,
    内置还有一个用来读取小文件Block的RecordReader,这样就能保证读取CombineFileSplit内部聚积的小文件。

    我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,须要做的工作,例如以下所看到的:
    1、实现一个RecordReader来读取CombineFileSplit包装的文件Block
    2、继承自CombineFileInputFormat实现一个使用我们自己定义的RecordReader的输入规格说明类。
    3、处理数据的Mapper实现类
    4、配置用来处理海量小文件的MapReduce Job

    package SmallFile;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    
    public class CombineSmallfileInputFormat extends
                CombineFileInputFormat<LongWritable,BytesWritable>
    {
        @Override
        public RecordReader<LongWritable, BytesWritable> createRecordReader(
                InputSplit split, TaskAttemptContext context) throws IOException
        {
            CombineFileSplit combineFileSplit = (CombineFileSplit)(split);
            CombineFileRecordReader<LongWritable,BytesWritable> recordReader = 
                new CombineFileRecordReader<LongWritable,BytesWritable>
                (combineFileSplit, context,CombineSmallfileRecordReader.class);
            try
            {
                recordReader.initialize(combineFileSplit, context);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            return recordReader;
        }
    }
    
    
    class CombineSmallfileRecordReader extends RecordReader<LongWritable,BytesWritable>
    {
        private CombineFileSplit combineFileSplit;
        private LineRecordReader lineRecordReader = new LineRecordReader();
        private Path[] paths;
        private int totalLength;
        private int currentIndex;
        private float currentProgress = 0;
        private LongWritable currentKey;
        private BytesWritable currentValue;
    
        public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit,TaskAttemptContext context,Integer index)
        {
            super();
            this.combineFileSplit = combineFileSplit;
            this.currentIndex = index;
        }
    
         @Override
         public void initialize(InputSplit split, TaskAttemptContext context)
           throws IOException, InterruptedException
         {
              FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex),
                      combineFileSplit.getOffset(currentIndex),combineFileSplit.getLength(currentIndex),
                      combineFileSplit.getLocations());
              lineRecordReader.initialize(fileSplit, context);
              this.paths = combineFileSplit.getPaths(); //分区所在的全部地址
              context.getConfiguration().set("map.input.file.name", 
                      combineFileSplit.getPath(currentIndex).getName()); //设置输入文件名称
         }
    
         @Override
         public boolean nextKeyValue() throws IOException, InterruptedException
         {
             if(currentIndex>=0 && currentIndex<totalLength)
             {
                 return lineRecordReader.nextKeyValue();
             }
             return false;
         }
    
         @Override
         public LongWritable getCurrentKey() throws IOException, InterruptedException
         {
              currentKey = lineRecordReader.getCurrentKey();
              return currentKey;
         }
    
         @Override
         public BytesWritable getCurrentValue() throws IOException, InterruptedException
         {
              byte[]value = lineRecordReader.getCurrentValue().getBytes();
              currentValue.set(value, 0, value.length);
              return currentValue;
         }
    
         @Override
         public float getProgress() throws IOException, InterruptedException
         {
              if(currentIndex>=0 && currentIndex<totalLength)
              {
                   currentProgress = currentIndex/totalLength;
                   return currentProgress;
              }
              return currentProgress;
         }
    
         @Override
         public void close() throws IOException
         {
             lineRecordReader.close();
         }
    }
  • 相关阅读:
    c 语言 运算符 优先级
    回文字符串个数
    最小操作数
    将一个二叉树转化为双向链表,不开辟新空间
    两个整数集合的交集 ———— 腾讯2014软件开发笔试题目
    python download
    Spring5.2.X源代码编译-问题-Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/context]
    Spring5.2.X源代码编译-问题-找不到CoroutinesUtils
    Spring5.2.X源代码编译
    入行四年的思考
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/5337060.html
Copyright © 2011-2022 走看看