zoukankan      html  css  js  c++  java
  • Hadoop之MapReduce

    此文章已于 23:19:00 2015/8/4 重新发布到 农民阿姨

    Hadoop之MapReduce

    提交任务

    hadoop jar hadoopdemo-1.0-SNAPSHOT-jar-with-dependencies.jar /user/root/testmr/hello.txt /user/root/testmr/out1

    MapReduce原理***

    image

    MapReduce执行过程***

    -------------------
    hello    you
    hello    me
    -------------------
    MapReducer的执行过程
    1.map处理阶段
    1.1读取HDFS的输入文件中的内容,把每一行解析成一个<k1,v1>对。
    k1表示每一行的起始位置(单位byte),v1表示每一行的文本内容。每个键值对调用一次map函数。
    问:一共可以解析成多少个<k1,v1>?
    答:2个,分别是<0, hello you>,<10, hello me>,一共2次map函数
    1.2覆盖map函数,实现自己的业务逻辑。对输入的<k1,v1>做处理,转化为新的<k2,v2>输出。

    void map(k1,v1, context){
      String[] splited = v1.split('\t');
      for(String word : splited){
        context.write(<k2,v2>);
      }
    }
    分别是<hello,1><you,1><hello,1><me,1>

    1.3对map输出的<k2,v2>做分区。默认有1个分区。(分区的目的是让不同的reduce计算不同的数据)
    1.4每个分区中的<k2,v2>按照k2进行排序、分组。分组指的是把相同k的v放到一个集合中。相同key的value放到一个集合中。
    排序后<hello,1><hello,1><me,1><you,1>
    分组后<hello,{1,1}><me,{1}><you,{1}>
    1.5归约
    2.reduce处理阶段
    2.1对多个map的输出,按照不同的分区,通过网络copy到不同的reduce节点。
    2.2对多个map的输出,进行合并、排序。覆盖reduce函数,实现自己的业务逻辑,对输入的<k2,v2s>进行处理,转化为新的<k3,v3>输出。
    void reduce(k2,v2s, context){
      long sum=0L;
      for(long times : v2s){
        sum += times;
      }
      context.write(k2,sum);
    }
    2.3把<k3,v3>写入到hdfs中


    -------------------
    问:从源代码的角度分析map函数处理的<k1,v1>是如何从HDFS文件中获取的?
    答:
    1.从TextInputFormat入手分析,找到父类FileInputFormat,找到父类InputFormat。
      在InputFormat中找到2个方法,分别是getSplits(...)和createRecordReader(...)。
      通过注释知道getSplits(...)作用是把输入文件集合中的所有内容解析成一个个的InputSplits,每一个InputSplit对应一个mapper task。
      createRecordReader(...)作用是创建一个RecordReader的实现类。RecordReader作用是解析InputSplit产生一个个的<k,v>。
    2.在FileInputFormat中找到getSplits(...)的实现。
      通过实现,获知
      (1)每个SplitSize的大小和默认的block大小一致,好处是满足数据本地性。
      (2)每个输入文件都会产生一个InputSplit,即使是空白文件,也会产生InputSPlit;
      如果一个文件非常大,那么会按照InputSplit大小,切分产生多个InputSplit。
    3.在TextInputFormat中找到createRecordReader(...)的实现,在方法中找到了LineRecordReader。
      接下来分析LineRecordReader类。 
      在RecordReader类中,通过查看多个方法,知晓key、value作为类的属性存在的,且知道了nextKeyValue()方法的用法。 
      在LineRecordReader类中,重点分析了nextKeyValue(...)方法。在这个方法中,重点分析了newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
      在in.readLine(...)中,第一个形参存储被读取的行文本内容,返回值表示被读取内容的字节数。
      通过以上代码,分析了InputSplit中的内容是如何转化为一个个的<k,v>。
    4.从Mapper类中进行分析,发现了setup()、cleanup()、map()、run()。
      在run()方法中,通过while,调用context.nextKeyValue(...)。
      进一步分析Context的接口类是org.apache.hadoop.mapreduce.lib.map.WrappedMapper.MapContext,MapContext调用了nextKeyValue(...)。最终找到了MapContext的实现了MapContextImpl类org.apache.hadoop.mapreduce.task.MapContextImpl。
      在这个类的构造方法中,发现传入了RecordReader的实现类。 

    package mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    /**
     * 假设有个目录结构
     * /dir1
     * /dir1/hello.txt
     * /dir1/dir11/hello.bat
     * 
     * 问:统计/dir1下面所有的文件中的单词技术
     *
     */
    public class WortCountTest {
    
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            String jobName = WortCountTest.class.getSimpleName();
            Job job = Job.getInstance(conf, jobName);
            
            //要把代码打包运行, 调用如下行
            job.setJarByClass(WortCountTest.class);
            
            //指定输入路径
            FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.177:9000/wuchao/hello"));
            //指定解析<k1,v1>的类
            //job.setInputFormatClass(TextInputFormat.class);
            //指定自定义的mapper类
            job.setMapperClass(MyMapper.class);
            //指定输出的<k2,v2>类型
            //当<k3,v3>的类型与<k2,v2>类型一致时,<k2,v2>类型可以不指定
            //job.setMapOutputKeyClass(Text.class);
            //job.setMapOutputValueClass(LongWritable.class);
            
            //指定自定义reducer类
            job.setReducerClass(MyReducer.class);
            //指定输出的<k3,v3>类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            //指定输出<k3,v3>的类
            //job.setOutputFormatClass(TextOutputFormat.class);
            //指定输出的路径
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.177:9000/out1"));
            
            job.waitForCompletion(true);
        }
    
        
        private static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
            Text k2 = new Text();
            LongWritable v2 = new LongWritable();
            
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("\t");
                //word表示每一行中的每个单词
                for (String word : splited) {
                    k2.set(word);
                    v2.set(1L);
                    context.write(k2, v2);
                }
            }
        }
        
        private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
            LongWritable v3 = new LongWritable();
            @Override
            protected void reduce(Text k2, Iterable<LongWritable> v2s,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                
                long sum = 0L;
                for (LongWritable v2 : v2s) {
                    sum += v2.get();
                }
                v3.set(sum);
                context.write(k2, v3);
            }
        }
    }

    jar 包提交

    hadoop jar /home/lina/example.jar com/oss/maxtemperature/MaxTemperatureDriver /local/sample.txt /output/sampleout
    岁月里,寒暑交替。人世间,北来南往。铭心的,云烟的。都付往事,不念,不问。
  • 相关阅读:
    JavaScript知识回顾
    HTML和CSS相关知识回顾
    springmvc文件上传和下载
    jsp页面调试中的问题记录
    mybatis传参的几种方式
    ssm中调试遇到的坑
    idea新手日记
    Oracle安装
    mysql5安装
    Servlet 的原理----无脑笔记
  • 原文地址:https://www.cnblogs.com/chaoren399/p/2825510.html
Copyright © 2011-2022 走看看