Mesos源码分析(5): Mesos Master的启动之四

5. Create an instance of allocator.





由于Mesos为跨数据中心调度资源并且是异构的资源需求时,资源分配相比普通调度将会更加困难。因此Mesos采用了DRF(主导资源公平算法 Dominant Resource Fairness)



考虑一个9CPU,18GBRAM的系统,拥有两个用户,其中用户A运行的任务的需求向量为{1CPU, 4GB},用户B运行的任务的需求向量为{3CPU,1GB},用户可以执行尽量多的任务来使用系统的资源。

在上述方案中,A的每个任务消耗总cpu的1/9和总内存的2/9,所以A的dominant resource是内存;B的每个任务消耗总cpu的1/3和总内存的1/18,所以B的dominant resource为CPU。DRF会均衡用户的dominant shares,执行3个用户A的任务,执行2个用户B的任务。三个用户A的任务总共消耗了{3CPU,12GB},两个用户B的任务总共消耗了{6CPU,2GB};在这个分配中,每一个用户的dominant share是相等的,用户A获得了2/3的RAM,而用户B获得了2/3的CPU。

以上的这个分配可以用如下方式计算出来:x和y分别是用户A和用户B的分配任务的数目,那么用户A消耗了{xCPU,4xGB},用户B消耗了{3yCPU,yGB},在图三中用户A和用户B消耗了同等dominant resource;用户A的dominant share为4x/18,用户B的dominant share为3y/9。所以DRF分配可以通过求解以下的优化问题来得到:

max(x,y) #(Maximize allocations)

subject to

x + 3y <= 9 #(CPU constraint)

4x + y <= 18 #(Memory Constraint)

2x/9 = y/3 #(Equalize dominant shares)

最后解出x=3以及y=2,因而用户A获得{3CPU,12GB},B得到{6CPU, 2GB}。




void HierarchicalAllocatorProcess::allocate(

const hashset<SlaveID>& slaveIds_)



// Compute the offerable resources, per framework:

// (1) For reserved resources on the slave, allocate these to a

// framework having the corresponding role.

// (2) For unreserved resources on the slave, allocate these

// to a framework of any role.

hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;

// NOTE: This function can operate on a small subset of slaves, we have to

// make sure that we don't assume cluster knowledge when summing resources

// from that set.

vector<SlaveID> slaveIds;


// Filter out non-whitelisted and deactivated slaves in order not to send

// offers for them.

