hadoop cgroup源码解读
2017-07-26 18:23
459 查看
版本:hadoop2.7.2 + centos 6.5
启用hadoop cgroup,可以参照我的博客hadoop启用cgroup
还有一篇关于cgroup的介绍
当前版本hadoop想要限制cpu资源,是基于cgroup cpu 子系统的资源控制来实现的。支持严格或者非严格模式。非严格模式下,一个nodemanger节点上,所有的container占用的cpu资源不会超过限制的总的cpu资源。严格模式下,将会严格限制到每个container最多不会超过设定的cpu资源,而不管cpu整体资源是否空闲。
#
hadoop cgroup需要启用LinuxExecutor,并且executor资源控制类为org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler
该类的变量:
1.init方法,加载Configuration,获取参数值,挂载cgroup,创建控制组,默认是hadoop-yarn,然后计算出要限制的cpu.cfs_quota_us和cpu.cfs_period_us的值,写入相应的文件中。
2.getOverallLimits方法,根据值计算出相应的limit[]值:cpu.cfs_period_us 和cpu.cfs_quota_us。
在linux cgroup中,linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us。与cpu.cfs_quota_us配合,来限制cpu资源。
这个方法quotaUS默认1000*1000,然后计算出cfs_period_us的值。
参数yarnProcessors为可用的cpu的核数
3.updateCgroup,将/controller/groupName/param文件的值更新为value。
4.该类继承了LCEResourcesHandler接口
CgroupsLCEResourcesHandler的实现方法:
5.setupLimits方法,为每一个container创建控制组,并且是严格模式的话,更新每个container的cpu限制。
hadoop cgroup会在/cgroup/cpu/hadoop-yarn下为每一个container创建一个控制组,等任务结束后再调用clearLimits()方法来删除这个控制组。
6.clearLimits()方法,container结束后删除这个container的控制组
现状
想要理解hadoop cgroup,首先需要对linux cgroup有一定的了解,yarn目前的调度基于内存和cpu,但是cpu资源并没有像内存资源那样进行了严格的限制,一个container有可以占据很多的cpu资源。启用hadoop cgroup,可以参照我的博客hadoop启用cgroup
还有一篇关于cgroup的介绍
当前版本hadoop想要限制cpu资源,是基于cgroup cpu 子系统的资源控制来实现的。支持严格或者非严格模式。非严格模式下,一个nodemanger节点上,所有的container占用的cpu资源不会超过限制的总的cpu资源。严格模式下,将会严格限制到每个container最多不会超过设定的cpu资源,而不管cpu整体资源是否空闲。
#
hadoop cgroup需要启用LinuxExecutor,并且executor资源控制类为org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler
该类的变量:
private Configuration conf; private String cgroupPrefix; private boolean cgroupMount; private String cgroupMountPath; private boolean cpuWeightEnabled = true; private boolean strictResourceUsageMode = false; private final String MTAB_FILE = "/proc/mounts"; private final String CGROUPS_FSTYPE = "cgroup"; private final String CONTROLLER_CPU = "cpu"; private final String CPU_PERIOD_US = "cfs_period_us"; private final String CPU_QUOTA_US = "cfs_quota_us"; //cpu权重值 private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel //linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us private final int MAX_QUOTA_US = 1000 * 1000; private final int MIN_PERIOD_US = 1000; //挂载cgroup和组 private final Map<String, String> controllerPaths; // Controller -> path private long deleteCgroupTimeout; private long deleteCgroupDelay; // package private for testing purposes Clock clock; //这个值将会被赋值为实际允许的cpu控制组的最大核数 private float yarnProcessors;
1.init方法,加载Configuration,获取参数值,挂载cgroup,创建控制组,默认是hadoop-yarn,然后计算出要限制的cpu.cfs_quota_us和cpu.cfs_period_us的值,写入相应的文件中。
@VisibleForTesting void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin) throws IOException { //加载配置文件 initConfig(); //如果设置允许挂载,则挂载cgroup // mount cgroups if requested if (cgroupMount && cgroupMountPath != null) { ArrayList<String> cgroupKVs = new ArrayList<String>(); //值为 cpu=/cgroup/cpu cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" + CONTROLLER_CPU); // 两个值分别为 cpu=/cgroup/cpu , hadoop-yarn lce.mountCgroups(cgroupKVs, cgroupPrefix); } //创建/cgroup/cpu/hadoop-yarn目录 initializeControllerPaths(); // cap overall usage to the number of cores allocated to //得到设置的nodemanger允许的最大的cpu核数 YARN yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf); int systemProcessors = plugin.getNumProcessors(); if (systemProcessors != (int) yarnProcessors) { LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); //计算得到cpu.cfs_period_us和cpu.cfs_quota_us的值 int[] limits = getOverallLimits(yarnProcessors); //更新cpu.cfs_period_us的值 updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0])); //更新cpu.cfs_quota_us的值 updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1])); } else if (cpuLimitsExist()) { LOG.info("Removing CPU constraints for YARN containers."); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); } }
2.getOverallLimits方法,根据值计算出相应的limit[]值:cpu.cfs_period_us 和cpu.cfs_quota_us。
在linux cgroup中,linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us。与cpu.cfs_quota_us配合,来限制cpu资源。
这个方法quotaUS默认1000*1000,然后计算出cfs_period_us的值。
参数yarnProcessors为可用的cpu的核数
@VisibleForTesting int[] getOverallLimits(float yarnProcessors) { int[] ret = new int[2]; if (yarnProcessors < 0.01f) { throw new IllegalArgumentException("Number of processors can't be <= 0."); } int quotaUS = MAX_QUOTA_US; int periodUS = (int) (MAX_QUOTA_US / yarnProcessors); if (yarnProcessors < 1.0f) { periodUS = MAX_QUOTA_US; quotaUS = (int) (periodUS * yarnProcessors); if (quotaUS < MIN_PERIOD_US) { LOG .warn("The quota calculated for the cgroup was too low. The minimum value is " + MIN_PERIOD_US + ", calculated value is " + quotaUS + ". Setting quota to minimum value."); quotaUS = MIN_PERIOD_US; } } // cfs_period_us can't be less than 1000 microseconds // if the value of periodUS is less than 1000, we can't really use cgroups // to limit cpu if (periodUS < MIN_PERIOD_US) { LOG .warn("The period calculated for the cgroup was too low. The minimum value is " + MIN_PERIOD_US + ", calculated value is " + periodUS + ". Using all available CPU."); periodUS = MAX_QUOTA_US; quotaUS = -1; } ret[0] = periodUS; ret[1] = quotaUS; return ret; }
3.updateCgroup,将/controller/groupName/param文件的值更新为value。
private void updateCgroup(String controller, String groupName, String param, String value) throws IOException { String path = pathForCgroup(controller, groupName); param = controller + "." + param; if (LOG.isDebugEnabled()) { LOG.debug("updateCgroup: " + path + ": " + param + "=" + value); } PrintWriter pw = null; try { File file = new File(path + "/" + param); Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8"); pw = new PrintWriter(w); pw.write(value); } catch (IOException e) { throw new IOException("Unable to set " + param + "=" + value + " for cgroup at: " + path, e); } finally { if (pw != null) { boolean hasError = pw.checkError(); pw.close(); if(hasError) { throw new IOException("Unable to set " + param + "=" + value + " for cgroup at: " + path); } if(pw.checkError()) { throw new IOException("Error while closing cgroup file " + path); } } } }
4.该类继承了LCEResourcesHandler接口
public interface LCEResourcesHandler extends Configurable { void init(LinuxContainerExecutor lce) throws IOException; /** * Called by the LinuxContainerExecutor before launching the executable * inside the container. * @param containerId the id of the container being launched * @param containerResource the node resources the container will be using */ void preExecute(ContainerId containerId, Resource containerResource) throws IOException; /** * Called by the LinuxContainerExecutor after the executable inside the * container has exited (successfully or not). * @param containerId the id of the container which was launched */ void postExecute(ContainerId containerId); String getResourcesOption(ContainerId containerId); }
CgroupsLCEResourcesHandler的实现方法:
/* * LCE Resources Handler interface */ public void preExecute(ContainerId containerId, Resource containerResource) throws IOException { setupLimits(containerId, containerResource); } public void postExecute(ContainerId containerId) { clearLimits(containerId); } public String getResourcesOption(ContainerId containerId) { String containerName = containerId.toString(); StringBuilder sb = new StringBuilder("cgroups="); if (isCpuWeightEnabled()) { sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks"); sb.append(","); } if (sb.charAt(sb.length() - 1) == ',') { sb.deleteCharAt(sb.length() - 1); } return sb.toString(); }
5.setupLimits方法,为每一个container创建控制组,并且是严格模式的话,更新每个container的cpu限制。
hadoop cgroup会在/cgroup/cpu/hadoop-yarn下为每一个container创建一个控制组,等任务结束后再调用clearLimits()方法来删除这个控制组。
private void setupLimits(ContainerId containerId, Resource containerResource) throws IOException { String containerName = containerId.toString(); if (isCpuWeightEnabled()) { int containerVCores = containerResource.getVirtualCores(); createCgroup(CONTROLLER_CPU, containerName); int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; updateCgroup(CONTROLLER_CPU, containerName, "shares", String.valueOf(cpuShares)); //是否启用了严格模式,在严格模式的情况下,会更新每个container内的cfs_quota_us和cfs_period_us的值 if (strictResourceUsageMode) { int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); if (nodeVCores != containerVCores) { float containerCPU = (containerVCores * yarnProcessors) / (float) nodeVCores; int[] limits = getOverallLimits(containerCPU); updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US, String.valueOf(limits[0])); updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US, String.valueOf(limits[1])); } } } }
6.clearLimits()方法,container结束后删除这个container的控制组
private void clearLimits(ContainerId containerId) { if (isCpuWeightEnabled()) { deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString())); } }
相关文章推荐
- [Hadoop源码解读](一)MapReduce篇之InputFormat
- [Hadoop源码解读](五)MapReduce篇之Writable相关类
- [Hadoop源码解读](一)MapReduce篇之InputFormat
- [Hadoop源码解读](四)MapReduce篇之Counter相关类
- [Hadoop源码解读](四)MapReduce篇之Counter相关类
- hadoop 2.6 源码 解读之NameNodeRpcServer启动及request处理
- Hadoop WordCount源码解读
- [Hadoop源码解读](一)MapReduce篇之InputFormat
- [Hadoop源码解读](五)MapReduce篇之Writable相关类
- hadoop源码的简单解读
- hadoop 2.6 源码解读之RPC Server 类高性能设计
- hadoop源码详细解读1——类Storage
- [Hadoop源码解读](一)MapReduce篇之InputFormat
- 【Hadoop】HDFS源码解读
- [Hadoop源码解读](一)MapReduce篇之InputFormat
- [Hadoop源码解读](五)MapReduce篇之Writable相关类
- [Hadoop源码解读](六)MapReduce篇之MapTask类
- org.apache.hadoop.io.compress源码解读
- [Hadoop源码解读](二)MapReduce篇之Mapper类(转)
- [Hadoop源码解读](一)MapReduce篇之InputFormat