zoukankan      html  css  js  c++  java
  • WholeFileInputFormat 和WholeFileRecordReader合并小文件

     

    如果不希望文件被切分,例如判断文件中记录是否有序,可以让minimumSize值大于最大文件的大小,但是文件的大小不能超过blockSize,或者重写FileInputFormat方法isSplitable()返回为false。下面介绍将多个小文件合成一个大的序列文件的例子:

    1)自定义完整文件输入处理类如下:

    Public class WholeFileInputFormat extends FileInputFormat<NullWritable, ByteWritable>

    {

    @override//不得分片

    protected boolean isSplitable(JobContext context,Path file){return false;}

           @override

           public RecordReader<NullWritable,BytesWritable> createRecordReader ( InputSplit split,TaskAttemptContext context )throws IOException,InterruptedException

    {

      WholeFileRecordReader reader=new WholeFileRecordReader();

      reader.initialize(split,context);

      return reader;

    }

    }

    2)自定义完整文件读取类WholeFileRecordReader

    WholeFileRecordReader类通过initialize()方法传入文件信息,然后调用nextKeyValue()方法一次性读取整个文件的内容,通过布尔值processed判断是否读取执行过。其他函数都是返回值。将FileSplit转为一条记录,键为null,值为文件内容。

    package org.edu.bupt.xiaoye.hadooptest;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FSDataInputStream;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.BytesWritable;

    import org.apache.hadoop.io.IOUtils;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.mapreduce.InputSplit;

    import org.apache.hadoop.mapreduce.RecordReader;

    import org.apache.hadoop.mapreduce.TaskAttemptContext;

    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

    /**

     * 继承RecordReader

     * 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。

     * 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片

     * 的所有内容

     * @author Xiaoye

     */

    public class WholeFileRecordReader extends

                  RecordReader<NullWritable, BytesWritable> {

           // 用来盛放传递过来的分片

           private FileSplit fileSplit;

           private Configuration conf;

           //将作为key-value中的value值返回

           private BytesWritable value = new BytesWritable();

           // 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成

           private boolean processed = false;

           /**

            * 设置RecordReader的分片和配置对象。

            */

           @Override

           public void initialize(InputSplit split, TaskAttemptContext context)

                         throws IOException, InterruptedException {

                  this.fileSplit = (FileSplit) split;

                  this.conf = context.getConfiguration();

           }

           /**

            * 核心算法

            * 用来产生key-value值

            * 一次读取整个文件内容注入value对象

            */

           @Override

           public boolean nextKeyValue() throws IOException, InterruptedException {

                  if (!processed) {

                         /*

                          * 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容

                          * 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中

                          */

                         byte[] contents = new byte[(int) fileSplit.getLength()];

                         Path file = fileSplit.getPath();

                         FileSystem fs = file.getFileSystem(conf);

                         FSDataInputStream in = null;

                         try {

                                in = fs.open(file);

                                IOUtils.readFully(in, contents, 0, contents.length);

                                value.set(contents, 0, contents.length);

                         } finally {

                                IOUtils.closeStream(in);

                         }

                         processed = true;

                         return true;

                  }

                  return false;

           }

           @Override

           public NullWritable getCurrentKey() throws IOException,

                         InterruptedException {

                  return NullWritable.get();

           }

           @Override

           public BytesWritable getCurrentValue() throws IOException,

                         InterruptedException {

                  return value;

           }

           @Override

           public float getProgress() throws IOException, InterruptedException {

                  return processed ? 1.0f : 0.0f;

           }

           @Override

           public void close() throws IOException {

                  //do nothing

           }

    3)将若干个小文件打包成顺序文件的mapreduce作业

    通过WholeFileRecordReader类读取所有小文件的内容,以文件名称为输出键,以内容未一条记录,然后合并成一个大的顺序文件。

    public class SmallFilesToSequenceFileConverter extends configured implement Tool

    {

           package com.pzoom.mr.sequence;

    import java.io.IOException;

    import java.util.Random;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.BytesWritable;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.InputSplit;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

    public class SmallFilesToSequenceFileConverter {

    ///定义map函数

           static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

                  private Text filenameKey;

                  //定义设置文件函数

                  @Override

                  protected void setup(Context context) throws IOException,

                  InterruptedException {

                         InputSplit split = context.getInputSplit();

                         Path path = ((FileSplit)split).getPath();

                         filenameKey = new Text(path.toString());

                  }

     //定义map函数

                  @Override

                  protected void map(NullWritable key, BytesWritable value,

                                Context context) throws IOException, InterruptedException {

                         context.write(filenameKey, value);

                  }

    //定义run函数

    @Override

    public int run (String[] args)throws IOException {

           Configuration conf = getConf();

           if(conf==null)

    {

           return -1;

    }

           Job job=JobBuilder.parseInputAndOutput(this,conf,args);

                  job.setInputFormatClass(WholeFileInputFormat.class);

                  job.setOutputFormatClass(SequenceFileOutputFormat.class);输出序列file

                  job.setOutputKeyClass(Text.class);

                  job.setOutputValueClass(BytesWritable.class);

                  job.setMapperClass(SequenceFileMapper.class);

                 

                  return job.waitForCompletion(true)? 0:1;}

    //args传入输入输出路径

     public static void main(String[] args) throws IOException{

                  int exitCode=ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);

    System.exit(exitCode);

                         }

    }

    }

    4)执行小文件合并为大文件的命令

    各参数含义:采用本地配置文件,两个reduces任务,输入文件夹,输出文件夹

    %hadoop jar job.jar SmallFilesToSequenceFileConverter –conf conf/Hadoop-localhost.xml –D mapreduece.job.reduces-2 input/smallfiles output

    5)通过命令来查看输出结果

    %hadoop fs –conf conf/Hadoop-localhost.xml –text output/part-r-00000

     

    输出结果是以小文件路径为键,以内容为值的合并序列文件

     

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

  • 相关阅读:
    TF中的自定义正则项
    [工具]多线程下载 axel
    [算法]kv-memory 表示dense特征
    [code]tensorflow分桶
    [code]Keras API的用法记录
    vim显示下划线不高亮问题
    [代码] kv2sparse
    [代码]并发执行python的例子
    AI算法手册
    [工具]soundflower
  • 原文地址:https://www.cnblogs.com/bclshuai/p/12255279.html
Copyright © 2011-2022 走看看