zoukankan      html  css  js  c++  java
  • MapReduce源码分析之新API作业提交(二):连接集群

    MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  private synchronized void connect()  
    2.          throws IOException, InterruptedException, ClassNotFoundException {  
    3.      
    4. // 如果cluster为null,构造Cluster实例cluster,  
    5. // Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法  
    6. if (cluster == null) {  
    7.      cluster =   
    8.        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {  
    9.                   public Cluster run()  
    10.                          throws IOException, InterruptedException,   
    11.                                 ClassNotFoundException {  
    12.                     return new Cluster(getConfiguration());  
    13.                   }  
    14.                 });  
    15.    }  
    16.  }  

            这个方法用synchronized关键字标识,处理逻辑为:如果cluster为null,构造Cluster实例cluster。

            Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法,我们看下它的成员变量,如下所示:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 客户端通信协议提供者  
    2. private ClientProtocolProvider clientProtocolProvider;  
    3. // 客户端通信协议实例  
    4. private ClientProtocol client;  
    5.   
    6. // 用户信息  
    7. private UserGroupInformation ugi;  
    8.   
    9. // 配置信息  
    10. private Configuration conf;  
    11.   
    12. // 文件系统实例  
    13. private FileSystem fs = null;  
    14.   
    15. // 系统路径  
    16. private Path sysDir = null;  
    17.   
    18. // 阶段区域路径  
    19. private Path stagingAreaDir = null;  
    20.   
    21. // 作业历史路径  
    22. private Path jobHistoryDir = null;  
    23.   
    24. // 日志  
    25. private static final Log LOG = LogFactory.getLog(Cluster.class);  
    26.   
    27. // 客户端通信协议提供者加载器  
    28. private static ServiceLoader<ClientProtocolProvider> frameworkLoader =  
    29.     ServiceLoader.load(ClientProtocolProvider.class);  

            Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider,客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。

            Cluster提供了两个构造函数,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.   public Cluster(Configuration conf) throws IOException {  
    2.     this(null, conf);  
    3.   }  
    4.   
    5.   public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)   
    6.       throws IOException {  
    7. <span style="white-space:pre">    </span>// 设置配置信息  
    8.     this.conf = conf;  
    9.       
    10.     // 获取当前用户  
    11.     this.ugi = UserGroupInformation.getCurrentUser();  
    12.       
    13.     // 调用initialize()方法完成初始化  
    14.     initialize(jobTrackAddr, conf);  
    15.   }  

            最终会调用initialize()方法完成初始化,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 确定客户端ClientProtocol实例client  
    2. private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)  
    3.     throws IOException {  
    4.   
    5.   synchronized (frameworkLoader) {  
    6.       
    7.     // 取出每个ClientProtocolProvider实例provider,通过其create()方法,  
    8.     // 构造ClientProtocol实例clientProtocol,  
    9.     // 并将两者赋值给对类应成员变量,退出循环  
    10.     for (ClientProtocolProvider provider : frameworkLoader) {  
    11.         
    12.     LOG.debug("Trying ClientProtocolProvider : "  
    13.           + provider.getClass().getName());  
    14.         
    15.     ClientProtocol clientProtocol = null;   
    16.         
    17.     try {  
    18.           
    19.         // 通过ClientProtocolProvider的create()方法,获取客户端与集群通讯ClientProtocol实例clientProtocol  
    20.         if (jobTrackAddr == null) {  
    21.           clientProtocol = provider.create(conf);  
    22.         } else {  
    23.           clientProtocol = provider.create(jobTrackAddr, conf);  
    24.         }  
    25.   
    26.         // 设置类成员变量clientProtocolProvider、client,并退出循环  
    27.         if (clientProtocol != null) {  
    28.           clientProtocolProvider = provider;  
    29.           client = clientProtocol;  
    30.             
    31.           // 记录debug级别日志信息  
    32.           LOG.debug("Picked " + provider.getClass().getName()  
    33.               + " as the ClientProtocolProvider");  
    34.           break;  
    35.         }  
    36.         else {  
    37.             
    38.         // 记录debug级别日志信息  
    39.           LOG.debug("Cannot pick " + provider.getClass().getName()  
    40.               + " as the ClientProtocolProvider - returned null protocol");  
    41.         }  
    42.       }   
    43.       catch (Exception e) {  
    44.         LOG.info("Failed to use " + provider.getClass().getName()  
    45.             + " due to error: " + e.getMessage());  
    46.       }  
    47.     }  
    48.   }  
    49.   
    50.   // 如果clientProtocolProvider、client任一为空,直接抛出IO异常  
    51.   if (null == clientProtocolProvider || null == client) {  
    52.     throw new IOException(  
    53.         "Cannot initialize Cluster. Please check your configuration for "  
    54.             + MRConfig.FRAMEWORK_NAME  
    55.             + " and the correspond server addresses.");  
    56.   }  
    57. }  

            initialize()方法唯一的一个任务就是确定客户端通信协议提供者clientProtocolProvider,并通过其create()方法构造客户端通信协议ClientProtocol实例client。

            MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。

            我们先看下Yarn模式,看下YarnClientProtocolProvider的create()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  @Override  
    2.  public ClientProtocol create(Configuration conf) throws IOException {  
    3.     
    4. // 如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null  
    5.    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {  
    6.      return new YARNRunner(conf);  
    7.    }  
    8.    return null;  
    9.  }  

            Yarn模式下,如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null,关于YARNRunner,我们待会再讲,我们接着再看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  @Override  
    2.  public ClientProtocol create(Configuration conf) throws IOException {  
    3.      
    4. // 初始化framework:取参数mapreduce.framework.name,参数未配置默认为local  
    5. String framework =  
    6.        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);  
    7.   
    8. // 如果framework是local,,则返回LocalJobRunner实例,并设置map任务数量为1,否则返回null  
    9.    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {  
    10.      return null;  
    11.    }  
    12.    conf.setInt(JobContext.NUM_MAPS, 1);  
    13.   
    14.    return new LocalJobRunner(conf);  
    15.  }  

            Local模式也是需要看参数mapreduce.framework.name的配置是否为local,是的话,返回LocalJobRunner实例,并设置map任务数量为1,否则返回null,值得一提的是,这里参数mapreduce.framework.name未配置的话,默认为local,也就是说,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。

            到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner,记住这点,对透彻了解MapReduce作业提交的整体流程非常重要。

            好了,我们继续以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 记录工厂RecordFactory实例  
    2. private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);  
    3.   
    4. // ResourceManager代理ResourceMgrDelegate实例  
    5. private ResourceMgrDelegate resMgrDelegate;  
    6.   
    7. // 客户端缓存ClientCache实例  
    8. private ClientCache clientCache;  
    9.   
    10. // 配置信息Configuration实例  
    11. private Configuration conf;  
    12.   
    13. // 文件上下文FileContext实例  
    14. private final FileContext defaultFileContext;  

            其中,最重要的一个变量就是ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。关于ResourceMgrDelegate的详细介绍,请阅读《MapReduce源码分析ResourceMgrDelegate》一文,这里不再做详细介绍。

            另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,

            接下来,我们看下YARNRunner的构造函数,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  /** 
    2.   * Yarn runner incapsulates the client interface of 
    3.   * yarn 
    4.   * @param conf the configuration object for the client 
    5.   */  
    6.  public YARNRunner(Configuration conf) {  
    7.     
    8.   // 先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数  
    9.   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));  
    10.  }  
    11.   
    12.  /** 
    13.   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting 
    14.   * {@link ResourceMgrDelegate}. Enables mocking and testing. 
    15.   * @param conf the configuration object for the client 
    16.   * @param resMgrDelegate the resourcemanager client handle. 
    17.   */  
    18.  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {  
    19.   // 先构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数  
    20.   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));  
    21.  }  
    22.   
    23.  /** 
    24.   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} 
    25.   * but allowing injecting {@link ClientCache}. Enable mocking and testing. 
    26.   * @param conf the configuration object 
    27.   * @param resMgrDelegate the resource manager delegate 
    28.   * @param clientCache the client cache object. 
    29.   */  
    30.  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,  
    31.      ClientCache clientCache) {  
    32.      
    33. // 成员变量赋值    
    34. this.conf = conf;  
    35.    try {  
    36.      this.resMgrDelegate = resMgrDelegate;  
    37.      this.clientCache = clientCache;  
    38.        
    39.      // 获取文件山下文FileContext实例defaultFileContext  
    40.      this.defaultFileContext = FileContext.getFileContext(this.conf);  
    41.    } catch (UnsupportedFileSystemException ufe) {  
    42.      throw new RuntimeException("Error in instantiating YarnClient", ufe);  
    43.    }  
    44.  }  

            YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。

            总结

            MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.6.0中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。

  • 相关阅读:
    Windows下IntelliJ IDEA中调试Spark Standalone
    Java中final修饰符深入研究
    Java对象创建过程补遗
    SpringMVC项目中获取所有URL到Controller Method的映射
    简述Java中Http/Https请求监听方法
    WPF浏览器应用程序与JS的互调用(不用WebBrowser)
    通讯协议序列化解读(一) Protobuf详解教程
    Protobuf3语法详解
    ReflectASM-invoke,高效率java反射机制原理
    FFMPEG指令
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556361.html
Copyright © 2011-2022 走看看