zoukankan      html  css  js  c++  java
  • hadoop 使用map合并小文件到SequenceFile

    上一例是直接用SequenceFile的createWriter来实现,本例采用mapreduce的方式。

    1、把小文件整体读入需要自定义InputFormat格式,自定义InputFormat格式需要先定义RecordReader读取方式,为了整体读入,RecordReader使用一次性读入所有字节。

    1.1 继承RecordReader泛型,重写这个类。

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class WholeFileRecordReader extends RecordReader<NullWritable,BytesWritable> {
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
        /**
         * Called once at initialization.
         *
         * @param split   the split that defines the range of records to read
         * @param context the information about the task
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            this.fileSplit = (FileSplit) split;
            this.conf = context.getConfiguration();
        }
    
        /**
         * Read the next key, value pair.
         *
         * @return true if a key/value pair was read
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if(!processed){
                byte[] contents = new byte[(int)fileSplit.getLength()];
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try {
                    in = fs.open(file);
                    IOUtils.readFully(in,contents,0,contents.length);//一次全部读取
                    value.set(contents,0,contents.length);
                }finally {
                    IOUtils.closeStream(in);
                }
                processed = true;
                return true;
            }
    
            return false;
        }
    
        /**
         * Get the current key
         *
         * @return the current key or null if there is no current key
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
    
        /**
         * Get the current value.
         *
         * @return the object that was read
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        /**
         * The current progress of the record reader through its data.
         *
         * @return a number between 0.0 and 1.0 that is the fraction of the data read
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return processed ? 1.0f:0.0f;
        }
    
        /**
         * Close the record reader.
         */
        @Override
        public void close() throws IOException {
    
        }
    }

    1.2 继承FileInputFormat泛型,重写文件输入格式

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    
    public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {
        /**
         * Is the given filename splittable? Usually, true, but if the file is
         * stream compressed, it will not be.
         * <p>
         * The default implementation in <code>FileInputFormat</code> always returns
         * true. Implementations that may deal with non-splittable files <i>must</i>
         * override this method.
         * <p>
         * <code>FileInputFormat</code> implementations can override this and return
         * <code>false</code> to ensure that individual input files are never split-up
         * so that  process entire files.
         *
         * @param context  the job context
         * @param filename the file name to check
         * @return is this file splitable?
         */
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;//文件不分片,为了整体读入
        }
    
        /**
         * Create a record reader for a given split. The framework will call
         * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
         * the split is used.
         *
         * @param split   the split to be read
         * @param context the information about the task
         * @return a new record reader
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            WholeFileRecordReader recordReader = new WholeFileRecordReader();
            recordReader.initialize(split,context);
            return recordReader;
        }
    }

    2、MAPPER,不要写reduce,本例只是合并文件。

    public class SequenceFileMapper extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> {
        enum FileCounter {
            FILENUM
        }
        private Text filenameKey;
        /**
         * Called once at the beginning of the task.
         *
         * @param context
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit)split).getPath();
            filenameKey = new Text(path.toString());
        }
    
        /**
         * Called once for each key/value pair in the input split. Most applications
         * should override this, but the default is the identity function.
         *
         * @param key
         * @param value
         * @param context
         */
        @Override
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(filenameKey,value);
            //自定义计数器
            context.getCounter(FileCounter.FILENUM).increment(1);
            //动态计数器
            context.getCounter("FileNameList",filenameKey.toString()).increment(1);
        }
    }

    3、执行job,使用辅助类Tool,也可以不用,直接写job执行就可以。

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
    
        /**
         * Execute the command with the given arguments.
         *
         * @param args command specific arguments.
         * @return exit code.
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            if(conf==null){
                return -1;
            }
    
            Path outPath = new Path(args[1]);
            FileSystem fileSystem = outPath.getFileSystem(conf);
            //删除输出路径
            if(fileSystem.exists(outPath))
            {
                fileSystem.delete(outPath,true);
            }
    
            Job job = Job.getInstance(conf,"SmallFilesToSequenceFile");
            job.setJarByClass(SmallFilesToSequenceFileConverter.class);
    
            job.setMapperClass(SequenceFileMapper.class);
    
            job.setInputFormatClass(WholeFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
    
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
    
            return job.waitForCompletion(true) ? 0:1;
        }
    
        public static void main(String[] args) throws Exception{
            long startTime = System.currentTimeMillis();
    
            int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);
            System.exit(exitCode);
    
            long endTime = System.currentTimeMillis();
            long timeSpan = endTime - startTime;
            System.out.println("运行耗时:"+timeSpan+"毫秒。");
        }
    }

    4、上传集群运行,打包成jar包的时候把META-INF目录和src目录放同级,防止找不到函数入口。

    #手动调整reduce数量为2,运算后会生成两个part
    [hadoop@bigdata-senior01 ~]$ hadoop jar SmallFilesToSequenceFileConverter.jar -D mapreduce.job.reduces=2 /demo /output3


    ...
    [hadoop@bigdata-senior01 ~]$ hadoop fs -ls /output3
    Found 3 items
    -rw-r--r--   1 hadoop supergroup          0 2019-02-18 16:17 /output3/_SUCCESS
    -rw-r--r--   1 hadoop supergroup      60072 2019-02-18 16:17 /output3/part-r-00000
    -rw-r--r--   1 hadoop supergroup      28520 2019-02-18 16:17 /output3/part-r-00001

  • 相关阅读:
    SQL注入的一般步骤及防范方法
    防止SQL注入的五种方法
    document.getElementById("orderform").submit() 提交给了谁?
    页面调试-F12
    rs.last()续
    rs.last()
    14课后习题
    HashMap
    链表
    习题
  • 原文地址:https://www.cnblogs.com/asker009/p/10396364.html
Copyright © 2011-2022 走看看