zoukankan      html  css  js  c++  java
  • Yarn 日志聚合优化—摆脱 HDFS 依赖

    背景

    (1)问题背景

    线上集群 Container 日志上报的事务集群 namenode rpc 持续飙高,影响到了 Yarn 分配 Container 的性能,任务提交数下降,导致整个集群的吞吐量下降。

    (2)原因简介

    作业提交到 Yarn 集群时,每个 NM 节点都会对每个 app 作业进行日志聚合操作,该操作包括初始化日志聚合服务、检测和创建日志聚合的 HDFS 目录、创建日志聚合线程执行本地日志的上传,其中

    • 初始化日志聚合服务就是简单的对象创建,不和 HDFS 交互,基本无压力;
    • 检测和创建日志聚合的 HDFS 目录会执行 HDFS 读和写请求,并且是同步阻塞执行,依赖写 /tmp/logs/ 目录所在集群的 HDFS 服务。
    • 创建日志聚合线程上传本地日志,代码中该线程是通过线程池异步创建,不存在阻塞,但固定大小的线程池可能会出现线程创建阻塞。

    (3)解决方案

    根据以上分析,针对日志聚合依赖读写 HDFS 数据反向影响作业的提交问题,主要有两种解决方案:

    • 作业提交不依赖日志聚合对 HDFS 服务的读/写。(本文主要解决这一问题
    • 日志聚合写 HDFS 进行分流,写到多个 HDFS 集群。

    一、聚合日志介绍

    日志聚集是 Yarn 提供的日志中央化管理功能,它能将运行完成的 Container 任务日志上传到 HDFS 上,从而减轻 NodeManager 负载,且提供一个中央化存储和分析机制。默认情况下,Container 任务日志存在在各个 NodeManager 的本地磁盘上,保存在 yarn.nodemanager.log-dirs 参数配置的目录下,保存的时间由 yarn.nodemanager.log.retain-seconds 参数决定(默认时3小时)。若启用日志聚集功能,会将作业完成的日志上传到 HDFS 的 ${yarn.nodemanager.remote-app-log-dir}/${user}/${yarn.nodemanager.remote-app-log-dir-suffix} 下,要实现日志聚合功能,需要额外的配置。

    这里的日志存储的就是具体 Mapreduce 和 Spark 任务的日志,包括框架的和应用程序里自己打印的。这日志聚合是用来看日志的,而 job history server 则是用来看某个application 的大致统计信息的,包括作业启停时间,map 任务数,reduce 任务数以及各种计数器的值等等。job history server 是抽象概要性的统计信息,而聚合日志是该application 所有任务节点的详细日志集合。

    二、聚合日志生命周期

    Yarn 作业在运行过程中,聚合日志的生命周期如下:

    1. 作业运行过程中,日志将暂存于 yarn.nodemanager.log-dirs 配置项指定的本地路径下,默认为 /var/log/hadoop-yarn/container

    2. 作业运行结束后(无论正常结束与否),将持久化日志到 yarn.nodemanager.remote-app-log-diryarn.nodemanager.remote-app-log-dir-suffix 配置项指定的 HDFS 路径下,前者默认为 /tmp/logs,后者默认为 logs。对应 HDFS 的实际路径为 ${yarn.nodemanager.remote-app-log-dir}/${user}/${yarn.nodemanager.remote-app-log-dir-suffix}/${application_id},即 /tmp/logs/<user>/logs/。控制日志聚合操作的服务为 LogAggregationService,具体上传日志到 HDFS 的行为由 LogAggregationService 服务创建的 AppLogAggregator 线程执行。

    3. 日志持久化聚合到 HDFS 后,会删除本地的暂存日志。

    4. 聚合上传到 HDFS 的日志也是有保留周期的,保存周期由 yarn.log-aggregation.retain-seconds 参数控制,集群可配置。

    三、聚合日志参数

    参数:yarn.nodemanager.log-dirs
    参数解释:日志存放地址(可配置多个目录)。
    默认值:${yarn.log.dir}/userlogs
    
    参数:yarn.log-aggregation-enable
    参数解释:是否启用日志聚集功能。
    默认值:false
    
    参数:yarn.log-aggregation.retain-seconds
    参数解释:在HDFS上聚集的日志最多保存多长时间。
    默认值:-1
    
    参数:yarn.log-aggregation.retain-check-interval-seconds
    参数解释:多长时间检查一次日志,并将满足条件的删除,如果是0或者负数,则为上一个值的1/10。
    默认值:-1
    
    参数:yarn.nodemanager.remote-app-log-dir
    参数解释:当应用程序运行结束后,日志被转移到的HDFS目录(启用日志聚集功能时有效)。
    默认值:/tmp/logs
    
    参数:yarn.nodemanager.remote-app-log-dir-suffix
    参数解释:远程日志目录子目录名称(启用日志聚集功能时有效)。
    默认值:logs 日志将被转移到目录${yarn.nodemanager.remote-app-log-dir}/${user}/${thisParam}下
    
    参数:yarn.nodemanager.log.retain-seconds
    参数解释:NodeManager上日志最多存放时间(不启用日志聚集功能时有效)。
    默认值:10800(3小时)
    

    四、现有技术方案

    YARN 在字节跳动的优化与实践

    将 HDFS 做成弱依赖
    对于一般的离线批处理来说,如果 HDFS 服务不可用了,那么 YARN 也没必要继续运行了。但是在字节跳动内部由于 YARN 还同时承载流式作业和模型训练,因此不能容忍 HDFS 故障影响到 YARN。为此,我们通过将 NodeLabel 存储到 ZK 中,将 Container Log 在 HDFS 的目录初始化和上传都改为异步的方式,摆脱了对 HDFS 的强依赖。

    YARN 在快手的应用实践与技术演进之路

    HDFS是yarn非常底层的基础设施,ResourceManager事件处理逻辑中有一些HDFS操作,HDFS卡一下,会造成整个事件处理逻辑卡住,最终整个集群卡住。分析发现RM对HDFS的操作主要集中在失败APP的处理,不是非常核心的逻辑,解决方案也比较简单粗暴,把HDFS的操作从同步改成异步。我们还对整个yarn事件处理逻辑进行排查,发现有一些像DNS的操作,在某些情况下也会比较卡,我们就把这种比较重IO的操作进行相应的优化,确保事件处理逻辑中都是快速的CPU操作,保证事件处理的高效和稳定。

    基于 Hadoop 的 58 同城离线计算平台设计与实践

    虽然有 Fedoration 机制来均衡各个 NN 的压力,但是对于单个 NN 压力仍然非常大,各种问题时刻在挑战 HDFS 稳定性,比如:NN RPC 爆炸,我们线上最大的 NS 有 15 亿的 RPC 调用,4000+ 并发连接请求,如此高的连接请求对业务稳定影响很大。针对这个问题,我们使用"拆解+优化"的两种手段相结合的方式来改进。拆解就是说我们把一些大的访问,能不能拆解到不同的集群上,或者我们能不能做些控制,具体案例如下:
    1.Hive Scratch:我们经过分析 Hive Scratch 的临时目录在 RPC 调用占比中达到 20%,对于 Hive Scratch 实际上每个业务不需要集中到一个 NS 上,我们把它均衡到多个 NS 上。
    2.Yarn 日志聚合:Yarn 的日志聚合主要是给业务查看一些日志,实际上他没有必要那个聚合到 HDFS 上,只需要访问本地就可以了。ResourceLocalize:同样把它均衡到各个 NS 上。

    HDFS Federation在美团点评的应用与改进

    计算引擎(包括MapReduce和Spark)在提交作业时,会向NameNode发送RPC,获取HDFS Token。在ViewFileSystem中,会向所有namespace串行的申请Token,如果某个namespace的NameNode负载很高,或者发生故障,则任务无法提交,YARN的ResourceManager在renew Token时,也会受此影响。随着美团点评的发展YARN作业并发量也在逐渐提高,保存在HDFS上的YARN log由于QPS过高,被拆分为独立的namespace。但由于其并发和YARN container并发相同,NameNode读写压力还是非常大,经常导致其RPC队列打满,请求超时,进而影响了作业的提交。针对此问题,我们做出了一下改进:
    1.container日志由NodeManager通过impersonate写入HDFS,这样客户端在提交Job时,就不需要YARN log所在namespace的Token。
    2.ViewFileSystem在获取Token时,增加了参数,用于指定不获取哪些namespace的Token。
    3.由于作业并不总是需要所有namespace中的数据,因此当单个namespace故障时,不应当影响其他namespace数据的读写,否则会降低整个集群的分区容忍性和可用性,ViewFileSystem在获取Token时,即使失败,也不影响作业提交,而是在真正访问数据时作业失败,这样在不需要的Token获取失败时,不影响作业的运行。
    另外,客户端获取到的Token会以namespace为key,保存在一个自定义数据结构中(Credentials)。ResourceManager renew时,遍历这个数据结构。而NodeManager在拉取JAR包时,根据本地配置中的namespace名去该数据结构中获取对应Token。因此需要注意的是,虽然namespace配置和服务端不同不影响普通HDFS读写,但提交作业所使用的namespace配置需要与NodeManager相同,至少会用到的namespace配置需要是一致的。

    本文主要针对字节跳动的思路对日志聚合逻辑进行优化,将日志聚合读写 HDFS 集群改为弱依赖。

    五、Yarn日志聚合源码分析

    要弄清楚聚合日志如何工作的,就需要了解 Yarn 中处理聚合日志的服务在哪里创建的,根据 ApplicationMaster启动及资源申请源码分析 文章分析,我们知道 Yarn 的第一个 Container 启动是用于 AppAttmpt 角色,也就是我们通常在 Yarn UI 界面看到的 ApplicationMaster 服务。所以我们来看看一个作业的第一个 Container 是如何启动以及如何创建日志记录组件 LogHandler 的。

    ApplicationMaster 通过调用 RPC 函数 ContainerManagementProtocol#startContainers() 开始启动 Container,即 startContainerInternal() 方法,这部分逻辑做了两件事:

    • 发送 ApplicationEventType.INIT_APPLICATION 事件,对应用程序资源的初始化,主要是初始化各类必需的服务组件(如日志记录组件 LogHandler、资源状态追踪组件 LocalResourcesTrackerImpl等),供后续 Container 启动,通常来自 ApplicationMaster 的第一个 Container 完成,这里的 if 逻辑针对一个 NM 节点上运行作业的所有 Containers 只调用一次,后续的 Container 跳过这段 Application 初始化过程。
    • 发送 ApplicationEventType.INIT_CONTAINER 事件,对 Container 进行初始化操作。(这部分事件留在 Container 启动环节介绍)
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
      private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
          ContainerTokenIdentifier containerTokenIdentifier,
          StartContainerRequest request) throws YarnException, IOException {
     
        // 省略Token认证及ContainerLaunchContext上下文初始化
     
        this.readLock.lock();
        try {
          if (!serviceStopped) {
            // Create the application
            Application application =
                new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
             
            // 应用程序的初始化,供后续Container使用,这个逻辑只调用一次,通常由来自ApplicationMaster的第一个Container完成
            if (null == context.getApplications().putIfAbsent(applicationID,
              application)) {
              LOG.info("Creating a new application reference for app " + applicationID);
              LogAggregationContext logAggregationContext =
                  containerTokenIdentifier.getLogAggregationContext();
              Map<ApplicationAccessType, String> appAcls =
                  container.getLaunchContext().getApplicationACLs();
              context.getNMStateStore().storeApplication(applicationID,
                  buildAppProto(applicationID, user, credentials, appAcls,
                    logAggregationContext));
     
     
              // 1.向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件
              dispatcher.getEventHandler().handle(
                new ApplicationInitEvent(applicationID, appAcls,
                  logAggregationContext));
            }
     
            // 2.向 ApplicationImpl 发送 ApplicationEventType.INIT_CONTAINER 事件
            this.context.getNMStateStore().storeContainer(containerId, request);
            dispatcher.getEventHandler().handle(
              new ApplicationContainerInitEvent(container));
     
            this.context.getContainerTokenSecretManager().startContainerSuccessful(
              containerTokenIdentifier);
            NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
              "ContainerManageImpl", applicationID, containerId);
            // TODO launchedContainer misplaced -> doesn't necessarily mean a container
            // launch. A finished Application will not launch containers.
            metrics.launchedContainer();
            metrics.allocateContainer(containerTokenIdentifier.getResource());
          } else {
            throw new YarnException(
                "Container start failed as the NodeManager is " +
                "in the process of shutting down");
          }
        } finally {
          this.readLock.unlock();
        }
      }
    

    这里主要看看第1件事情,即向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件,事件对应的状态机为 AppInitTransition 状态机。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    // Transitions from NEW state
               .addTransition(ApplicationState.NEW, ApplicationState.INITING,
                   ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
    

    AppInitTransition 状态机会对日志聚合组件服务进行初始化,关键行动是向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
      /**
       * Notify services of new application.
       * 
       * In particular, this initializes the {@link LogAggregationService}
       */
      @SuppressWarnings("unchecked")
      static class AppInitTransition implements
          SingleArcTransition<ApplicationImpl, ApplicationEvent> {
        @Override
        public void transition(ApplicationImpl app, ApplicationEvent event) {
          ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
          app.applicationACLs = initEvent.getApplicationACLs();
          app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
    
          // 初始化日志聚合组件服务
          // Inform the logAggregator
          app.logAggregationContext = initEvent.getLogAggregationContext();
          // 向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件
          app.dispatcher.getEventHandler().handle(
              new LogHandlerAppStartedEvent(app.appId, app.user,
                  app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
                  app.applicationACLs, app.logAggregationContext)); 
        }
      }
    

    想要弄清楚 LogHandlerEventType.APPLICATION_STARTED 事件做了什么,就要知道 LogHandlerEventType 类注册的事件处理器是什么以及事件处理器做了什么事情。这里的 register 方法对 LogHandlerEventType 类进行了注册,对应的 logHandler 事件处理器为 LogAggregationService 服务。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
      @Override
      public void serviceInit(Configuration conf) throws Exception {
        // 定义日志处理器
        LogHandler logHandler =
          createLogHandler(conf, this.context, this.deletionService);
        addIfService(logHandler);
        // 注册 LogHandlerEventType 事件,logHandler 为对应的处理器
        dispatcher.register(LogHandlerEventType.class, logHandler);
        
        waitForContainersOnShutdownMillis =
            conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
                YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
            conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
                YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
            SHUTDOWN_CLEANUP_SLOP_MS;
    
        super.serviceInit(conf);
        recover();
      }
    

    具体创建 logHandler 对象的调用,由于集群开启了日志聚合功能(由参数 yarn.log-aggregation-enable 控制),这里返回 LogAggregationService 服务。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
        protected LogHandler createLogHandler(Configuration conf, Context context,
          DeletionService deletionService) {
        if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
          // 判断是否启用了日志聚合,由于集群开启了日志聚合,这里初始化 LogAggregationService 服务
          return new LogAggregationService(this.dispatcher, context,
              deletionService, dirsHandler);
        } else {
          return new NonAggregatingLogHandler(this.dispatcher, deletionService,
                                              dirsHandler,
                                              context.getNMStateStore());
        }
      }
    

    弄清楚了 LogHandlerEventType 类注册的服务是 LogAggregationService,我们就进入 LogAggregationService 类的 handle() 方法,看看上面的 LogHandlerEventType.APPLICATION_STARTED 事件做了什么事。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      @Override
      public void handle(LogHandlerEvent event) {
        switch (event.getType()) {
          // APPLICATION_STARTED 事件处理流程
          case APPLICATION_STARTED:
            LogHandlerAppStartedEvent appStartEvent =
                (LogHandlerAppStartedEvent) event;
            initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
                appStartEvent.getCredentials(),
                appStartEvent.getLogRetentionPolicy(),
                appStartEvent.getApplicationAcls(),
                appStartEvent.getLogAggregationContext());
            break;
          case CONTAINER_FINISHED:
            // 省略
          case APPLICATION_FINISHED:
            //省略
          default:
            ; // Ignore
        }
      }
    

    LogHandlerEventType.APPLICATION_STARTED 事件的关键逻辑在 initApp() 方法的调用。这段逻辑主要做了三件事:

    1. 判断 HDFS 上日志聚合的根目录是否存在,即 /tmp/logs/ 目录(具体为 hdfs://nameservice/tmp/logs),由参数 yarn.nodemanager.remote-app-log-dir 控制。(注意:这里的请求会阻塞读 HDFS)
    2. 创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程。(重点,这里会有请求阻塞写 HDFS,并且通过有限大小的线程池异步创建日志聚合线程去做日志的聚合)
    3. 根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成。
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      private void initApp(final ApplicationId appId, String user,
          Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
          Map<ApplicationAccessType, String> appAcls,
          LogAggregationContext logAggregationContext) {
        ApplicationEvent eventResponse;
        try {
          // 1、 判断 HDFS 上日志聚合的根目录是否存在,即 `/tmp/logs/` 目录(具体为 `hdfs://nameservice/tmp/logs`),由参数 `yarn.nodemanager.remote-app-log-dir` 控制
          verifyAndCreateRemoteLogDir(getConfig());
          // 重点:2、创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程
          initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
              logAggregationContext);
          // 构建 ApplicationEvent 事件
          eventResponse = new ApplicationEvent(appId,
              ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
        } catch (YarnRuntimeException e) {
          LOG.warn("Application failed to init aggregation", e);
          eventResponse = new ApplicationEvent(appId,
              ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
        }
        // 3、根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成
        this.dispatcher.getEventHandler().handle(eventResponse);
      }
    

    第1件事比较简单,主要是是判断 HDFS 聚合日志的根目录是否存在,由于目录一般都存在,这一块主要是读 HDFS 请求。我们主要来看看 initApp() 方法做的第2件事,可以看到第3件事是发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 表示日志聚合服务初始化完成,包括创建作业在 HDFS 的日志聚合目录和启动日志聚合线程。所以基本可以知道第2件事的 initAppAggregator() 是会创建作业日志聚合目录,并启动日志聚合线程,具体的我们来看代码。

    这段代码其实主要做了两件事:

    1. 调用 createAppDir() 方法执行 HDFS 写请求为作业创建日志聚合的目录,即 hdfs://nameservice/tmp/logs/<user>/logs/ 目录,这里的写逻辑如果成功则只调用一次,一般是由第一个 Container 创建(即作业的 ApplicationMaster Container),其他 Container 只执行 HDFS 读请求判断该目录是否存在即可。
    2. 通过 threadPool 线程池创建每个作业在 NM 节点的日志聚合线程,异步处理本地日志的上传,该线程池大小由参数 yarn.nodemanager.logaggregation.threadpool-size-max 控制,默认大小为 100.
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      protected void initAppAggregator(final ApplicationId appId, String user,
          Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
          Map<ApplicationAccessType, String> appAcls,
          LogAggregationContext logAggregationContext) {
    
        // Get user's FileSystem credentials
        final UserGroupInformation userUgi =
            UserGroupInformation.createRemoteUser(user);
        if (credentials != null) {
          userUgi.addCredentials(credentials);
        }
    
        // New application
        final AppLogAggregator appLogAggregator =
            new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
                getConfig(), appId, userUgi, this.nodeId, dirsHandler,
                getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
                appAcls, logAggregationContext, this.context,
                getLocalFileContext(getConfig()));
        if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
          throw new YarnRuntimeException("Duplicate initApp for " + appId);
        }
        // wait until check for existing aggregator to create dirs
        YarnRuntimeException appDirException = null;
        try {
          // 创建作业日志聚合目录,即 hdfs://nameservice/tmp/logs/<user>/logs/ 目录
          // Create the app dir
          createAppDir(user, appId, userUgi);
        } catch (Exception e) {
          appLogAggregator.disableLogAggregation();
          if (!(e instanceof YarnRuntimeException)) {
            appDirException = new YarnRuntimeException(e);
          } else {
            appDirException = (YarnRuntimeException)e;
          }
          appLogAggregators.remove(appId);
          closeFileSystems(userUgi);
          throw appDirException;
        }
    
        // 创建作业的日志聚合线程,并通过线程池启动日志聚合线程,异步上传 NM 节点的日志
        // Schedule the aggregator.
        Runnable aggregatorWrapper = new Runnable() {
          public void run() {
            try {
              appLogAggregator.run();
            } finally {
              appLogAggregators.remove(appId);
              closeFileSystems(userUgi);
            }
          }
        };
        this.threadPool.execute(aggregatorWrapper);
      }
    

    至此,从日志聚合服务组件的创建,到为作业初始化 HDFS 聚合日志目录,到启动日志聚合线程,整个日志聚合的调用逻辑已介绍完毕,日志的具体上传逻辑在 AppLogAggregatorImpl 类的 run() 方法开始执行,具体上传这里不做详细介绍,感兴趣可以可以去看看上传行为是如何做的。

    六、代码修改方案

    在背景介绍中,提到了日志聚合操作存在风险的点主要在读/写 HDFS 请求所在的集群 namenode rpc 压力,和固定大小的线程池创建线程的阻塞,代码的修改逻辑也是结合这两个问题诞生的。

    • 针对读/写 HDFS 请求的 rpc 压力,代码将日志聚合逻辑中与 HDFS 交互的方式全部改为异步处理,不依赖日志聚合读写数据的 HDFS 集群。
    • 针对固定大小线程池创建线程可能出现的阻塞情况,代码将这一块修改为生产者-消费者模式,聚合日志线程的产生与线程的处理解耦。

    6.1 读/写 HDFS 请求异步

    日志聚合服务中与 HDFS 交互有两个地方,一个是读操作,判断 HDFS 上 /tmp/logs/ 目录是否存在,一个是写操作,创建作业的聚合日志目录 /tmp/logs/<user>/logs/<appid>/,这写操作每个作业只执行一次,后续都是读操作,判断该目录是否存在即可。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      private void asyncCreateAppDir(final String user, final ApplicationId appId, final UserGroupInformation userUgi) {
          new Thread() {
            @Override
            public void run() {
              synchronized(this) {
                try {
                  // check dir '/tmp/logs/' exists
                  verifyAndCreateRemoteLogDir(getConfig());
                  // create app log dir
                  createAppDir(user, appId, userUgi);
                } catch (Exception e) {
                  e.printStackTrace();
                }
              }
            }
          }.start();
      }
    

    将日志聚合读写 HDFS 请求改为异步后,可能会产生另外一个问题。由于作业日志聚合目录的创建是异步的,而执行日志上传操作也是异步进行的,这里存在着先后顺序,即必须作业的日志聚合目录已经创建完成,上传操作才能正常进行。因此,在具体执行上传操作时,我们对日志聚合目录是否存在添加一层校验,以确保上传前聚合目录必须存在。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
      private void doAppLogAggregation() {
        // 判断作业是否完成,直到作业完成后才跳出 while 逻辑
        while (!this.appFinishing.get() && !this.aborted.get()) {
          synchronized(this) {
            try {
              waiting.set(true);
              if (this.rollingMonitorInterval > 0) {
                wait(this.rollingMonitorInterval * 1000);
                if (this.appFinishing.get() || this.aborted.get()) {
                  break;
                }
                uploadLogsForContainers(false);
              } else {
                wait(THREAD_SLEEP_TIME);
              }
            } catch (InterruptedException e) {
              LOG.warn("PendingContainers queue is interrupted");
              this.appFinishing.set(true);
            }
          }
        }
    
        if (this.aborted.get()) {
          return;
        }
    
        // 改造点:增加日志聚合目录是否存在的校验,如果不存在则创建改目录(具体改造见下面)
        //check remote app dir
        checkRemoteDir();
    
        // 关键:执行真正的日志上传动作
        // App is finished, upload the container logs.
        uploadLogsForContainers(true);
    
    
        // 删除作业在 NM 本地目录保存的日志,由 DeletionService 服务负责。
        doAppLogAggregationPostCleanUp();
    
        this.dispatcher.getEventHandler().handle(
            new ApplicationEvent(this.appId,
                ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
        this.appAggregationFinished.set(true);
      }
    
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
      // 改造的具体代码如下
      private  void checkRemoteDir() {
      try {
        userUgi.doAs(new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
            if (!remoteFS.exists(remoteNodeLogFileForApp.getParent())) {
              try {
                FsPermission dirPerm = new FsPermission(APP_DIR_PERMISSIONS);
                remoteFS.mkdirs(remoteNodeLogFileForApp.getParent(), dirPerm);
                FsPermission umask = FsPermission.getUMask(remoteFS.getConf());
                if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
                  remoteFS.setPermission(remoteNodeLogFileForApp.getParent(), dirPerm);
                }
              } catch (IOException e) {
                LOG.error("Failed to setup application log directory for " + appId, e);
                throw e;
              }
            }
            return null;
          }});
        } catch (Exception e) {
          throw new YarnRuntimeException(e);
        }
      }
    

    6.2 聚合日志线程的创建和处理解耦

    这一块主要是通过生产者-消费者模式,将日志聚合线程的创建和处理解耦,生产的线程由阻塞队列 logAggregatorQueue 维护,具体的线程消费逻辑由独立线程 LauncherLogAggregatorThread 处理,具体代码如下。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      protected void initAppAggregator(final ApplicationId appId, String user,
          Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
          Map<ApplicationAccessType, String> appAcls,
          LogAggregationContext logAggregationContext) {
          //省略
    
          // 服务停止时阻塞新请求的接收
          if (blockNewLogAggr) {
          	return;
          }
    
          processed = false;
    
        // create the aggregator thread.
        Runnable aggregatorWrapper = new Runnable() {
          public void run() {
            try {
              appLogAggregator.run();
            } finally {
              appLogAggregators.remove(appId);
              closeFileSystems(userUgi);
            }
          }
        };
        // 改造点:将线程池直接创建线程改为生产-消费模式,这里负责生产日志聚合线程,添加到阻塞队列中
    	  logAggregatorQueue.add(aggregatorWrapper);
      }
    
      // logAggregatorQueue 对象的定义
      private BlockingQueue<Runnable> logAggregatorQueue = new LinkedBlockingQueue<Runnable>();
    
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
        // 相关变量定义
        // 消费线程停止标志 
        private volatile boolean stopped = false;
        // 是否阻塞新聚合日志的接收,默认不阻塞 false
        private volatile boolean blockNewLogAggr = false;
        // 消费队列中对象是否处理完成
        private volatile boolean processed = true;
        // 用于线程等待/通知的 syncronized 对象
        private Object waitForProcess = new Object();
     
      // 改造点:具体的线程消费由单独的线程类控制,实现线程创建和处理的解耦
      private class LauncherLogAggregatorThread implements Runnable {
     
        @Override
        public void run() {
          while (!stopped && !Thread.currentThread().isInterrupted()) {
            processed = logAggregatorQueue.isEmpty();
              if (blockNewLogAggr) {
                  synchronized (waitForProcess) {
                      if (processed) {
                          waitForProcess.notify();
                      }
                  }
              }
            Runnable toLaunch;
            try {
              // Schedule the aggregator.
              toLaunch = logAggregatorQueue.take();
              threadPool.execute(toLaunch);
            } catch (InterruptedException e) {
              LOG.warn(this.getClass().getName() + " interrupted. Returning.");
            }
          }
        }
      }
    

    服务停止时等待消费队列聚合事件处理完成,然后关闭消费线程和线程池。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
     
      @Override
      protected void serviceStop() throws Exception {
        LOG.info(this.getName() + " waiting for pending aggregation during exit");
        blockNewLogAggr = true;
        synchronized (waitForProcess) {
          while (!processed && launcherLogAggregatorThread.isAlive()) {
            waitForProcess.wait(1000);
            LOG.info("Waiting for launcherLogAggregatorThread to process. Thread state is :" + launcherLogAggregatorThread.getState());
          }
        }
     
        this.stopped = true;
        if (launcherLogAggregatorThread != null) {
          launcherLogAggregatorThread.interrupt();
          try {
            launcherLogAggregatorThread.join();
          } catch (InterruptedException ie) {
            LOG.warn(launcherLogAggregatorThread.getName() + " interrupted during join ", ie);
          }
        }
        stopAggregators();
        super.serviceStop();
      }
    

    七、测试分析

    测试集群分为 hadoop-up1 集群和 hadoop-up2 集群,采用 viewfs 模式访问 HDFS,作业提交在 hadoop-up1 集群,日志聚合目录 /tmp/logs/ 挂载在 hadoop-up2 集群下,即 hdfs://hadoop-up2/tmp/logs/ 目录。

    7.1 NM日志聚合改造前(原始版本)

    作业提交命令:

    hadoop jar /opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/hadoop-mapreduce-examples-2.6.0-cdh5.14.4.jar pi -Dmapred.job.queue.name=root.exquery 50 50

    (1)开启 hadoop-up2 集群 HDFS 服务

    结论:

    作业正常提交,日志正常聚合。

    (2)关闭 hadoop-up2 集群 HDFS 服务

    结论:
    作业提交卡住,需等待请求 HDFS 服务超时,作业处于 Accepted 状态卡住,作业的 ApplicationMaster 处于 NEW 状态,该 Container 没有被分配(整个过程卡住大概 3min),直到抛异常触发日志聚合失败(即 ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)事件,作业的 ApplicationMaster 分配到 Container,作业开始运行,并且日志不聚合。

    现象1:

    作业提交卡住,Yarn UI 作业的状态为 Accepted 状态,Elapsed 时间大概持续了 3min,说明作业在这段时间一直等待运行,并且用于启动 ApplicationMaster 的 Container 状态为 NEW,没有转换到 Submited 状态,表示 Container 没有运行,Yarn 认为该作业还未提交。这也是线上集群在日志聚合集群 rpc 压力大时会影响作业的提交数和 Container 分配性能下降的原因。

    现象2:

    由于 hadoop-up2 集群 HDFS 服务关闭,分析 NodeManager 执行日志,先打印 Application failed to init aggregation 信息,然后打印LogAggregationService.verifyAndCreateRemoteLogDir() 方法执行 HDFS 读请求的调用异常,读请求多次重试后抛出 YarnRuntimeException 异常,堆栈信息的调用栈和执行代码都和这一现象吻合。

    // NodeManager 日志:
    2021-03-10 09:56:43,444 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService: Application failed to init aggregation
    org.apache.hadoop.yarn.exceptions.YarnRuntimeException: Failed to check permissions for dir [/tmp/logs]
            at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.verifyAndCreateRemoteLogDir(LogAggregationService.java:205)
            at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initApp(LogAggregationService.java:336)
            at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.handle(LogAggregationService.java:463)
            at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.handle(LogAggregationService.java:68)
            at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:182)
            at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:109)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.net.ConnectException: Call From 10-197-1-236/10.197.1.236 to 10-197-1-238:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
            // 省略
            at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1261)
            at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:432)
            at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.getFileStatus(ChRootedFileSystem.java:226)
            at org.apache.hadoop.fs.viewfs.ViewFileSystem.getFileStatus(ViewFileSystem.java:379)
            at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.verifyAndCreateRemoteLogDir(LogAggregationService.java:194)
    2021-03-10 09:56:43,445 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application: Log Aggregation service failed to initialize, there will be no logs for this application
    
    // verifyAndCreateRemoteLogDir() 方法调用代码块
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      private void initApp(final ApplicationId appId, String user,
          Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
          Map<ApplicationAccessType, String> appAcls,
          LogAggregationContext logAggregationContext) {
        ApplicationEvent eventResponse;
        try {
    	    verifyAndCreateRemoteLogDir(getConfig());
          initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
              logAggregationContext);
          eventResponse = new ApplicationEvent(appId,
              ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
        } catch (YarnRuntimeException e) {
          LOG.warn("Application failed to init aggregation", e);
          eventResponse = new ApplicationEvent(appId,
              ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
        }
        this.dispatcher.getEventHandler().handle(eventResponse);
      }
    

    7.2 NM日志聚合改造后

    作业提交命令:

    hadoop jar /opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/hadoop-mapreduce-examples-2.6.0-cdh5.14.4.jar pi -Dmapred.job.queue.name=root.exquery 50 50

    (1)开启 hadoop-up2 集群 HDFS 服务

    结论:

    作业正常提交,日志正常聚合。

    (2)关闭 hadoop-up2 集群 HDFS 服务

    结论:
    作业正常提交,日志聚合失败,不影响作业提交和运行。

    现象:

    • 作业正常执行提交和执行。
    • 日志聚合读请求 HDFS 异常(和 7.1 中 NM 日志一致),但不影响作业执行。
    • 日志聚合失败,JobHistoryServer 无法查看聚合的日志。

    参考资料

    1. YARN的Log Aggregation原理
  • 相关阅读:
    BZOJ2208 [Jsoi2010]连通数[缩点/Floyd传递闭包+bitset优化]
    loj515 「LibreOJ β Round #2」贪心只能过样例[bitset+bool背包]
    BZOJ3331 [BeiJing2013]压力[圆方树+树上差分]
    BZOJ4010 [HNOI2015]菜肴制作[拓扑排序+贪心]
    BZOJ2140 稳定婚姻[强连通分量]
    hdu4612 Warm up[边双连通分量缩点+树的直径]
    BZOJ2730 [HNOI2012]矿场搭建[点双连通分量]
    BZOJ3887 [Usaco2015 Jan]Grass Cownoisseur[缩点]
    BZOJ1016 [JSOI2008]最小生成树计数[最小生成树+搜索]
    hdu4786 Fibonacci Tree[最小生成树]【结论题】
  • 原文地址:https://www.cnblogs.com/lemonu/p/14513315.html
Copyright © 2011-2022 走看看