您的位置:首页 > 大数据 > 人工智能

Facebook的公平份额调度器FairScheduler

2014-04-27 22:23 417 查看
FairScheduler是由Facebook公司提出的,为了解决Facebook要处理生产型作业(数据分析、Hive)、大型批处理作业(数据挖掘、机器学习)、小型交互作业(Hive查询)的问题。同时满足不同用户提交的作业在计算时间、存储空间、数据流量和响应时间都有不同需求的情况下,使用Hadoop mapreduce框架能够应对多种类型作业并行执行,使得用户具有良好的体验,所以Facebook提出了该算法。
对于FairScheduler的设计思想,大家普遍都认为它尽可能保证所有的作业都能够获得等量的资源份额。即但系统中只有一个作业执行时,它将独占集群所有资源;而当有其它作业被提交时就会有TaskTracker被释放并分配给新提交的作业,以保证所有的作业都能够获得大体相同的计算资源。笔者在分析了Hadoop-0.20.2.0版本的FairScheduler调度器源码之后,发现这个对FairScheduler的描述有较大的出入,同时还发现它的源码实现有相当大的问题。所以,本文将从源码的级别上来详细的分析Facebook的这个公平份额调度器(FairScheduler)。

FairScheduler设计的相当灵活,用户可以根据自己的具体需求来对该任务调度器进行扩展,为了更好的理解它的工作原理及扩展性,有必要先分析一下它的相关类。



从这个类图中可以看出,FairScheduler任务调度器主要由5大组件构成:作业池管理器、负载均衡器、任务选择器、权重调整器、作业调度更新线程。其中,作业池管理器(PoolManager)主要负责以池的单位来管理用户提交的作业,这是因为每一个作业池中每次参与调度的作业的数量是由限制的,所以每一个作业必须对应一个唯一的作业池;负载均衡器(LoadManager)会根据当前集群的负载以及当前TaskTracker节点的负载情况来决定是否应该给该TaskTracker节点分配Map/Reduce任务;任务选择器(TaskSelector)负责从一个作业中选取一个Map/Reduce任务给TaskTracker节点;作业调度更新线程(UpdateThread)会每隔500ms更新一次可调度的作业集,在更新的过程中,它会调用权重调整器(WeightAdjuster)来更新每一个作业的权重。这样,FairScheduler的整个调度框架如下:



1.FairScheduler的启动

公平份额调度器FairScheduler的start()方法主要负载创建/启动器内部的组件,同时它还提供了一个基于Web的可视化管理界面,来对FairScheduler进行简单的管理与动态配置。关于这个可视化的管理不会在本文讨论。这个启动过程的主要步骤如下:

1). 创建/启动作业初始化器EagerTaskInitializationListener;

2). 将作业初始化器和作业接收器(JobListener)注册到JobTracker;

3). 创建作业池管理器(PoolManager);

4). 创建/启动负载均衡器(LoadManager)和任务选择器(TaskSelector);

5). 创建权重调整器(WeightAdjuster);

6). 根据配置来初始化assignMultiple和sizeBasedWeight的值;

7). 创建/启动作业调度更新线程(UpdateThread);

8). 启动基于Web的可视化管理器;

之所以说FairScheduler的设计具有较好的可扩展性,是因为用户可以根据自己的具体应用场景来自定义负载均衡器、任务选择器以及权重调整器,然后配置到FairScheduler中即可。这个配置的方法就是在JobTracker节点的配置文件中配置对应的实现类全路径名,而他们对应的配置项分别为:

[java]
view plaincopyprint?

负载均衡器: mapred.fairscheduler.loadmanager 任务选择器: mapred.fairscheduler.taskselector 权重调整器: mapred.fairscheduler.weightadjuster

负载均衡器: mapred.fairscheduler.loadmanager
任务选择器: mapred.fairscheduler.taskselector
权重调整器: mapred.fairscheduler.weightadjuster
另外,FairScheduler的属性assignMultiple被用来控制给一个TaskTracker节点分配任务的数量,如果该值配置为true,则最多可以给一个TaskTracker节点分配一个Map任务和一个Reduce任务,否则最多只为其分配一个Map任务或者一个Reduce任务。该值通过mapred.fairscheduler.assignmultiple来设置;属性sizeBasedWeight被用来在更新作业的权重的时候是否应该考虑该作业尚未完成任务的大小,可通过mapred.fairscheduler.sizebasedweight来设置。

2.作业池管理器(PoolManager)

