zoukankan      html  css  js  c++  java
  • Mesos源码分析(11): Mesos-Master接收到launchTasks消息

    根据Mesos源码分析(6): Mesos Master的初始化中的代码分析,当Mesos-Master接收到launchTask消息的时候,会调用Master::launchTasks函数。

     

    1. void Master::launchTasks(
    2.     const UPID& from,
    3.     const FrameworkID& frameworkId,
    4.     const vector<TaskInfo>& tasks,
    5.     const Filters& filters,
    6.     const vector<OfferID>& offerIds)
    7. {
    8.   Framework* framework = getFramework(frameworkId);
    9.  
    10.   if (framework == NULL) {
    11.     LOG(WARNING)
    12.       << "Ignoring launch tasks message for offers " << stringify(offerIds)
    13.       << " of framework " << frameworkId
    14.       << " because the framework cannot be found";
    15.  
    16.     return;
    17.   }
    18.  
    19.   if (framework->pid != from) {
    20.     LOG(WARNING)
    21.       << "Ignoring launch tasks message for offers " << stringify(offerIds)
    22.       << " from '" << from << "' because it is not from the"
    23.       << " registered framework " << *framework;
    24.  
    25.     return;
    26.   }
    27.  
    28.   // Currently when no tasks are specified in the launchTasks message
    29.   // it is implicitly considered a decline of the offers.
    30.   if (!tasks.empty()) {
    31.     scheduler::Call::Accept message;
    32.     message.mutable_filters()->CopyFrom(filters);
    33.  
    34.     Offer::Operation* operation = message.add_operations();
    35.     operation->set_type(Offer::Operation::LAUNCH);
    36.  
    37.     foreach (const TaskInfo& task, tasks) {
    38.       operation->mutable_launch()->add_task_infos()->CopyFrom(task);
    39.     }
    40.  
    41.     foreach (const OfferID& offerId, offerIds) {
    42.       message.add_offer_ids()->CopyFrom(offerId);
    43.     }
    44.  
    45.     accept(framework, message);
    46.   } else {
    47.     scheduler::Call::Decline message;
    48.     message.mutable_filters()->CopyFrom(filters);
    49.  
    50.     foreach (const OfferID& offerId, offerIds) {
    51.       message.add_offer_ids()->CopyFrom(offerId);
    52.     }
    53.  
    54.     decline(framework, message);
    55.   }
    56. }

     

    它会进一步调用accept函数

    1. void Master::accept(
    2.     Framework* framework,
    3.     const scheduler::Call::Accept& accept)
    4. {
    5.   CHECK_NOTNULL(framework);
    6.  
    7.   foreach (const Offer::Operation& operation, accept.operations()) {
    8.     if (operation.type() == Offer::Operation::LAUNCH) {
    9.       if (operation.launch().task_infos().size() > 0) {
    10.         ++metrics->messages_launch_tasks;
    11.       } else {
    12.         ++metrics->messages_decline_offers;
    13.       }
    14.     }
    15.  
    16.     // TODO(jieyu): Add metrics for non launch operations.
    17.   }
    18.  
    19.   // TODO(bmahler): We currently only support using multiple offers
    20.   // for a single slave.
    21.   Resources offeredResources;
    22.   Option<SlaveID> slaveId = None();
    23.   Option<Error> error = None();
    24.  
    25.   if (accept.offer_ids().size() == 0) {
    26.     error = Error("No offers specified");
    27.   } else {
    28.     // Validate the offers.
    29.     error = validation::offer::validate(accept.offer_ids(), this, framework);
    30.  
    31.     // Compute offered resources and remove the offers. If the
    32.     // validation failed, return resources to the allocator.
    33.     foreach (const OfferID& offerId, accept.offer_ids()) {
    34.       Offer* offer = getOffer(offerId);
    35.  
    36.       // Since we re-use `OfferID`s, it is possible to arrive here with either
    37.       // a resource offer, or an inverse offer. We first try as a resource offer
    38.       // and if that fails, then we assume it is an inverse offer. This is
    39.       // correct as those are currently the only 2 ways to get an `OfferID`.
    40.       if (offer != NULL) {
    41.         slaveId = offer->slave_id();
    42.         offeredResources += offer->resources();
    43.  
    44.         if (error.isSome()) {
    45.           allocator->recoverResources(
    46.               offer->framework_id(),
    47.               offer->slave_id(),
    48.               offer->resources(),
    49.               None());
    50.         }
    51.         removeOffer(offer);
    52.         continue;
    53.       }
    54.  
    55.       // Try it as an inverse offer. If this fails then the offer is no longer
    56.       // valid.
    57.       InverseOffer* inverseOffer = getInverseOffer(offerId);
    58.       if (inverseOffer != NULL) {
    59.         mesos::master::InverseOfferStatus status;
    60.         status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
    61.         status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
    62.         status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
    63.  
    64.         allocator->updateInverseOffer(
    65.             inverseOffer->slave_id(),
    66.             inverseOffer->framework_id(),
    67.             UnavailableResources{
    68.                 inverseOffer->resources(),
    69.                 inverseOffer->unavailability()},
    70.             status,
    71.             accept.filters());
    72.  
    73.         removeInverseOffer(inverseOffer);
    74.         continue;
    75.       }
    76.  
    77.       // If the offer was neither in our offer or inverse offer sets, then this
    78.       // offer is no longer valid.
    79.       LOG(WARNING) << "Ignoring accept of offer " << offerId
    80.                    << " since it is no longer valid";
    81.     }
    82.   }
    83.  
    84.   // If invalid, send TASK_LOST for the launch attempts.
    85.   // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
    86.   // consistently handle message dropping. It would be ideal if the
    87.   // 'drop' overload can handle both resource recovery and lost task
    88.   // notifications.
    89.   if (error.isSome()) {
    90.     LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
    91.                  << "': " << error.get().message;
    92.  
    93.     foreach (const Offer::Operation& operation, accept.operations()) {
    94.       if (operation.type() != Offer::Operation::LAUNCH) {
    95.         continue;
    96.       }
    97.  
    98.       foreach (const TaskInfo& task, operation.launch().task_infos()) {
    99.         const StatusUpdate& update = protobuf::createStatusUpdate(
    100.             framework->id(),
    101.             task.slave_id(),
    102.             task.task_id(),
    103.             TASK_LOST,
    104.             TaskStatus::SOURCE_MASTER,
    105.             None(),
    106.             "Task launched with invalid offers: " + error.get().message,
    107.             TaskStatus::REASON_INVALID_OFFERS);
    108.  
    109.         metrics->tasks_lost++;
    110.  
    111.         metrics->incrementTasksStates(
    112.             TASK_LOST,
    113.             TaskStatus::SOURCE_MASTER,
    114.             TaskStatus::REASON_INVALID_OFFERS);
    115.  
    116.         forward(update, UPID(), framework);
    117.       }
    118.     }
    119.  
    120.     return;
    121.   }
    122.  
    123.   CHECK_SOME(slaveId);
    124.   Slave* slave = slaves.registered.get(slaveId.get());
    125.   CHECK_NOTNULL(slave);
    126.  
    127.   LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
    128.             << " on slave " << *slave << " for framework " << *framework;
    129.  
    130.   list<Future<bool>> futures;
    131.   foreach (const Offer::Operation& operation, accept.operations()) {
    132.     switch (operation.type()) {
    133.       case Offer::Operation::LAUNCH: {
    134.         // Authorize the tasks. A task is in 'framework->pendingTasks'
    135.         // before it is authorized.
    136.         foreach (const TaskInfo& task, operation.launch().task_infos()) {
    137.           futures.push_back(authorizeTask(task, framework));
    138.  
    139.           // Add to pending tasks.
    140.           //
    141.           // NOTE: The task ID here hasn't been validated yet, but it
    142.           // doesn't matter. If the task ID is not valid, the task won't
    143.           // be launched anyway. If two tasks have the same ID, the second
    144.           // one will not be put into 'framework->pendingTasks', therefore
    145.           // will not be launched.
    146.           if (!framework->pendingTasks.contains(task.task_id())) {
    147.             framework->pendingTasks[task.task_id()] = task;
    148.           }
    149.         }
    150.         break;
    151.       }
    152.  
    153.       // NOTE: When handling RESERVE and UNRESERVE operations, authorization
    154.       // will proceed even if no principal is specified, although currently
    155.       // resources cannot be reserved or unreserved unless a principal is
    156.       // provided. Any RESERVE/UNRESERVE operation with no associated principal
    157.       // will be found invalid when `validate()` is called in `_accept()` below.
    158.  
    159.       // The RESERVE operation allows a principal to reserve resources.
    160.       case Offer::Operation::RESERVE: {
    161.         Option<string> principal = framework->info.has_principal()
    162.           ? framework->info.principal()
    163.           : Option<string>::none();
    164.  
    165.         futures.push_back(
    166.             authorizeReserveResources(
    167.                 operation.reserve(), principal));
    168.  
    169.         break;
    170.       }
    171.  
    172.       // The UNRESERVE operation allows a principal to unreserve resources.
    173.       case Offer::Operation::UNRESERVE: {
    174.         Option<string> principal = framework->info.has_principal()
    175.           ? framework->info.principal()
    176.           : Option<string>::none();
    177.  
    178.         futures.push_back(
    179.             authorizeUnreserveResources(
    180.                 operation.unreserve(), principal));
    181.  
    182.         break;
    183.       }
    184.  
    185.       // The CREATE operation allows the creation of a persistent volume.
    186.       case Offer::Operation::CREATE: {
    187.         Option<string> principal = framework->info.has_principal()
    188.           ? framework->info.principal()
    189.           : Option<string>::none();
    190.  
    191.         futures.push_back(
    192.             authorizeCreateVolume(
    193.                 operation.create(), principal));
    194.  
    195.         break;
    196.       }
    197.  
    198.       // The DESTROY operation allows the destruction of a persistent volume.
    199.       case Offer::Operation::DESTROY: {
    200.         Option<string> principal = framework->info.has_principal()
    201.           ? framework->info.principal()
    202.           : Option<string>::none();
    203.  
    204.         futures.push_back(
    205.             authorizeDestroyVolume(
    206.                 operation.destroy(), principal));
    207.  
    208.         break;
    209.       }
    210.     }
    211.   }
    212.  
    213.   // Wait for all the tasks to be authorized.
    214.   await(futures)
    215.     .onAny(defer(self(),
    216.                  &Master::_accept,
    217.                  framework->id(),
    218.                  slaveId.get(),
    219.                  offeredResources,
    220.                  accept,
    221.                  lambda::_1));
    222. }

     

    如果鉴权通过,则调用Master::_accept

    1. void Master::_accept(
    2.     const FrameworkID& frameworkId,
    3.     const SlaveID& slaveId,
    4.     const Resources& offeredResources,
    5.     const scheduler::Call::Accept& accept,
    6.     const Future<list<Future<bool>>>& _authorizations)
    7. {
    8.   Framework* framework = getFramework(frameworkId);
    9. ……
    10.  
    11.   Slave* slave = slaves.registered.get(slaveId);
    12.  
    13.   if (slave == NULL || !slave->connected) {
    14.     foreach (const Offer::Operation& operation, accept.operations()) {
    15.       if (operation.type() != Offer::Operation::LAUNCH) {
    16.         continue;
    17.       }
    18.  
    19.       foreach (const TaskInfo& task, operation.launch().task_infos()) {
    20.         const TaskStatus::Reason reason =
    21.             slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
    22.                           : TaskStatus::REASON_SLAVE_DISCONNECTED;
    23.         const StatusUpdate& update = protobuf::createStatusUpdate(
    24.             framework->id(),
    25.             task.slave_id(),
    26.             task.task_id(),
    27.             TASK_LOST,
    28.             TaskStatus::SOURCE_MASTER,
    29.             None(),
    30.             slave == NULL ? "Slave removed" : "Slave disconnected",
    31.             reason);
    32.  
    33.         metrics->tasks_lost++;
    34.  
    35.         metrics->incrementTasksStates(
    36.             TASK_LOST,
    37.             TaskStatus::SOURCE_MASTER,
    38.             reason);
    39.  
    40.         forward(update, UPID(), framework);
    41.       }
    42.     }
    43.  
    44.     // Tell the allocator about the recovered resources.
    45.     allocator->recoverResources(
    46.         frameworkId,
    47.         slaveId,
    48.         offeredResources,
    49.         None());
    50.  
    51.     return;
    52.   }
    53.  
    54.   // Some offer operations update the offered resources. We keep
    55.   // updated offered resources here. When a task is successfully
    56.   // launched, we remove its resource from offered resources.
    57.   Resources _offeredResources = offeredResources;
    58.  
    59.   // The order of `authorizations` must match the order of the operations in
    60.   // `accept.operations()`, as they are iterated through simultaneously.
    61.   CHECK_READY(_authorizations);
    62.   list<Future<bool>> authorizations = _authorizations.get();
    63.  
    64.   foreach (const Offer::Operation& operation, accept.operations()) {
    65.     switch (operation.type()) {
    66.       // The RESERVE operation allows a principal to reserve resources.
    67.       case Offer::Operation::RESERVE: {
    68.         Future<bool> authorization = authorizations.front();
    69.         authorizations.pop_front();
    70.  
    71.         CHECK(!authorization.isDiscarded());
    72.  
    73.         if (authorization.isFailed()) {
    74.           // TODO(greggomann): We may want to retry this failed authorization
    75.           // request rather than dropping it immediately.
    76.           drop(framework,
    77.                operation,
    78.                "Authorization of principal '" + framework->info.principal() +
    79.                "' to reserve resources failed: " +
    80.                authorization.failure());
    81.  
    82.           continue;
    83.         } else if (!authorization.get()) {
    84.           drop(framework,
    85.                operation,
    86.                "Not authorized to reserve resources as '" +
    87.                  framework->info.principal() + "'");
    88.  
    89.           continue;
    90.         }
    91.  
    92.         Option<string> principal = framework->info.has_principal()
    93.           ? framework->info.principal()
    94.           : Option<string>::none();
    95.  
    96.         // Make sure this reserve operation is valid.
    97.         Option<Error> error = validation::operation::validate(
    98.             operation.reserve(), principal);
    99.  
    100.         if (error.isSome()) {
    101.           drop(framework, operation, error.get().message);
    102.           continue;
    103.         }
    104.  
    105.         // Test the given operation on the included resources.
    106.         Try<Resources> resources = _offeredResources.apply(operation);
    107.         if (resources.isError()) {
    108.           drop(framework, operation, resources.error());
    109.           continue;
    110.         }
    111.  
    112.         _offeredResources = resources.get();
    113.  
    114.         LOG(INFO) << "Applying RESERVE operation for resources "
    115.                   << operation.reserve().resources() << " from framework "
    116.                   << *framework << " to slave " << *slave;
    117.  
    118.         apply(framework, slave, operation);
    119.         break;
    120.       }
    121.  
    122.       // The UNRESERVE operation allows a principal to unreserve resources.
    123.       case Offer::Operation::UNRESERVE: {
    124.         Future<bool> authorization = authorizations.front();
    125.         authorizations.pop_front();
    126.  
    127.         CHECK(!authorization.isDiscarded());
    128.  
    129.         if (authorization.isFailed()) {
    130.           // TODO(greggomann): We may want to retry this failed authorization
    131.           // request rather than dropping it immediately.
    132.           drop(framework,
    133.                operation,
    134.                "Authorization of principal '" + framework->info.principal() +
    135.                "' to unreserve resources failed: " +
    136.                authorization.failure());
    137.  
    138.           continue;
    139.         } else if (!authorization.get()) {
    140.           drop(framework,
    141.                operation,
    142.                "Not authorized to unreserve resources as '" +
    143.                  framework->info.principal() + "'");
    144.  
    145.           continue;
    146.         }
    147.  
    148.         // Make sure this unreserve operation is valid.
    149.         Option<Error> error = validation::operation::validate(
    150.             operation.unreserve());
    151.  
    152.         if (error.isSome()) {
    153.           drop(framework, operation, error.get().message);
    154.           continue;
    155.         }
    156.  
    157.         // Test the given operation on the included resources.
    158.         Try<Resources> resources = _offeredResources.apply(operation);
    159.         if (resources.isError()) {
    160.           drop(framework, operation, resources.error());
    161.           continue;
    162.         }
    163.  
    164.         _offeredResources = resources.get();
    165.  
    166.         LOG(INFO) << "Applying UNRESERVE operation for resources "
    167.                   << operation.unreserve().resources() << " from framework "
    168.                   << *framework << " to slave " << *slave;
    169.  
    170.         apply(framework, slave, operation);
    171.         break;
    172.       }
    173.  
    174.       case Offer::Operation::CREATE: {
    175.         Future<bool> authorization = authorizations.front();
    176.         authorizations.pop_front();
    177.  
    178.         CHECK(!authorization.isDiscarded());
    179.  
    180.         if (authorization.isFailed()) {
    181.           // TODO(greggomann): We may want to retry this failed authorization
    182.           // request rather than dropping it immediately.
    183.           drop(framework,
    184.                operation,
    185.                "Authorization of principal '" + framework->info.principal() +
    186.                "' to create persistent volumes failed: " +
    187.                authorization.failure());
    188.  
    189.           continue;
    190.         } else if (!authorization.get()) {
    191.           drop(framework,
    192.                operation,
    193.                "Not authorized to create persistent volumes as '" +
    194.                  framework->info.principal() + "'");
    195.  
    196.           continue;
    197.         }
    198.  
    199.         // Make sure this create operation is valid.
    200.         Option<Error> error = validation::operation::validate(
    201.             operation.create(), slave->checkpointedResources);
    202.  
    203.         if (error.isSome()) {
    204.           drop(framework, operation, error.get().message);
    205.           continue;
    206.         }
    207.  
    208.         Try<Resources> resources = _offeredResources.apply(operation);
    209.         if (resources.isError()) {
    210.           drop(framework, operation, resources.error());
    211.           continue;
    212.         }
    213.  
    214.         _offeredResources = resources.get();
    215.  
    216.         LOG(INFO) << "Applying CREATE operation for volumes "
    217.                   << operation.create().volumes() << " from framework "
    218.                   << *framework << " to slave " << *slave;
    219.  
    220.         apply(framework, slave, operation);
    221.         break;
    222.       }
    223.  
    224.       case Offer::Operation::DESTROY: {
    225.         Future<bool> authorization = authorizations.front();
    226.         authorizations.pop_front();
    227.  
    228.         CHECK(!authorization.isDiscarded());
    229.  
    230.         if (authorization.isFailed()) {
    231.           // TODO(greggomann): We may want to retry this failed authorization
    232.           // request rather than dropping it immediately.
    233.           drop(framework,
    234.                operation,
    235.                "Authorization of principal '" + framework->info.principal() +
    236.                "' to destroy persistent volumes failed: " +
    237.                authorization.failure());
    238.  
    239.           continue;
    240.         } else if (!authorization.get()) {
    241.           drop(framework,
    242.                operation,
    243.                "Not authorized to destroy persistent volumes as '" +
    244.                  framework->info.principal() + "'");
    245.  
    246.           continue;
    247.         }
    248.  
    249.         // Make sure this destroy operation is valid.
    250.         Option<Error> error = validation::operation::validate(
    251.             operation.destroy(), slave->checkpointedResources);
    252.  
    253.         if (error.isSome()) {
    254.           drop(framework, operation, error.get().message);
    255.           continue;
    256.         }
    257.  
    258.         Try<Resources> resources = _offeredResources.apply(operation);
    259.         if (resources.isError()) {
    260.           drop(framework, operation, resources.error());
    261.           continue;
    262.         }
    263.  
    264.         _offeredResources = resources.get();
    265.  
    266.         LOG(INFO) << "Applying DESTROY operation for volumes "
    267.                   << operation.create().volumes() << " from framework "
    268.                   << *framework << " to slave " << *slave;
    269.  
    270.         apply(framework, slave, operation);
    271.         break;
    272.       }
    273.  
    274.       case Offer::Operation::LAUNCH: {
    275.         foreach (const TaskInfo& task, operation.launch().task_infos()) {
    276.           Future<bool> authorization = authorizations.front();
    277.           authorizations.pop_front();
    278.  
    279.           // NOTE: The task will not be in 'pendingTasks' if
    280.           // 'killTask()' for the task was called before we are here.
    281.           // No need to launch the task if it's no longer pending.
    282.           // However, we still need to check the authorization result
    283.           // and do the validation so that we can send status update
    284.           // in case the task has duplicated ID.
    285.           bool pending = framework->pendingTasks.contains(task.task_id());
    286.  
    287.           // Remove from pending tasks.
    288.           framework->pendingTasks.erase(task.task_id());
    289.  
    290.           CHECK(!authorization.isDiscarded());
    291.  
    292.           if (authorization.isFailed() || !authorization.get()) {
    293.             string user = framework->info.user(); // Default user.
    294.             if (task.has_command() && task.command().has_user()) {
    295.               user = task.command().user();
    296.             } else if (task.has_executor() &&
    297.                        task.executor().command().has_user()) {
    298.               user = task.executor().command().user();
    299.             }
    300.  
    301.             const StatusUpdate& update = protobuf::createStatusUpdate(
    302.                 framework->id(),
    303.                 task.slave_id(),
    304.                 task.task_id(),
    305.                 TASK_ERROR,
    306.                 TaskStatus::SOURCE_MASTER,
    307.                 None(),
    308.                 authorization.isFailed() ?
    309.                     "Authorization failure: " + authorization.failure() :
    310.                     "Not authorized to launch as user '" + user + "'",
    311.                 TaskStatus::REASON_TASK_UNAUTHORIZED);
    312.  
    313.             metrics->tasks_error++;
    314.  
    315.             metrics->incrementTasksStates(
    316.                 TASK_ERROR,
    317.                 TaskStatus::SOURCE_MASTER,
    318.                 TaskStatus::REASON_TASK_UNAUTHORIZED);
    319.  
    320.             forward(update, UPID(), framework);
    321.  
    322.             continue;
    323.           }
    324.  
    325.           // Validate the task.
    326.  
    327.           // Make a copy of the original task so that we can
    328.           // fill the missing `framework_id` in ExecutorInfo
    329.           // if needed. This field was added to the API later
    330.           // and thus was made optional.
    331.           TaskInfo task_(task);
    332.           if (task.has_executor() && !task.executor().has_framework_id()) {
    333.             task_.mutable_executor()
    334.                 ->mutable_framework_id()->CopyFrom(framework->id());
    335.           }
    336.  
    337.           const Option<Error>& validationError = validation::task::validate(
    338.               task_,
    339.               framework,
    340.               slave,
    341.               _offeredResources);
    342.  
    343.           if (validationError.isSome()) {
    344.             const StatusUpdate& update = protobuf::createStatusUpdate(
    345.                 framework->id(),
    346.                 task_.slave_id(),
    347.                 task_.task_id(),
    348.                 TASK_ERROR,
    349.                 TaskStatus::SOURCE_MASTER,
    350.                 None(),
    351.                 validationError.get().message,
    352.                 TaskStatus::REASON_TASK_INVALID);
    353.  
    354.             metrics->tasks_error++;
    355.  
    356.             metrics->incrementTasksStates(
    357.                 TASK_ERROR,
    358.                 TaskStatus::SOURCE_MASTER,
    359.                 TaskStatus::REASON_TASK_INVALID);
    360.  
    361.             forward(update, UPID(), framework);
    362.  
    363.             continue;
    364.           }
    365.  
    366.           // Add task.
    367.           if (pending) {
    368.             _offeredResources -= addTask(task_, framework, slave);
    369.  
    370.             // TODO(bmahler): Consider updating this log message to
    371.             // indicate when the executor is also being launched.
    372.             LOG(INFO) << "Launching task " << task_.task_id()
    373.                       << " of framework " << *framework
    374.                       << " with resources " << task_.resources()
    375.                       << " on slave " << *slave;
    376.  
    377.             RunTaskMessage message;
    378.             message.mutable_framework()->MergeFrom(framework->info);
    379.  
    380.             // TODO(anand): We set 'pid' to UPID() for http frameworks
    381.             // as 'pid' was made optional in 0.24.0. In 0.25.0, we
    382.             // no longer have to set pid here for http frameworks.
    383.             message.set_pid(framework->pid.getOrElse(UPID()));
    384.             message.mutable_task()->MergeFrom(task_);
    385.  
    386.             if (HookManager::hooksAvailable()) {
    387.               // Set labels retrieved from label-decorator hooks.
    388.               message.mutable_task()->mutable_labels()->CopyFrom(
    389.                   HookManager::masterLaunchTaskLabelDecorator(
    390.                       task_,
    391.                       framework->info,
    392.                       slave->info));
    393.             }
    394.  
    395.             send(slave->pid, message);
    396.           }
    397.         }
    398.         break;
    399.       }
    400.  
    401.       default:
    402.         LOG(ERROR) << "Unsupported offer operation " << operation.type();
    403.         break;
    404.     }
    405.   }
    406.  
    407.   if (!_offeredResources.empty()) {
    408.     // Tell the allocator about the unused (e.g., refused) resources.
    409.     allocator->recoverResources(
    410.         frameworkId,
    411.         slaveId,
    412.         _offeredResources,
    413.         accept.filters());
    414.   }
    415. }

     

    Mesos-Master将RunTaskMessage消息发送给Mesos-Slave

  • 相关阅读:
    php解析XML的两种方法
    Windows下xampp搭配php环境以及mysql的设置和php连接Mysql数据库
    管道
    Ubuntu文件系统
    Ubuntu如何使用Vscode写C++代码
    Ubuntu如何百度云盘下载
    Debug程序的使用
    寄存器
    汇编指令
    汇编第二章_寄存器
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5724166.html
Copyright © 2011-2022 走看看