zoukankan      html  css  js  c++  java
  • Mesos源码分析(5): Mesos Master的启动之四

     

    5. Create an instance of allocator.

     

    代码如下

     

    Mesos源码中默认的Allocator,即HierarchicalDRFAllocator的位置在$MESOS_HOME/src/master/allocator/mesos/hierarchical.hpp,而DRF中对每个Framework排序的Sorter位于$MESOS_HOME/src/master/allocator/sorter/drf/sorter.cpp,可以查看其源码了解它的工作原理。

     

    HierarchicalDRF的基本原理

     

    如何作出offer分配的决定是由资源分配模块Allocator实现的,该模块存在于Master之中。资源分配模块确定Framework接受offer的顺序,与此同时,确保在资源利用最大化的条件下公平地共享资源。

    由于Mesos为跨数据中心调度资源并且是异构的资源需求时,资源分配相比普通调度将会更加困难。因此Mesos采用了DRF(主导资源公平算法 Dominant Resource Fairness)

    Framework拥有的全部资源类型份额中占最高百分比的就是Framework的主导份额。DRF算法会使用所有已注册的Framework来计算主导份额,以确保每个Framework能接收到其主导资源的公平份额。

     

    举个例子

     

    考虑一个9CPU,18GBRAM的系统,拥有两个用户,其中用户A运行的任务的需求向量为{1CPU, 4GB},用户B运行的任务的需求向量为{3CPU,1GB},用户可以执行尽量多的任务来使用系统的资源。

    在上述方案中,A的每个任务消耗总cpu的1/9和总内存的2/9,所以A的dominant resource是内存;B的每个任务消耗总cpu的1/3和总内存的1/18,所以B的dominant resource为CPU。DRF会均衡用户的dominant shares,执行3个用户A的任务,执行2个用户B的任务。三个用户A的任务总共消耗了{3CPU,12GB},两个用户B的任务总共消耗了{6CPU,2GB};在这个分配中,每一个用户的dominant share是相等的,用户A获得了2/3的RAM,而用户B获得了2/3的CPU。

    以上的这个分配可以用如下方式计算出来:x和y分别是用户A和用户B的分配任务的数目,那么用户A消耗了{xCPU,4xGB},用户B消耗了{3yCPU,yGB},在图三中用户A和用户B消耗了同等dominant resource;用户A的dominant share为4x/18,用户B的dominant share为3y/9。所以DRF分配可以通过求解以下的优化问题来得到:

     

    max(x,y)     #(Maximize allocations)

        subject to

            x + 3y <= 9         #(CPU constraint)

            4x + y <= 18         #(Memory Constraint)

                2x/9 = y/3     #(Equalize dominant shares)

     

    最后解出x=3以及y=2,因而用户A获得{3CPU,12GB},B得到{6CPU, 2GB}。

     

    HierarchicalDRF核心算法实现

     

    HierachicalDRF的实现在Src/main/allocator/mesos/hierarchical.cpp中

     

     

    不是每次把所有的资源都给所有的framework,而是根据资源分配算法,每个framework拿到的不同

     

    1. void HierarchicalAllocatorProcess::allocate(
    2.     const hashset<SlaveID>& slaveIds_)
    3. {
    4.   ++metrics.allocation_runs;
    5.  
    6.   // Compute the offerable resources, per framework:
    7.   // (1) For reserved resources on the slave, allocate these to a
    8.   // framework having the corresponding role.
    9.   // (2) For unreserved resources on the slave, allocate these
    10.   // to a framework of any role.
    11.   hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
    12.  
    13.   // NOTE: This function can operate on a small subset of slaves, we have to
    14.   // make sure that we don't assume cluster knowledge when summing resources
    15.   // from that set.
    16.  
    17.   vector<SlaveID> slaveIds;
    18.   slaveIds.reserve(slaveIds_.size());
    19.  
    20.   // Filter out non-whitelisted and deactivated slaves in order not to send
    21.   // offers for them.
    22.   foreach (const SlaveID& slaveId, slaveIds_) {
    23.     if (isWhitelisted(slaveId) && slaves[slaveId].activated) {
    24.       slaveIds.push_back(slaveId);
    25.     }
    26.   }
    27.  
    28.   // Randomize the order in which slaves' resources are allocated.
    29.   //
    30.   // TODO(vinod): Implement a smarter sorting algorithm.
    31.   std::random_shuffle(slaveIds.begin(), slaveIds.end());
    32.  
    33.   // Returns the __quantity__ of resources allocated to a quota role. Since we
    34.   // account for reservations and persistent volumes toward quota, we strip
    35.   // reservation and persistent volume related information for comparability.
    36.   // The result is used to determine whether a role's quota is satisfied, and
    37.   // also to determine how many resources the role would need in order to meet
    38.   // its quota.
    39.   //
    40.   // NOTE: Revocable resources are excluded in `quotaRoleSorter`.
    41.   auto getQuotaRoleAllocatedResources = [this](const string& role) {
    42.     CHECK(quotas.contains(role));
    43.  
    44.     // NOTE: `allocationScalarQuantities` omits dynamic reservation and
    45.     // persistent volume info, but we additionally strip `role` here.
    46.     Resources resources;
    47.  
    48.     foreach (Resource resource,
    49.              quotaRoleSorter->allocationScalarQuantities(role)) {
    50.       CHECK(!resource.has_reservation());
    51.       CHECK(!resource.has_disk());
    52.  
    53.       resource.set_role("*");
    54.       resources += resource;
    55.     }
    56.  
    57.     return resources;
    58.   };
    59.  
    60.   // Quota comes first and fair share second. Here we process only those
    61.   // roles, for which quota is set (quota'ed roles). Such roles form a
    62.   // special allocation group with a dedicated sorter.
    63.   foreach (const SlaveID& slaveId, slaveIds) {
    64.     foreach (const string& role, quotaRoleSorter->sort()) {
    65.       CHECK(quotas.contains(role));
    66.  
    67.       // If there are no active frameworks in this role, we do not
    68.       // need to do any allocations for this role.
    69.       if (!activeRoles.contains(role)) {
    70.         continue;
    71.       }
    72.  
    73.       // Get the total quantity of resources allocated to a quota role. The
    74.       // value omits role, reservation, and persistence info.
    75.       Resources roleConsumedResources = getQuotaRoleAllocatedResources(role);
    76.  
    77.       // If quota for the role is satisfied, we do not need to do any further
    78.       // allocations for this role, at least at this stage.
    79.       //
    80.       // TODO(alexr): Skipping satisfied roles is pessimistic. Better
    81.       // alternatives are:
    82.       // * A custom sorter that is aware of quotas and sorts accordingly.
    83.       // * Removing satisfied roles from the sorter.
    84.       if (roleConsumedResources.contains(quotas[role].info.guarantee())) {
    85.         continue;
    86.       }
    87.  
    88.       // Fetch frameworks according to their fair share.
    89.       foreach (const string& frameworkId_, frameworkSorters[role]->sort()) {
    90.         FrameworkID frameworkId;
    91.         frameworkId.set_value(frameworkId_);
    92.  
    93.         // If the framework has suppressed offers, ignore. The unallocated
    94.         // part of the quota will not be allocated to other roles.
    95.         if (frameworks[frameworkId].suppressed) {
    96.           continue;
    97.         }
    98.  
    99.         // Only offer resources from slaves that have GPUs to
    100.         // frameworks that are capable of receiving GPUs.
    101.         // See MESOS-5634.
    102.         if (!frameworks[frameworkId].gpuAware &&
    103.             slaves[slaveId].total.gpus().getOrElse(0) > 0) {
    104.           continue;
    105.         }
    106.  
    107.         // Calculate the currently available resources on the slave.
    108.         Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
    109.  
    110.         // The resources we offer are the unreserved resources as well as the
    111.         // reserved resources for this particular role. This is necessary to
    112.         // ensure that we don't offer resources that are reserved for another
    113.         // role.
    114.         //
    115.         // NOTE: Currently, frameworks are allowed to have '*' role.
    116.         // Calling reserved('*') returns an empty Resources object.
    117.         //
    118.         // Quota is satisfied from the available non-revocable resources on the
    119.         // agent. It's important that we include reserved resources here since
    120.         // reserved resources are accounted towards the quota guarantee. If we
    121.         // were to rely on stage 2 to offer them out, they would not be checked
    122.         // against the quota guarantee.
    123.         Resources resources =
    124.           (available.unreserved() + available.reserved(role)).nonRevocable();
    125.  
    126.         // It is safe to break here, because all frameworks under a role would
    127.         // consider the same resources, so in case we don't have allocatable
    128.         // resources, we don't have to check for other frameworks under the
    129.         // same role. We only break out of the innermost loop, so the next step
    130.         // will use the same `slaveId`, but a different role.
    131.         //
    132.         // NOTE: The resources may not be allocatable here, but they can be
    133.         // accepted by one of the frameworks during the second allocation
    134.         // stage.
    135.         if (!allocatable(resources)) {
    136.           break;
    137.         }
    138.  
    139.         // If the framework filters these resources, ignore. The unallocated
    140.         // part of the quota will not be allocated to other roles.
    141.         if (isFiltered(frameworkId, slaveId, resources)) {
    142.           continue;
    143.         }
    144.  
    145.         VLOG(2) << "Allocating " << resources << " on agent " << slaveId
    146.                 << " to framework " << frameworkId
    147.                 << " as part of its role quota";
    148.  
    149.         // NOTE: We perform "coarse-grained" allocation for quota'ed
    150.         // resources, which may lead to overcommitment of resources beyond
    151.         // quota. This is fine since quota currently represents a guarantee.
    152.         offerable[frameworkId][slaveId] += resources;
    153.         slaves[slaveId].allocated += resources;
    154.  
    155.         // Resources allocated as part of the quota count towards the
    156.         // role's and the framework's fair share.
    157.         //
    158.         // NOTE: Revocable resources have already been excluded.
    159.         frameworkSorters[role]->add(slaveId, resources);
    160.         frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
    161.         roleSorter->allocated(role, slaveId, resources);
    162.         quotaRoleSorter->allocated(role, slaveId, resources);
    163.       }
    164.     }
    165.   }
    166.  
    167.   // Calculate the total quantity of scalar resources (including revocable
    168.   // and reserved) that are available for allocation in the next round. We
    169.   // need this in order to ensure we do not over-allocate resources during
    170.   // the second stage.
    171.   //
    172.   // For performance reasons (MESOS-4833), this omits information about
    173.   // dynamic reservations or persistent volumes in the resources.
    174.   //
    175.   // NOTE: We use total cluster resources, and not just those based on the
    176.   // agents participating in the current allocation (i.e. provided as an
    177.   // argument to the `allocate()` call) so that frameworks in roles without
    178.   // quota are not unnecessarily deprived of resources.
    179.   Resources remainingClusterResources = roleSorter->totalScalarQuantities();
    180.   foreachkey (const string& role, activeRoles) {
    181.     remainingClusterResources -= roleSorter->allocationScalarQuantities(role);
    182.   }
    183.  
    184.   // Frameworks in a quota'ed role may temporarily reject resources by
    185.   // filtering or suppressing offers. Hence quotas may not be fully allocated.
    186.   Resources unallocatedQuotaResources;
    187.   foreachpair (const string& name, const Quota& quota, quotas) {
    188.     // Compute the amount of quota that the role does not have allocated.
    189.     //
    190.     // NOTE: Revocable resources are excluded in `quotaRoleSorter`.
    191.     // NOTE: Only scalars are considered for quota.
    192.     Resources allocated = getQuotaRoleAllocatedResources(name);
    193.     const Resources required = quota.info.guarantee();
    194.     unallocatedQuotaResources += (required - allocated);
    195.   }
    196.  
    197.   // Determine how many resources we may allocate during the next stage.
    198.   //
    199.   // NOTE: Resources for quota allocations are already accounted in
    200.   // `remainingClusterResources`.
    201.   remainingClusterResources -= unallocatedQuotaResources;
    202.  
    203.   // To ensure we do not over-allocate resources during the second stage
    204.   // with all frameworks, we use 2 stopping criteria:
    205.   // * No available resources for the second stage left, i.e.
    206.   // `remainingClusterResources` - `allocatedStage2` is empty.
    207.   // * A potential offer will force the second stage to use more resources
    208.   // than available, i.e. `remainingClusterResources` does not contain
    209.   // (`allocatedStage2` + potential offer). In this case we skip this
    210.   // agent and continue to the next one.
    211.   //
    212.   // NOTE: Like `remainingClusterResources`, `allocatedStage2` omits
    213.   // information about dynamic reservations and persistent volumes for
    214.   // performance reasons. This invariant is preserved because we only add
    215.   // resources to it that have also had this metadata stripped from them
    216.   // (typically by using `Resources::createStrippedScalarQuantity`).
    217.   Resources allocatedStage2;
    218.  
    219.   // At this point resources for quotas are allocated or accounted for.
    220.   // Proceed with allocating the remaining free pool.
    221.   foreach (const SlaveID& slaveId, slaveIds) {
    222.     // If there are no resources available for the second stage, stop.
    223.     if (!allocatable(remainingClusterResources - allocatedStage2)) {
    224.       break;
    225.     }
    226.  
    227.     foreach (const string& role, roleSorter->sort()) {
    228.       foreach (const string& frameworkId_,
    229.                frameworkSorters[role]->sort()) {
    230.         FrameworkID frameworkId;
    231.         frameworkId.set_value(frameworkId_);
    232.  
    233.         // If the framework has suppressed offers, ignore.
    234.         if (frameworks[frameworkId].suppressed) {
    235.           continue;
    236.         }
    237.  
    238.         // Only offer resources from slaves that have GPUs to
    239.         // frameworks that are capable of receiving GPUs.
    240.         // See MESOS-5634.
    241.         if (!frameworks[frameworkId].gpuAware &&
    242.             slaves[slaveId].total.gpus().getOrElse(0) > 0) {
    243.           continue;
    244.         }
    245.  
    246.         // Calculate the currently available resources on the slave.
    247.         Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
    248.  
    249.         // The resources we offer are the unreserved resources as well as the
    250.         // reserved resources for this particular role. This is necessary to
    251.         // ensure that we don't offer resources that are reserved for another
    252.         // role.
    253.         //
    254.         // NOTE: Currently, frameworks are allowed to have '*' role.
    255.         // Calling reserved('*') returns an empty Resources object.
    256.         //
    257.         // NOTE: We do not offer roles with quota any more non-revocable
    258.         // resources once their quota is satisfied. However, note that this is
    259.         // not strictly true due to the coarse-grained nature (per agent) of the
    260.         // allocation algorithm in stage 1.
    261.         //
    262.         // TODO(mpark): Offer unreserved resources as revocable beyond quota.
    263.         Resources resources = available.reserved(role);
    264.         if (!quotas.contains(role)) {
    265.           resources += available.unreserved();
    266.         }
    267.  
    268.         // It is safe to break here, because all frameworks under a role would
    269.         // consider the same resources, so in case we don't have allocatable
    270.         // resources, we don't have to check for other frameworks under the
    271.         // same role. We only break out of the innermost loop, so the next step
    272.         // will use the same slaveId, but a different role.
    273.         //
    274.         // The difference to the second `allocatable` check is that here we also
    275.         // check for revocable resources, which can be disabled on a per frame-
    276.         // work basis, which requires us to go through all frameworks in case we
    277.         // have allocatable revocable resources.
    278.         if (!allocatable(resources)) {
    279.           break;
    280.         }
    281.  
    282.         // Remove revocable resources if the framework has not opted for them.
    283.         if (!frameworks[frameworkId].revocable) {
    284.           resources = resources.nonRevocable();
    285.         }
    286.  
    287.         // If the resources are not allocatable, ignore. We can not break
    288.         // here, because another framework under the same role could accept
    289.         // revocable resources and breaking would skip all other frameworks.
    290.         if (!allocatable(resources)) {
    291.           continue;
    292.         }
    293.  
    294.         // If the framework filters these resources, ignore.
    295.         if (isFiltered(frameworkId, slaveId, resources)) {
    296.           continue;
    297.         }
    298.  
    299.         // If the offer generated by `resources` would force the second
    300.         // stage to use more than `remainingClusterResources`, move along.
    301.         // We do not terminate early, as offers generated further in the
    302.         // loop may be small enough to fit within `remainingClusterResources`.
    303.         const Resources scalarQuantity =
    304.           resources.createStrippedScalarQuantity();
    305.  
    306.         if (!remainingClusterResources.contains(
    307.                 allocatedStage2 + scalarQuantity)) {
    308.           continue;
    309.         }
    310.  
    311.         VLOG(2) << "Allocating " << resources << " on agent " << slaveId
    312.                 << " to framework " << frameworkId;
    313.  
    314.         // NOTE: We perform "coarse-grained" allocation, meaning that we always
    315.         // allocate the entire remaining slave resources to a single framework.
    316.         //
    317.         // NOTE: We may have already allocated some resources on the current
    318.         // agent as part of quota.
    319.         offerable[frameworkId][slaveId] += resources;
    320.         allocatedStage2 += scalarQuantity;
    321.         slaves[slaveId].allocated += resources;
    322.  
    323.         frameworkSorters[role]->add(slaveId, resources);
    324.         frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
    325.         roleSorter->allocated(role, slaveId, resources);
    326.  
    327.         if (quotas.contains(role)) {
    328.           // See comment at `quotaRoleSorter` declaration regarding
    329.           // non-revocable.
    330.           quotaRoleSorter->allocated(role, slaveId, resources.nonRevocable());
    331.         }
    332.       }
    333.     }
    334.   }
    335.  
    336.   if (offerable.empty()) {
    337.     VLOG(1) << "No allocations performed";
    338.   } else {
    339.     // Now offer the resources to each framework.
    340.     foreachkey (const FrameworkID& frameworkId, offerable) {
    341.       offerCallback(frameworkId, offerable[frameworkId]);
    342.     }
    343.   }
    344.  
    345.   // NOTE: For now, we implement maintenance inverse offers within the
    346.   // allocator. We leverage the existing timer/cycle of offers to also do any
    347.   // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
    348.   deallocate(slaveIds_);
    349. }

     

    上面这段算法非常复杂,概况来说调用了三个Sorter,对所有的Framework进行排序,哪个先得到资源,哪个后得到资源。

     

    在src/master/allocator/mesos/hierarchical.hpp中,有对三个重要的sorter的定义和注释,可以帮助了解sorter的逻辑。

     

     

     

     

     

    总的来说分两大步:

    • 先保证有quota的role
    • 然后其他的资源没有quota的再分

    在每一步Hierachical的意思是两层排序

    • 一层是按照role排序
    • 第二层是相同的role的不同Framework排序

    每一层的排序都是按照计算的share进行排序来先给谁,再给谁

     

    在src/master/allocator/sorter/drf/sorter.cpp中

     

     

    1. double DRFSorter::calculateShare(const string& name)
    2. {
    3.   double share = 0.0;
    4.  
    5.   // TODO(benh): This implementation of "dominant resource fairness"
    6.   // currently does not take into account resources that are not
    7.   // scalars.
    8.  
    9.   foreach (const string& scalar, total_.scalarQuantities.names()) {
    10.     // Filter out the resources excluded from fair sharing.
    11.     if (fairnessExcludeResourceNames.isSome() &&
    12.         fairnessExcludeResourceNames->count(scalar) > 0) {
    13.       continue;
    14.     }
    15.  
    16.     // We collect the scalar accumulated total value from the
    17.     // `Resources` object.
    18.     //
    19.     // NOTE: Although in principle scalar resources may be spread
    20.     // across multiple `Resource` objects (e.g., persistent volumes),
    21.     // we currently strip persistence and reservation metadata from
    22.     // the resources in `scalarQuantities`.
    23.     Option<Value::Scalar> __total =
    24.       total_.scalarQuantities.get<Value::Scalar>(scalar);
    25.  
    26.     CHECK_SOME(__total);
    27.     const double _total = __total.get().value();
    28.  
    29.     if (_total > 0.0) {
    30.       double allocation = 0.0;
    31.  
    32.       // We collect the scalar accumulated allocation value from the
    33.       // `Resources` object.
    34.       //
    35.       // NOTE: Although in principle scalar resources may be spread
    36.       // across multiple `Resource` objects (e.g., persistent volumes),
    37.       // we currently strip persistence and reservation metadata from
    38.       // the resources in `scalarQuantities`.
    39.       Option<Value::Scalar> _allocation =
    40.         allocations[name].scalarQuantities.get<Value::Scalar>(scalar);
    41.  
    42.       if (_allocation.isSome()) {
    43.         allocation = _allocation.get().value();
    44.       }
    45.  
    46.       share = std::max(share, allocation / _total);
    47.     }
    48.   }
    49.  
    50.   return share / weights[name];
    51. }

     

     

    Quota, Reservation, Role, Weight

     

    • 每个Framework可以有Role,既用于权限,也用于资源分配
    • 可以给某个role在offerResources的时候回复Offer::Operation::RESERVE,来预订某台slave上面的资源。Reservation是很具体的,具体到哪台机器的多少资源属于哪个Role
    • Quota是每个Role的最小保证量,但是不具体到某个节点,而是在整个集群中保证有这么多就行了。
    • Reserved资源也算在Quota里面。
    • 不同的Role之间可以有Weight

     

    最后将resource交给每一个Framework

    在allocate函数的最后,依次调用offerCallback来讲resource分配给每一个Framework

     

     

    那offerCallback函数是什么时候注册进来的呢?

     

     

     

    在Allocator的initialize函数中,OfferCallback被注册尽量,并且没过一段时间执行一次。

     

    在Allocator初始化的时候,最后定义每allocationInterval运行一次

    offerCallback是注册进来的函数,请记住。

     

    6. flags.registry == "in_memory" or flags.registry == "replicated_log" 信息存储在内存,zk,本地文件夹

     

    7. 选举和检测谁是Leader的对象初始化

     

    Try<MasterContender*> contender_ = MasterContender::create(zk, flags.master_contender);

    Try<MasterDetector*> detector_ = MasterDetector::create(zk, flags.master_detector);

     

    8. 生成Master对象,启动Master线程

     

  • 相关阅读:
    python 模块和包以及他们的导入关系
    python time,random,os,sys,序列化模块
    python 可迭代对象,迭代器和生成器,lambda表达式
    python之拷贝(深浅)
    python 内置函数
    python 函数的递归
    python 装饰器
    python 函数基础知识整理
    @清晰掉 string.h之基础堵漏
    C和C++之间库的互相调用
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5701505.html
Copyright © 2011-2022 走看看