zoukankan      html  css  js  c++  java
  • Hadoop学习之路(二十二)MapReduce的输入和输出

    MapReduce的输入

    作为一个会编写MR程序的人来说,知道map方法的参数是默认的数据读取组件读取到的一行数据

    1、是谁在读取? 是谁在调用这个map方法?

    查看源码Mapper.java知道是run方法在调用map方法。

     1 /**
     2      * 
     3      * 找出谁在调用Run方法
     4      * 
     5      * 
     6      * 有一个组件叫做:MapTask
     7      * 
     8      * 就会有对应的方法在调用mapper.run(context);
     9      * 
    10      * 
    11      * context.nextKeyValue() ====== lineRecordReader.nextKeyValue();
    12      */
    13     public void run(Context context) throws IOException, InterruptedException {
    14 
    15         /**
    16          * 在每一个mapTask被初始化出来的时候,就会被调用一次
    17          */
    18         setup(context);
    19         try {
    20 
    21             /**
    22              * 数据读取组件每次读取到一行,都交给map方法执行一次
    23              * 
    24              * 
    25              * context.nextKeyValue()的意义有连点:
    26              * 
    27              * 1、读取一个key-value到该context对象中的两个属性中:key-value
    28              * 2、方法的返回值并不是读取到的key-value,是标志有没有读取到key_value的布尔值
    29              * 
    30              * 
    31              * context.getCurrentKey() ==== key
    32              * context.getCurrentValue() ==== value
    33              * 
    34              * 
    35              * 
    36              * 依赖于最底层的 LineRecordReader的实现
    37              * 
    38              * 你的nextKeyValue方法的返回结果中,一定要包含 false
    39              */
    40             while (context.nextKeyValue()) {
    41                 map(context.getCurrentKey(), context.getCurrentValue(), context);
    42             }
    43 
    44         } finally {
    45 
    46             /**
    47              * 当前这个mapTask在执行完毕所有的该切片数据之后,会执行
    48              */
    49             cleanup(context);
    50         }
    51     }

    此处map方法中有四个重要的方法:

    1、context.nextKeyValue(); //负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值

    2、context.getCurrentKey(); //负责获取context.nextKeyValue() 读取到的key

    3、context.getCurrentValue(); //负责获取context.nextKeyValue() 读取到的value

    4、context.write(key,value); //负责输出mapper阶段输出的数据

    2、谁在调用run方法?context参数怎么来的,是什么?

    共同答案:找到了谁在调用run方法,那么就能知道这个谁就会给run方法传入一个参数叫做:context

    最开始,mapper.run(context)是由mapTask实例对象进行调用

    查看源码MapTask.java

     1 @Override
     2     public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     3             throws IOException, ClassNotFoundException, InterruptedException {
     4         this.umbilical = umbilical;
     5 
     6         if (isMapTask()) {
     7             // If there are no reducers then there won't be any sort. Hence the
     8             // map
     9             // phase will govern the entire attempt's progress.
    10             if (conf.getNumReduceTasks() == 0) {
    11                 mapPhase = getProgress().addPhase("map", 1.0f);
    12             } else {
    13                 // If there are reducers then the entire attempt's progress will
    14                 // be
    15                 // split between the map phase (67%) and the sort phase (33%).
    16                 mapPhase = getProgress().addPhase("map", 0.667f);
    17                 sortPhase = getProgress().addPhase("sort", 0.333f);
    18             }
    19         }
    20         TaskReporter reporter = startReporter(umbilical);
    21 
    22         boolean useNewApi = job.getUseNewMapper();
    23         initialize(job, getJobID(), reporter, useNewApi);
    24 
    25         // check if it is a cleanupJobTask
    26         if (jobCleanup) {
    27             runJobCleanupTask(umbilical, reporter);
    28             return;
    29         }
    30         if (jobSetup) {
    31             runJobSetupTask(umbilical, reporter);
    32             return;
    33         }
    34         if (taskCleanup) {
    35             runTaskCleanupTask(umbilical, reporter);
    36             return;
    37         }
    38 
    39         /**
    40          * run方法的核心:
    41          * 
    42          * 新的API
    43          */
    44 
    45         if (useNewApi) {
    46             /**
    47              * jobConf对象, splitMetaInfo 切片信息 umbilical 通信协议
    48              * reporter就是包含了各种计数器的一个对象
    49              */
    50             runNewMapper(job, splitMetaInfo, umbilical, reporter);
    51         } else {
    52             runOldMapper(job, splitMetaInfo, umbilical, reporter);
    53         }
    54 
    55         done(umbilical, reporter);
    56     }

    得出伪代码调动新的API

    1        mapTask.run(){
    2                 runNewMapper(){
    3                     mapper.run(mapperContext);
    4                 }
    5             }

    3、查看runNewMapper方法

    发现此方法还是在MapTask.java中

      1 /**
      2      * 这就是具体的调用逻辑的核心;
      3      * 
      4      * 
      5      * mapper.run(context);
      6      * 
      7      * 
      8      * 
      9      * @param job
     10      * @param splitIndex
     11      * @param umbilical
     12      * @param reporter
     13      * @throws IOException
     14      * @throws ClassNotFoundException
     15      * @throws InterruptedException
     16      */
     17     @SuppressWarnings("unchecked")
     18     private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex,
     19             final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
     20             throws IOException, ClassNotFoundException, InterruptedException {
     21         // make a task context so we can get the classes
     22         org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(
     23                 job, getTaskID(), reporter);
     24         // make a mapper
     25         org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>) ReflectionUtils
     26                 .newInstance(taskContext.getMapperClass(), job);
     27         
     28         
     29         
     30         
     31         /**
     32          * inputFormat.createRecordReader() === RecordReader real
     33          * 
     34          * 
     35          * inputFormat就是TextInputFormat类的实例对象
     36          * 
     37          * TextInputFormat组件中的createRecordReader方法的返回值就是  LineRecordReader的实例对象
     38          */
     39         // make the input format
     40         org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat = 
     41                 (org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>) 
     42                 ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     43         
     44         
     45         
     46         
     47         
     48         // rebuild the input split
     49         org.apache.hadoop.mapreduce.InputSplit split = null;
     50         split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
     51         LOG.info("Processing split: " + split);
     52 
     53         /**
     54          * NewTrackingRecordReader这个类中一定有三个方法:
     55          * 
     56          * nextKeyValue
     57          * getCurrentKey
     58          * getCurrentValue
     59          * 
     60          * NewTrackingRecordReader的里面的三个方法的实现
     61          * 其实是依赖于于inputFormat对象的createRecordReader方法的返回值的  三个方法的实现
     62          * 
     63          * 默认的InputFormat: TextInputFormat
     64          * 默认的RecordReader:LineRecordReader
     65          * 
     66          * 
     67          * 最终:NewTrackingRecordReader的三个方法的实现是依赖于:LineRecordReader这个类中的三个同名方法的实现
     68          */
     69         org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = 
     70                 new NewTrackingRecordReader<INKEY, INVALUE>(
     71                 split, inputFormat, reporter, taskContext);
     72 
     73         job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     74         
     75         
     76         
     77         
     78         
     79         /**
     80          * 声明一个Output对象用来给mapper的key-value进行输出
     81          */
     82         org.apache.hadoop.mapreduce.RecordWriter output = null;
     83         // get an output object
     84         if (job.getNumReduceTasks() == 0) {
     85             
     86             /**
     87              * NewDirectOutputCollector  直接输出的一个收集器,  这个类中一定有一个方法 叫做  write
     88              */
     89             output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
     90         } else {
     91             
     92             
     93             /**
     94              * 有reducer阶段了。
     95              * 
     96              *         1、能确定,一定会排序
     97              * 
     98              *         2、能否确定一定会使用Parititioner,  不一定。     在逻辑上可以任务没有起作用。
     99              * 
    100              * NewOutputCollector 这个类当中,一定有一个方法:write方法
    101              */
    102             output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    103         }
    104         
    105         
    106         
    107         
    108 
    109         /**
    110          *  mapContext对象中一定包含三个方法
    111          *  
    112          *  找到了之前第一查看源码实现的方法的问题的答案:
    113          *  
    114          *      问题:找到谁调用MapContextImpl这个类的构造方法
    115          *  
    116          *      mapContext就是MapContextImpl的实例对象
    117          *      
    118          *      MapContextImpl类中一定有三个方法:
    119          *      
    120          *      input  ===  NewTrackingRecordReader
    121          *      
    122          *      
    123          *      
    124          *      确定的知识:
    125          *      
    126          *      1、mapContext对象中,一定有write方法
    127          *      
    128          *      2、通过观看MapContextImpl的组成,发现其实没有write方法
    129          *      
    130          *      解决:
    131          *      
    132          *      其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类
    133          *      
    134          *      
    135          *      
    136          *      最底层的write方法:  output.write();
    137          */
    138         org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = 
    139                 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
    140                 job, getTaskID(), input, output, committer, reporter, split);
    141 
    142         /**
    143          * mapperContext的内部一定包含是三个犯法:
    144          * 
    145          * nextKeyValue
    146          * getCurrentKey
    147          * getCurrentValue
    148          * 
    149          * mapperContext的具体实现是依赖于new Context(context);
    150          * context = mapContext
    151          * 
    152          * 结论:
    153          * 
    154          * mapContext对象的内部一定包含以下三个方法:
    155          * 
    156          * nextKeyValue
    157          * getCurrentKey
    158          * getCurrentValue
    159          * 
    160          * 
    161          * mapContext 中 也有一个方法叫做:write(key,value)
    162          */
    163         org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = 
    164                 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>()
    165                 .getMapContext(mapContext);
    166 
    167         try {
    168             
    169             
    170             
    171             
    172             input.initialize(split, mapperContext);
    173             
    174             
    175             
    176             /**
    177              * 复杂调用整个mapTask执行的入口
    178              * 
    179              * 方法的逻辑构成:
    180              * 
    181              *     1、重点方法在最后,或者在try中
    182              *  2、其他的代码,几乎只有两个任务:一个用来记录记日志或者完善流程。。 一个准备核心方法的参数
    183              */
    184             mapper.run(mapperContext);
    185             
    186             
    187             
    188             mapPhase.complete();
    189             setPhase(TaskStatus.Phase.SORT);
    190             statusUpdate(umbilical);
    191             input.close();
    192             input = null;
    193             output.close(mapperContext);
    194             output = null;
    195             
    196             
    197             
    198         } finally {
    199             closeQuietly(input);
    200             closeQuietly(output, mapperContext);
    201         }
    202     }

    能确定的是:mapperContext一定有上面说的那四个重要的方法,往上继续查找mapperContext

     /**
    143          * mapperContext的内部一定包含是三个犯法:
    144          * 
    145          * nextKeyValue
    146          * getCurrentKey
    147          * getCurrentValue
    148          * 
    149          * mapperContext的具体实现是依赖于new Context(context);
    150          * context = mapContext
    151          * 
    152          * 结论:
    153          * 
    154          * mapContext对象的内部一定包含以下三个方法:
    155          * 
    156          * nextKeyValue
    157          * getCurrentKey
    158          * getCurrentValue
    159          * 
    160          * 
    161          * mapContext 中 也有一个方法叫做:write(key,value)
    162          */
    163         org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = 
    164                 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>()
    165                 .getMapContext(mapContext);

    查看WrappedMapper.java

      1 /**
      2  * Licensed to the Apache Software Foundation (ASF) under one
      3  * or more contributor license agreements.  See the NOTICE file
      4  * distributed with this work for additional information
      5  * regarding copyright ownership.  The ASF licenses this file
      6  * to you under the Apache License, Version 2.0 (the
      7  * "License"); you may not use this file except in compliance
      8  * with the License.  You may obtain a copy of the License at
      9  *
     10  *     http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing, software
     13  * distributed under the License is distributed on an "AS IS" BASIS,
     14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15  * See the License for the specific language governing permissions and
     16  * limitations under the License.
     17  */
     18 
     19 package org.apache.hadoop.mapreduce.lib.map;
     20 
     21 import java.io.IOException;
     22 import java.net.URI;
     23 
     24 import org.apache.hadoop.classification.InterfaceAudience;
     25 import org.apache.hadoop.classification.InterfaceStability;
     26 import org.apache.hadoop.conf.Configuration;
     27 import org.apache.hadoop.conf.Configuration.IntegerRanges;
     28 import org.apache.hadoop.fs.Path;
     29 import org.apache.hadoop.io.RawComparator;
     30 import org.apache.hadoop.mapreduce.Counter;
     31 import org.apache.hadoop.mapreduce.InputFormat;
     32 import org.apache.hadoop.mapreduce.InputSplit;
     33 import org.apache.hadoop.mapreduce.JobID;
     34 import org.apache.hadoop.mapreduce.MapContext;
     35 import org.apache.hadoop.mapreduce.Mapper;
     36 import org.apache.hadoop.mapreduce.OutputCommitter;
     37 import org.apache.hadoop.mapreduce.OutputFormat;
     38 import org.apache.hadoop.mapreduce.Partitioner;
     39 import org.apache.hadoop.mapreduce.Reducer;
     40 import org.apache.hadoop.mapreduce.TaskAttemptID;
     41 import org.apache.hadoop.security.Credentials;
     42 
     43 /**
     44  * A {@link Mapper} which wraps a given one to allow custom
     45  * {@link Mapper.Context} implementations.
     46  */
     47 @InterfaceAudience.Public
     48 @InterfaceStability.Evolving
     49 public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     50 
     51     /**
     52      * Get a wrapped {@link Mapper.Context} for custom implementations.
     53      * 
     54      * @param mapContext
     55      *            <code>MapContext</code> to be wrapped
     56      * @return a wrapped <code>Mapper.Context</code> for custom implementations
     57      */
     58     public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMapContext(
     59             MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
     60         return new Context(mapContext);
     61     }
     62 
     63     @InterfaceStability.Evolving
     64     public class Context extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
     65 
     66         protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;
     67 
     68         public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
     69             this.mapContext = mapContext;
     70         }
     71 
     72         /**
     73          * Get the input split for this map.
     74          */
     75         public InputSplit getInputSplit() {
     76             return mapContext.getInputSplit();
     77         }
     78 
     79         @Override
     80         public KEYIN getCurrentKey() throws IOException, InterruptedException {
     81             return mapContext.getCurrentKey();
     82         }
     83 
     84         @Override
     85         public VALUEIN getCurrentValue() throws IOException, InterruptedException {
     86             return mapContext.getCurrentValue();
     87         }
     88 
     89         @Override
     90         public boolean nextKeyValue() throws IOException, InterruptedException {
     91             return mapContext.nextKeyValue();
     92         }
     93 
     94         @Override
     95         public Counter getCounter(Enum<?> counterName) {
     96             return mapContext.getCounter(counterName);
     97         }
     98 
     99         @Override
    100         public Counter getCounter(String groupName, String counterName) {
    101             return mapContext.getCounter(groupName, counterName);
    102         }
    103 
    104         @Override
    105         public OutputCommitter getOutputCommitter() {
    106             return mapContext.getOutputCommitter();
    107         }
    108 
    109         @Override
    110         public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
    111             mapContext.write(key, value);
    112         }
    113 
    114         @Override
    115         public String getStatus() {
    116             return mapContext.getStatus();
    117         }
    118 
    119         @Override
    120         public TaskAttemptID getTaskAttemptID() {
    121             return mapContext.getTaskAttemptID();
    122         }
    123 
    124         @Override
    125         public void setStatus(String msg) {
    126             mapContext.setStatus(msg);
    127         }
    128 
    129         @Override
    130         public Path[] getArchiveClassPaths() {
    131             return mapContext.getArchiveClassPaths();
    132         }
    133 
    134         @Override
    135         public String[] getArchiveTimestamps() {
    136             return mapContext.getArchiveTimestamps();
    137         }
    138 
    139         @Override
    140         public URI[] getCacheArchives() throws IOException {
    141             return mapContext.getCacheArchives();
    142         }
    143 
    144         @Override
    145         public URI[] getCacheFiles() throws IOException {
    146             return mapContext.getCacheFiles();
    147         }
    148 
    149         @Override
    150         public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
    151             return mapContext.getCombinerClass();
    152         }
    153 
    154         @Override
    155         public Configuration getConfiguration() {
    156             return mapContext.getConfiguration();
    157         }
    158 
    159         @Override
    160         public Path[] getFileClassPaths() {
    161             return mapContext.getFileClassPaths();
    162         }
    163 
    164         @Override
    165         public String[] getFileTimestamps() {
    166             return mapContext.getFileTimestamps();
    167         }
    168 
    169         @Override
    170         public RawComparator<?> getCombinerKeyGroupingComparator() {
    171             return mapContext.getCombinerKeyGroupingComparator();
    172         }
    173 
    174         @Override
    175         public RawComparator<?> getGroupingComparator() {
    176             return mapContext.getGroupingComparator();
    177         }
    178 
    179         @Override
    180         public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
    181             return mapContext.getInputFormatClass();
    182         }
    183 
    184         @Override
    185         public String getJar() {
    186             return mapContext.getJar();
    187         }
    188 
    189         @Override
    190         public JobID getJobID() {
    191             return mapContext.getJobID();
    192         }
    193 
    194         @Override
    195         public String getJobName() {
    196             return mapContext.getJobName();
    197         }
    198 
    199         @Override
    200         public boolean getJobSetupCleanupNeeded() {
    201             return mapContext.getJobSetupCleanupNeeded();
    202         }
    203 
    204         @Override
    205         public boolean getTaskCleanupNeeded() {
    206             return mapContext.getTaskCleanupNeeded();
    207         }
    208 
    209         @Override
    210         public Path[] getLocalCacheArchives() throws IOException {
    211             return mapContext.getLocalCacheArchives();
    212         }
    213 
    214         @Override
    215         public Path[] getLocalCacheFiles() throws IOException {
    216             return mapContext.getLocalCacheFiles();
    217         }
    218 
    219         @Override
    220         public Class<?> getMapOutputKeyClass() {
    221             return mapContext.getMapOutputKeyClass();
    222         }
    223 
    224         @Override
    225         public Class<?> getMapOutputValueClass() {
    226             return mapContext.getMapOutputValueClass();
    227         }
    228 
    229         @Override
    230         public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
    231             return mapContext.getMapperClass();
    232         }
    233 
    234         @Override
    235         public int getMaxMapAttempts() {
    236             return mapContext.getMaxMapAttempts();
    237         }
    238 
    239         @Override
    240         public int getMaxReduceAttempts() {
    241             return mapContext.getMaxReduceAttempts();
    242         }
    243 
    244         @Override
    245         public int getNumReduceTasks() {
    246             return mapContext.getNumReduceTasks();
    247         }
    248 
    249         @Override
    250         public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
    251             return mapContext.getOutputFormatClass();
    252         }
    253 
    254         @Override
    255         public Class<?> getOutputKeyClass() {
    256             return mapContext.getOutputKeyClass();
    257         }
    258 
    259         @Override
    260         public Class<?> getOutputValueClass() {
    261             return mapContext.getOutputValueClass();
    262         }
    263 
    264         @Override
    265         public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
    266             return mapContext.getPartitionerClass();
    267         }
    268 
    269         @Override
    270         public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
    271             return mapContext.getReducerClass();
    272         }
    273 
    274         @Override
    275         public RawComparator<?> getSortComparator() {
    276             return mapContext.getSortComparator();
    277         }
    278 
    279         @Override
    280         public boolean getSymlink() {
    281             return mapContext.getSymlink();
    282         }
    283 
    284         @Override
    285         public Path getWorkingDirectory() throws IOException {
    286             return mapContext.getWorkingDirectory();
    287         }
    288 
    289         @Override
    290         public void progress() {
    291             mapContext.progress();
    292         }
    293 
    294         @Override
    295         public boolean getProfileEnabled() {
    296             return mapContext.getProfileEnabled();
    297         }
    298 
    299         @Override
    300         public String getProfileParams() {
    301             return mapContext.getProfileParams();
    302         }
    303 
    304         @Override
    305         public IntegerRanges getProfileTaskRange(boolean isMap) {
    306             return mapContext.getProfileTaskRange(isMap);
    307         }
    308 
    309         @Override
    310         public String getUser() {
    311             return mapContext.getUser();
    312         }
    313 
    314         @Override
    315         public Credentials getCredentials() {
    316             return mapContext.getCredentials();
    317         }
    318 
    319         @Override
    320         public float getProgress() {
    321             return mapContext.getProgress();
    322         }
    323     }
    324 }
    View Code

    此类里面一定有那4个重要的方法,发现调用了mapContext,继续往上找

    /**
    110          *  mapContext对象中一定包含三个方法
    111          *  
    112          *  找到了之前第一查看源码实现的方法的问题的答案:
    113          *  
    114          *      问题:找到谁调用MapContextImpl这个类的构造方法
    115          *  
    116          *      mapContext就是MapContextImpl的实例对象
    117          *      
    118          *      MapContextImpl类中一定有三个方法:
    119          *      
    120          *      input  ===  NewTrackingRecordReader
    121          *      
    122          *      
    123          *      
    124          *      确定的知识:
    125          *      
    126          *      1、mapContext对象中,一定有write方法
    127          *      
    128          *      2、通过观看MapContextImpl的组成,发现其实没有write方法
    129          *      
    130          *      解决:
    131          *      
    132          *      其实mapContext.write方法的调用是来自于MapContextImpl这个类的父类
    133          *      
    134          *      
    135          *      
    136          *      最底层的write方法:  output.write();
    137          */
    138         org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = 
    139                 new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
    140                 job, getTaskID(), input, output, committer, reporter, split);

    mapConext就是这个类MapContextImpl的实例对象

    继续确定:

    mapConext = new MapContextImpl(input)
    mapConext.nextKeyVlaue(){
    
    LineRecordReader real = input.createRecordReader();
    
    real.nextKeyValue();
    }

    查看MapContextImpl.java源码

     1 public class MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     2         extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     3         implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     4     
     5     
     6     private RecordReader<KEYIN, VALUEIN> reader;
     7     private InputSplit split;
     8 
     9     public MapContextImpl(Configuration conf, 
    10             TaskAttemptID taskid, 
    11             RecordReader<KEYIN, VALUEIN> reader,
    12             RecordWriter<KEYOUT, VALUEOUT> writer, 
    13             OutputCommitter committer, 
    14             StatusReporter reporter,
    15             InputSplit split) {
    16         
    17         
    18         
    19         // 通过super调用父类的构造方法
    20         super(conf, taskid, writer, committer, reporter);
    21         
    22         
    23         
    24         this.reader = reader;
    25         this.split = split;
    26     }
    27 
    28     /**
    29      * Get the input split for this map.
    30      */
    31     public InputSplit getInputSplit() {
    32         return split;
    33     }
    
    40     @Override
    41     public KEYIN getCurrentKey() throws IOException, InterruptedException {
    42         return reader.getCurrentKey();
    43     }
    44 
    45     @Override
    46     public VALUEIN getCurrentValue() throws IOException, InterruptedException {
    47         return reader.getCurrentValue();
    48     }
    49 
    50     @Override
    51     public boolean nextKeyValue() throws IOException, InterruptedException {
    52         return reader.nextKeyValue();
    53     }
    54     
    55     
    56     
    57 
    58 }
  • 相关阅读:
    SpringBoot: 2.SpringBoot整合servlet(转)
    SpringBoot: 1.创建第一个SpringBoot项目(转)
    SqlServer:SqlServer(服务器磁盘监控,创建管理员账号分配权,添加链接服务器,查询CPU,查询内存)
    Log parser工具使用
    目录爆破工具
    安装腾讯QQ问题记录
    Sql Server 2017 安装问题记录
    reGeorg+Proxifier使用
    Swaks
    python2.7 安装pycrypto库报错
  • 原文地址:https://www.cnblogs.com/qingyunzong/p/8615013.html
Copyright © 2011-2022 走看看