zoukankan      html  css  js  c++  java
  • [zz]Mesos的分析4 支持Hadoop任务级调度

    转载自: http://blog.sina.com.cn/s/blog_4a1f59bf0100qotf.html

    Hadoop的调度示意图如下所示:

    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度

    在Mesos资源管理平台上,启动Hadoop计算框架的时候,只是启动了它的JobTracker,而并没有启动TaskTracker,这主要考虑到资源伸缩性的管理。下面将介绍,当在这种框架下提交一个Hadoop作业时,调度系统如何工作的。

    在Mesos启动的时候,只启动了JobTracker,并没有启动TaskTracker,那么提交的作业如何执行呢?
    在Mesos分析3的介绍中,ResourceOffer ---》ReplyToOffer ---》ResourceOffer ---》是一个不断执行的过程。ResourceOffer会给FrameworkScheduler提供SlaveOffer的信息,SlaveOffer消息包含了SlaveId和它能够提供的资源信息(CPU, Mem)。private TaskDescription findTask(String slaveId, String host, int cpus, int mem)方法会获取JobTracker维护的Collection<JobInProgress> jobs的信息,取出目前需要运行的MapTask 和 ReduceTask个数,然后使用一定的判断方法(涉及到DelayScheduling、mapToReduceRatio优先ReduceTask),使用MesosTask对Task进行封装,然后合并资源信息并入TaskDescription对象。由于Mesos-Master一个ResourceOffer会把所有的可用的SlaveOffer都发送过来,因此,要按照上面findTask的方法,在每一个提供资源的Slave上去查找资源和匹配Task。这些匹配好的TaskDescription组装成一个ArrayList作为ReplyToOffer的信息返回给Mesos-Master。
    Mesos-Master收到F2M_SLOT_OFFER_REPLY消息之后,会下面的操作,来处理包含Task执行请求的OfferReply,具体的函数如下:

    // Process a resource offer reply (for a non-cancelled offer) by launching
    // the desired tasks (if the offer contains a valid set of tasks) and
    // reporting any unused resources to the allocator
    void Master::processOfferReply(SlotOffer *offer,
        const vector<TaskDescription>& tasks, const Params& params)
    {
      LOG(INFO) << "Received reply for " << offer;

      Framework *framework = lookupFramework(offer->frameworkId);
      CHECK(framework != NULL);

      // Count resources in the offer
      unordered_map<Slave *, Resources> offerResources;
      foreach (SlaveResources &r, offer->resources) {
        offerResources[r.slave] = r.resources;
      }

      // Count resources in the response, and check that its tasks are valid
      unordered_map<Slave *, Resources> responseResources;
      foreach (const TaskDescription &t, tasks) {
        // Check whether this task size is valid
        Params params(t.params);
        Resources res(params.getInt32("cpus", -1),
                      params.getInt32("mem", -1));
        if (res.cpus < MIN_CPUS || res.mem < MIN_MEM ||
            res.cpus > MAX_CPUS || res.mem > MAX_MEM) {
          terminateFramework(framework, 0,
              "Invalid task size: " + lexical_cast<string>(res));
          return;
        }
        // Check whether the task is on a valid slave
        Slave *slave = lookupSlave(t.slaveId);
        if (!slave || offerResources.find(slave) == offerResources.end()) {
          terminateFramework(framework, 0, "Invalid slave in offer reply");
          return;
        }
        responseResources[slave] += res;
      }

      // Check that the total accepted on each slave isn't more than offered
      foreachpair (Slave *s, Resources& respRes, responseResources) {
        Resources &offRes = offerResources[s];
        if (respRes.cpus > offRes.cpus || respRes.mem > offRes.mem) {
          terminateFramework(framework, 0, "Too many resources accepted");
          return;
        }
      }

      // Check that there are no duplicate task IDs
      unordered_set<TaskID> idsInResponse;
      foreach (const TaskDescription &t, tasks) {
        if (framework->tasks.find(t.taskId) != framework->tasks.end() ||
            idsInResponse.find(t.taskId) != idsInResponse.end()) {
          terminateFramework(framework, 0,
              "Duplicate task ID: " + lexical_cast<string>(t.taskId));
          return;
        }
        idsInResponse.insert(t.taskId);
      }

      // Launch the tasks in the response
      foreach (const TaskDescription &t, tasks) {
        // Record the resources in event_history
        Params params(t.params);
        Resources res(params.getInt32("cpus", -1),
                      params.getInt64("mem", -1));

        Slave *slave = lookupSlave(t.slaveId);
        evLogger->logTaskCreated(t.taskId, framework->id, t.slaveId,
                                 slave->webUIUrl, res);

        // Launch the tasks in the response
        launchTask(framework, t);
      }

      // If there are resources left on some slaves, add filters for them
      vector<SlaveResources> resourcesLeft;
      int timeout = params.getInt32("timeout", DEFAULT_REFUSAL_TIMEOUT);
      double expiry = (timeout == -1) ? 0 : elapsed() + timeout;
      foreachpair (Slave *s, Resources offRes, offerResources) {
        Resources respRes = responseResources[s];
        Resources left = offRes - respRes;
        if (left.cpus > 0 || left.mem > 0) {
          resourcesLeft.push_back(SlaveResources(s, left));
        }
        if (timeout != 0 && respRes.cpus == 0 && respRes.mem == 0) {
          LOG(INFO) << "Adding filter on " << s << " to " << framework
                    << " for  " << timeout << " seconds";
          framework->slaveFilter[s] = expiry;
        }
      }
     
      // Return the resources left to the allocator
      removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesLeft);
    }

    在对提交的Tasks进行了安全检查之后,向framework提交任务。


    void Master::launchTask(Framework *framework, const TaskDescription& t)
    {
      Params params(t.params);
      Resources res(params.getInt32("cpus", -1),
                    params.getInt32("mem", -1));

      // The invariant right now is that launchTask is called only for
      // TaskDescriptions where the slave is still valid (see the code
      // above in processOfferReply).
      Slave *slave = lookupSlave(t.slaveId);
      CHECK(slave != NULL);

      Task *task = new Task(t.taskId, framework->id, res, TASK_STARTING,
                            t.name, "", slave->id);

      framework->addTask(task);
      slave->addTask(task);

      allocator->taskAdded(task);

      LOG(INFO) << "Launching " << task << " on " << slave;
      send(slave->pid, pack<M2S_RUN_TASK>(
            framework->id, t.taskId, framework->name, framework->user,
            framework->executorInfo, t.name, t.arg, t.params, framework->pid));//讲作业提交给Slave去执行
    }

    Slave接到M2S_RUN_TASK消息之后,
    如果这是第一次收到该消息的话,SlaveProcess会检查当前节点是否启动了Executor,如果没有启动则会执行下图1所示的流程:
    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度

    图1 该Slave上注册Framework和TaskTracker的启动过程

    如果Slave上已经注册了Framework,则相当于ExecutorProcess在MesosExecutorDriver的干预下已经生成。可以用来与SlaveProcess进行通信。那么由ReplyToOffer传递给MasterProcess的Task就要被Launch到对应框架上。注意这里的Launch只是象征意义的,只是在FrameworkScheduler上记录选中Task的信息,选择的过程是使用findTask的方法找到可以提供SlaveOffer的Slave节点的Task。这个过程和JobTracker的assignTask的过程互斥,这里对于JobTracker实例对象进行了synchronized的说明,保证了Hadoop框架内调用assignTask在findTask之后进行,并且分配的task是之前对应Slave节点选中的task。Task的Launch过程如图2所示。
    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度
    图2 launch task的过程

    在launch task之后,然后就要使得Mesos框架内task状态的更新与Hadoop task的更新的步调一致。在TaskTracker增加了MesosTaskTrackerInstrumentation来添加task状态改变所进行的处理。当状态改变之后,MesosTaskTrackerInstrumentation就会将task状态的改变通过FrameworkExecutor作为与Mesos框架沟通的桥梁,提交给Mesos框架。具体流程请看图3 task status update
    Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度
    图3 task status update


    经过上面流程的分析,Hadoop计算平台已经完全和Mesos兼容在一起。这种做法的好处有以下几点:
    1)Hadoop集群的资源变得有弹性,可以根据作业的需求动态的启动和关闭TaskTracker,这可以大大提高集群的使用效率。
    2)资源管理与调度策略的分离。Mesos可以同时支持不同的计算框架,通过Mesos框架的MesosSchedulerDriver和MesosExecutorDriver将框架内的自身的调度优势和执行方式与Mesos自身资源的管理隔离。这样就只需管理资源,而作业的执行的管理则交给计算框架去管理。
    3)对于Hadoop源码没有伤害。Mesos在支持Hadoop的过程中,支持提供了一个Hadoop-contrib-mesos来进行,没有修改任何关于hadoop MapReduce、Common、HDFS三个基本组件的代码,这样的策略更容易让Hadoop使用者接受。
    4)Hadoop提交作业方式没有变化。原有的MapReduce程序仍然可以执行。

    Mesos的不足之处,
    1)框架过于复杂,要想支持其它计算框架,不仅需要对于Mesos源码十分熟悉,而且还要对另外的计算框架非常精通。这明显增加了很大的人力成本。
    2)Mesos对于资源的管理还不够成熟,Slave节点资源信息只有Mem大小和CPU的个数,管理粒度过于粗糙。
    3)Mesos缺少资源隔离技术。
     

    请网友珍惜Klose的工作成果,如果需要转载请注明出处。谢谢
    Klose
  • 相关阅读:
    asp.net将图片转成二进制存入数据库
    ionic2打包生成APK报错 Error: Could not find gradle wrapper within Android SDK. Might need to update your Android SDK. Looked here: D:AndroidSDK ools emplatesgradlewrapper
    'ionic' 不是内部或外部命令,也不是可运行的程序或批处理文件。
    ABP-vs2017执行Add-Migration出现的问题
    关闭页面时,弹出JS提示框提示是否关闭
    C#生成Bar Code Image
    MemoryStream转imageSource
    RadControls RadGridView 显示加载数据时间
    RadGridView 分页控件
    CRM2011弹出asp.net模态窗口关闭的问题
  • 原文地址:https://www.cnblogs.com/zhangzhang/p/2850601.html
Copyright © 2011-2022 走看看