zoukankan      html  css  js  c++  java
  • YARN的 AM与RM通信,申请资源分配过程

    AppMaster向RM请求资源

    MRAppMaster :serviceinit
      // service to allocate containers from RM (if non-uber) or to fake it (uber)
          containerAllocator = createContainerAllocator(null, context);
          addIfService(containerAllocator);
          dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
          
            protected ContainerAllocator createContainerAllocator(
          final ClientService clientService, final AppContext context) {
        return new ContainerAllocatorRouter(clientService, context);  //
      }
      
      
        private final class ContainerAllocatorRouter extends AbstractService
          implements ContainerAllocator, RMHeartbeatHandler {
        private final ClientService clientService;
        private final AppContext context;
        private ContainerAllocator containerAllocator;
    
       .....
        @Override
        protected void serviceStart() throws Exception {
          if (job.isUber()) {
            this.containerAllocator = new LocalContainerAllocator(
                this.clientService, this.context, nmHost, nmPort, nmHttpPort
                , containerID);
          } else {
            this.containerAllocator = new RMContainerAllocator(              ///
                this.clientService, this.context);
          }
          ((Service)this.containerAllocator).init(getConfig());
          ((Service)this.containerAllocator).start();
          super.serviceStart();
          
          
         org.apache.hadoop.mapreduce.v2.app.rm; RMContainerAllocator类有该方法
          
           protected synchronized void heartbeat() throws Exception {
        scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
        List<Container> allocatedContainers = getResources();  //发远程RM发送心跳信息,注意心跳里可能没有新的资源请求信息
        //只是告诉RM自己还活着,或者只是从RM取得分配资源
        if (allocatedContainers.size() > 0) {
          scheduledRequests.assign(allocatedContainers); //获得的container具体分配到任务task (应该是重排序)
        }
        
        资源请求包括的字段:
        优先级,期望在的host,内存大小等 (默认三路复制,可能会有7个资源请求,3个local,3个 rack,1个随机)
        }
        
        RMContainerAllocator父类RMCommunicator的方法
      protected void startAllocatorThread() {
        allocatorThread = new Thread(new Runnable() {
          @Override
          public void run() {
            while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              try {
                Thread.sleep(rmPollInterval);  //默认每秒
                try {
                  heartbeat();   //发送心跳
                  ...
        
        private List<Container> getResources() throws Exception {
        int headRoom = getAvailableResources() != null
            ? getAvailableResources().getMemory() : 0;//first time it would be null
        AllocateResponse response;
        /*
         * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
         * milliseconds before aborting. During this interval, AM will still try
         * to contact the RM.
         */
        try {
          response = makeRemoteRequest();  //关键
          
          makeRemoteRequest方法为其父类RMContainerRequestor定义的方法
          
            protected AllocateResponse makeRemoteRequest() throws IOException {
        ResourceBlacklistRequest blacklistRequest =
            ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
                new ArrayList<String>(blacklistRemovals));
        AllocateRequest allocateRequest =   //新建个资源请求
            AllocateRequest.newInstance(lastResponseID,
              super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),  //这个ask是集合类,存ResourceRequest实例,
              //只有个新建方法,在哪赋值的呢
              new ArrayList<ContainerId>(release), blacklistRequest);
        AllocateResponse allocateResponse;
        try {
          allocateResponse = scheduler.allocate(allocateRequest);   //关键,分配资源,此处的scheduler 并非是调度器
          //而是ApplicationMasterProtocol,他会终调用到调度器
          
          scheduler为其父类RMCommunicator新建
           protected ApplicationMasterProtocol scheduler;
           ...
             protected void serviceStart() throws Exception {
        scheduler= createSchedulerProxy();
        ..
        
        
      protected ApplicationMasterProtocol createSchedulerProxy() {
        final Configuration conf = getConfig();
    
        try {
          return ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);  //ApplicationMasterProtocol协议是关键
          //通过他远程调用ApplicationMasterService中的方法
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
      
    //后面追踪ask的的赋值最终是在哪里调用
    //ask的赋值方法,最后是由  addContainerReq方法,该方法在RMContainerAllocator调用
        private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
        // because objects inside the resource map can be deleted ask can end up 
        // containing an object that matches new resource object but with different
        // numContainers. So exisintg values must be replaced explicitly
        if(ask.contains(remoteRequest)) {
          ask.remove(remoteRequest);
        }
        ask.add(remoteRequest);    
      }
      
        protected void addContainerReq(ContainerRequest req) {
        // Create resource requests
        for (String host : req.hosts) {
          // Data-local
          if (!isNodeBlacklisted(host)) {
            addResourceRequest(req.priority, host, req.capability);
          }      
        }
    
        // Nothing Rack-local for now
        for (String rack : req.racks) {
          addResourceRequest(req.priority, rack, req.capability);
        }
    
        // Off-switch
        addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
      }
      
    
    RMContainerAllocator内
    void addMap(ContainerRequestEvent event) {  //addMap方法
          ContainerRequest request = null;
          
          if (event.getEarlierAttemptFailed()) {
            earlierFailedMaps.add(event.getAttemptID());
            request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
            LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
          } else {
            for (String host : event.getHosts()) {
              LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
              if (list == null) {
                list = new LinkedList<TaskAttemptId>();
                mapsHostMapping.put(host, list);
              }
              list.add(event.getAttemptID());
              if (LOG.isDebugEnabled()) {
                LOG.debug("Added attempt req to host " + host);
              }
           }
           for (String rack: event.getRacks()) {
             LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
             if (list == null) {
               list = new LinkedList<TaskAttemptId>();
               mapsRackMapping.put(rack, list);
             }
             list.add(event.getAttemptID());
             if (LOG.isDebugEnabled()) {
                LOG.debug("Added attempt req to rack " + rack);
             }
           }
           request = new ContainerRequest(event, PRIORITY_MAP);
          }
          maps.put(event.getAttemptID(), request);
          addContainerReq(request);           //调用
          
          
          //addMap在该方法内被调用
        protected synchronized void handleEvent(ContainerAllocatorEvent event) {
        recalculateReduceSchedule = true;
        ..................
            scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
            
            
            
            
              protected void serviceStart() throws Exception {
        this.eventHandlingThread = new Thread() {
          @SuppressWarnings("unchecked")
          @Override
          public void run() {
    
            ContainerAllocatorEvent event;
    
            while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              try {
                event = RMContainerAllocator.this.eventQueue.take();  //取出事件
              } catch (InterruptedException e) {
                if (!stopped.get()) {
                  LOG.error("Returning, interrupted : " + e);
                }
                return;
              }
    
              try {
                handleEvent(event);   //调用
                
               // 事件加入在MRAppMaster内,加入的事件在上面的方法被处理,该方法在哪里调用了呢?
                  public void handle(ContainerAllocatorEvent event) {
          this.containerAllocator.handle(event);
        }
       

    RM端接受AppMaster心跳请求

     //总结,applicationmaster最终通过ApplicationMasterProtocol#allocate向RM汇报资源需求,RM端的ApplicationMasterService提供服务,并最终调用调度器的allocate 
     //将新的资源需求写入内存结构,并返回已经分配的资源
      public class ApplicationMasterService extends AbstractService implements
        ApplicationMasterProtocol {
      public AllocateResponse allocate(AllocateRequest request)
          throws YarnException, IOException {
              ..
                 // Allow only one thread in AM to do heartbeat at a time.
        synchronized (lastResponse) {
    
          // Send the status update to the appAttempt.
          this.rmContext.getDispatcher().getEventHandler().handle(
              new RMAppAttemptStatusupdateEvent(appAttemptId, request
                  .getProgress()));
    
          List<ResourceRequest> ask = request.getAskList();  //ask,release为封装的请求
          List<ContainerId> release = request.getReleaseList(
          
          
             
          // Send new requests to appAttempt.
          Allocation allocation =
              this.rScheduler.allocate(appAttemptId, ask, release, 
                  blacklistAdditions, blacklistRemovals);  //调有RM端的调度器 rScheduler
                  ..
                    allocateResponse.setUpdatedNodes(updatedNodeReports);
          }
    
    //封装一个response返回
          allocateResponse.setAllocatedContainers(allocation.getContainers());
          allocateResponse.setCompletedContainersStatuses(appAttempt
              .pullJustFinishedContainers());
          allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
          allocateResponse.setAvailableResources(allocation.getResourceLimit());
          
          allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
       
          // add preemption to the allocateResponse message (if any)
          allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
    
          // Adding NMTokens for allocated containers.
          if (!allocation.getContainers().isEmpty()) {
            allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
                .createAndGetNMTokens(app.getUser(), appAttemptId,
    
    //FIFO Scheduler的allocate方法
    ...
        // Update application requests
            application.updateResourceRequests(ask);  //将此次资源请求写入application的请求内存结构,等待nm发送心跳分配完后,写入application的分配内存结构,
            //最终要更新到这样的一个内存结构 final Map<Priority, Map<String, ResourceRequest>> requests = 
       // new HashMap<Priority, Map<String, ResourceRequest>>();
    ...
          return new Allocation(
              application.pullNewlyAllocatedContainers(),    //application内部的集合类,从分配好的内存结构里取
              application.getHeadroom());
         
         
    //application为FiCaSchedulerApp类
      synchronized public List<Container> pullNewlyAllocatedContainers() {
        List<Container> returnContainerList = new ArrayList<Container>(
            newlyAllocatedContainers.size());
        for (RMContainer rmContainer : newlyAllocatedContainers) {   //只是从newlyAllocatedContainers里面取,newlyAllocatedContainers的赋值是NM发送心跳后调用assignContainer后赋值的
          rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
              RMContainerEventType.ACQUIRED));
          returnContainerList.add(rmContainer.getContainer());
        }
        newlyAllocatedContainers.clear();
        return returnContainerList;
      }
    
      synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
          Priority priority, ResourceRequest request, 
          Container container) {
    
       ....
        // Add it to allContainers list.
        newlyAllocatedContainers.add(rmContainer);  //给其赋值
        
        
        
        //FIFO scheduler类调用上面方法,该方法是NM发送心跳最终调用的方法
         private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, 
          Priority priority, int assignableContainers, 
          ResourceRequest request, NodeType type) {
      ....
            }
    
            // Create the container
            Container container =
                BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
                  .getHttpAddress(), capability, priority, containerToken);
            
            // Allocate!
            
            // Inform the application
            RMContainer rmContainer =
                application.allocate(type, node, priority, request, container);
    //总结以上看到的,也就是appmaster向RM发送请求,是从当前内存结构返回资源请求,这个过程是异步的,当nm发送心跳,会根据appmaster的资源请求分配资源
    //写到内存结构,等appmaster来取 (发送的资源请求,要先保存下来,资源请求的内存结构里,保存在application FiCaSchedulerApp里application.showRequests()
  • 相关阅读:
    SQL Server 2005 镜像构建说明(转载)
    表变量 vs. 临时表
    SQL Server日志
    MDX常见计算方法(百分比/分配/平均值/基于时间的计算)
    MDX中一些边界问题的处理
    MDX中的Where vs. Subselect/Subcube
    MDX优化Set操作
    SSIS处理导入数据时, 存在的更新, 不存在的插入(转载)
    MDX实现同比和环比
    iPhone4S出现应用无法打开时的解决方案
  • 原文地址:https://www.cnblogs.com/joqk/p/3966416.html
Copyright © 2011-2022 走看看