zoukankan      html  css  js  c++  java
  • hadoop Yarn 编程API

    客户端编程库:
    所在jar包: org.apache.hadoop.yarn.client.YarnClient
    
    使用方法:
    1 定义一个YarnClient实例:
       private YarnClient client;
    2 构造一个Yarn客户端句柄并初始化
    this.client = YarnClient.createYarnClient();
    client.ini(conf)
    3 启动Yarn
    yarnClient.start()
    4 获取一个新的application id
    YarnClientApplication app=yarnClient.createApplication(); 注解:application id 封装在YarnCLientApplication里面了。
    5 构造ApplicationSubmissionContext, 用以提交作业
    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
    ###################注解:一下会有很多set 属性的东东“程序名称,优先级,所在的队列啊,###################################
    ApplicationId appId = appContext.getApplicationId();
    appContext.setApplicationName(appName)
    ......
    yarnClient.submitApplication(appContext);//将应用程序提交到ResouceManager上 这里通过步骤 2 中的conf 读取yarn-site.xml

    
    


    ApplicationMaster编程酷

    设计思路:
    为用户暴露‘回调函数’用户只需要实现这些回调函数,当某种事情发生时,会调用对应的(用户实现的)回调函数
    回调机制:
    1 定义一个回调函数
    2 提供函数实现的一方在初始化的时候,将回调函数的函数指针注册给调用者
    3 当特殊条件或事件发生的时候,调用者使用函数指针调用回调函数对事件进行处理
    回调机制好处:
    可以把调用者和被调用者分开,调用者不关心谁是调用者。它只需知道存在一个具有特定原型和限制条件的被调用函数。




    
    
    /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    
    package org.apache.hadoop.yarn.client.api.async;
    
    import java.io.IOException;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import org.apache.hadoop.classification.InterfaceAudience.Private;
    import org.apache.hadoop.classification.InterfaceAudience.Public;
    import org.apache.hadoop.classification.InterfaceStability.Stable;
    import org.apache.hadoop.service.AbstractService;
    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
    import org.apache.hadoop.yarn.api.records.Container;
    import org.apache.hadoop.yarn.api.records.ContainerId;
    import org.apache.hadoop.yarn.api.records.ContainerStatus;
    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
    import org.apache.hadoop.yarn.api.records.NodeReport;
    import org.apache.hadoop.yarn.api.records.Priority;
    import org.apache.hadoop.yarn.api.records.Resource;
    import org.apache.hadoop.yarn.client.api.AMRMClient;
    import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
    import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
    import org.apache.hadoop.yarn.exceptions.YarnException;
    
    import com.google.common.annotations.VisibleForTesting;
    
    /**
     * <code>AMRMClientAsync</code> handles communication with the ResourceManager
     * and provides asynchronous updates on events such as container allocations and
     * completions.  It contains a thread that sends periodic heartbeats to the
     * ResourceManager.
     * 
     * It should be used by implementing a CallbackHandler:
     * <pre>
     * {@code
     * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
     *   public void onContainersAllocated(List<Container> containers) {
     *     [run tasks on the containers]
     *   }
     *   
     *   public void onContainersCompleted(List<ContainerStatus> statuses) {
     *     [update progress, check whether app is done]
     *   }
     *   
     *   public void onNodesUpdated(List<NodeReport> updated) {}
     *   
     *   public void onReboot() {}
     * }
     * }
     * </pre>
     * 
     * The client's lifecycle should be managed similarly to the following:
     * 
     * <pre>
     * {@code
     * AMRMClientAsync asyncClient = 
     *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
     * asyncClient.init(conf);
     * asyncClient.start();
     * RegisterApplicationMasterResponse response = asyncClient
     *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
     *       appMasterTrackingUrl);
     * asyncClient.addContainerRequest(containerRequest);
     * [... wait for application to complete]
     * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
     * asyncClient.stop();
     * }
     * </pre>
     */
    @Public
    @Stable
    public abstract class AMRMClientAsync<T extends ContainerRequest> 
    extends AbstractService {
      
      protected final AMRMClient<T> client;
      protected final CallbackHandler handler;
      protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
    
      public static <T extends ContainerRequest> AMRMClientAsync<T>
          createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
        return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
      }
      
      public static <T extends ContainerRequest> AMRMClientAsync<T>
          createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
              CallbackHandler callbackHandler) {
        return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
      }
      
      protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
        this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
      }
      
      @Private
      @VisibleForTesting
      protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
          CallbackHandler callbackHandler) {
        super(AMRMClientAsync.class.getName());
        this.client = client;
        this.heartbeatIntervalMs.set(intervalMs);
        this.handler = callbackHandler;
      }
        
      public void setHeartbeatInterval(int interval) {
        heartbeatIntervalMs.set(interval);
      }
      
      public abstract List<? extends Collection<T>> getMatchingRequests(
                                                       Priority priority, 
                                                       String resourceName, 
                                                       Resource capability);
      
      /**
       * Registers this application master with the resource manager. On successful
       * registration, starts the heartbeating thread.
       * @throws YarnException
       * @throws IOException
       */
      public abstract RegisterApplicationMasterResponse registerApplicationMaster(
          String appHostName, int appHostPort, String appTrackingUrl)
          throws YarnException, IOException;
    
      /**
       * Unregister the application master. This must be called in the end.
       * @param appStatus Success/Failure status of the master
       * @param appMessage Diagnostics message on failure
       * @param appTrackingUrl New URL to get master info
       * @throws YarnException
       * @throws IOException
       */
      public abstract void unregisterApplicationMaster(
          FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
      throws YarnException, IOException;
    
      /**
       * Request containers for resources before calling <code>allocate</code>
       * @param req Resource request
       */
      public abstract void addContainerRequest(T req);
    
      /**
       * Remove previous container request. The previous container request may have 
       * already been sent to the ResourceManager. So even after the remove request 
       * the app must be prepared to receive an allocation for the previous request 
       * even after the remove request
       * @param req Resource request
       */
      public abstract void removeContainerRequest(T req);
    
      /**
       * Release containers assigned by the Resource Manager. If the app cannot use
       * the container or wants to give up the container then it can release them.
       * The app needs to make new requests for the released resource capability if
       * it still needs it. eg. it released non-local resources
       * @param containerId
       */
      public abstract void releaseAssignedContainer(ContainerId containerId);
    
      /**
       * Get the currently available resources in the cluster.
       * A valid value is available after a call to allocate has been made
       * @return Currently available resources
       */
      public abstract Resource getAvailableResources();
    
      /**
       * Get the current number of nodes in the cluster.
       * A valid values is available after a call to allocate has been made
       * @return Current number of nodes in the cluster
       */
      public abstract int getClusterNodeCount();
    
      public interface CallbackHandler {   注视:这里有一系列的回调函数
        
        /**
         * Called when the ResourceManager responds to a heartbeat with completed
         * containers. If the response contains both completed containers and
         * allocated containers, this will be called before containersAllocated.
         */
        public void onContainersCompleted(List<ContainerStatus> statuses);
        
        /**
         * Called when the ResourceManager responds to a heartbeat with allocated
         * containers. If the response containers both completed containers and
         * allocated containers, this will be called after containersCompleted.
         */
        public void onContainersAllocated(List<Container> containers);
        
        /**
         * Called when the ResourceManager wants the ApplicationMaster to shutdown
         * for being out of sync etc. The ApplicationMaster should not unregister
         * with the RM unless the ApplicationMaster wants to be the last attempt.
         */
        public void onShutdownRequest();
        
        /**
         * Called when nodes tracked by the ResourceManager have changed in health,
         * availability etc.
         */
        public void onNodesUpdated(List<NodeReport> updatedNodes);
        
        public float getProgress();
        
        /**
         * Called when error comes from RM communications as well as from errors in
         * the callback itself from the app. Calling
         * stop() is the recommended action.
         *
         * @param e
         */
        public void onError(Throwable e);
      }
    }

    用户实现一个MyCallbackHandler,实现AMRMClientAsync.CallbackHandler接口:

    class MyCallbackHandler implements AMRMClientAsync.CallbackHandler{

    .....................

    }

    ApplicationMaster编程---AM 与 RM交互
    引入jar包, org.apche.hadoop.yarn.client.api.async.AMRMClientAsync;
    流程:
    1 构造一个MyCallbackHandler对象
    AMRMClientAsync.CallbackHandler allocListener = new MyCallbackHandler();
    2 构造一个AMRMClientAsync句柄
    asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, allcoListener);
    3 初始化并启动AMRMClientAsync
    asyncClient.init(conf);//通过传入一个YarnConfiguration对象并进行初始化
    asyncClient.start(); //启动asyncClient


    4 ApplicationMaster向ResourceManager注册
    RegisterApplicationMasterResponse reponse = asyncClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
    5 添加Container请求
    asyncClient.addContainerRequest(containerRequest)
    6 等待应用程序运行结束
    asyncClient.unregisterApplicationMaster(status, appMsg, null); [反注册]
    asyncCLient.stop()



    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.yarn.client.api.async;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    import java.util.concurrent.ConcurrentMap;
    
    import org.apache.hadoop.classification.InterfaceAudience.Private;
    import org.apache.hadoop.classification.InterfaceAudience.Public;
    import org.apache.hadoop.classification.InterfaceStability.Stable;
    import org.apache.hadoop.service.AbstractService;
    import org.apache.hadoop.yarn.api.records.Container;
    import org.apache.hadoop.yarn.api.records.ContainerId;
    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    import org.apache.hadoop.yarn.api.records.ContainerStatus;
    import org.apache.hadoop.yarn.api.records.NodeId;
    import org.apache.hadoop.yarn.api.records.Token;
    import org.apache.hadoop.yarn.client.api.NMClient;
    import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
    import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
    import org.apache.hadoop.yarn.conf.YarnConfiguration;
    
    import com.google.common.annotations.VisibleForTesting;
    
    /**
     * <code>NMClientAsync</code> handles communication with all the NodeManagers
     * and provides asynchronous updates on getting responses from them. It
     * maintains a thread pool to communicate with individual NMs where a number of
     * worker threads process requests to NMs by using {@link NMClientImpl}. The max
     * size of the thread pool is configurable through
     * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
     *
     * It should be used in conjunction with a CallbackHandler. For example
     *
     * <pre>
     * {@code
     * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
     *   public void onContainerStarted(ContainerId containerId,
     *       Map<String, ByteBuffer> allServiceResponse) {
     *     [post process after the container is started, process the response]
     *   }
     *
     *   public void onContainerStatusReceived(ContainerId containerId,
     *       ContainerStatus containerStatus) {
     *     [make use of the status of the container]
     *   }
     *
     *   public void onContainerStopped(ContainerId containerId) {
     *     [post process after the container is stopped]
     *   }
     *
     *   public void onStartContainerError(
     *       ContainerId containerId, Throwable t) {
     *     [handle the raised exception]
     *   }
     *
     *   public void onGetContainerStatusError(
     *       ContainerId containerId, Throwable t) {
     *     [handle the raised exception]
     *   }
     *
     *   public void onStopContainerError(
     *       ContainerId containerId, Throwable t) {
     *     [handle the raised exception]
     *   }
     * }
     * }
     * </pre>
     *
     * The client's life-cycle should be managed like the following:
     *
     * <pre>
     * {@code
     * NMClientAsync asyncClient = 
     *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
     * asyncClient.init(conf);
     * asyncClient.start();
     * asyncClient.startContainer(container, containerLaunchContext);
     * [... wait for container being started]
     * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
     *     container.getContainerToken());
     * [... handle the status in the callback instance]
     * asyncClient.stopContainer(container.getId(), container.getNodeId(),
     *     container.getContainerToken());
     * [... wait for container being stopped]
     * asyncClient.stop();
     * }
     * </pre>
     */
    @Public
    @Stable
    public abstract class NMClientAsync extends AbstractService {
    
      protected NMClient client;
      protected CallbackHandler callbackHandler;
    
      public static NMClientAsync createNMClientAsync(
          CallbackHandler callbackHandler) {
        return new NMClientAsyncImpl(callbackHandler);
      }
      
      protected NMClientAsync(CallbackHandler callbackHandler) {
        this (NMClientAsync.class.getName(), callbackHandler);
      }
    
      protected NMClientAsync(String name, CallbackHandler callbackHandler) {
        this (name, new NMClientImpl(), callbackHandler);
      }
    
      @Private
      @VisibleForTesting
      protected NMClientAsync(String name, NMClient client,
          CallbackHandler callbackHandler) {
        super(name);
        this.setClient(client);
        this.setCallbackHandler(callbackHandler);
      }
    
      public abstract void startContainerAsync(
          Container container, ContainerLaunchContext containerLaunchContext);
    
      public abstract void stopContainerAsync(
          ContainerId containerId, NodeId nodeId);
    
      public abstract void getContainerStatusAsync(
          ContainerId containerId, NodeId nodeId);
      
      public NMClient getClient() {
        return client;
      }
    
      public void setClient(NMClient client) {
        this.client = client;
      }
    
      public CallbackHandler getCallbackHandler() {
        return callbackHandler;
      }
    
      public void setCallbackHandler(CallbackHandler callbackHandler) {
        this.callbackHandler = callbackHandler;
      }
    
      /**
       * <p>
       * The callback interface needs to be implemented by {@link NMClientAsync}
       * users. The APIs are called when responses from <code>NodeManager</code> are
       * available.
       * </p>
       *
       * <p>
       * Once a callback happens, the users can chose to act on it in blocking or
       * non-blocking manner. If the action on callback is done in a blocking
       * manner, some of the threads performing requests on NodeManagers may get
       * blocked depending on how many threads in the pool are busy.
       * </p>
       *
       * <p>
       * The implementation of the callback function should not throw the
       * unexpected exception. Otherwise, {@link NMClientAsync} will just
       * catch, log and then ignore it.
       * </p>
       */
      public static interface CallbackHandler {
        /**
         * The API is called when <code>NodeManager</code> responds to indicate its
         * acceptance of the starting container request
         * @param containerId the Id of the container
         * @param allServiceResponse a Map between the auxiliary service names and
         *                           their outputs
         */
        void onContainerStarted(ContainerId containerId,
            Map<String, ByteBuffer> allServiceResponse);
    
        /**
         * The API is called when <code>NodeManager</code> responds with the status
         * of the container
         * @param containerId the Id of the container
         * @param containerStatus the status of the container
         */
        void onContainerStatusReceived(ContainerId containerId,
            ContainerStatus containerStatus);
    
        /**
         * The API is called when <code>NodeManager</code> responds to indicate the
         * container is stopped.
         * @param containerId the Id of the container
         */
        void onContainerStopped(ContainerId containerId);
    
        /**
         * The API is called when an exception is raised in the process of
         * starting a container
         *
         * @param containerId the Id of the container
         * @param t the raised exception
         */
        void onStartContainerError(ContainerId containerId, Throwable t);
    
        /**
         * The API is called when an exception is raised in the process of
         * querying the status of a container
         *
         * @param containerId the Id of the container
         * @param t the raised exception
         */
        void onGetContainerStatusError(ContainerId containerId, Throwable t);
    
        /**
         * The API is called when an exception is raised in the process of
         * stopping a container
         *
         * @param containerId the Id of the container
         * @param t the raised exception
         */
        void onStopContainerError(ContainerId containerId, Throwable t);
    
      }
    
    }


    ApplicationMaster编程酷,AM与NM交互酷
    用户实现一个MyCallbackHandler,实现NMClientAsync.CallbackHandler接口

    class MyCallbackHandler implements NMClientAsync.CallbackHandler{
    ............
    }


    引入jar包:
    org.apache.hadoop.yarn.client.api.async.NMClientAsync;
    org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl

    流程:
    1 构造一个NMClientAsync句柄
    NMClientAsync asyncClient = new NMClientAsyncImpl(new MyCallbackhandler())
    2 初始化并启动 NMClientAsync
    asyncClient.init(conf);//初始化ansyClient
    asyncClient.start(); //启动asyncClient
    3 构造Container
    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
    ...//设置ctx变量
    4 启动Container
    asyncClient.startContainerAsync(container, ctx);
    5 获取container状态
    asyncClient.getContainerStatusAsync(container.getId(), container.getNodeId(), container.getContainerToken());
    6 停止Container
    asyncClient.stopContainerAsync(container.getId(), container.getNodeId(), container.getContainerToken());
    asyncClient.stop()





    
    
    

























  • 相关阅读:
    [每日一题2020.06.23]leetcode #16 双指针
    typora+picgo+jsdeliver+github打造免费高效的博客图床
    [javaSE笔记5]String
    [javaSE笔记4] ArrayList
    [javaSE笔记3] JAVA的继承---多态 抽象
    [每日一题2020.06.22]leetcode #11 双指针
    操作系统---设备管理
    [每日一题2020.06.21]leetcode #124 DFS二叉树
    操作系统---磁盘
    PC实用工具推荐
  • 原文地址:https://www.cnblogs.com/i80386/p/3641093.html
Copyright © 2011-2022 走看看