这里之所以要重点介绍作业池管理器,是因为它与作业的调度顺序休戚相关,确切的来说是它的配置在很到程度上决定作业的调度。这是因为,FairScheduler从两个层面上来考虑调度作业,它首先会根据User和Pool的限制条件来选取一定量的作业作为当前可调度的作业集;然后对这个可调度的作业集进行基于公平度的排序,进而优先调度那些公平度低的作业。这个公平度反映了作业已占用计算资源与它应该分得的计算资源之间的落差,每一个作业的公平亏欠度不仅取决于这个落差,还取决于作业处于这种资源分配不公平状态的时间。同时,每一个作业应该分得的计算资源与它的权重以及它所属Pool的权重有关,也就是说,作业的权重及它所属Pool的权重越大,那么它所占用的计算资源也应该越多。一个作业的权重计算方法如下:

[java]
view plaincopyprint?

private double calculateRawWeight(JobInProgress job, TaskType taskType) {

if (!isRunnable(job)) {//作业是否在当前可调度的作业集中

return 0;

} else {
double weight =
1.0;
if (sizeBasedWeight) {

// 作业还未完成的任务数量
weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);

}
weight *= getPriorityFactor(job.getPriority());//作业的优先级

if (weightAdjuster !=
null) {
//用户来调整作业的权重
weight = weightAdjuster.adjustWeight(job, taskType, weight);

}

return weight;
}
}

private double calculateRawWeight(JobInProgress job, TaskType taskType) {
if (!isRunnable(job)) {//作业是否在当前可调度的作业集中
return 0;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
// 作业还未完成的任务数量
weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
}
weight *= getPriorityFactor(job.getPriority());//作业的优先级
if (weightAdjuster != null) {
//用户来调整作业的权重
weight = weightAdjuster.adjustWeight(job, taskType, weight);
}

return weight;
}
}
刚才说过,FairScheduler会先基于FIFO的策略从User和Pool的限制层面上选择一批作业作为当前可调度作业集,这里的User限制是指在这个可调度作业集中属于该User的作业数量不能超过他的上限,Pool限制则指在这个可调度作业集中属于该Pool的作业数量不能超过它的上限,各个User、Pool的限制都保存在PoolManager中,而PoolManager是通过加载配置文件来得到这些限制信息的。而这个配置文件的路径又可以通过JobTracker节点的配置文件来设置,对应的配置项为:mapred.fairscheduler.allocation.file,同时在这个配置文件中,还可以指定一个Poll至少可分得集群中多少Map/Reduce计算资源。关于如何配置User、Pool的限制及Pool的计算资源,感兴趣的同学可以参考Hadoop的官网。另外,对于如何指定一个作业属于哪一个Pool(请注意,这里的Pool不同于作业所属的队列,但可以通过配置让Pool等价于作业队列),可以通过作业的配置文件来执行,对应的配置项名则又是由JobTracker节点的配置文件中的mapred.fairscheduler.poolnameproperty项所决定。

3.可调度作业集及其状态的更新

可调度作业集及其状态的更新主要由两个事件来触发,一是用户新提交了一个作业并添加到调度器中;二是作业调度更新线程(UpdateThread)的定时(500ms)操作。这个操作过程主要包含以下几个步骤:

1). 基于FIFO的策略从所有已初始化未完成的作业中选取一批作业作为新的可调度作业集,User和Pool的限制条件即是选择结束的条件;

2). 更新每一个作业的running以及非running的Map/Reduce任务数量,但对于非可调度的作业,其非running的Map/Reduce任务数量都为0;

3). 更新每一个可调度作业的全局权重(这个计算方法在稍后会详细谈到);

4). 基于Pool的计算资源和该Pool中可调度作业的全局权重来计算作业应该分配的计算资源(作业最小资源量);

5). 基于集群的计算资源和所有可调度作业的全局权重来计算作业应该分配的计算资源(作业公平份额量);

不过在用户新提交的一个作业添加到调度器的处理过程中除了上述操作之外,还有2个额外操作,一是删除作业集中以完成的作业,二是更新每一个作业的公平亏欠度,它的计算方法如下:

[java]
view plaincopyprint?

//应该分得计算资源(*FairShare)与实际得到的资源(running*s)之间的差乘以处于这种"不公平"状态的时间timeDelta

private void updateDeficits(long timeDelta) {

for (JobInfo info: infos.values()) {

info.mapDeficit += (info.mapFairShare - info.runningMaps) * timeDelta;

info.reduceDeficit += (info.reduceFairShare - info.runningReduces) * timeDelta;

}
}

