zoukankan      html  css  js  c++  java
  • Hadoop MapReduce InputFormat/OutputFormat

    InputFormat

    import java.io.IOException;
    import java.util.List;
    
    /**
     * InputFormat describes the input-specification for a Map-Reduce job.
     * 
     * The Map-Reduce framework relies on the InputFormat of the job to:
     * 
     * Validate the input-specification of the job.
     * 
     * Split-up the input file(s) into logical InputSplits, each of which is then
     * assigned to an individual Mapper.
     * 
     * Provide the RecordReader implementation to be used to glean input records
     * from the logical InputSplit for processing by the Mapper.
     * 
     * The default behavior of file-based InputFormats, typically sub-classes of
     * FileInputFormat, is to split the input into logical InputSplits based on the
     * total size, in bytes, of the input files. However, the FileSystem blocksize
     * of the input files is treated as an upper bound for input splits. A lower
     * bound on the split size can be set via mapred.min.split.size.
     * 
     * Clearly, logical splits based on input-size is insufficient for many
     * applications since record boundaries are to respected. In such cases, the
     * application has to also implement a RecordReader on whom lies the
     * responsibility to respect record-boundaries and present a record-oriented
     * view of the logical InputSplit to the individual task.
     * 
     */
    public abstract class InputFormat<K, V> {
    
        /**
         * Logically split the set of input files for the job.
         * 
         * <p>
         * Each {@link InputSplit} is then assigned to an individual {@link Mapper}
         * for processing.
         * </p>
         * 
         * <p>
         * <i>Note</i>: The split is a <i>logical</i> split of the inputs and the
         * input files are not physically split into chunks. For e.g. a split could
         * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
         * also creates the {@link RecordReader} to read the {@link InputSplit}.
         * 
         * @param context
         *            job configuration.
         * @return an array of {@link InputSplit}s for the job.
         */
        public abstract List<InputSplit> getSplits(JobContext context)
                throws IOException, InterruptedException;
    
        /**
         * Create a record reader for a given split. The framework will call
         * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
         * the split is used.
         * 
         * @param split
         *            the split to be read
         * @param context
         *            the information about the task
         * @return a new record reader
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract RecordReader<K, V> createRecordReader(InputSplit split,
                TaskAttemptContext context) throws IOException,
                InterruptedException;
    
    }

    OutputFormat

    import java.io.IOException;
    
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.RecordReader;
    
    /**
     * <code>InputSplit</code> represents the data to be processed by an individual
     * {@link Mapper}.
     * 
     * <p>
     * Typically, it presents a byte-oriented view on the input and is the
     * responsibility of {@link RecordReader} of the job to process this and present
     * a record-oriented view.
     * 
     * @see InputFormat
     * @see RecordReader
     */
    public abstract class InputSplit {
    
        /**
         * Get the size of the split, so that the input splits can be sorted by
         * size.
         * 
         * @return the number of bytes in the split
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract long getLength() throws IOException, InterruptedException;
    
        /**
         * Get the list of nodes by name where the data for the split would be
         * local. The locations do not need to be serialized.
         * 
         * @return a new array of the node nodes.
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract String[] getLocations() throws IOException,
                InterruptedException;
    
    }

    RecordReader

    import java.io.Closeable;
    import java.io.IOException;
    
    /**
     * The record reader breaks the data into key/value pairs for input to the
     * {@link Mapper}.
     * 
     * @param <KEYIN>
     * @param <VALUEIN>
     */
    public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    
        /**
         * Called once at initialization.
         * 
         * @param split
         *            the split that defines the range of records to read
         * @param context
         *            the information about the task
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException;
    
        /**
         * Read the next key, value pair.
         * 
         * @return true if a key/value pair was read
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract boolean nextKeyValue() throws IOException,
                InterruptedException;
    
        /**
         * Get the current key
         * 
         * @return the current key or null if there is no current key
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract KEYIN getCurrentKey() throws IOException,
                InterruptedException;
    
        /**
         * Get the current value.
         * 
         * @return the object that was read
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract VALUEIN getCurrentValue() throws IOException,
                InterruptedException;
    
        /**
         * The current progress of the record reader through its data.
         * 
         * @return a number between 0.0 and 1.0 that is the fraction of the data
         *         read
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract float getProgress() throws IOException,
                InterruptedException;
    
        /**
         * Close the record reader.
         */
        public abstract void close() throws IOException;
    
    }

    OutputFormat

    import java.io.IOException;
    
    import org.apache.hadoop.fs.FileSystem;
    
    /**
     * <code>OutputFormat</code> describes the output-specification for a Map-Reduce
     * job.
     * 
     * <p>
     * The Map-Reduce framework relies on the <code>OutputFormat</code> of the job
     * to:
     * <p>
     * <ol>
     * <li>
     * Validate the output-specification of the job. For e.g. check that the output
     * directory doesn't already exist.
     * <li>
     * Provide the {@link RecordWriter} implementation to be used to write out the
     * output files of the job. Output files are stored in a {@link FileSystem}.</li>
     * </ol>
     * 
     * @see RecordWriter
     */
    public abstract class OutputFormat<K, V> {
    
        /**
         * Get the {@link RecordWriter} for the given task.
         * 
         * @param context
         *            the information about the current task.
         * @return a {@link RecordWriter} to write the output for the job.
         * @throws IOException
         */
        public abstract RecordWriter<K, V> getRecordWriter(
                TaskAttemptContext context) throws IOException,
                InterruptedException;
    
        /**
         * Check for validity of the output-specification for the job.
         * 
         * <p>
         * This is to validate the output specification for the job when it is a job
         * is submitted. Typically checks that it does not already exist, throwing
         * an exception when it already exists, so that output is not overwritten.
         * </p>
         * 
         * @param context
         *            information about the job
         * @throws IOException
         *             when output should not be attempted
         */
        public abstract void checkOutputSpecs(JobContext context)
                throws IOException, InterruptedException;
    
        /**
         * Get the output committer for this output format. This is responsible for
         * ensuring the output is committed correctly.
         * 
         * @param context
         *            the task context
         * @return an output committer
         * @throws IOException
         * @throws InterruptedException
         */
        public abstract OutputCommitter getOutputCommitter(
                TaskAttemptContext context) throws IOException,
                InterruptedException;
    
    }

    RecordWriter

    import java.io.IOException;
    
    import org.apache.hadoop.fs.FileSystem;
    
    /**
     * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs to an
     * output file.
     * 
     * <p>
     * <code>RecordWriter</code> implementations write the job outputs to the
     * {@link FileSystem}.
     * 
     * @see OutputFormat
     */
    public abstract class RecordWriter<K, V> {
    
        /**
         * Writes a key/value pair.
         * 
         * @param key
         *            the key to write.
         * @param value
         *            the value to write.
         * @throws IOException
         */
        public abstract void write(K key, V value) throws IOException,
                InterruptedException;
    
        /**
         * Close this <code>RecordWriter</code> to future operations.
         * 
         * @param context
         *            the context of the task
         * @throws IOException
         */
        public abstract void close(TaskAttemptContext context) throws IOException,
                InterruptedException;
    
    }

    OutputCommitter

    import java.io.IOException;
    
    /**
     * <code>OutputCommitter</code> describes the commit of task output for a
     * Map-Reduce job.
     * 
     * <p>
     * The Map-Reduce framework relies on the <code>OutputCommitter</code> of the
     * job to:
     * <p>
     * <ol>
     * <li>
     * Setup the job during initialization. For example, create the temporary output
     * directory for the job during the initialization of the job.</li>
     * <li>
     * Cleanup the job after the job completion. For example, remove the temporary
     * output directory after the job completion.</li>
     * <li>
     * Setup the task temporary output.</li>
     * <li>
     * Check whether a task needs a commit. This is to avoid the commit procedure if
     * a task does not need commit.</li>
     * <li>
     * Commit of the task output.</li>
     * <li>
     * Discard the task commit.</li>
     * </ol>
     * 
     * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
     * @see JobContext
     * @see TaskAttemptContext
     * 
     */
    public abstract class OutputCommitter {
    
        /**
         * For the framework to setup the job output during initialization
         * 
         * @param jobContext
         *            Context of the job whose output is being written.
         * @throws IOException
         *             if temporary output could not be created
         */
        public abstract void setupJob(JobContext jobContext) throws IOException;
    
        /**
         * For cleaning up the job's output after job completion
         * 
         * @param jobContext
         *            Context of the job whose output is being written.
         * @throws IOException
         */
        public abstract void cleanupJob(JobContext jobContext) throws IOException;
    
        /**
         * Sets up output for the task.
         * 
         * @param taskContext
         *            Context of the task whose output is being written.
         * @throws IOException
         */
        public abstract void setupTask(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * Check whether task needs a commit
         * 
         * @param taskContext
         * @return true/false
         * @throws IOException
         */
        public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * To promote the task's temporary output to final output location
         * 
         * The task's output is moved to the job's output directory.
         * 
         * @param taskContext
         *            Context of the task whose output is being written.
         * @throws IOException
         *             if commit is not
         */
        public abstract void commitTask(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * Discard the task output
         * 
         * @param taskContext
         * @throws IOException
         */
        public abstract void abortTask(TaskAttemptContext taskContext)
                throws IOException;
    
    }
  • 相关阅读:
    Golang之redis
    Golang之Socket
    Golang之单元测试
    Golang之定时器,recover
    Python深度学习之安装theano(windows)
    电容充放电时间常数RC计算方法(转载)
    输入阻抗的理解(转载)
    浮点数的二进制表示
    modbus-poll和modbus-slave工具的学习使用——modbus协议功能码04的解析——04读输入寄存器
    STM32调试中遇到的工具困难(转载+整理)
  • 原文地址:https://www.cnblogs.com/yurunmiao/p/3558511.html
Copyright © 2011-2022 走看看