zoukankan      html  css  js  c++  java
  • Hadoop Mapreduce运行流程

    Mapreduce的运算过程为两个阶段:

      第一个阶段的map task相互独立,完全并行;

      第二个阶段的reduce task也是相互独立,但依赖于上一阶段所有map task并发实例的输出;

    这些task任务分布在多台机器运行,它的运行管理是有一个master负责,这个master由yarn负责启动,那么yarn如何知道启动多少个map task进程去计算呢?

    下面概述一下Mapreduce的执行流程:

    1、客户端首先会访问hdfs的namenode获取待处理数据的信息(文件数及文件大小),形成一个任务分配计划(会写入配置文件);

    2、这个任务分配计划以及配置文件都会交给yarn,yarn根据自己所掌握的各机器资源情况,去启动mr appmaster;

    3、mr appmaster根据配置文件负责启动map task任务进程;

    4、map task去datanode分行读取数据,交给自定义的mapper,输出的context.write(key,value),

      再交给outputcollecter输出到本机的一个分区文件(后面有几个reduce task就有几个分区);

    5、所有的map task执行完,mr appmaster再去启动reduce task;

    6、reduce task进程对每一组key相同的<key,value>调用一次自定义的reducer;

    7、reduce task的计算结果会不断追加写入设置好的hdfs的路径中;

    8、整个程序需要一个driver来提交,提交的是一个描述了各种必要信息的job对象。

    代码示例:

     WordCountMap.java

    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * Mapper的第一个参数:KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,
     * 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
     * Mapper的第二个参数:VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
     * Mapper第三个参数:KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
     * Mapper第四个参数:VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable
     */
    public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            //将maptask传给我们的文本内容先转换成String
            String line = value.toString();
            //根据空格将这一行切分成单词
            String[] words = line.split(" ");
            
            //将单词输出为<单词,1>
            for(String word:words) {
                //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

     

    WordCountReduce.java

    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.WordCount.Reduce;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * KEYIN, VALUEIN 对应  mapper输出的KEYOUT,VALUEOUT类型对应
     * KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型
     * KEYOUT是单词
     * VLAUEOUT是总次数
     */
    
    public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        /**
         * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
         * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
         * 入参key,是一组相同单词kv对的key
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
            int count=0;
            
            for(IntWritable value:values) {
                count += value.get();
            }
            
            context.write(key, new IntWritable(count));
        }
    }

     

    WordcountDriver.java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 相当于一个yarn集群的客户端
     * 需要在此封装我们的mr程序的相关运行参数,指定jar包
     * 最后提交给yarn
     */
    public class WordcountDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            //是否运行为本地模式,就是看这个参数值是否为local,默认就是local
            /*conf.set("mapreduce.framework.name", "local");*/
            
            //本地模式运行mr程序时,输入输出的数据可以在本地,也可以在hdfs上
            //到底在哪里,就看以下两行配置你用哪行,默认就是file:///
            /*conf.set("fs.defaultFS", "hdfs://192.168.1.110:9000/");*/
            /*conf.set("fs.defaultFS", "file:///");*/
                    
            //运行集群模式,就是把程序提交到yarn中去运行
            //要想运行为集群模式,以下3个参数要指定为集群上的值
            /*conf.set("mapreduce.framework.name", "yarn");
            conf.set("yarn.resourcemanager.hostname", "192.168.1.110");
            conf.set("fs.defaultFS", "hdfs://192.168.1.110:9000/");*/
            
            Job job = Job.getInstance(conf);//指定本程序的jar包所在的本地路径
            job.setJarByClass(WordcountDriver.class);
            
            //指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(WordCountMap.class);
            job.setReducerClass(WordCountReduce.class);
            
            //指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            //指定最终输出的数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //指定需要使用combiner,以及用哪个类作为combiner的逻辑
            job.setCombinerClass(WordCountReduce.class);
            
            //如果不设置InputFormat,它默认用的是TextInputformat.class
            job.setInputFormatClass(CombineTextInputFormat.class);
            CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
            CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
            
            //指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
            //job.submit();  //如果submit的话,提交完任务客户端就退出了,而不知道任务在集群上的运行情况
            boolean res = job.waitForCompletion(true);  //所以我们这里使用wait提交,参数true表示将集群情况返回客户端
            System.exit(res?0:1);
        }
    }
  • 相关阅读:
    查找一段信号的累加峰值---verilog
    AXI_stream接口时序温习
    QAM调制---Verilog代码
    数据交织模块---Verilog代码
    卷积编码后的删余模块---Verilog代码
    数据发送模块---基于地址的检测(verilog代码)
    短训练序列---Verilog代码
    长训练序列---verilog代码
    数据扰码器---Verilog代码
    卷积编码器---Verilog代码
  • 原文地址:https://www.cnblogs.com/chaofan-/p/9774430.html
Copyright © 2011-2022 走看看