Mesos源码分析(5): Mesos Master的启动之四
2016-07-24 23:05
507 查看
5. Create an instance of allocator.
代码如下Mesos源码中默认的Allocator,即HierarchicalDRFAllocator的位置在$MESOS_HOME/src/master/allocator/mesos/hierarchical.hpp,而DRF中对每个Framework排序的Sorter位于$MESOS_HOME/src/master/allocator/sorter/drf/sorter.cpp,可以查看其源码了解它的工作原理。
HierarchicalDRF的基本原理
如何作出offer分配的决定是由资源分配模块Allocator实现的,该模块存在于Master之中。资源分配模块确定Framework接受offer的顺序,与此同时,确保在资源利用最大化的条件下公平地共享资源。由于Mesos为跨数据中心调度资源并且是异构的资源需求时,资源分配相比普通调度将会更加困难。因此Mesos采用了DRF(主导资源公平算法 Dominant Resource Fairness)
Framework拥有的全部资源类型份额中占最高百分比的就是Framework的主导份额。DRF算法会使用所有已注册的Framework来计算主导份额,以确保每个Framework能接收到其主导资源的公平份额。
举个例子
考虑一个9CPU,18GBRAM的系统,拥有两个用户,其中用户A运行的任务的需求向量为{1CPU, 4GB},用户B运行的任务的需求向量为{3CPU,1GB},用户可以执行尽量多的任务来使用系统的资源。
在上述方案中,A的每个任务消耗总cpu的1/9和总内存的2/9,所以A的dominant resource是内存;B的每个任务消耗总cpu的1/3和总内存的1/18,所以B的dominant resource为CPU。DRF会均衡用户的dominant shares,执行3个用户A的任务,执行2个用户B的任务。三个用户A的任务总共消耗了{3CPU,12GB},两个用户B的任务总共消耗了{6CPU,2GB};在这个分配中,每一个用户的dominant share是相等的,用户A获得了2/3的RAM,而用户B获得了2/3的CPU。
以上的这个分配可以用如下方式计算出来:x和y分别是用户A和用户B的分配任务的数目,那么用户A消耗了{xCPU,4xGB},用户B消耗了{3yCPU,yGB},在图三中用户A和用户B消耗了同等dominant resource;用户A的dominant share为4x/18,用户B的dominant share为3y/9。所以DRF分配可以通过求解以下的优化问题来得到:
max(x,y) #(Maximize allocations)
subject to
x + 3y <= 9 #(CPU constraint)
4x + y <= 18 #(Memory Constraint)
2x/9 = y/3 #(Equalize dominant shares)
最后解出x=3以及y=2,因而用户A获得{3CPU,12GB},B得到{6CPU, 2GB}。
HierarchicalDRF核心算法实现
HierachicalDRF的实现在Src/main/allocator/mesos/hierarchical.cpp中不是每次把所有的资源都给所有的framework,而是根据资源分配算法,每个framework拿到的不同
void HierarchicalAllocatorProcess::allocate( const hashset<SlaveID>& slaveIds_) { ++metrics.allocation_runs; // Compute the offerable resources, per framework: // (1) For reserved resources on the slave, allocate these to a // framework having the corresponding role. // (2) For unreserved resources on the slave, allocate these // to a framework of any role. hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable; // NOTE: This function can operate on a small subset of slaves, we have to // make sure that we don't assume cluster knowledge when summing resources // from that set. vector<SlaveID> slaveIds; slaveIds.reserve(slaveIds_.size()); // Filter out non-whitelisted and deactivated slaves in order not to send // offers for them. foreach (const SlaveID& slaveId, slaveIds_) { if (isWhitelisted(slaveId) && slaves[slaveId].activated) { slaveIds.push_back(slaveId); } } // Randomize the order in which slaves' resources are allocated. // // TODO(vinod): Implement a smarter sorting algorithm. std::random_shuffle(slaveIds.begin(), slaveIds.end()); // Returns the __quantity__ of resources allocated to a quota role. Since we // account for reservations and persistent volumes toward quota, we strip // reservation and persistent volume related information for comparability. // The result is used to determine whether a role's quota is satisfied, and // also to determine how many resources the role would need in order to meet // its quota. // // NOTE: Revocable resources are excluded in `quotaRoleSorter`. auto getQuotaRoleAllocatedResources = [this](const string& role) { CHECK(quotas.contains(role)); // NOTE: `allocationScalarQuantities` omits dynamic reservation and // persistent volume info, but we additionally strip `role` here. Resources resources; foreach (Resource resource, quotaRoleSorter->allocationScalarQuantities(role)) { CHECK(!resource.has_reservation()); CHECK(!resource.has_disk()); resource.set_role("*"); resources += resource; } return resources; }; // Quota comes first and fair share second. Here we process only those // roles, for which quota is set (quota'ed roles). Such roles form a // special allocation group with a dedicated sorter. foreach (const SlaveID& slaveId, slaveIds) { foreach (const string& role, quotaRoleSorter->sort()) { CHECK(quotas.contains(role)); // If there are no active frameworks in this role, we do not // need to do any allocations for this role. if (!activeRoles.contains(role)) { continue; } // Get the total quantity of resources allocated to a quota role. The // value omits role, reservation, and persistence info. Resources roleConsumedResources = getQuotaRoleAllocatedResources(role); // If quota for the role is satisfied, we do not need to do any further // allocations for this role, at least at this stage. // // TODO(alexr): Skipping satisfied roles is pessimistic. Better // alternatives are: // * A custom sorter that is aware of quotas and sorts accordingly. // * Removing satisfied roles from the sorter. if (roleConsumedResources.contains(quotas[role].info.guarantee())) { continue; } // Fetch frameworks according to their fair share. foreach (const string& frameworkId_, frameworkSorters[role]->sort()) { FrameworkID frameworkId; frameworkId.set_value(frameworkId_); // If the framework has suppressed offers, ignore. The unallocated // part of the quota will not be allocated to other roles. if (frameworks[frameworkId].suppressed) { continue; } // Only offer resources from slaves that have GPUs to // frameworks that are capable of receiving GPUs. // See MESOS-5634. if (!frameworks[frameworkId].gpuAware && slaves[slaveId].total.gpus().getOrElse(0) > 0) { continue; } // Calculate the currently available resources on the slave. Resources available = slaves[slaveId].total - slaves[slaveId].allocated; // The resources we offer are the unreserved resources as well as the // reserved resources for this particular role. This is necessary to // ensure that we don't offer resources that are reserved for another // role. // // NOTE: Currently, frameworks are allowed to have '*' role. // Calling reserved('*') returns an empty Resources object. // // Quota is satisfied from the available non-revocable resources on the // agent. It's important that we include reserved resources here since // reserved resources are accounted towards the quota guarantee. If we // were to rely on stage 2 to offer them out, they would not be checked // against the quota guarantee. Resources resources = (available.unreserved() + available.reserved(role)).nonRevocable(); // It is safe to break here, because all frameworks under a role would // consider the same resources, so in case we don't have allocatable // resources, we don't have to check for other frameworks under the // same role. We only break out of the innermost loop, so the next step // will use the same `slaveId`, but a different role. // // NOTE: The resources may not be allocatable here, but they can be // accepted by one of the frameworks during the second allocation // stage. if (!allocatable(resources)) { break; } // If the framework filters these resources, ignore. The unallocated // part of the quota will not be allocated to other roles. if (isFiltered(frameworkId, slaveId, resources)) { continue; } VLOG(2) << "Allocating " << resources << " on agent " << slaveId << " to framework " << frameworkId << " as part of its role quota"; // NOTE: We perform "coarse-grained" allocation for quota'ed // resources, which may lead to overcommitment of resources beyond // quota. This is fine since quota currently represents a guarantee. offerable[frameworkId][slaveId] += resources; slaves[slaveId].allocated += resources; // Resources allocated as part of the quota count towards the // role's and the framework's fair share. // // NOTE: Revocable resources have already been excluded. frameworkSorters[role]->add(slaveId, resources); frameworkSorters[role]->allocated(frameworkId_, slaveId, resources); roleSorter->allocated(role, slaveId, resources); quotaRoleSorter->allocated(role, slaveId, resources); } } } // Calculate the total quantity of scalar resources (including revocable // and reserved) that are available for allocation in the next round. We // need this in order to ensure we do not over-allocate resources during // the second stage. // // For performance reasons (MESOS-4833), this omits information about // dynamic reservations or persistent volumes in the resources. // // NOTE: We use total cluster resources, and not just those based on the // agents participating in the current allocation (i.e. provided as an // argument to the `allocate()` call) so that frameworks in roles without // quota are not unnecessarily deprived of resources. Resources remainingClusterResources = roleSorter->totalScalarQuantities(); foreachkey (const string& role, activeRoles) { remainingClusterResources -= roleSorter->allocationScalarQuantities(role); } // Frameworks in a quota'ed role may temporarily reject resources by // filtering or suppressing offers. Hence quotas may not be fully allocated. Resources unallocatedQuotaResources; foreachpair (const string& name, const Quota& quota, quotas) { // Compute the amount of quota that the role does not have allocated. // // NOTE: Revocable resources are excluded in `quotaRoleSorter`. // NOTE: Only scalars are considered for quota. Resources allocated = getQuotaRoleAllocatedResources(name); const Resources required = quota.info.guarantee(); unallocatedQuotaResources += (required - allocated); } // Determine how many resources we may allocate during the next stage. // // NOTE: Resources for quota allocations are already accounted in // `remainingClusterResources`. remainingClusterResources -= unallocatedQuotaResources; // To ensure we do not over-allocate resources during the second stage // with all frameworks, we use 2 stopping criteria: // * No available resources for the second stage left, i.e. // `remainingClusterResources` - `allocatedStage2` is empty. // * A potential offer will force the second stage to use more resources // than available, i.e. `remainingClusterResources` does not contain // (`allocatedStage2` + potential offer). In this case we skip this // agent and continue to the next one. // // NOTE: Like `remainingClusterResources`, `allocatedStage2` omits // information about dynamic reservations and persistent volumes for // performance reasons. This invariant is preserved because we only add // resources to it that have also had this metadata stripped from them // (typically by using `Resources::createStrippedScalarQuantity`). Resources allocatedStage2; // At this point resources for quotas are allocated or accounted for. // Proceed with allocating the remaining free pool. foreach (const SlaveID& slaveId, slaveIds) { // If there are no resources available for the second stage, stop. if (!allocatable(remainingClusterResources - allocatedStage2)) { break; } foreach (const string& role, roleSorter->sort()) { foreach (const string& frameworkId_, frameworkSorters[role]->sort()) { FrameworkID frameworkId; frameworkId.set_value(frameworkId_); // If the framework has suppressed offers, ignore. if (frameworks[frameworkId].suppressed) { continue; } // Only offer resources from slaves that have GPUs to // frameworks that are capable of receiving GPUs. // See MESOS-5634. if (!frameworks[frameworkId].gpuAware && slaves[slaveId].total.gpus().getOrElse(0) > 0) { continue; } // Calculate the currently available resources on the slave. Resources available = slaves[slaveId].total - slaves[slaveId].allocated; // The resources we offer are the unreserved resources as well as the // reserved resources for this particular role. This is necessary to // ensure that we don't offer resources that are reserved for another // role. // // NOTE: Currently, frameworks are allowed to have '*' role. // Calling reserved('*') returns an empty Resources object. // // NOTE: We do not offer roles with quota any more non-revocable // resources once their quota is satisfied. However, note that this is // not strictly true due to the coarse-grained nature (per agent) of the // allocation algorithm in stage 1. // // TODO(mpark): Offer unreserved resources as revocable beyond quota. Resources resources = available.reserved(role); if (!quotas.contains(role)) { resources += available.unreserved(); } // It is safe to break here, because all frameworks under a role would // consider the same resources, so in case we don't have allocatable // resources, we don't have to check for other frameworks under the // same role. We only break out of the innermost loop, so the next step // will use the same slaveId, but a different role. // // The difference to the second `allocatable` check is that here we also // check for revocable resources, which can be disabled on a per frame- // work basis, which requires us to go through all frameworks in case we // have allocatable revocable resources. if (!allocatable(resources)) { break; } // Remove revocable resources if the framework has not opted for them. if (!frameworks[frameworkId].revocable) { resources = resources.nonRevocable(); } // If the resources are not allocatable, ignore. We can not break // here, because another framework under the same role could accept // revocable resources and breaking would skip all other frameworks. if (!allocatable(resources)) { continue; } // If the framework filters these resources, ignore. if (isFiltered(frameworkId, slaveId, resources)) { continue; } // If the offer generated by `resources` would force the second // stage to use more than `remainingClusterResources`, move along. // We do not terminate early, as offers generated further in the // loop may be small enough to fit within `remainingClusterResources`. const Resources scalarQuantity = resources.createStrippedScalarQuantity(); if (!remainingClusterResources.contains( allocatedStage2 + scalarQuantity)) { continue; } VLOG(2) << "Allocating " << resources << " on agent " << slaveId << " to framework " << frameworkId; // NOTE: We perform "coarse-grained" allocation, meaning that we always // allocate the entire remaining slave resources to a single framework. // // NOTE: We may have already allocated some resources on the current // agent as part of quota. offerable[frameworkId][slaveId] += resources; allocatedStage2 += scalarQuantity; slaves[slaveId].allocated += resources; frameworkSorters[role]->add(slaveId, resources); frameworkSorters[role]->allocated(frameworkId_, slaveId, resources); roleSorter->allocated(role, slaveId, resources); if (quotas.contains(role)) { // See comment at `quotaRoleSorter` declaration regarding // non-revocable. quotaRoleSorter->allocated(role, slaveId, resources.nonRevocable()); } } } } if (offerable.empty()) { VLOG(1) << "No allocations performed"; } else { // Now offer the resources to each framework. foreachkey (const FrameworkID& frameworkId, offerable) { offerCallback(frameworkId, offerable[frameworkId]); } } // NOTE: For now, we implement maintenance inverse offers within the // allocator. We leverage the existing timer/cycle of offers to also do any // "deallocation" (inverse offers) necessary to satisfy maintenance needs. deallocate(slaveIds_); } |
在src/master/allocator/mesos/hierarchical.hpp中,有对三个重要的sorter的定义和注释,可以帮助了解sorter的逻辑。
总的来说分两大步:
先保证有quota的role
然后其他的资源没有quota的再分
在每一步Hierachical的意思是两层排序
一层是按照role排序
第二层是相同的role的不同Framework排序
每一层的排序都是按照计算的share进行排序来先给谁,再给谁
在src/master/allocator/sorter/drf/sorter.cpp中
double DRFSorter::calculateShare(const string& name) { double share = 0.0; // TODO(benh): This implementation of "dominant resource fairness" // currently does not take into account resources that are not // scalars. foreach (const string& scalar, total_.scalarQuantities.names()) { // Filter out the resources excluded from fair sharing. if (fairnessExcludeResourceNames.isSome() && fairnessExcludeResourceNames->count(scalar) > 0) { continue; } // We collect the scalar accumulated total value from the // `Resources` object. // // NOTE: Although in principle scalar resources may be spread // across multiple `Resource` objects (e.g., persistent volumes), // we currently strip persistence and reservation metadata from // the resources in `scalarQuantities`. Option<Value::Scalar> __total = total_.scalarQuantities.get<Value::Scalar>(scalar); CHECK_SOME(__total); const double _total = __total.get().value(); if (_total > 0.0) { double allocation = 0.0; // We collect the scalar accumulated allocation value from the // `Resources` object. // // NOTE: Although in principle scalar resources may be spread // across multiple `Resource` objects (e.g., persistent volumes), // we currently strip persistence and reservation metadata from // the resources in `scalarQuantities`. Option<Value::Scalar> _allocation = allocations[name].scalarQuantities.get<Value::Scalar>(scalar); if (_allocation.isSome()) { allocation = _allocation.get().value(); } share = std::max(share, allocation / _total); } } return share / weights[name]; } |
Quota, Reservation, Role, Weight
每个Framework可以有Role,既用于权限,也用于资源分配可以给某个role在offerResources的时候回复Offer::Operation::RESERVE,来预订某台slave上面的资源。Reservation是很具体的,具体到哪台机器的多少资源属于哪个Role
Quota是每个Role的最小保证量,但是不具体到某个节点,而是在整个集群中保证有这么多就行了。
Reserved资源也算在Quota里面。
不同的Role之间可以有Weight
最后将resource交给每一个Framework
在allocate函数的最后,依次调用offerCallback来讲resource分配给每一个Framework
那offerCallback函数是什么时候注册进来的呢?
在Allocator的initialize函数中,OfferCallback被注册尽量,并且没过一段时间执行一次。
在Allocator初始化的时候,最后定义每allocationInterval运行一次
offerCallback是注册进来的函数,请记住。
6. flags.registry == "in_memory" or flags.registry == "replicated_log" 信息存储在内存,zk,本地文件夹
7. 选举和检测谁是Leader的对象初始化
Try<MasterContender*> contender_ = MasterContender::create(zk, flags.master_contender);Try<MasterDetector*> detector_ = MasterDetector::create(zk, flags.master_detector);
8. 生成Master对象,启动Master线程
相关文章推荐
- 哈希 hash算法
- 18. 使用模板【从零开始学Spring Boot】
- 6Reader--FileReader2
- kmp算法
- PHP - 数组去重,(折中:符串去重)
- 5Reader--FileReader
- 如何在android sudio 获取SHA1与MD5(发布版和开发版)
- 热备份、温备份、冷备份(Hot/Warm/Cold Backup)
- java中的继承
- [AngularJS面面观] 13. Angular工具库 --- 异常对象创建方法minErr
- ACM_kruscal 计算最小生成树
- 4文件续写
- mysql 优化总体思路
- Android进程间通信(IPC)机制Binder简要介绍和学习计划
- POJ 1142 Brackets Sequence(区间dp,记录路径,还原括号匹配)
- App的打磨之路(下)
- 3IO异常处理方式
- 数据库约束 主键-唯一性-Check-外键
- hdu5745--La Vie en rose (DP+bitset)
- 周末