hadoop cgroup源码解读

版本:hadoop2.7.2 + centos 6.5


想要理解hadoop cgroup,首先需要对linux cgroup有一定的了解,yarn目前的调度基于内存和cpu,但是cpu资源并没有像内存资源那样进行了严格的限制,一个container有可以占据很多的cpu资源。

启用hadoop cgroup,可以参照我的博客hadoop启用cgroup


当前版本hadoop想要限制cpu资源,是基于cgroup cpu 子系统的资源控制来实现的。支持严格或者非严格模式。非严格模式下,一个nodemanger节点上,所有的container占用的cpu资源不会超过限制的总的cpu资源。严格模式下,将会严格限制到每个container最多不会超过设定的cpu资源,而不管cpu整体资源是否空闲。


hadoop cgroup需要启用LinuxExecutor,并且executor资源控制类为org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler


private Configuration conf;
private String cgroupPrefix;
private boolean cgroupMount;
private String cgroupMountPath;

private boolean cpuWeightEnabled = true;
private boolean strictResourceUsageMode = false;

private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup";
private final String CONTROLLER_CPU = "cpu";
private final String CPU_PERIOD_US = "cfs_period_us";
private final String CPU_QUOTA_US = "cfs_quota_us";
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel

//linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us
private final int MAX_QUOTA_US = 1000 * 1000;
private final int MIN_PERIOD_US = 1000;

private final Map<String, String> controllerPaths; // Controller -> path

private long deleteCgroupTimeout;
private long deleteCgroupDelay;
// package private for testing purposes
Clock clock;

private float yarnProcessors;


void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
throws IOException {

// mount cgroups if requested
if (cgroupMount && cgroupMountPath != null) {
ArrayList<String> cgroupKVs = new ArrayList<String>();

//值为  cpu=/cgroup/cpu
cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" +
//   两个值分别为  cpu=/cgroup/cpu   , hadoop-yarn
lce.mountCgroups(cgroupKVs, cgroupPrefix);

// cap overall usage to the number of cores allocated to
yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
} else if (cpuLimitsExist()) {
LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));

2.getOverallLimits方法,根据值计算出相应的limit[]值:cpu.cfs_period_us 和cpu.cfs_quota_us。

在linux cgroup中,linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us。与cpu.cfs_quota_us配合,来限制cpu资源。



int[] getOverallLimits(float yarnProcessors) {

int[] ret = new int[2];

if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");

int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
.warn("The quota calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");

// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
.warn("The period calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;

ret[0] = periodUS;
ret[1] = quotaUS;
return ret;


private void updateCgroup(String controller, String groupName, String param,
String value) throws IOException {
String path = pathForCgroup(controller, groupName);
param = controller + "." + param;

if (LOG.isDebugEnabled()) {
LOG.debug("updateCgroup: " + path + ": " + param + "=" + value);

PrintWriter pw = null;
try {
File file = new File(path + "/" + param);
Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
pw = new PrintWriter(w);
} catch (IOException e) {
throw new IOException("Unable to set " + param + "=" + value +
" for cgroup at: " + path, e);
} finally {
if (pw != null) {
boolean hasError = pw.checkError();
if(hasError) {
throw new IOException("Unable to set " + param + "=" + value +
" for cgroup at: " + path);
if(pw.checkError()) {
throw new IOException("Error while closing cgroup file " + path);


public interface LCEResourcesHandler extends Configurable {

void init(LinuxContainerExecutor lce) throws IOException;

* Called by the LinuxContainerExecutor before launching the executable
* inside the container.
* @param containerId the id of the container being launched
* @param containerResource the node resources the container will be using
void preExecute(ContainerId containerId, Resource containerResource)
throws IOException;

* Called by the LinuxContainerExecutor after the executable inside the
* container has exited (successfully or not).
* @param containerId the id of the container which was launched
void postExecute(ContainerId containerId);

String getResourcesOption(ContainerId containerId);


* LCE Resources Handler interface

public void preExecute(ContainerId containerId, Resource containerResource)
throws IOException {
setupLimits(containerId, containerResource);

public void postExecute(ContainerId containerId) {

public String getResourcesOption(ContainerId containerId) {
String containerName = containerId.toString();

StringBuilder sb = new StringBuilder("cgroups=");

if (isCpuWeightEnabled()) {
sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks");

if (sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1);

return sb.toString();


hadoop cgroup会在/cgroup/cpu/hadoop-yarn下为每一个container创建一个控制组,等任务结束后再调用clearLimits()方法来删除这个控制组。

private void setupLimits(ContainerId containerId,
Resource containerResource) throws IOException {
String containerName = containerId.toString();

if (isCpuWeightEnabled()) {
int containerVCores = containerResource.getVirtualCores();
createCgroup(CONTROLLER_CPU, containerName);
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
updateCgroup(CONTROLLER_CPU, containerName, "shares",
if (strictResourceUsageMode) {
int nodeVCores =
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,
updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,


private void clearLimits(ContainerId containerId) {
if (isCpuWeightEnabled()) {
deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString()));
