zoukankan      html  css  js  c++  java
  • [zz]Mesos的分析4 支持Hadoop任务级调度

    转载自: http://blog.sina.com.cn/s/blog_4a1f59bf0100qotf.html

    Hadoop的调度示意图如下所示:

    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度

    在Mesos资源管理平台上,启动Hadoop计算框架的时候,只是启动了它的JobTracker,而并没有启动TaskTracker,这主要考虑到资源伸缩性的管理。下面将介绍,当在这种框架下提交一个Hadoop作业时,调度系统如何工作的。

    在Mesos启动的时候,只启动了JobTracker,并没有启动TaskTracker,那么提交的作业如何执行呢?
    在Mesos分析3的介绍中,ResourceOffer ---》ReplyToOffer ---》ResourceOffer ---》是一个不断执行的过程。ResourceOffer会给FrameworkScheduler提供SlaveOffer的信息,SlaveOffer消息包含了SlaveId和它能够提供的资源信息(CPU, Mem)。private TaskDescription findTask(String slaveId, String host, int cpus, int mem)方法会获取JobTracker维护的Collection<JobInProgress> jobs的信息,取出目前需要运行的MapTask 和 ReduceTask个数,然后使用一定的判断方法(涉及到DelayScheduling、mapToReduceRatio优先ReduceTask),使用MesosTask对Task进行封装,然后合并资源信息并入TaskDescription对象。由于Mesos-Master一个ResourceOffer会把所有的可用的SlaveOffer都发送过来,因此,要按照上面findTask的方法,在每一个提供资源的Slave上去查找资源和匹配Task。这些匹配好的TaskDescription组装成一个ArrayList作为ReplyToOffer的信息返回给Mesos-Master。
    Mesos-Master收到F2M_SLOT_OFFER_REPLY消息之后,会下面的操作,来处理包含Task执行请求的OfferReply,具体的函数如下:

    // Process a resource offer reply (for a non-cancelled offer) by launching
    // the desired tasks (if the offer contains a valid set of tasks) and
    // reporting any unused resources to the allocator
    void Master::processOfferReply(SlotOffer *offer,
        const vector<TaskDescription>& tasks, const Params& params)
    {
      LOG(INFO) << "Received reply for " << offer;

      Framework *framework = lookupFramework(offer->frameworkId);
      CHECK(framework != NULL);

      // Count resources in the offer
      unordered_map<Slave *, Resources> offerResources;
      foreach (SlaveResources &r, offer->resources) {
        offerResources[r.slave] = r.resources;
      }

      // Count resources in the response, and check that its tasks are valid
      unordered_map<Slave *, Resources> responseResources;
      foreach (const TaskDescription &t, tasks) {
        // Check whether this task size is valid
        Params params(t.params);
        Resources res(params.getInt32("cpus", -1),
                      params.getInt32("mem", -1));
        if (res.cpus < MIN_CPUS || res.mem < MIN_MEM ||
            res.cpus > MAX_CPUS || res.mem > MAX_MEM) {
          terminateFramework(framework, 0,
              "Invalid task size: " + lexical_cast<string>(res));
          return;
        }
        // Check whether the task is on a valid slave
        Slave *slave = lookupSlave(t.slaveId);
        if (!slave || offerResources.find(slave) == offerResources.end()) {
          terminateFramework(framework, 0, "Invalid slave in offer reply");
          return;
        }
        responseResources[slave] += res;
      }

      // Check that the total accepted on each slave isn't more than offered
      foreachpair (Slave *s, Resources& respRes, responseResources) {
        Resources &offRes = offerResources[s];
        if (respRes.cpus > offRes.cpus || respRes.mem > offRes.mem) {
          terminateFramework(framework, 0, "Too many resources accepted");
          return;
        }
      }

      // Check that there are no duplicate task IDs
      unordered_set<TaskID> idsInResponse;
      foreach (const TaskDescription &t, tasks) {
        if (framework->tasks.find(t.taskId) != framework->tasks.end() ||
            idsInResponse.find(t.taskId) != idsInResponse.end()) {
          terminateFramework(framework, 0,
              "Duplicate task ID: " + lexical_cast<string>(t.taskId));
          return;
        }
        idsInResponse.insert(t.taskId);
      }

      // Launch the tasks in the response
      foreach (const TaskDescription &t, tasks) {
        // Record the resources in event_history
        Params params(t.params);
        Resources res(params.getInt32("cpus", -1),
                      params.getInt64("mem", -1));

        Slave *slave = lookupSlave(t.slaveId);
        evLogger->logTaskCreated(t.taskId, framework->id, t.slaveId,
                                 slave->webUIUrl, res);

        // Launch the tasks in the response
        launchTask(framework, t);
      }

      // If there are resources left on some slaves, add filters for them
      vector<SlaveResources> resourcesLeft;
      int timeout = params.getInt32("timeout", DEFAULT_REFUSAL_TIMEOUT);
      double expiry = (timeout == -1) ? 0 : elapsed() + timeout;
      foreachpair (Slave *s, Resources offRes, offerResources) {
        Resources respRes = responseResources[s];
        Resources left = offRes - respRes;
        if (left.cpus > 0 || left.mem > 0) {
          resourcesLeft.push_back(SlaveResources(s, left));
        }
        if (timeout != 0 && respRes.cpus == 0 && respRes.mem == 0) {
          LOG(INFO) << "Adding filter on " << s << " to " << framework
                    << " for  " << timeout << " seconds";
          framework->slaveFilter[s] = expiry;
        }
      }
     
      // Return the resources left to the allocator
      removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesLeft);
    }

    在对提交的Tasks进行了安全检查之后,向framework提交任务。


    void Master::launchTask(Framework *framework, const TaskDescription& t)
    {
      Params params(t.params);
      Resources res(params.getInt32("cpus", -1),
                    params.getInt32("mem", -1));

      // The invariant right now is that launchTask is called only for
      // TaskDescriptions where the slave is still valid (see the code
      // above in processOfferReply).
      Slave *slave = lookupSlave(t.slaveId);
      CHECK(slave != NULL);

      Task *task = new Task(t.taskId, framework->id, res, TASK_STARTING,
                            t.name, "", slave->id);

      framework->addTask(task);
      slave->addTask(task);

      allocator->taskAdded(task);

      LOG(INFO) << "Launching " << task << " on " << slave;
      send(slave->pid, pack<M2S_RUN_TASK>(
            framework->id, t.taskId, framework->name, framework->user,
            framework->executorInfo, t.name, t.arg, t.params, framework->pid));//讲作业提交给Slave去执行
    }

    Slave接到M2S_RUN_TASK消息之后,
    如果这是第一次收到该消息的话,SlaveProcess会检查当前节点是否启动了Executor,如果没有启动则会执行下图1所示的流程:
    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度

    图1 该Slave上注册Framework和TaskTracker的启动过程

    如果Slave上已经注册了Framework,则相当于ExecutorProcess在MesosExecutorDriver的干预下已经生成。可以用来与SlaveProcess进行通信。那么由ReplyToOffer传递给MasterProcess的Task就要被Launch到对应框架上。注意这里的Launch只是象征意义的,只是在FrameworkScheduler上记录选中Task的信息,选择的过程是使用findTask的方法找到可以提供SlaveOffer的Slave节点的Task。这个过程和JobTracker的assignTask的过程互斥,这里对于JobTracker实例对象进行了synchronized的说明,保证了Hadoop框架内调用assignTask在findTask之后进行,并且分配的task是之前对应Slave节点选中的task。Task的Launch过程如图2所示。
    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度
    图2 launch task的过程

    在launch task之后,然后就要使得Mesos框架内task状态的更新与Hadoop task的更新的步调一致。在TaskTracker增加了MesosTaskTrackerInstrumentation来添加task状态改变所进行的处理。当状态改变之后,MesosTaskTrackerInstrumentation就会将task状态的改变通过FrameworkExecutor作为与Mesos框架沟通的桥梁,提交给Mesos框架。具体流程请看图3 task status update
    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度
    图3 task status update


    经过上面流程的分析,Hadoop计算平台已经完全和Mesos兼容在一起。这种做法的好处有以下几点:
    1)Hadoop集群的资源变得有弹性,可以根据作业的需求动态的启动和关闭TaskTracker,这可以大大提高集群的使用效率。
    2)资源管理与调度策略的分离。Mesos可以同时支持不同的计算框架,通过Mesos框架的MesosSchedulerDriver和MesosExecutorDriver将框架内的自身的调度优势和执行方式与Mesos自身资源的管理隔离。这样就只需管理资源,而作业的执行的管理则交给计算框架去管理。
    3)对于Hadoop源码没有伤害。Mesos在支持Hadoop的过程中,支持提供了一个Hadoop-contrib-mesos来进行,没有修改任何关于hadoop MapReduce、Common、HDFS三个基本组件的代码,这样的策略更容易让Hadoop使用者接受。
    4)Hadoop提交作业方式没有变化。原有的MapReduce程序仍然可以执行。

    Mesos的不足之处,
    1)框架过于复杂,要想支持其它计算框架,不仅需要对于Mesos源码十分熟悉,而且还要对另外的计算框架非常精通。这明显增加了很大的人力成本。
    2)Mesos对于资源的管理还不够成熟,Slave节点资源信息只有Mem大小和CPU的个数,管理粒度过于粗糙。
    3)Mesos缺少资源隔离技术。
     

    请网友珍惜Klose的工作成果,如果需要转载请注明出处。谢谢
    Klose
  • 相关阅读:
    Vue 2.x windows环境下安装
    VSCODE官网下载缓慢或下载失败 解决办法
    angular cli 降级
    Win10 VS2019 设置 以管理员身份运行
    XSHELL 连接 阿里云ECS实例
    Chrome浏览器跨域设置
    DBeaver 执行 mysql 多条语句报错
    DBeaver 连接MySql 8.0 报错 Public Key Retrieval is not allowed
    DBeaver 连接MySql 8.0报错 Unable to load authentication plugin 'caching_sha2_password'
    Linux系统分区
  • 原文地址:https://www.cnblogs.com/zhangzhang/p/2850601.html
Copyright © 2011-2022 走看看