//应该分得计算资源(*FairShare)与实际得到的资源(running*s)之间的差乘以处于这种"不公平"状态的时间timeDelta
private void updateDeficits(long timeDelta) {
for (JobInfo info: infos.values()) {
info.mapDeficit += (info.mapFairShare - info.runningMaps) * timeDelta;
info.reduceDeficit += (info.reduceFairShare - info.runningReduces) * timeDelta;
}
}
每一个作业池都配置有固定的计算资源(如果在配置文件中没有明确配置,那么该Poo的计算资源就默认为0),因此就需要将该作业池的计算资源分配给该Pool中的当前可调度作业。而Pool中的每一个可调度作业到底要被分配多少个计算资源主要依赖于该作业的全局权重。这种基于Pool的计算资源和作业的全局权重来分配计算资源的方法如下:

[java]
view plaincopyprint?

private void updateMinSlots() {

//Clear old minSlots
for (JobInfo info: infos.values()) {

info.minMaps = 0;
info.minReduces = 0;

}
// 为每一个Pool中的可调度作业分配计算资源.

PoolManager poolMgr = getPoolManager();
for (Pool pool: poolMgr.getPools()) {

for (final TaskType type: TaskType.values()) {

Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());

//该Pool的计算资源总量
int slotsLeft = poolMgr.getAllocation(pool.getName(), type);

//给该Pool中所有可调度的作业分配计算资源

while (slotsLeft > 0) {

// Figure out total weight of jobs that still need slots

double totalWeight = 0;

for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {

JobInProgress job = it.next();
//选择该Pool中还需要计算资源的可调度作业并统计它们的权重和

if (isRunnable(job) && runnableTasks(job, type) > minTasks(job, type)) {

totalWeight += weight(job, type);
} else {
it.remove();
}
}

if (totalWeight ==
0) break;

//对于还需要计算资源的可调度作业,根据它们的权重比重把该Pool中剩余的计算资源分配给他们

int oldSlots = slotsLeft;

for (JobInProgress job: jobs) {

double weight = weight(job, type);

int share = (int) Math.floor(oldSlots * weight / totalWeight);

slotsLeft = giveMinSlots(job, type, slotsLeft, share);
}

if (slotsLeft == oldSlots) {

// No tasks were assigned; do another pass using ceil, giving the

// extra slots to jobs in order of weight then deficit

List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);

Collections.sort(sortedJobs, new Comparator<JobInProgress>() {

public int compare(JobInProgress j1, JobInProgress j2) {

double dif = weight(j2, type) - weight(j1, type);

if (dif == 0)
// Weights are equal, compare by deficit
dif = deficit(j2, type) - deficit(j1, type);
return (int) Math.signum(dif);

}
});
for (JobInProgress job: sortedJobs) {

double weight = weight(job, type);

int share = (int) Math.ceil(oldSlots * weight / totalWeight);

slotsLeft = giveMinSlots(job, type, slotsLeft, share);
}
if (slotsLeft > 0) {

LOG.warn("Had slotsLeft = " + slotsLeft +
" after the final loop in updateMinSlots. This probably means some fair scheduler weights are being set to NaN or Infinity.");

}
break;
}

}//while

}//for

}//for

}

private void updateMinSlots() {
//Clear old minSlots
for (JobInfo info: infos.values()) {
info.minMaps = 0;
info.minReduces = 0;
}
// 为每一个Pool中的可调度作业分配计算资源.
PoolManager poolMgr = getPoolManager();
for (Pool pool: poolMgr.getPools()) {

for (final TaskType type: TaskType.values()) {
Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
//该Pool的计算资源总量
int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
//给该Pool中所有可调度的作业分配计算资源
while (slotsLeft > 0) {
// Figure out total weight of jobs that still need slots
double totalWeight = 0;
for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
JobInProgress job = it.next();
//选择该Pool中还需要计算资源的可调度作业并统计它们的权重和
if (isRunnable(job) && runnableTasks(job, type) > minTasks(job, type)) {
totalWeight += weight(job, type);
} else {
it.remove();
}
}

if (totalWeight == 0)  break;

//对于还需要计算资源的可调度作业,根据它们的权重比重把该Pool中剩余的计算资源分配给他们
int oldSlots = slotsLeft;
for (JobInProgress job: jobs) {
double weight = weight(job, type);
int share = (int) Math.floor(oldSlots * weight / totalWeight);
slotsLeft = giveMinSlots(job, type, slotsLeft, share);
}

if (slotsLeft == oldSlots) {
// No tasks were assigned; do another pass using ceil, giving the
// extra slots to jobs in order of weight then deficit
List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
public int compare(JobInProgress j1, JobInProgress j2) {
double dif = weight(j2, type) - weight(j1, type);
if (dif == 0) // Weights are equal, compare by deficit
dif = deficit(j2, type) - deficit(j1, type);
return (int) Math.signum(dif);
}
});
for (JobInProgress job: sortedJobs) {
double weight = weight(job, type);
int share = (int) Math.ceil(oldSlots * weight / totalWeight);
slotsLeft = giveMinSlots(job, type, slotsLeft, share);
}
if (slotsLeft > 0) {
LOG.warn("Had slotsLeft = " + slotsLeft + " after the final loop in updateMinSlots. This probably means some fair scheduler weights are being set to NaN or Infinity.");
}
break;
}

}//while

}//for

}//for

}
对于通过集群的计算资源和所有可调度作业的全局权重来最终确定该作业的应该分配的公平份额的算法,笔者认为这个算法可能存在某些问题而导致很难理解,所以本文不会详细讨论,有知道的博友可以@我。该算法的实现源码如下:

