zoukankan      html  css  js  c++  java
  • Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task

     

    MesosSchedulerDriver的代码在src/sched/sched.cpp里面实现。

     

     

    Driver->run()调用start()

     

     

    首先检测Mesos-Master的leader

     

     

     

    创建一个线程。

     

    SchedulerProcess的initialize()函数

     

    里面主要注册消息处理函数。

     

    1. virtual void initialize()
    2.  {
    3.    install<Event>(&SchedulerProcess::receive);
    4.  
    5.    // TODO(benh): Get access to flags so that we can decide whether
    6.    // or not to make ZooKeeper verbose.
    7.    install<FrameworkRegisteredMessage>(
    8.        &SchedulerProcess::registered,
    9.        &FrameworkRegisteredMessage::framework_id,
    10.        &FrameworkRegisteredMessage::master_info);
    11.  
    12.    install<FrameworkReregisteredMessage>(
    13.        &SchedulerProcess::reregistered,
    14.        &FrameworkReregisteredMessage::framework_id,
    15.        &FrameworkReregisteredMessage::master_info);
    16.  
    17.    install<ResourceOffersMessage>(
    18.        &SchedulerProcess::resourceOffers,
    19.        &ResourceOffersMessage::offers,
    20.        &ResourceOffersMessage::pids);
    21.  
    22.    install<RescindResourceOfferMessage>(
    23.        &SchedulerProcess::rescindOffer,
    24.        &RescindResourceOfferMessage::offer_id);
    25.  
    26.    install<StatusUpdateMessage>(
    27.        &SchedulerProcess::statusUpdate,
    28.        &StatusUpdateMessage::update,
    29.        &StatusUpdateMessage::pid);
    30.  
    31.    install<LostSlaveMessage>(
    32.        &SchedulerProcess::lostSlave,
    33.        &LostSlaveMessage::slave_id);
    34.  
    35.    install<ExitedExecutorMessage>(
    36.        &SchedulerProcess::lostExecutor,
    37.        &ExitedExecutorMessage::executor_id,
    38.        &ExitedExecutorMessage::slave_id,
    39.        &ExitedExecutorMessage::status);
    40.  
    41.    install<ExecutorToFrameworkMessage>(
    42.        &SchedulerProcess::frameworkMessage,
    43.        &ExecutorToFrameworkMessage::slave_id,
    44.        &ExecutorToFrameworkMessage::executor_id,
    45.        &ExecutorToFrameworkMessage::data);
    46.  
    47.    install<FrameworkErrorMessage>(
    48.        &SchedulerProcess::error,
    49.        &FrameworkErrorMessage::message);
    50.  
    51.    // Start detecting masters.
    52.    detector->detect()
    53.      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
    54.  }

     

    在前面的文章中,Mesos源码分析(6): Mesos Master的初始化中,

    Allocatorinitialize函数中,传入的OfferCallbackMaster::offer 

    每过allocation_intervalAllocator都会计算每个frameworkoffer,然后依次调用Master::offer,将资源offer给相应的framework

    Master::offer函数中,生成如下的ResourceOffersMessage,并且发送给Framework

     

    对应到这里当Driver收到ResourceOffersMessage的消息的时候,会调用SchedulerProcess::resourceOffers

     

    1.   void resourceOffers(
    2.       const UPID& from,
    3.       const vector<Offer>& offers,
    4.       const vector<string>& pids)
    5.   {
    6. ……
    7.     VLOG(2) << "Received " << offers.size() << " offers";
    8. ……
    9.     scheduler->resourceOffers(driver, offers);
    10.  
    11.     VLOG(1) << "Scheduler::resourceOffers took " << stopwatch.elapsed();
    12.   }

     

    最终调用了Framework的resourceOffers。

     

    Test Framework的resourceOffers函数,根据得到的offers,创建一系列tasks,然后调用driver的launchTasks函数

    1. virtual void resourceOffers(SchedulerDriver* driver,
    2.                             const vector<Offer>& offers)
    3. {
    4.   foreach (const Offer& offer, offers) {
    5.     cout << "Received offer " << offer.id() << " with " << offer.resources()
    6.          << endl;
    7.  
    8.     static const Resources TASK_RESOURCES = Resources::parse(
    9.         "cpus:" + stringify(CPUS_PER_TASK) +
    10.         ";mem:" + stringify(MEM_PER_TASK)).get();
    11.  
    12.     Resources remaining = offer.resources();
    13.  
    14.     // Launch tasks.
    15.     vector<TaskInfo> tasks;
    16.     while (tasksLaunched < totalTasks &&
    17.            remaining.flatten().contains(TASK_RESOURCES)) {
    18.       int taskId = tasksLaunched++;
    19.  
    20.       cout << "Launching task " << taskId << " using offer "
    21.            << offer.id() << endl;
    22.  
    23.       TaskInfo task;
    24.       task.set_name("Task " + lexical_cast<string>(taskId));
    25.       task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
    26.       task.mutable_slave_id()->MergeFrom(offer.slave_id());
    27.       task.mutable_executor()->MergeFrom(executor);
    28.  
    29.       Option<Resources> resources =
    30.         remaining.find(TASK_RESOURCES.flatten(role));
    31.  
    32.       CHECK_SOME(resources);
    33.       task.mutable_resources()->MergeFrom(resources.get());
    34.       remaining -= resources.get();
    35.  
    36.       tasks.push_back(task);
    37.     }
    38.  
    39.     driver->launchTasks(offer.id(), tasks);
    40.   }
    41. }

     

    SchedulerProcess的launchTasks函数实现如下:

    1. void launchTasks(const vector<OfferID>& offerIds,
    2.                  const vector<TaskInfo>& tasks,
    3.                  const Filters& filters)
    4. {
    5.   Offer::Operation operation;
    6.   operation.set_type(Offer::Operation::LAUNCH);
    7.  
    8.   Offer::Operation::Launch* launch = operation.mutable_launch();
    9.   foreach (const TaskInfo& task, tasks) {
    10.     launch->add_task_infos()->CopyFrom(task);
    11.   }
    12.  
    13.   acceptOffers(offerIds, {operation}, filters);
    14. }
    15.  
    16. void acceptOffers(
    17.     const vector<OfferID>& offerIds,
    18.     const vector<Offer::Operation>& operations,
    19.     const Filters& filters)
    20. {
    21.   // TODO(jieyu): Move all driver side verification to master since
    22.   // we are moving towards supporting pure launguage scheduler.
    23.  
    24.   if (!connected) {
    25.     VLOG(1) << "Ignoring accept offers message as master is disconnected";
    26.  
    27.     // NOTE: Reply to the framework with TASK_LOST messages for each
    28.     // task launch. See details from notes in launchTasks.
    29.     foreach (const Offer::Operation& operation, operations) {
    30.       if (operation.type() != Offer::Operation::LAUNCH) {
    31.         continue;
    32.       }
    33.  
    34.       foreach (const TaskInfo& task, operation.launch().task_infos()) {
    35.         StatusUpdate update = protobuf::createStatusUpdate(
    36.             framework.id(),
    37.             None(),
    38.             task.task_id(),
    39.             TASK_LOST,
    40.             TaskStatus::SOURCE_MASTER,
    41.             None(),
    42.             "Master disconnected",
    43.             TaskStatus::REASON_MASTER_DISCONNECTED);
    44.  
    45.         statusUpdate(UPID(), update, UPID());
    46.       }
    47.     }
    48.     return;
    49.   }
    50.  
    51.   Call call;
    52.   CHECK(framework.has_id());
    53.   call.mutable_framework_id()->CopyFrom(framework.id());
    54.   call.set_type(Call::ACCEPT);
    55.  
    56.   Call::Accept* accept = call.mutable_accept();
    57.  
    58.   // Setting accept.operations.
    59.   foreach (const Offer::Operation& _operation, operations) {
    60.     Offer::Operation* operation = accept->add_operations();
    61.     operation->CopyFrom(_operation);
    62.   }
    63.  
    64.   // Setting accept.offer_ids.
    65.   foreach (const OfferID& offerId, offerIds) {
    66.     accept->add_offer_ids()->CopyFrom(offerId);
    67.  
    68.     if (!savedOffers.contains(offerId)) {
    69.       // TODO(jieyu): A duplicated offer ID could also cause this
    70.       // warning being printed. Consider refine this message here
    71.       // and in launchTasks as well.
    72.       LOG(WARNING) << "Attempting to accept an unknown offer " << offerId;
    73.     } else {
    74.       // Keep only the slave PIDs where we run tasks so we can send
    75.       // framework messages directly.
    76.       foreach (const Offer::Operation& operation, operations) {
    77.         if (operation.type() != Offer::Operation::LAUNCH) {
    78.           continue;
    79.         }
    80.  
    81.         foreach (const TaskInfo& task, operation.launch().task_infos()) {
    82.           const SlaveID& slaveId = task.slave_id();
    83.  
    84.           if (savedOffers[offerId].contains(slaveId)) {
    85.             savedSlavePids[slaveId] = savedOffers[offerId][slaveId];
    86.           } else {
    87.             LOG(WARNING) << "Attempting to launch task " << task.task_id()
    88.                          << " with the wrong slave id " << slaveId;
    89.           }
    90.         }
    91.       }
    92.     }
    93.  
    94.     // Remove the offer since we saved all the PIDs we might use.
    95.     savedOffers.erase(offerId);
    96.   }
    97.  
    98.   // Setting accept.filters.
    99.   accept->mutable_filters()->CopyFrom(filters);
    100.  
    101.   CHECK_SOME(master);
    102.   send(master.get().pid(), call);
    103. }

     

    最终向Mesos-Master的leader发送launchTasks的消息。

  • 相关阅读:
    构建之法 读书笔记01
    团队合作第一次会议有感
    浪潮之巅 读书笔记 03
    浪潮之巅 读书笔记 02
    团队介绍
    疫情查询app 开发
    世界疫情信息爬取 及开发查询爬取数据的app
    浪潮之巅 读书笔记 01
    支付宝沙箱配置
    定时器定时执行redis操作定时器执行几次自动停止的问题
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5724095.html
Copyright © 2011-2022 走看看