zoukankan      html  css  js  c++  java
  • 大数据之路week07--day04 (YARN,Hadoop的优化,combline,join思想,)

    hadoop 的计算特点:将计算任务向数据靠拢,而不是将数据向计算靠拢。

    特点:数据本地化,减少网络io。

    首先需要知道,hadoop数据本地化是指的map任务,reduce任务并不具备数据本地化特征。
          通常输入的数据首先在逻辑上注意这里不是真正物理上划分)将会分片split,每个分片上构建一个map任务,由该任务执行执行用户自定义的map函数,从而处理分片中的每条记录。
          那么切片的大小一般是趋向一个HDFS的block块的大小。为什么最佳的分片大小是趋向block块的大小呢?是因为这样能够确保单节点上最大输入块的大小,如果分片跨越两个数据块,没有一个block能够同时存储这两块数据,因此需要通过网络传输将部分数据传输到map任务节点上。这样明显比使用本地数据的map效率更低。
           注意,map任务执行后的结果并没有写到HDFS中,而是作为中间结果存储到本地硬盘,那为什么没有存储到HDFS呢?因为,该中间结果会被reduce处理后产生最终结果后,该中间数据会被删除,如果存储到HDFS中,他会进行备份,这样明显没有意义。如果map将中间结果传输到reduce过程中出现了错误,Hadoop会在另一个节点上重新执行map产生中间结果。
           那么为什么reduce没有数据本地化的特点呢?对于单个reduce任务来说,他的输入通常是所有mapper经过排序输出,这些输出通过网络传输到reduce节点,数据在reduce节点合并然后由reduce函数进行处理。最终结果输出到HDFS上。当多个有reduce任务的时候,map会针对输出进行分区partition,也就是为每个reduce构建一个分区,分区是由用户指定的partition函数,效率很高。
         同时为了高效传输可以指定combiner函数,他的作用就是,减少网络传输和本地传输

    假设文件是500mb

    long bytesRemaining = length; 500mb

              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining  );

                splits.add(makeSplit(path, length-bytesRemaining 256 , splitSize 128,

                            blkLocations[blkIndex].getHosts(),

                            blkLocations[blkIndex].getCachedHosts()));

                bytesRemaining = bytesRemaining-splitSize;116

              }

      

      

    优化1 为什么默认切片是128MB和blk大小一致?(优化)

    1 切片大小默认一致,是为了数据本地化,减少数据拉取消耗网络io

    2 并不是越大越好,也不是越小越好。根据集群的资源情况而定。

     当集群计算资源充足的情况下:将切片的大小调小,增加map数量,提高读取效率。

     当集群计算资源紧张的情况下:将切片的大小调大,减少资源占用,让任务正常运转。

     mapred.min.split.size、mapred.max.split.size、blockSize

    优化2:可以设置yarn资源和队列。

     yarn的webui:http://192.168.73.90:8088/cluster

     调整计算资源:https://blog.csdn.net/qq_36753550/article/details/83065546

     设置队列:https://blog.csdn.net/weixin_30607029/article/details/96507281

    mr运行日志信息:百分比是按照完成的m或r的任务的个数/m或r的总个数。

    map和reduce任务是同时的。

    MRv1/MRv2/YARN MRv1:

      对于经典的MRv1它由三部分组成 :

        编程模型、 数据处理引擎和运行时环境。

        编程模型由新旧 API 两部分组成,新旧api只是代码封装上略有变化,性能没变化。

        数据处理引擎由 MapTask 和 ReduceTask 组成。 运行时环境由 JobTracker 和 TaskTracker 两类服务组成。

      MRv2:

        由于MRv1对JobTracker的功能过多造成负载过重在扩展性、 资源利用率和多框架支持等方面存在不足,因此MRv2框架 的基本设计思想是将MRv1中的JobTracker包含的资源管理和应用管理两部分功能进行拆分,分别交给两个进程实现。 资源管理进程与具体应用程序无关,它负责整个集群的资源管理(内存、 CPU、 磁盘)。 应用管理进程负责管理应用程序,并且每个应用管理进程只管理一个作业。 由于资源管理可以共享给其他框架使用,因此MRv2将其做成了一个通用的系统YARN,YARN系统使得MRv2计算框架在可扩展性,资源利用率,多框架支持方面得到了很大改进。

      YARN:yarn由4部分组成。

        1. ResourceManager主要功能是:

          (1)接收用户请求

          (2)管理调度资源

          (3)启动管理am    

          (4)管理所有nm,处理nm的状态汇报,向nm下达命令。

        2.Container:yarn的应用都是运行在容器上的,容器包含cpu,内存等信息。

        3.NodeManager:NM是每个节点上的资源和任务管理器,它会定时地向RM汇报本节点上的资源使用情况和各个容器的运行状态;同时负责对容器的启动和停止。

        4. ApplicationMaster:管理应用程序。向RM获取资源、为应用程序分配任务、 监控所有任务运行状态。

    1. 作业提交

      首先我们将任务提交给JobClient,JobClient会向RM获取一个appId。 然后我们的JobClient会对作业进行处理, 切分InputSplit, 将作业的Jar包, 配置文件和拷贝InputSplit信息拷贝到HDFS。 最后, 通过调用RM的submitApplication()来提交作业。

    2. 作业初始化

      当RM收到submitApplciation()的请求时, 就将该请求发给调度器, 调度器分配第一个容器, 然后RM在该容器内启动ApplicationMaster进程。该进程上运行着一个MRAppMaster的Java应用。其通过创造一些bookkeeping对象来监控作业的进度。 然后通过hdfs得到由JobClient已经处理好的作业信息。为每个Inputsplit创建一个map任务, 并创建相应的reduce任务。然后ApplicationMaster会对整个作业量进行判断,如果作业量很小, ApplicationMaster会选择在其自己的JVM中运行任务, 种作业称作是uber task的方式。在任务运行之前, 作业的setup方法被调用来创建输出路径。

    3. 任务分配

      如果不是小作业, 那么ApplicationMaster向RM请求更多的容器来运行所有的map和reduce任务,每个容器只能对应一个任务。这些请求是通过心跳来传输的, 包括每个map任务的数据位置, 比如Inputsplit的主机名和机架。调度器利用这些信息来调度任务, 尽量将任务分配给有存储数据的节点, 或者分配给和存放Inputsplit的节点相同机架的节点。

    4. 任务运行

      当一个任务由RM的调度器分配了一个容器后, ApplicationMaster与NM通信来启动容器。任务由一个为YarnChild的Java应用执行。在运行任务之前首先本地化任务需要的资源, 比如作业配置, JAR文件, 以及hdfs中保存的任务所需的所有文件。最后, map任务或者reduce运行在一个叫YarnChild的进程当中。

    5. 进度和状态更新

      每个NM会想applicationmaster汇报自己的工作状态,JobClient会每秒轮训检测applicationmaster,这样就能随时收到更新信息。

    6. 作业完成

      除了向applicationmaster请求作业进度外, JobClient每5分钟都会通过调用waitForCompletion()来检查作业是否完成。作业完成之后,applicationmaster和NM会清理工作状态, OutputCommiter的作业清理方法也会被调用. 作业的信息会被作业历史服务器存储以备之后用户核查.

    yarn对异常task的处理(推测执行)?

      推测执行是在分布式环境下,因为某种原因造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task,则这些task拖慢了整个job的执行进度,为了避免这种情况发生,Hadoop会为该task启动备份任务,让该speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果。推测执行优化机制采用了典型的以空间换时间的优化策略,它同时启动多个相同task(备份任务)处理相同的数据块,哪个完成的早,则采用哪个task的结果,这样可防止拖后腿Task任务出现,进而提高作业计算速度,但是,这样却会占用更多的资源。

    yarn调度器的策略?

      yarn默认是计算能力调度 FifoScheduler:根据先进先出排队,最简单的调度器。 CapacityScheduler(计算能力调度)、FairScheduler(公平调度):

      相同点:

        (1)都是多队列。

        (2)都有资源最大最小上线限制。

        (3)都是资源共享,每个队列剩余的资源可以给其他队列使用。

      不同点:

        (1)队列排序算法不同:计算能力调度资源使用量小的优先。公平调度根据公平排序算法排序。

        (2)应该用选择算法不同:计算能力调度是先进先出。公平调度先进先出或者公平排序算法。

        (3)资源抢占:公平调度如果当前队列有新应用提交后,会把共享出去的资源抢夺回来。

    优化3 将reduce端的聚合操作,放到map 进行执行。适合求和,计数,等一些等幂操作。

    原理:减少的了reduce 从map拉取数据的过程,提高计算效率。

    代码举例:(计数)

      1 package com.wyh.shujia006;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Reducer;
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     14 
     15 /**
     16 
     17  * 创建时间:2019年12月17日 下午3:14:11
     18 
     19  * 项目名称:shujia006
     20 
     21  * @author WYH
     22 
     23  * @version 1.0
     24 
     25  * @since JDK 1.8.0
     26 
     27  * 文件名称:WordCount.java
     28 
     29  * 类说明:
     30 
     31  */
     32 
     33 public class WordCount {
     34     //创建内部类 MyMap
     35     public static class MyMap extends Mapper<LongWritable, Text, Text, LongWritable>{
     36         @Override
     37         protected void map(LongWritable K1, Text V1,
     38                 Mapper<LongWritable, Text, Text, LongWritable>.Context context)
     39                 throws IOException, InterruptedException {
     40             String s1 = V1.toString();
     41             String[] words = s1.split(",");
     42             for(String word1 : words){
     43                 Text word = new Text(word1);
     44                 context.write(word, new LongWritable(1l));
     45             }
     46         }
     47     }
     48     
     49     //创建内部类MyReduce
     50     public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
     51         @Override
     52         protected void reduce(Text K2, Iterable<LongWritable> V2s,
     53                 Reducer<Text, LongWritable, Text, LongWritable>.Context context)
     54                 throws IOException, InterruptedException {
     55             Long sum = 0l;
     56             for(LongWritable V2 : V2s){
     57                 sum += V2.get();
     58             }
     59             context.write(K2, new LongWritable(sum));
     60         }
     61     }
     62     
     63     //主体函数
     64     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     65         //加载hadoop的配置参数
     66         Configuration conf = new Configuration();
     67         //创建任务的对象
     68         Job job = Job.getInstance(conf, WordCount.class.getSimpleName());
     69     //=========================================================================    
     70         //设置打包的类
     71         job.setJarByClass(WordCount.class);
     72     //=========================================================================    
     73         //设置读取文件的hdfs路径
     74         FileInputFormat.addInputPath(job, new Path(args[0]));
     75     //=========================================================================        
     76         //指定需要执行的map类
     77         job.setMapperClass(MyMap.class);
     78         //指定map输出的序列化类
     79         job.setMapOutputKeyClass(Text.class);
     80         job.setMapOutputValueClass(LongWritable.class);
     81     //=========================================================================        
     82         //指定需要执行的reduce类
     83         job.setReducerClass(MyReduce.class);
     84         //指定reduce的序列化类
     85         job.setOutputKeyClass(Text.class);
     86         job.setOutputValueClass(LongWritable.class);
     87         
     88         job.setCombinerClass(MyReduce.class);
     89     //=========================================================================        
     90         //指定输出的hdfs路径
     91         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     92     //=========================================================================        
     93         //提交任务,等待执行完成,并打印执行日志
     94         job.waitForCompletion(true);
     95         
     96         
     97     }
     98     
     99     
    100     
    101     
    102     
    103     
    104 
    105 }

    优化4 Join

    MapReduce中的join

      其实就是类似于关系型数据库中的连接查询一样。需要计算的数据可能存储在不同的文件中或不同表中,两个文件又有一些相同的字段可以相互关联,这时候我们就可以通过这些关联字段将两个文件中的数据组合到一起进行计算了。

      我知道的mr有三种join方式。Map join、SemiJoin、reduce join。

    Reduce Join

    思路:

      分为两个阶段

       (1)map函数主要是对不同文件中的数据打标签。

      (2)reduce函数获取key相同的value list,进行笛卡尔积。

    Map Join思路:

      比如有两个表,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中保存一个hash map,将小表数据放入这个hash map中,key是小表与大表的内个连接字段,value是小表一条记录,然后只扫描大表:对于大表中的每一条记录key/value,在hash map中查找是否有相同的key的记录,如果有,则连接输出即可。

    Semi Join 这个SemiJoin其实就是对reduce join的一种优化。

      就是在map端过滤掉不参加join操作的数据,则可以大大减少数据量,提高网络传输速度。

    这三种join方式适用于不同的场景:

      Reduce join要考虑数据量过大时的网络传输问题。

      Map join和SemiJoin则要考虑数据量过大时的内存问题。 如果只考虑网络传输,忽略内存问题则。

      Map join效率最高,其次是SemiJoin,最低的是reduce join。

    DistributedCache DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。一般用户数据字典的分发,和map join使用。一般缓存的文件都是只读。

    关联两份数据的代码展示:

      1 package com.wyh.shujia006;
      2 
      3 import java.io.IOException;
      4 import java.util.Vector;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.LongWritable;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 
     17 /**
     18 
     19  * 创建时间:2019年12月19日 下午4:42:42
     20 
     21  * 项目名称:shujia006
     22 
     23  * @author WYH
     24 
     25  * @version 1.0
     26 
     27  * @since JDK 1.8.0
     28 
     29  * 文件名称:dianxin_join.java
     30 
     31  * 类说明:
     32 
     33  */
     34 
     35 public class dianxin_join {
     36     public static class joinMap extends Mapper<LongWritable, Text, Text, Text>{
     37         @Override
     38         protected void map(LongWritable k1, Text v1,
     39                 Mapper<LongWritable, Text, Text, Text>.Context context)
     40                 throws IOException, InterruptedException {
     41                 FileSplit split = (FileSplit)context.getInputSplit();
     42                 Path path = split.getPath();
     43                 String line = path.toString();
     44                 if(line.contains("dianxin_data")){
     45                     String s1 = v1.toString();
     46                     String[] sp1 = s1.split("	");
     47                     /*Text t1 = new Text(sp1[2]);
     48                     String s2 = sp1[1];
     49                     String s3 = sp1[4];
     50                     String newString = "dianxin"+s2+s3;
     51                     context.write(t1, new Text(newString));*/
     52                     context.write(new Text(sp1[2]), new Text("xin"+sp1[1]+sp1[4]));
     53                 }else if(line.contains("city_id")){
     54                     String s1 = v1.toString();
     55                     String[] sp1 = s1.split(",");
     56                     /*Text t1 = new Text(sp1[0]);
     57                     String s2 = sp1[1];
     58                     String newString = "city_id"+sp1[1];
     59                     context.write(t1, new Text(newString));*/
     60                     context.write(new Text(sp1[0]), new Text("city_id"+sp1[1]));
     61                 }
     62         }    
     63     }
     64     
     65     public static class joinReduce extends Reducer<Text, Text, Text, Text>{
     66         @Override
     67         protected void reduce(Text k2, Iterable<Text> v2s,
     68                 Reducer<Text, Text, Text, Text>.Context context)
     69                 throws IOException, InterruptedException {
     70             //先定义两个集合来分别添加两个表里的内容
     71             Vector dianxin = new Vector();
     72             Vector city = new Vector();
     73             for(Text name : v2s){
     74                 String s2 = name.toString();
     75                 if(s2.startsWith("xin")){
     76                     s2 = s2.substring(3);
     77                     dianxin.add(s2);        
     78                 }else if(s2.startsWith("city_id")){
     79                     s2 = s2.substring(7);
     80                     city.add(s2);
     81                 }
     82             }
     83             
     84             for(Object dianxin1 : dianxin){
     85                 for(Object city1 : city){
     86                     context.write(k2, new Text(dianxin1+","+city1));
     87                 }
     88             }
     89         }    
     90     }
     91     
     92     public static void main(String[] args) throws Exception {
     93         Configuration conf = new Configuration();
     94         Job job = Job.getInstance(conf, dianxin_join.class.getSimpleName());
     95         job.setJarByClass(dianxin_join.class);
     96         FileInputFormat.addInputPath(job, new Path(args[0]));
     97         FileInputFormat.addInputPath(job, new Path(args[1]));
     98         
     99         job.setMapperClass(joinMap.class);
    100         job.setMapOutputKeyClass(Text.class);
    101         job.setMapOutputValueClass(Text.class);
    102         
    103         job.setReducerClass(joinReduce.class);
    104         job.setOutputKeyClass(Text.class);
    105         job.setOutputValueClass(Text.class);
    106         
    107         FileOutputFormat.setOutputPath(job, new Path(args[2]));
    108         job.waitForCompletion(true);
    109     }
    110 }

    1,通过修改map的切片大小控制map数据量(尽量和block大小保持一致) 并不是map越多越好,根据集群资源 set mapred.max.split.size=256000000

    2,合并小文件。因为一个文件会至少生成一个map

    3,避免数据倾斜(重点)

    4,combine操作

    5,mapjoin操作(重点,需掌握思想)

    6,适当备份,因为备份多可以本地化生成map任务

  • 相关阅读:
    [多线程学习笔记]条件变量
    [多线程学习笔记]互斥量
    [多线程学习笔记]线程生命周期
    多定时器队列
    双向环形链表
    多目录,多可执行文件的Makfile的编写
    大工匠
    从零开始打造我的计算机系统【运行效果】
    从零开始打造我的计算机系统【交叉汇编器】
    C中的回调函数
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12070025.html
Copyright © 2011-2022 走看看