zoukankan      html  css  js  c++  java
  • Hadoop关键任务Job资源隔离方案

    前言

    在目前的Hadoop集群中,对于所有的用户Job来说,态度都是一致的,也就是说,"来者不拒",但是如果集群的平均Job运行数量上去的,就免不了会出现资源的滥用现象了,之前介绍过几篇相应的文章,不过主题都是偏向于监控问题的,并不是解决方案.比如说自定义Hive Sql Job分析工具,还有这篇文章Hadoop异常Task发现分析, 重新回到主题,一般如果一个稍微到了一定规模的程度时,应该会出现所谓的"关键任务",而且这些任务有一些共同点:

    1.一般会在第二天凌晨跑,而且从0点开始,一般在早上8,9点结束,方便第二天上班时查阅结果.

    2.处理的前一天的数据,而且量比一般的Job大许多.

    3.处理的数据一般是敏感的数据,比如涉及到金融分析,pv,uv,gmv等类似这样关键的数据.

    而且这样的任务必须能在第二天早上的时候完成掉,因为许多运营的同事会看这些数据进行第二天的工作.于是这样的任务被称为"关键任务".解决这种类似的问题,解决的办法就一个,资源隔离,而在目前Yarn的解决办法中,一般可以想到的是独立分队列,分资源使用量,但是这有一点不好,就是队列分出去了,就会持续占有理论上的最大资源,如果你打开了资源抢夺功能,又会造成不同队列间的竞争,而Job与Job直接的资源竞争势必会影响到Job的执行效率.于是仔细想想,我们是不是可以在规定的时段内只让某些关键的Job运行,直接拒绝掉其他用户提交的Job,答案是可以的.


    方案设想

    上述的方案设想是很完美的,比如我的关键任务一般是在0点到9点钟跑的,而且必须在9点前出结果的,所以这段时间内,我将拒绝掉,什么张三啊,李四啊这些普通用户提交的Job.资源只给关键用户用,我就可以彻彻底底无须考虑资源抢占的因素了.如何去限制呢,如果你此时考虑如何在复杂的Yarn的层面上去考虑的话,不出3天,5天绝对不会想到完整的解决办法的,不是我贬低大家的能力,因为YARN自身内部的逻辑真的没那么简单.所以我反其道而行,在job-clien端做限制,在job的提交操作中进行限制.如果出现不满足的job出现,直接拒绝提交,Job连进都别想进入到系统中.实现大体思路清晰后,我们要想针对上述的这个需求,我们要有哪些限制条件,1个是用户,还有1个就是时间,


    方案实现

    首先要能找到job-client端的代码,在hadoop-mapreduce--client-core的Job类中.要更改代码的方法就是平常我们写MR Job时候经常会调用的方法Job.waitForCompletion().首先在更改之前,要先定义几个新的配置属性,因为这是我们新加的功能,限制用户和时间当然是要做出可配的吗,总不能写死在代码中吧.

    @InterfaceAudience.Private
    public interface MRConfig {
      ...
    
      public static final String MAPREDUCE_LIMIT_EXECUTED_ENABLED =
          "mapreduce.limit-executed.enabled";
      public static final String DEFAULT_MAPREDUCE_LIMIT_EXECUTED_ENABLED =
          "false";
    
      public static final String MAPREDUCE_LIMIT_EXECUTED_USERS =
          "mapreduce.limit-executed.users";
      public static final String MAPREDUCE_LIMIT_EXECUTED_HOURS =
          "mapreduce.limit-executed.hours";
    }
    正如上面名称上显示的那样,1个是是否启用配置,1个是限制执行用户配置,1个是限制执行时间配置,这些配置属性将会以","逗号的形式隔开.然后重新回到job类中.首先在变量中新加1个标记属性,标识此Job是否能被执行:

    private boolean canExecuted;
    然后定位到job的waitForCompletion()方法中:

    /**
       * Submit the job to the cluster and wait for it to finish.
       * @param verbose print the progress to the user
       * @return true if the job succeeded
       * @throws IOException thrown if the communication with the 
       *         <code>JobTracker</code> is lost
       */
      public boolean waitForCompletion(boolean verbose
                                       ) throws IOException, InterruptedException,
                                                ClassNotFoundException {
        if (state == JobState.DEFINE) {
          submit();
        }
        //增加是否可执行判断
        if (!canExecuted) {
          this.status = new JobStatus();
          this.status.setState(State.FAILED);
          return false;
        }
    
        if (verbose) {
          monitorAndPrintJob();
        } else {
          // get the completion poll interval from the client.
          int completionPollIntervalMillis = 
            Job.getCompletionPollInterval(cluster.getConf());
          while (!isComplete()) {
            try {
              Thread.sleep(completionPollIntervalMillis);
            } catch (InterruptedException ie) {
            }
          }
        }
        return isSuccessful();
      }
    如果Job被判断不可执行,直接返回failed的执行状态.而具体的是否可执行是在submit()方法中进行的操作.

    /**
       * Submit the job to the cluster and return immediately.
       * @throws IOException
       */
      public void submit() 
             throws IOException, InterruptedException, ClassNotFoundException {
        //在此处进行Job是否可执行的判断
        canExecuted = jobCanBeExecuted();
        if (!canExecuted) {
          //如果不可执行,直接返回结果
          return;
        }
    
        ensureState(JobState.DEFINE);
        setUseNewAPI();
        connect();
        final JobSubmitter submitter = 
            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
          public JobStatus run() throws IOException, InterruptedException, 
          ClassNotFoundException {
            return submitter.submitJobInternal(Job.this, cluster);
          }
        });
        state = JobState.RUNNING;
        LOG.info("The url to track the job: " + getTrackingURL());
       }
    于是又跳到了关键的jobCanBeExecuted()方法.

      private boolean jobCanBeExecuted() {
        boolean isLimitExecutedEnabled;
        boolean isAcceptedUser;
        boolean isAcceptedHour;
        String usersConfValue;
        String hoursConfValue;
        String curHour;
        String[] acceptedUsers;
        String[] acceptedHours;
    
        isLimitExecutedEnabled =
            Boolean.parseBoolean(conf.get(
                MRConfig.MAPREDUCE_LIMIT_EXECUTED_ENABLED,
                MRConfig.DEFAULT_MAPREDUCE_LIMIT_EXECUTED_ENABLED));
        usersConfValue = conf.get(MRConfig.MAPREDUCE_LIMIT_EXECUTED_USERS);
        hoursConfValue = conf.get(MRConfig.MAPREDUCE_LIMIT_EXECUTED_HOURS);
    
        if (!isLimitExecutedEnabled) {
          //如果没有启用此功能,则默认都是可接受的用户和时间
          isAcceptedUser = true;
          isAcceptedHour = true;
        } else if (usersConfValue != null) {
          //如果出现用户属性不为空,则马上设置用户为不可接受
          isAcceptedUser = false;
    
          acceptedUsers = usersConfValue.split(",");
          for (String s : acceptedUsers) {
            if (s.equals(conf.get(JobContext.USER_NAME))) {
              //将当前用户与可接受用户进行对比
              isAcceptedUser = true;
              break;
            }
          }
    
          //时间小时段的比较同理
          if (hoursConfValue != null) {
            isAcceptedHour = false;
    
            acceptedHours = hoursConfValue.split(",");
            curHour = getCurrentHoure();
            for (String s : acceptedHours) {
              if (s.equals(curHour)) {
                isAcceptedHour = true;
                break;
              }
            }
          } else {
            isAcceptedHour = true;
          }
        } else {
          isAcceptedUser = true;
          isAcceptedHour = true;
        }
    
        //最后返回2者的并结果,只有2个都true才能是job被执行
        return (isAcceptedUser && isAcceptedHour);
      }
    其中的逻辑有不明白的地方可以详细的看注释,在这里就不解释了.最后还有1个地方要改,

      /**
       * Returns the current state of the Job.
       * 
       * @return JobStatus#State
       * @throws IOException
       * @throws InterruptedException
       */
      public JobStatus.State getJobState() 
          throws IOException, InterruptedException {
        if (canExecuted) {
          ensureState(JobState.RUNNING);
          updateStatus();
        }
    
        return status.getState();
      }
    要加上canExecuted的判断,否则会抛异常,因为普通的Job必须要保证之前的状态是JobState.RUNNING.


    程序测试

    因为时间的关系,我就没有在测试的集群中跑这个新的功能,就写了1个测试案例,总共分为4个

    1.不开启限制执行功能,普通用户能够顺利通过测试,Job执行状态为成功.

    2.开启限制执行功能,设置执行用户,Job的所属用户还是普通用户,Job运行失败.

    3.开启限制执行功能,设置执行用户,设置执行时间-1(表明Job在执行时间的选择上将必定被拒绝),Job的所属用户是可接受用户,Job运行失败.

    4.开启限制执行功能,设置执行用户,设置执行时间0-23(表明Job在执行时间的选择上弊端成功),Job的所属用户是可接受用户,Job运行成功.

    测试的testcase:

    @Test(timeout = 300000)
      public void testSleepJobWithLimitExecuted() throws Exception {
        boolean exitCode;
        String acceptedUser;
        String normalUser;
        Job job;
        Configuration sleepConf;
    
        if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
          LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
              + " not found. Not running test.");
          return;
        }
    
        acceptedUser = "acceptedUser";
        normalUser = "normalUser";
        sleepConf = new Configuration(mrCluster.getConfig());
        // set master address to local to test that local mode applied iff framework
        // == local
        sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf(sleepConf);
    
        // don't enable limit-executed function, the normal user can be allowed to
        // execute job.
        sleepJob = new SleepJob();
        sleepJob.setConf(sleepConf);
        // job with 3 maps (1s) and numReduces reduces (5s), 1 "record" each:
        job = sleepJob.createJob(3, numSleepReducers, 1000, 1, 5000, 1);
        job.setUser(normalUser);
        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
        job.setJarByClass(SleepJob.class);
        job.setMaxMapAttempts(1);
        job.submit();
        exitCode = job.waitForCompletion(true);
        Assert.assertTrue(exitCode);
        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
    
        // add the limit-executed users and the normal user of the job will be
        // failed.
        sleepConf.set(MRConfig.DEFAULT_MAPREDUCE_LIMIT_EXECUTED_ENABLED, "true");
        sleepConf.set(MRConfig.MAPREDUCE_LIMIT_EXECUTED_USERS, acceptedUser);
        sleepJob = new SleepJob();
        sleepJob.setConf(sleepConf);
        // job with 3 maps (1s) and numReduces reduces (5s), 1 "record" each:
        job = sleepJob.createJob(3, numSleepReducers, 1000, 1, 5000, 1);
        job.setUser(normalUser);
        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
        job.setJarByClass(SleepJob.class);
        job.setMaxMapAttempts(1);
        job.submit();
        exitCode = job.waitForCompletion(true);
        Assert.assertFalse(exitCode);
        Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
    
        // change the job user to accptedUser, the job will be succeed executed;
        sleepConf.set(MRConfig.DEFAULT_MAPREDUCE_LIMIT_EXECUTED_ENABLED, "true");
        sleepConf.set(MRConfig.MAPREDUCE_LIMIT_EXECUTED_USERS, acceptedUser);
        sleepJob = new SleepJob();
        sleepJob.setConf(sleepConf);
        // job with 3 maps (1s) and numReduces reduces (5s), 1 "record" each:
        job = sleepJob.createJob(3, numSleepReducers, 1000, 1, 5000, 1);
        job.setUser(acceptedUser);
        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
        job.setJarByClass(SleepJob.class);
        job.setMaxMapAttempts(1);
        job.submit();
        exitCode = job.waitForCompletion(true);
        Assert.assertTrue(exitCode);
        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
    
        // add limit-executed hours as -1, so the job will be failed again
        sleepConf.set(MRConfig.DEFAULT_MAPREDUCE_LIMIT_EXECUTED_ENABLED, "true");
        sleepConf.set(MRConfig.MAPREDUCE_LIMIT_EXECUTED_USERS, acceptedUser);
        sleepConf.set(MRConfig.MAPREDUCE_LIMIT_EXECUTED_HOURS, "-1");
        sleepJob = new SleepJob();
        sleepJob.setConf(sleepConf);
        // job with 3 maps (1s) and numReduces reduces (5s), 1 "record" each:
        job = sleepJob.createJob(3, numSleepReducers, 1000, 1, 5000, 1);
        job.setUser(acceptedUser);
        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
        job.setJarByClass(SleepJob.class);
        job.setMaxMapAttempts(1);
        job.submit();
        exitCode = job.waitForCompletion(true);
        Assert.assertFalse(exitCode);
        Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
    
        // change the limit-hours as every hour of day the job will be succeed
        sleepConf.set(MRConfig.DEFAULT_MAPREDUCE_LIMIT_EXECUTED_ENABLED, "true");
        sleepConf.set(MRConfig.MAPREDUCE_LIMIT_EXECUTED_USERS, acceptedUser);
        sleepConf.set(MRConfig.MAPREDUCE_LIMIT_EXECUTED_HOURS,
            "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23");
        sleepJob = new SleepJob();
        sleepJob.setConf(sleepConf);
        // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
        job = sleepJob.createJob(3, numSleepReducers, 1000, 1, 5000, 1);
        job.setUser(acceptedUser);
        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
        job.setJarByClass(SleepJob.class);
        job.setMaxMapAttempts(1);
        job.submit();
        exitCode = job.waitForCompletion(true);
        Assert.assertTrue(exitCode);
        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
      }
    这个测试我已经跑通过了,但是目前测试还不全,我还不确定有没有其他的不是走waitForComplete()方法进行Job提交的方式的,可能测试的会不全.


    开源社区

    此相关的新功能我已经提交到开源社区,Issue链接:https://issues.apache.org/jira/browse/MAPREDUCE-6548

    与本文主题相关的另一个Issue链接:https://issues.apache.org/jira/browse/YARN-1051


    其他

    我的监控分析工具集:https://github.com/linyiqun/yarn-jobhistory-crawler

  • 相关阅读:
    vscode设置不展示 .pyc文件
    Centos7安装python3并与python2共存, 以及安装pip(pip3)
    vmware 安装 centos7 及网络配置,永久修改主机名
    VMware虚拟机ubuntu显示屏幕太小解决办法, 安装 VM tools
    struct timeval和gettimeofday()
    cocos2d-x中CCLabelAtlas的小图片拼接
    Core Animation系列之CADisplayLink
    Xcode5 上64位编译 出错No architectures to compile for
    通过OpenGL ES在iOS平台实践增强现实
    通过OpenGL ES在iOS平台实践增强现实(一)
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183833.html
Copyright © 2011-2022 走看看