zoukankan      html  css  js  c++  java
  • mr原理简单分析

    背景

         又是一个周末一天一天的过的好快,今天的任务干啥呢,索引总结一些mr吧,因为前两天有面试问过我?我当时也是简单说了一下,毕竟现在写mr程序的应该很少很少了,废话不说了,结合官网和自己理解写起。

    官网 https://hadoop.apache.org/docs/r3.3.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

    简单分析

    一个mr作业通常数据会被切割成多个数据块通过map任务来并行处理,就是说我们在处理文件的时候,首次我们写入文件会被分割成多个块,hdfs文件设计支持的语义 write-once-read-moreblock块是128m默认 https://hadoop.apache.org/docs/r3.3.0/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html Data Blocks 位置。然后在我们读取的时候也就是提交mr 作业后namenode会根据元数据管理然后从不同的datanode得知数据位置,从而进行读取。mr框架包含一个单独的master ResouceManager,每个集群结点一个工作NodeManager,每个应用一个MRAppMaster

    Input output,接受《keyvalue》最后落地《keyvalue》形式,需要实现Writetable接口,还要实现WritebaleCompare接口因为要对比排序。

    public class WordCount {

     

      public static class TokenizerMapper

           extends Mapper<Object, Text, Text, IntWritable>{

     

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

     

        public void map(Object key, Text value, Context context

                        ) throws IOException, InterruptedException {

          StringTokenizer itr = new StringTokenizer(value.toString());

          while (itr.hasMoreTokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

          }

        }

      }

     

      public static class IntSumReducer

           extends Reducer<Text,IntWritable,Text,IntWritable> {

        private IntWritable result = new IntWritable();

     

        public void reduce(Text key, Iterable<IntWritable> values,

                           Context context

                           ) throws IOException, InterruptedException {

          int sum = 0;

          for (IntWritable val : values) {

            sum += val.get();

          }

          result.set(sum);

          context.write(key, result);

        }

      }

     

      public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

      }

    }

    Mapper

    我们会接受<key,value>对,hdfs生产map任务对每一个Inputsplit通过Inputformat在我们的作业中。

    mapper设置通过Job.setMapperClass(Class) 方法,我们实现mapper接口对应就可以在map中写内容了。如果向清理内容可以实现cleanUp方法。这个方法 备注  Called once at the end of the task.也就是说在task结束后会触发一次。

    输出对并不一样要跟输入对类型保持一致,一个输入对可能会产生一个或者多个输出对。输出通过contextwrite方法收集。

    应用可以通过Counter 来上报统计,上下文中可以找到Counter counter = context.getCounter(MRJobConfig.JOB_NAME, MRJobConfig.JOB_NAME);

    所有的结果通过key来进行发放传递给reduce然后落地结果。用户也可以通过比较器来进行指定分组。

    Job.setGroupingComparatorClass(Class).

    mapper的输出会分区到reduce中,然后数量与reduce的任务数量一致,也可以通过实现自定义Partitioner来进行分配。

    combiner也是中间优化的一部分,通过ob.setCombinerClass(Class) 来设置combiner类,在发送到reduce之前在减少数量从而提高性能。数据结果如何存储什么格式,可以通过设置压缩形式来存储通过配置。

    多少个Mapper

    mapper的个数通常是通过输入文件大小block数量来决定的。正常的并行度水平对于每个结点来看是10-100之间,离线跑hql脚本一般设置过大会导致占用资源过多,其他任务排队情况导致任务从而延迟,或者浪费资源。例子:如果你的输入文件大小是10T,那么hdfs默认块是128M,那么你就会有82,000map数量,这个就需要通过配置文件来设置map 的大小了。

    Reducer

    通过mapper处理完以后发送到reduce端的对,reducer会对这些对再进行处理使得这个以key为分组的对集合更小。

    Job.setNumReduceTasks(int) 设置recude个数Job.setReducerClass(Class) 设置reduce的执行类

    reduce有三个阶段:shuffle, sort and reduce.翻译出来感觉不太好。

    Shuffle

    这个阶段就是通过mapper的输出结果数据进行一次分组partition

    Sort

    怎么排序呢,就是mapper阶段的task,输出会是以key分组的,然后相同的key再进行merge 合并排序,在这个阶段。

    Secondary Sort

    二次排序,可能用户对于之前的key排序不满了,希望再次修改进行重新排序分组那么通过设置Job.setSortComparatorClass(Class).Job.setGroupingComparatorClass(Class)以达到目的。

    Reduce

    最后一个阶段reduce,减少?这么翻译总感觉不够精准那就直接叫reduce吧。以key分组到达一个reduce,拿到数据Context.write(WritableComparable, Writable)写到文件系统搞定完毕。reduce的输出是没有排序的。

    多少个reduce

    一个合理的数量应该是0.95或者是1.75 乘以 结点数量 * 每个结点最大的容量数。

    0.95启动更快,1.75负载更好。reduce数量多了对集群是个开销,但是对于提升成功率更好。因子小于整数的目的也是为了能够留有余地。

    Reducer NONE

    reducer设置为空的或者0都可以的。有些任务不要进行对map结果进行排序等操作,就可以直接写入到文件系统。FileOutputFormat.setOutputPath(Job, Path)

    Partitioner

    分区以key为主键,对map结果partiton,按照哈希的函数方式,数量与reduce任务一样。

    https://hadoop.apache.org/docs/r3.3.0/api/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.html HashPartitioner是默认的哈希分区方式。

    Counter

    一个统计工具,上文也说过了,可以通过上下文拿到。

    Task Execution & Environment

    任务的执行和环境,MRAppMaster执行 mr任务的时候 map reduce都是作为一个进程在一个分开的jvm中执行。

    然后我们可以通过配置设置一些jvm的参数,堆栈大小,gc日志,这些我们可以观察到任务的运行情况等。

    Job Submission and Monitoring

    任务提交和监控说一下,步骤:

    1、输入输出检查

    2、计算输入文件大小

    3、设置分布式缓存信息如果有必要

    4、然后就是上传jar 和配置 到mr 的执行目录下

    5、提交到ResourceManager上然后监控它的状态

    Job.submit() : 作业提交到集群立即返回.

    Job.waitForCompletion(boolean) : 作业提交到集群然后等待完成

      Memory Management Map Parameters Shuffle/Reduce Parameters Configured Parameters不介绍上了,主要是mr的运行原理说一下,剩下的可以通过开头的mr连接了解到,官网这一篇还是有很多东西的,后面没有说到的输入输出文件类,split类,分布式缓存,提交debug脚本查看日志等等,大家都可以去看一下,最后是个wordcount案例应用了这些特点。

      看到的小伙伴有什么工作机会可以跟我联系目前在考虑新机会,多谢!

                Growth depends on cycles

  • 相关阅读:
    windows2012 永激活及配置
    Fiddler2 英文版翻译
    你知道using的用法吗?
    你会利用css写下拉列表框吗?
    完美解决.net2.0和.net4.0在同一个iis中共同运行
    深入剖析new override和virtual关键字
    思科防火墙,h3c三层交换机配置笔记
    c# 笔记 数据类型转换 数组 函数
    Silverlight 完美征程 笔记1 (控件模型)
    C#笔记(流程控制)
  • 原文地址:https://www.cnblogs.com/hackerxiaoyon/p/13549707.html
Copyright © 2011-2022 走看看