foreach (const SlaveID& slaveId, slaveIds_) {

if (isWhitelisted(slaveId) && slaves[slaveId].activated) {




// Randomize the order in which slaves' resources are allocated.


// TODO(vinod): Implement a smarter sorting algorithm.

std::random_shuffle(slaveIds.begin(), slaveIds.end());

// Returns the __quantity__ of resources allocated to a quota role. Since we

// account for reservations and persistent volumes toward quota, we strip

// reservation and persistent volume related information for comparability.

// The result is used to determine whether a role's quota is satisfied, and

// also to determine how many resources the role would need in order to meet

// its quota.


// NOTE: Revocable resources are excluded in `quotaRoleSorter`.

auto getQuotaRoleAllocatedResources = [this](const string&
role) {


// NOTE: `allocationScalarQuantities` omits dynamic reservation and

// persistent volume info, but we additionally strip `role` here.

Resources resources;

foreach (Resource resource,

quotaRoleSorter->allocationScalarQuantities(role)) {




resources += resource;


return resources;


// Quota comes first and fair share second. Here we process only those

// roles, for which quota is set (quota'ed roles). Such roles form a

// special allocation group with a dedicated sorter.

foreach (const SlaveID&
slaveId, slaveIds) {

foreach (const string&
role, quotaRoleSorter->sort()) {


// If there are no active frameworks in this role, we do not

// need to do any allocations for this role.

if (!activeRoles.contains(role)) {



// Get the total quantity of resources allocated to a quota role. The

// value omits role, reservation, and persistence info.

Resources roleConsumedResources = getQuotaRoleAllocatedResources(role);

// If quota for the role is satisfied, we do not need to do any further

// allocations for this role, at least at this stage.


// TODO(alexr): Skipping satisfied roles is pessimistic. Better

// alternatives are:

// * A custom sorter that is aware of quotas and sorts accordingly.

// * Removing satisfied roles from the sorter.

if (roleConsumedResources.contains(quotas[role].info.guarantee())) {



// Fetch frameworks according to their fair share.

foreach (const string&
frameworkId_, frameworkSorters[role]->sort()) {

FrameworkID frameworkId;


// If the framework has suppressed offers, ignore. The unallocated

// part of the quota will not be allocated to other roles.

if (frameworks[frameworkId].suppressed) {



// Only offer resources from slaves that have GPUs to

// frameworks that are capable of receiving GPUs.

// See MESOS-5634.

if (!frameworks[frameworkId].gpuAware &&

slaves[slaveId].total.gpus().getOrElse(0) > 0) {



// Calculate the currently available resources on the slave.

Resources available = slaves[slaveId].total - slaves[slaveId].allocated;

// The resources we offer are the unreserved resources as well as the

// reserved resources for this particular role. This is necessary to

// ensure that we don't offer resources that are reserved for another

// role.


// NOTE: Currently, frameworks are allowed to have '*' role.

// Calling reserved('*') returns an empty Resources object.


// Quota is satisfied from the available non-revocable resources on the

// agent. It's important that we include reserved resources here since

// reserved resources are accounted towards the quota guarantee. If we

// were to rely on stage 2 to offer them out, they would not be checked

// against the quota guarantee.

Resources resources =

(available.unreserved() + available.reserved(role)).nonRevocable();

// It is safe to break here, because all frameworks under a role would

// consider the same resources, so in case we don't have allocatable

// resources, we don't have to check for other frameworks under the

// same role. We only break out of the innermost loop, so the next step

// will use the same `slaveId`, but a different role.


// NOTE: The resources may not be allocatable here, but they can be

// accepted by one of the frameworks during the second allocation

// stage.

if (!allocatable(resources)) {



// If the framework filters these resources, ignore. The unallocated

// part of the quota will not be allocated to other roles.

if (isFiltered(frameworkId, slaveId, resources)) {



VLOG(2) << "Allocating " << resources << " on
agent " << slaveId

<< " to framework " << frameworkId

<< " as part of its role quota";

// NOTE: We perform "coarse-grained" allocation for quota'ed

// resources, which may lead to overcommitment of resources beyond

// quota. This is fine since quota currently represents a guarantee.

offerable[frameworkId][slaveId] += resources;

slaves[slaveId].allocated += resources;

// Resources allocated as part of the quota count towards the

// role's and the framework's fair share.


// NOTE: Revocable resources have already been excluded.

frameworkSorters[role]->add(slaveId, resources);

frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);

roleSorter->allocated(role, slaveId, resources);

quotaRoleSorter->allocated(role, slaveId, resources);




// Calculate the total quantity of scalar resources (including revocable

// and reserved) that are available for allocation in the next round. We

// need this in order to ensure we do not over-allocate resources during

// the second stage.


// For performance reasons (MESOS-4833), this omits information about

// dynamic reservations or persistent volumes in the resources.


// NOTE: We use total cluster resources, and not just those based on the

// agents participating in the current allocation (i.e. provided as an

// argument to the `allocate()` call) so that frameworks in roles without

// quota are not unnecessarily deprived of resources.

Resources remainingClusterResources = roleSorter->totalScalarQuantities();

foreachkey (const string& role, activeRoles) {

remainingClusterResources -= roleSorter->allocationScalarQuantities(role);


// Frameworks in a quota'ed role may temporarily reject resources by

// filtering or suppressing offers. Hence quotas may not be fully allocated.

Resources unallocatedQuotaResources;

foreachpair (const string&
name, const Quota& quota, quotas) {

// Compute the amount of quota that the role does not have allocated.


// NOTE: Revocable resources are excluded in `quotaRoleSorter`.

// NOTE: Only scalars are considered for quota.

Resources allocated = getQuotaRoleAllocatedResources(name);

const Resources required = quota.info.guarantee();

unallocatedQuotaResources += (required - allocated);


// Determine how many resources we may allocate during the next stage.


// NOTE: Resources for quota allocations are already accounted in

// `remainingClusterResources`.

remainingClusterResources -= unallocatedQuotaResources;

// To ensure we do not over-allocate resources during the second stage

// with all frameworks, we use 2 stopping criteria:

// * No available resources for the second stage left, i.e.

// `remainingClusterResources` - `allocatedStage2` is empty.

// * A potential offer will force the second stage to use more resources

// than available, i.e. `remainingClusterResources` does not contain

// (`allocatedStage2` + potential offer). In this case we skip this

// agent and continue to the next one.


// NOTE: Like `remainingClusterResources`, `allocatedStage2` omits

// information about dynamic reservations and persistent volumes for

// performance reasons. This invariant is preserved because we only add

// resources to it that have also had this metadata stripped from them

// (typically by using `Resources::createStrippedScalarQuantity`).

Resources allocatedStage2;

// At this point resources for quotas are allocated or accounted for.

// Proceed with allocating the remaining free pool.

foreach (const SlaveID&
slaveId, slaveIds) {

// If there are no resources available for the second stage, stop.

if (!allocatable(remainingClusterResources - allocatedStage2)) {



foreach (const string&
role, roleSorter->sort()) {

foreach (const string&

frameworkSorters[role]->sort()) {

FrameworkID frameworkId;


// If the framework has suppressed offers, ignore.

if (frameworks[frameworkId].suppressed) {



// Only offer resources from slaves that have GPUs to

// frameworks that are capable of receiving GPUs.

// See MESOS-5634.

if (!frameworks[frameworkId].gpuAware &&

slaves[slaveId].total.gpus().getOrElse(0) > 0) {



// Calculate the currently available resources on the slave.

Resources available = slaves[slaveId].total - slaves[slaveId].allocated;

// The resources we offer are the unreserved resources as well as the

// reserved resources for this particular role. This is necessary to

// ensure that we don't offer resources that are reserved for another

// role.


// NOTE: Currently, frameworks are allowed to have '*' role.

// Calling reserved('*') returns an empty Resources object.


// NOTE: We do not offer roles with quota any more non-revocable

// resources once their quota is satisfied. However, note that this is

// not strictly true due to the coarse-grained nature (per agent) of the

// allocation algorithm in stage 1.


// TODO(mpark): Offer unreserved resources as revocable beyond quota.

Resources resources = available.reserved(role);

if (!quotas.contains(role)) {

resources += available.unreserved();


// It is safe to break here, because all frameworks under a role would

// consider the same resources, so in case we don't have allocatable

// resources, we don't have to check for other frameworks under the

// same role. We only break out of the innermost loop, so the next step

// will use the same slaveId, but a different role.


// The difference to the second `allocatable` check is that here we also

// check for revocable resources, which can be disabled on a per frame-

// work basis, which requires us to go through all frameworks in case we

// have allocatable revocable resources.

if (!allocatable(resources)) {



// Remove revocable resources if the framework has not opted for them.

if (!frameworks[frameworkId].revocable) {

resources = resources.nonRevocable();


// If the resources are not allocatable, ignore. We can not break

// here, because another framework under the same role could accept

// revocable resources and breaking would skip all other frameworks.

if (!allocatable(resources)) {



// If the framework filters these resources, ignore.

if (isFiltered(frameworkId, slaveId, resources)) {



// If the offer generated by `resources` would force the second

// stage to use more than `remainingClusterResources`, move along.

// We do not terminate early, as offers generated further in the

// loop may be small enough to fit within `remainingClusterResources`.

const Resources scalarQuantity =


if (!remainingClusterResources.contains(

allocatedStage2 + scalarQuantity)) {



VLOG(2) << "Allocating " << resources << " on
agent " << slaveId

<< " to framework " << frameworkId;

// NOTE: We perform "coarse-grained" allocation, meaning that we always

// allocate the entire remaining slave resources to a single framework.


// NOTE: We may have already allocated some resources on the current

// agent as part of quota.

offerable[frameworkId][slaveId] += resources;

allocatedStage2 += scalarQuantity;

slaves[slaveId].allocated += resources;

frameworkSorters[role]->add(slaveId, resources);

frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);

roleSorter->allocated(role, slaveId, resources);

if (quotas.contains(role)) {

// See comment at `quotaRoleSorter` declaration regarding

// non-revocable.

quotaRoleSorter->allocated(role, slaveId, resources.nonRevocable());





if (offerable.empty()) {

VLOG(1) << "No allocations performed";

} else {

// Now offer the resources to each framework.

foreachkey (const FrameworkID& frameworkId, offerable) {

offerCallback(frameworkId, offerable[frameworkId]);



// NOTE: For now, we implement maintenance inverse offers within the

// allocator. We leverage the existing timer/cycle of offers to also do any

// "deallocation" (inverse offers) necessary to satisfy maintenance needs.











double DRFSorter::calculateShare(const string&


double share = 0.0;

// TODO(benh): This implementation of "dominant resource fairness"

// currently does not take into account resources that are not

// scalars.

foreach (const string&
scalar, total_.scalarQuantities.names()) {

// Filter out the resources excluded from fair sharing.

if (fairnessExcludeResourceNames.isSome() &&

fairnessExcludeResourceNames->count(scalar) > 0) {



// We collect the scalar accumulated total value from the

// `Resources` object.


// NOTE: Although in principle scalar resources may be spread

// across multiple `Resource` objects (e.g., persistent volumes),

// we currently strip persistence and reservation metadata from

// the resources in `scalarQuantities`.

Option<Value::Scalar> __total =



const double _total
= __total.get().value();

if (_total > 0.0) {

double allocation = 0.0;

// We collect the scalar accumulated allocation value from the

// `Resources` object.


// NOTE: Although in principle scalar resources may be spread

// across multiple `Resource` objects (e.g., persistent volumes),

// we currently strip persistence and reservation metadata from

// the resources in `scalarQuantities`.

Option<Value::Scalar> _allocation =


if (_allocation.isSome()) {

allocation = _allocation.get().value();


share = std::max(share, allocation / _total);



return share / weights[name];


Quota, Reservation, Role, Weight








6. flags.registry == "in_memory" or flags.registry == "replicated_log" 信息存储在内存,zk,本地文件夹

7. 选举和检测谁是Leader的对象初始化

Try<MasterContender*> contender_ = MasterContender::create(zk, flags.master_contender);

Try<MasterDetector*> detector_ = MasterDetector::create(zk, flags.master_detector);

8. 生成Master对象,启动Master线程