[java]
view plaincopyprint?

private void updateFairShares(ClusterStatus clusterStatus) {

// Clear old fairShares
for (JobInfo info: infos.values()) {

info.mapFairShare = 0;

info.reduceFairShare = 0;

}

// 计算每一个可调度作业应该分得的每类计算资源.
for (TaskType type: TaskType.values()) {

//选择未完成并且可调度的作业
HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();

for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {

JobInProgress job = entry.getKey();
JobInfo info = entry.getValue();
if (isRunnable(job) && runnableTasks(job, type) >
0) {
jobsLeft.add(info);
}
}

//获取整个集群的计算资源
double slotsLeft = getTotalSlots(type, clusterStatus);

//计算每一个未完成并且可调度作业应该分得的某一类计算资源

while (!jobsLeft.isEmpty()) {

double totalWeight =
0;
//统计所有未完成并可调度作业的权重和
for (JobInfo info: jobsLeft) {

double weight = (type == TaskType.MAP ? info.mapWeight : info.reduceWeight);

totalWeight += weight;
}
boolean recomputeSlots =
false;
double oldSlots = slotsLeft;
// Copy slotsLeft so we can modify it
for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {

JobInfo info = iter.next();
double minSlots = (type == TaskType.MAP ? info.minMaps : info.minReduces);

double weight = (type == TaskType.MAP ? info.mapWeight : info.reduceWeight);

//基于公平性计算该作业应该分配的计算资源
double fairShare = weight / totalWeight * oldSlots;

//对于以Pool的计算为准来更新作业的公平资源配额

if (minSlots > fairShare) {

if (type == TaskType.MAP) info.mapFairShare = minSlots;

else info.reduceFairShare = minSlots;

slotsLeft -= minSlots;
iter.remove();
recomputeSlots = true;

}
}//for

if (!recomputeSlots) {

// All minimums are met. Give each job its fair share of excess slots.

for (JobInfo info: jobsLeft) {

double weight = (type == TaskType.MAP ? info.mapWeight : info.reduceWeight);

double fairShare = weight / totalWeight * oldSlots;

if (type == TaskType.MAP)

info.mapFairShare = fairShare;
else
info.reduceFairShare = fairShare;
}
break;
}
}//while

}
}

private void updateFairShares(ClusterStatus clusterStatus) {
// Clear old fairShares
for (JobInfo info: infos.values()) {
info.mapFairShare = 0;
info.reduceFairShare = 0;
}

// 计算每一个可调度作业应该分得的每类计算资源.
for (TaskType type: TaskType.values()) {
//选择未完成并且可调度的作业
HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
JobInProgress job = entry.getKey();
JobInfo info = entry.getValue();
if (isRunnable(job) && runnableTasks(job, type) > 0) {
jobsLeft.add(info);
}
}

//获取整个集群的计算资源
double slotsLeft = getTotalSlots(type, clusterStatus);

//计算每一个未完成并且可调度作业应该分得的某一类计算资源
while (!jobsLeft.isEmpty()) {
double totalWeight = 0;
//统计所有未完成并可调度作业的权重和
for (JobInfo info: jobsLeft) {
double weight = (type == TaskType.MAP ? info.mapWeight : info.reduceWeight);
totalWeight += weight;
}
boolean recomputeSlots = false;
double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
JobInfo info = iter.next();
double minSlots = (type == TaskType.MAP ? info.minMaps : info.minReduces);
double weight = (type == TaskType.MAP ? info.mapWeight : info.reduceWeight);
//基于公平性计算该作业应该分配的计算资源
double fairShare = weight / totalWeight * oldSlots;
//对于以Pool的计算为准来更新作业的公平资源配额
if (minSlots > fairShare) {
if (type == TaskType.MAP) info.mapFairShare = minSlots;
else info.reduceFairShare = minSlots;
slotsLeft -= minSlots;
iter.remove();
recomputeSlots = true;
}
}//for

if (!recomputeSlots) {
// All minimums are met. Give each job its fair share of excess slots.
for (JobInfo info: jobsLeft) {
double weight = (type == TaskType.MAP ? info.mapWeight : info.reduceWeight);
double fairShare = weight / totalWeight * oldSlots;
if (type == TaskType.MAP)
info.mapFairShare = fairShare;
else
info.reduceFairShare = fairShare;
}
break;
}
}//while

}
}


