您的位置:首页 > 运维架构

hadoop cgroup源码解读

2017-07-26 18:23 459 查看
版本:hadoop2.7.2 + centos 6.5

现状

想要理解hadoop cgroup,首先需要对linux cgroup有一定的了解,yarn目前的调度基于内存和cpu,但是cpu资源并没有像内存资源那样进行了严格的限制,一个container有可以占据很多的cpu资源。

启用hadoop cgroup,可以参照我的博客hadoop启用cgroup

还有一篇关于cgroup的介绍

当前版本hadoop想要限制cpu资源,是基于cgroup cpu 子系统的资源控制来实现的。支持严格或者非严格模式。非严格模式下,一个nodemanger节点上,所有的container占用的cpu资源不会超过限制的总的cpu资源。严格模式下,将会严格限制到每个container最多不会超过设定的cpu资源,而不管cpu整体资源是否空闲。

#

hadoop cgroup需要启用LinuxExecutor,并且executor资源控制类为org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler

该类的变量:

private Configuration conf;
private String cgroupPrefix;
private boolean cgroupMount;
private String cgroupMountPath;

private boolean cpuWeightEnabled = true;
private boolean strictResourceUsageMode = false;

private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup";
private final String CONTROLLER_CPU = "cpu";
private final String CPU_PERIOD_US = "cfs_period_us";
private final String CPU_QUOTA_US = "cfs_quota_us";
//cpu权重值
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel

//linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us
private final int MAX_QUOTA_US = 1000 * 1000;
private final int MIN_PERIOD_US = 1000;

//挂载cgroup和组
private final Map<String, String> controllerPaths; // Controller -> path

private long deleteCgroupTimeout;
private long deleteCgroupDelay;
// package private for testing purposes
Clock clock;

//这个值将会被赋值为实际允许的cpu控制组的最大核数
private float yarnProcessors;


1.init方法,加载Configuration,获取参数值,挂载cgroup,创建控制组,默认是hadoop-yarn,然后计算出要限制的cpu.cfs_quota_us和cpu.cfs_period_us的值,写入相应的文件中。

@VisibleForTesting
void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
throws IOException {
//加载配置文件
initConfig();

//如果设置允许挂载,则挂载cgroup
// mount cgroups if requested
if (cgroupMount && cgroupMountPath != null) {
ArrayList<String> cgroupKVs = new ArrayList<String>();

//值为  cpu=/cgroup/cpu
cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" +
CONTROLLER_CPU);
//   两个值分别为  cpu=/cgroup/cpu   , hadoop-yarn
lce.mountCgroups(cgroupKVs, cgroupPrefix);
}
//创建/cgroup/cpu/hadoop-yarn目录
initializeControllerPaths();

// cap overall usage to the number of cores allocated to
//得到设置的nodemanger允许的最大的cpu核数
YARN
yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
//计算得到cpu.cfs_period_us和cpu.cfs_quota_us的值
int[] limits = getOverallLimits(yarnProcessors);
//更新cpu.cfs_period_us的值
updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
//更新cpu.cfs_quota_us的值
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
} else if (cpuLimitsExist()) {
LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
}
}


2.getOverallLimits方法,根据值计算出相应的limit[]值:cpu.cfs_period_us 和cpu.cfs_quota_us。

在linux cgroup中,linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us。与cpu.cfs_quota_us配合,来限制cpu资源。

这个方法quotaUS默认1000*1000,然后计算出cfs_period_us的值。

参数yarnProcessors为可用的cpu的核数

@VisibleForTesting
int[] getOverallLimits(float yarnProcessors) {

int[] ret = new int[2];

if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}

int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG
.warn("The quota calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}

// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG
.warn("The period calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}

ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
}


3.updateCgroup,将/controller/groupName/param文件的值更新为value。

private void updateCgroup(String controller, String groupName, String param,
String value) throws IOException {
String path = pathForCgroup(controller, groupName);
param = controller + "." + param;

if (LOG.isDebugEnabled()) {
LOG.debug("updateCgroup: " + path + ": " + param + "=" + value);
}

PrintWriter pw = null;
try {
File file = new File(path + "/" + param);
Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
pw = new PrintWriter(w);
pw.write(value);
} catch (IOException e) {
throw new IOException("Unable to set " + param + "=" + value +
" for cgroup at: " + path, e);
} finally {
if (pw != null) {
boolean hasError = pw.checkError();
pw.close();
if(hasError) {
throw new IOException("Unable to set " + param + "=" + value +
" for cgroup at: " + path);
}
if(pw.checkError()) {
throw new IOException("Error while closing cgroup file " + path);
}
}
}
}


4.该类继承了LCEResourcesHandler接口

public interface LCEResourcesHandler extends Configurable {

void init(LinuxContainerExecutor lce) throws IOException;

/**
* Called by the LinuxContainerExecutor before launching the executable
* inside the container.
* @param containerId the id of the container being launched
* @param containerResource the node resources the container will be using
*/
void preExecute(ContainerId containerId, Resource containerResource)
throws IOException;

/**
* Called by the LinuxContainerExecutor after the executable inside the
* container has exited (successfully or not).
* @param containerId the id of the container which was launched
*/
void postExecute(ContainerId containerId);

String getResourcesOption(ContainerId containerId);
}


CgroupsLCEResourcesHandler的实现方法:

/*
* LCE Resources Handler interface
*/

public void preExecute(ContainerId containerId, Resource containerResource)
throws IOException {
setupLimits(containerId, containerResource);
}

public void postExecute(ContainerId containerId) {
clearLimits(containerId);
}

public String getResourcesOption(ContainerId containerId) {
String containerName = containerId.toString();

StringBuilder sb = new StringBuilder("cgroups=");

if (isCpuWeightEnabled()) {
sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks");
sb.append(",");
}

if (sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1);
}

return sb.toString();
}


5.setupLimits方法,为每一个container创建控制组,并且是严格模式的话,更新每个container的cpu限制。

hadoop cgroup会在/cgroup/cpu/hadoop-yarn下为每一个container创建一个控制组,等任务结束后再调用clearLimits()方法来删除这个控制组。

private void setupLimits(ContainerId containerId,
Resource containerResource) throws IOException {
String containerName = containerId.toString();

if (isCpuWeightEnabled()) {
int containerVCores = containerResource.getVirtualCores();
createCgroup(CONTROLLER_CPU, containerName);
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(cpuShares));
//是否启用了严格模式,在严格模式的情况下,会更新每个container内的cfs_quota_us和cfs_period_us的值
if (strictResourceUsageMode) {
int nodeVCores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,
String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,
String.valueOf(limits[1]));
}
}
}
}


6.clearLimits()方法,container结束后删除这个container的控制组

private void clearLimits(ContainerId containerId) {
if (isCpuWeightEnabled()) {
deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString()));
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop cgroup