zoukankan      html  css  js  c++  java
  • Hadoop源码分析34 Child的Map

    提交作业:

    hadoop  jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount /user/admin/in/yellow2.txt /user/admin/out/128

     

    生成2Map2Reduce任务。

     

    执行Maps[0]:

    args=[127.0.0.1, 40996, attempt_201404282305_0001_m_000000_0,/opt/hadoop-1.0.0/logs/userlogs/job_201404282305_0001/attempt_201404282305_0001_m_000000_0,-518526792]

     

    cwd =/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/work

     

    jobTokenFile= /tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/jobToken

    jvmContext=JvmContext(jvmId=jvm_201404282305_0001_m_,pid=29184)

     

    myTask=JvmTask{shouldDie=false,t=MapTask{taskId=attempt_201404282305_0001_m_000000_0,jobFile="/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml" ,jobSetup=false, jobCleanup=false, taskCleanup     =false, taskStatus=MapTaskStatus { UNASSIGNED},splitMetaInfo=JobSplit$TaskSplitIndex={splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404282305_0001/job.split",startOffset=7     }}

     

    job=JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

     

    TaskjobContext=JobContext{job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,}

     

    TasktaskContext=TaskAttemptContext(job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,taskId=attempt_201404282305_0001_m_000000_0, reporter=org.apache.hadoop.mapred.Task$TaskReporter@67323b17);

     

    TasktaskStatus=RUNNING

     

    TaskoutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat@a27b2d9

     

    Taskcommitter =FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]]outputPath=/user/admin/out/128workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}

     

    runNewMapper(job,splitMetaInfo,umbilical, reporter)里面的变量:

     

    taskContext=TaskAttemptContext{ conf=JobConf  {Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}              jobId=job_201404282305_0001taskId=attempt_201404282305_0001_m_000000_0}

     

    mapper=org.apache.hadoop.examples.WordCount$TokenizerMapper@3e4a762a

     

    inputFormat=org.apache.hadoop.mapreduce.lib.input.TextInputFormat@21833d8a

     

    split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864

     

    input=NewTrackingRecordReader{  

    inputSplit=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864

    job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

    real= org.apache.hadoop.mapreduce.lib.input.LineRecordReader@5afbee67

    }

     

    output= NewOutputCollector{

    collector =newMapOutputBuffer{............}

    partitions=2

    partitioner=org.apache.hadoop.mapreduce.lib.partition.HashPartitioner@5c66b7ea

    }

     

    output的成员collector(NewOutputCollector类型)

    job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

    localFs= LocalFileSystem@fdcb254

    partitions =2

    rfs= rawLocalFileSystem

    PARTITION= 0;// partitionoffset in acct

    KEYSTART=1;  // keyoffset in acct

    VALSTART=2;  // valoffset in acct

    ACCTSIZE=3;  // total#fields in acct

    RECSIZE=(ACCTSIZE+ 1) * 4 =16;  // acctbytes per record

     

    spillper=0.8

    recper=0.05

    sortmb=100

    sorter =org.apache.hadoop.util.QuickSort@aa9502d

    maxMemUsage= sortmb << 20;  // 104857600

    recordCapacity= (int)(maxMemUsage* recper);  // 5242880

    recordCapacity-= recordCapacity % RECSIZE// 5242880

    kvbuffer =new byte[maxMemUsage- recordCapacity]; // byte[99614720]

    bufvoid =kvbuffer.length// 99614720

    recordCapacity/= RECSIZE;// 327680

    kvoffsets =new int[recordCapacity]; // int[327680]

    kvindices =new int[recordCapacity* ACCTSIZE]; // int[983040]

    softBufferLimit =(int)(kvbuffer.length*spillper); //79691776

    softRecordLimit =(int)(kvoffsets.length*spillper); //262144

    comparator =job.getOutputKeyComparator();  // Text$Comparator@42b5e6a1

    keyClass= job.getMapOutputKeyClass(); //org.apache.hadoop.io.Text

    valClass= job.getMapOutputValueClass(); //org.apache.hadoop.io.IntWritable

     

    combinerRunner=CombinerRunner.create(job, getTaskID(),

                                                combineInputCounter,

                                                reporter, null);

    //NewCombinerRunner{reducerClass=org.apache.hadoop.examples.WordCount$IntSumReducer,taskId=attempt_201404282305_0001_m_000000_0,keyClass=org.apache.hadoop.io.Text,valueClass=org.apache.hadoop.io.IntWritable,comparator=org.apache.hadoop.io.Text$Comparator@42b5e6a1,committer=null}

    combineCollector=newCombineOutputCollector(combineOutputCounter,reporter, conf);

    minSpillsForCombine=job.getInt("min.num.spills.for.combine",3); //3

     

    spillThread= = newSpillThread();

    spillLock=newReentrantLock();

    spillDone=spillLock.newCondition();

    spillReady=spillLock.newCondition();

     

     

    mapperContext= Mapper$Context{taskId=attempt_201404282305_0001_m_000000_0status=""split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864jobId=job_201404282305_0001committer=FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]]outputPath=/user/admin/out/128workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}output=MapTask$NewOutputCollector{...与前面同...}reader=MapTask$NewTrackingRecordReader{...与前面input...}                         

    }

     

    input.initialize(split,mapperContext)

    //设置LineRecordReader的成员,打开文件

    start= split.getStart();

    end =start +split.getLength();

    pos =start;

    FSDataInputStreamfileIn = fs.open(split.getPath());

    in =newLineReader(fileIn,job);

    bufferSize=65536;

    buffer =new byte[this.bufferSize];

     

    mapper.run(mapperContext)

    //运行Map程序

     publicvoidrun(Contextcontext) throwsIOException,InterruptedException {

       setup(context);

       while(context.nextKeyValue()){

         map(context.getCurrentKey(), context.getCurrentValue(),context);

       }

       cleanup(context);

     }

    //nextKeyValue()方法,读取一行,读前pos=0,读后key=0value=Yellowpos=7.

    ......

    //读前pos=20,读后key=20value=Lookat the stars; look how they shine for you”,pos=67.

    ......

    //读前pos=68,读后key=68value=Andeverything you do”,pos=90.

    ......

    //

    Map方法:

    publicvoidmap(Objectkey, Text value, Context context

                       ) throwsIOException,InterruptedException {

         StringTokenizer itr = newStringTokenizer(value.toString());

         while(itr.hasMoreTokens()){

           word.set(itr.nextToken());

           context.write(word,one);

         }

       }

    Mapper$Context.write(word,one)

    --> TaskInputOutputContext.write(word,one)

    --> MapTask$NewOutputCollector.write(word,one)

    --> (MapTask.MapOutputCollector)collector.collect(key,value, partitioner.getPartition(key,value, partitions))

     

     

    HashPartitioner.getPartition()定义为:

     

     publicintgetPartition(K key, Vvalue,

                             intnumReduceTasks){

       return(key.hashCode() &Integer.MAX_VALUE)% numReduceTasks;

     }

    即通过KeyHashcode除以Recude的余数,确定属于哪个Reduce

     

     

    collect的处理流程为:

     

    keySerializer.serialize(key);//写入KeyBlockingBufferkvbuffer

    valSerializer.serialize(value);//写入ValueBlockingBufferkvbuffer

     

     

    intind =kvindex*ACCTSIZE;

    kvoffsets[kvindex]= ind; //一级索引,kvindices中的位置

     

    kvindices[ind+ PARTITION]= partition;  //二级索引,属于哪个Reduce

    kvindices[ind+ KEYSTART]=keystart;   //二级索引,Key位置

    kvindices[ind+ VALSTART]=valstart;   //二级索引,Value位置

     

    kvindex=kvnext;

     

     

    SpillThread线程将内存数据排序,并写入本地磁盘

     

    MapTask.MapOutputBuffer.sortAndSpill()处理:

     

    size=2276473partitions=2numSpills=0

    filename=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/output/spill0.out

     

    endPosition=262144

     

     

    排序 sorter.sort(MapOutputBuffer.this,kvstart,endPosition, reporter)

     

    Combine:combinerRunner.combine(kvIter,combineCollector);

     

    运行Combine

     

    写入结果keyvaluespill0.outspill1.outspill2.out......

     

    keySerializer.serialize(key);

    valueSerializer.serialize(value);

     

    mergeParts()合并结果

     

    Merge完成以后还要运行combine

    combinerRunner.combine(kvIter,combineCollector);

     

    最后生成两个文件:

    file.out

    file.out.index

     

     

    sortAndSpillmergeParts过程:

     

    Map阶段:主线程在Map阶段将所有结果写入内存kvbufferSpillThread线程将kvbuffer中内容分块进行Sort(快速排序)Combine,写入文件spill0.outspill1.outspill2.out......spill51.out中,每个文件包括了2Reduce的内容。

     

    output.close()阶段:主线程将内存中最后一块进行Sort(快速排序)Combine,写入spill52.out中,每个文件包括了2Reduce的内容。

     

    mergeParts阶段:主线程将文件spill0.outspill1.out......spill52.out加入 优先级队列 MergeQueue (实际是一个小顶堆Heap) 进行合并,每次合并最多使用10个文件,会生成intermediate.0, intermediate.1, intermediate.2, intermediate.3......等临时文件,临时文件也会加入MergeQueue 中。由于spill0.outspill1.out......spill52.out 等文件内部已经是有序的 (局部有序),所以,Merge的时候,从小顶堆 MergeQueue 取得根文件的指针指向的元素(即最小元素),取得最小元素后更新指针,最小元素加入到目标文件中,然后再调节小顶堆 MergeQueue 的顺序。 这样Merge后的文件也将是有序的。

    再进行Combine,写入文件file.out,包括了2Reduce的内容。


    注意这里利用了快速排序 (Quit Sort),小顶堆(Heap)等数据结构和算法。

    注意 i << 1 相当于2*i ,i>>>1 相当于 i/2 .


      

  • 相关阅读:
    Mac从零配置Vim
    Mac效率:配置Alfred web search
    看看你的邻居在干什么
    成功破解邻居的Wifi密码
    MacBook安装Win10
    C陷阱:求数组长度
    Nexus 6P 解锁+TWRP+CM
    搭建树莓派手机远程开门系统
    Ubuntu下配置ShadowS + Chrome
    JS传参出现乱码(转载)
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276481.html
Copyright © 2011-2022 走看看