3.作业的全局权重及资源量计算方法

1). 作业原始权重:



2). 作业全局权重:



3). 作业最小资源量:



4). 作业公平份额量(简化计算方法):



4.作业的调度

当一个公平份额调度器FairScheduler给一个TaskTracker节点分配任务时,它只分配那些可调度作业的任务给当前的计算节点,同时这些可调度作业的优后顺序是按照作业的公平亏损度从高到低排序的,也即是说,FairScheduler优先调度那些公平亏损度高的作业,具体的排序算法实现如下:

[java]
view plaincopyprint?

<SPAN> private
class DeficitComparator implements Comparator<JobInProgress> {

private final TaskType taskType;

private DeficitComparator(TaskType taskType) {

this.taskType = taskType;

}

public int compare(JobInProgress j1, JobInProgress j2) {

JobInfo j1Info = infos.get(j1);
JobInfo j2Info = infos.get(j2);
long deficitDif;

boolean j1Needy, j2Needy;

if (taskType == TaskType.MAP) {

//检查作业实际占用的计算资源量是否小于它应该获得的最小计算资源量
j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);

j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
//比较两个作业的公平亏欠度
deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
} else {
j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);

deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
}

if (j1Needy && !j2Needy)

return -1;

else if (j2Needy && !j1Needy)

return 1;

else // Both needy or both non-needy; compare by deficit

return (int) Math.signum(deficitDif);

}
}
</SPAN>

private class DeficitComparator implements Comparator<JobInProgress> {
private final TaskType taskType;

private DeficitComparator(TaskType taskType) {
this.taskType = taskType;
}

public int compare(JobInProgress j1, JobInProgress j2) {
JobInfo j1Info = infos.get(j1);
JobInfo j2Info = infos.get(j2);
long deficitDif;
boolean j1Needy, j2Needy;
if (taskType == TaskType.MAP) {
//检查作业实际占用的计算资源量是否小于它应该获得的最小计算资源量
j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
//比较两个作业的公平亏欠度
deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
} else {
j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
}

if (j1Needy && !j2Needy)
return -1;
else if (j2Needy && !j1Needy)
return 1;
else // Both needy or both non-needy; compare by deficit
return (int) Math.signum(deficitDif);
}
}
FairScheduler在调度某一个具体的作业之前,还会先调用负载均衡器来判断是否应该给当前的TaskTracker节点分配任务。另外,对于任何一个TaskTracker节点,FairScheduler最多只为它分配2个任务,而且最多只有1个Map任务和1个Reduce任务,也就是给一个TaskTracker节点分配的任务只有4种组合情况:

1). 一个任务也没有;

2). 一个Map任务;

3). 一个Reduce任务;

4). 一个Map任务和一个Reduce任务;

FairScheduler总体的调度原则是,一是保证各个Pool及User先提交的作业先执行完,二是保证所有的作业享有与其权重对应的计算资源量。

何时使用各个调度器

这些调度算法各具针对性。如果正在运行一个大型Hadoop集群,它具有多个客户端和不同类型、不同优先级的作业,那么容量调度器是最好选择,它可以确保访问,并能重用未使用的容量并调整队列中作业的优先级。尽管不太复杂,但无论是小型还是大型集群,如果由同一个组织使用,工作负载数量有限,那么公平调度器也能运转得很好。公平调度可以将容量不均匀地分配给池(作业的),但是它较为简单且可配置性较低。公平调度在存在多种作业的情况下非常有用,因为它能为小作业和大作业混合的情况提供更快的响应时间(支持更具交互性的使用模型)。(该段落转载自网络)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: