zoukankan      html  css  js  c++  java
  • Hadoop源码篇---解读Mapprer源码outPut输出

    一。前述

    上次讲完MapReduce的输入后,这次开始讲MapReduce的输出。注意MapReduce的原语很重要:

    相同”的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算!!!!!

    二。代码

    继续看MapTask任务。

    private <INKEY,INVALUE,OUTKEY,OUTVALUE>
      void runNewMapper(final JobConf job,
                        final TaskSplitIndex splitIndex,
                        final TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException,
                                 InterruptedException {
        // make a task context so we can get the classes
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
          new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                      getTaskID(),
                                                                      reporter);
        // make a mapper
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
        // make the input format
        org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
          (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
            ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
        // rebuild the input split
        org.apache.hadoop.mapreduce.InputSplit split = null;
        split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
            splitIndex.getStartOffset());
        LOG.info("Processing split: " + split);
    
        org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
          new NewTrackingRecordReader<INKEY,INVALUE>
            (split, inputFormat, reporter, taskContext);
        
        job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
        org.apache.hadoop.mapreduce.RecordWriter output = null;
        
        // get an output object
        if (job.getNumReduceTasks() == 0) {
          output = 
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
        } else {
          output = new NewOutputCollector(taskContext, job, umbilical, reporter);源码解析一
        }
    
        org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
        mapContext = 
          new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
              input, output, 
              committer, 
              reporter, split);
    
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
            mapperContext = 
              new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
                  mapContext);
    
        try {
          input.initialize(split, mapperContext);
          mapper.run(mapperContext);
          mapPhase.complete();
          setPhase(TaskStatus.Phase.SORT);
          statusUpdate(umbilical);
          input.close();
          input = null;
          output.close(mapperContext);
          output = null;
        } finally {
          closeQuietly(input);
          closeQuietly(output, mapperContext);
        }
      }

    解析一。构造OutPut对象:

     NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                           JobConf job,
                           TaskUmbilicalProtocol umbilical,
                           TaskReporter reporter
                           ) throws IOException, ClassNotFoundException {
          collector = createSortingCollector(job, reporter);//对应解析源码1.2
          partitions = jobContext.getNumReduceTasks();//分区数等于Reduce数,分区数大于分组的概念。
          if (partitions > 1) {
            partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
              ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);//对应源码1.1
          } else {
            partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
              @Override
              public int getPartition(K key, V value, int numPartitions) {
                return partitions - 1;//用户不设置时默认框架一个reduce,并且分区号为0
              }
            };
          }
        }
      @Override
        public void write(K key, V value) throws IOException, InterruptedException {
          collector.collect(key, value,
                            partitioner.getPartition(key, value, partitions));//上下文对象构造写出的值,放在collect缓存区中。
        }


    解析1.1

    public Class<? extends Partitioner<?,?>> getPartitionerClass()
    throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>)
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);//当用户设置取用户的,没设置默认HashPartitioner 对应解析源码1.1.1

    解析源码1.2createSortingCollector类的具体实现

     private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
              createSortingCollector(JobConf job, TaskReporter reporter)
        throws IOException, ClassNotFoundException {
        MapOutputCollector.Context context =
          new MapOutputCollector.Context(this, job, reporter);
    
        Class<?>[] collectorClasses = job.getClasses(
          JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
        int remainingCollectors = collectorClasses.length;
        for (Class clazz : collectorClasses) {
          try {
            if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
              throw new IOException("Invalid output collector class: " + clazz.getName() +
                " (does not implement MapOutputCollector)");
            }
            Class<? extends MapOutputCollector> subclazz =
              clazz.asSubclass(MapOutputCollector.class);
            LOG.debug("Trying map output collector class: " + subclazz.getName());
            MapOutputCollector<KEY, VALUE> collector =
              ReflectionUtils.newInstance(subclazz, job);
            collector.init(context);//解析源码对应1.2.1
            LOG.info("Map output collector class = " + collector.getClass().getName());
            return collector;
          } catch (Exception e) {
            String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
            if (--remainingCollectors > 0) {
              msg += " (" + remainingCollectors + " more collector(s) to try)";
            }
            LOG.warn(msg, e);
          }
        }
        throw new IOException("Unable to initialize any output collector");
      }

     解析源码1.2.1 缓冲区collect的初始化

     public void init(MapOutputCollector.Context context
                        ) throws IOException, ClassNotFoundException {
          job = context.getJobConf();
          reporter = context.getReporter();
          mapTask = context.getMapTask();
          mapOutputFile = mapTask.getMapOutputFile();
          sortPhase = mapTask.getSortPhase();
          spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
          partitions = job.getNumReduceTasks();
          rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
    
          //sanity checks
          final float spillper =
            job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);//缓冲区溢写阈值,
          final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);//缓冲区默认单位是100M
          indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                             INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
          if (spillper > (float)1.0 || spillper <= (float)0.0) {
            throw new IOException("Invalid "" + JobContext.MAP_SORT_SPILL_PERCENT +
                "": " + spillper);
          }
          if ((sortmb & 0x7FF) != sortmb) {
            throw new IOException(
                "Invalid "" + JobContext.IO_SORT_MB + "": " + sortmb);
          }
          sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
                QuickSort.class, IndexedSorter.class), job);//Map从缓冲区往磁盘写文件的时候需要排序,用的快排。
          // buffers and accounting
          int maxMemUsage = sortmb << 20;
          maxMemUsage -= maxMemUsage % METASIZE;
          kvbuffer = new byte[maxMemUsage];
          bufvoid = kvbuffer.length;
          kvmeta = ByteBuffer.wrap(kvbuffer)
             .order(ByteOrder.nativeOrder())
             .asIntBuffer();
          setEquator(0);
          bufstart = bufend = bufindex = equator;
          kvstart = kvend = kvindex;
    
          maxRec = kvmeta.capacity() / NMETA;
          softLimit = (int)(kvbuffer.length * spillper);
          bufferRemaining = softLimit;
          LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
          LOG.info("soft limit at " + softLimit);
          LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
          LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
     comparator = job.getOutputKeyComparator();//排序所使用的比较器 见源码解析1,2.1.1
          keyClass = (Class<K>)job.getMapOutputKeyClass();
          valClass = (Class<V>)job.getMapOutputValueClass();
          serializationFactory = new SerializationFactory(job);
          keySerializer = serializationFactory.getSerializer(keyClass);
          keySerializer.open(bb);
          valSerializer = serializationFactory.getSerializer(valClass);
          valSerializer.open(bb);
    // combiner
          final Counters.Counter combineInputCounter =
            reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
          combinerRunner = CombinerRunner.create(job, getTaskID(), //map端的组合
                                                 combineInputCounter,
                                                 reporter, null);
          if (combinerRunner != null) {
            final Counters.Counter combineOutputCounter =
              reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
            combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
          } else {
            combineCollector = null;
          }

          spillInProgress = false;
          minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);//小文件最少是3时,会合并小文件。
          spillThread.setDaemon(true);//线程是另外一个线程负责写的 见解析源码1.2.1.2
          spillThread.setName("SpillThread");
          spillLock.lock();

    总结:Mappper输出到缓冲区默认是100M,写到0.8时,会溢写!!!!这块可以调优。通过来回折半来调比如第一次调整50% 然后再80%中减小 70% 然后60%来回折半。

              Combine一定要注意,比如求平均值

     解析1,2.1.1排序比较器的实现

     public RawComparator getOutputKeyComparator() {
        Class<? extends RawComparator> theClass = getClass(
          JobContext.KEY_COMPARATOR, null, RawComparator.class);字典排序 默认
        if (theClass != null)
          return ReflectionUtils.newInstance(theClass, this);
        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);//如果用户没有设置排序比较器,就是Key类型自己的比较器,所以Key必须实现序列化,反序列化,比较器。
      }

    总结:框架默认使用Key的比较器,字典排序 默认,用户也可以覆盖Key的比较器,自定义。!!!

    解析源码1.2.1.2 溢写线程做的事
    protected class SpillThread extends Thread {
    
          @Override
          public void run() {
            spillLock.lock();
            spillThreadRunning = true;
            try {
              while (true) {
                spillDone.signal();
                while (!spillInProgress) {
                  spillReady.await();
                }
                try {
                  spillLock.unlock();
                  sortAndSpill();//排序溢写
                } catch (Throwable t) {
                  sortSpillException = t;
                } finally {
                  spillLock.lock();
                  if (bufend < bufstart) {
                    bufvoid = kvbuffer.length;
                  }
                  kvstart = kvend;
                  bufstart = bufend;
                  spillInProgress = false;
                }
              }
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            } finally {
              spillLock.unlock();
              spillThreadRunning = false;
            }
          }
        }

    总结:Map往缓冲区写入东西,线程把缓冲区中的内容做溢写,开始排序,溢写使用快排!!!Combine也在内存中,buffer也在内存,这些计算逻辑都在内存中,排序算法也在内存中,因为Map方法在内存中,这是第一次Combine,从Buffer产生一堆小文件的时候,然后一堆小文件在合并的时候还会执行一次Combine,这次有条件限制(小文件数量大于3)。

    解析源码1.1.1

    public class HashPartitioner<K, V> extends Partitioner<K, V> {
    
      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(K key, V value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!
      }
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!重要取分区的写法!!

    总结1.以上源码来源于 output = new NewOutputCollector(taskContext, job, umbilical, reporter);所以可得出在输出构造的时候需要构造一个分区器。要么是0的,要么是用户设置的,要么是默认的。
    总结2.在输出构造中,有缓冲区的设置。
    总结3,以上方法都是OutPut的初始化。
    总结4.Map输出的K,V变成K,V,P然后写入到环形缓冲区,内存缓存区80%,然后溢写排序,(先按分区排序,然后再按Key的组排序),然后生成小文件,然后合并,用的归并算法,此时小文件已经是内部有序的,所以使用归并算法,一次io即可。

    持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

  • 相关阅读:
    B站排行榜第一的视频,看看5W弹幕都在说些什么?
    手把手教你如何用Python获取爱奇艺电视剧弹幕数据
    Python爬虫中最重要、最常见、一定要熟练掌握的库
    机器学习
    nginx: [emerg] bind() to 0.0.0.0:80 failed (13: Permission denied)
    14、ERROR: for proxy Cannot start service proxy: driver failed **** Error starting userland proxy: listen tcp 0.0.0.0:80: listen: address already in use
    13、file /usr/bin/docker from install of docker-ce-18.03.0.ce-1.el7.centos.x86_64 conflicts with file from package docker-common-2:1.13.1-203.git0be3e21.el7.centos.x86_64
    12、Error response from daemon: Get https://192.168.247.151/v2/: dial tcp 192.168.247.151:443: connect: connection refused
    Selenium无法定位元素的九种解决方案
    Linux安装Oracle数据库SQLPlus客户端
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8252953.html
Copyright © 2011-2022 走看看