版本:hadoop2.7.2 + centos 6.5

现状

想要理解hadoop cgroup,首先需要对linux cgroup有一定的了解,yarn目前的调度基于内存和cpu,但是cpu资源并没有像内存资源那样进行了严格的限制,一个container有可以占据很多的cpu资源。

启用hadoop cgroup,可以参照我的博客hadoop启用cgroup

还有一篇关于cgroup的介绍

当前版本hadoop想要限制cpu资源,是基于cgroup cpu 子系统的资源控制来实现的。支持严格或者非严格模式。非严格模式下,一个nodemanger节点上,所有的container占用的cpu资源不会超过限制的总的cpu资源。严格模式下,将会严格限制到每个container最多不会超过设定的cpu资源,而不管cpu整体资源是否空闲。

#

hadoop cgroup需要启用LinuxExecutor,并且executor资源控制类为org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler

该类的变量:

  private Configuration conf;private String cgroupPrefix;private boolean cgroupMount;private String cgroupMountPath;private boolean cpuWeightEnabled = true;private boolean strictResourceUsageMode = false;private final String MTAB_FILE = "/proc/mounts";private final String CGROUPS_FSTYPE = "cgroup";private final String CONTROLLER_CPU = "cpu";private final String CPU_PERIOD_US = "cfs_period_us";private final String CPU_QUOTA_US = "cfs_quota_us";//cpu权重值private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel//linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒usprivate final int MAX_QUOTA_US = 1000 * 1000;private final int MIN_PERIOD_US = 1000;//挂载cgroup和组private final Map<String, String> controllerPaths; // Controller -> pathprivate long deleteCgroupTimeout;private long deleteCgroupDelay;// package private for testing purposesClock clock;//这个值将会被赋值为实际允许的cpu控制组的最大核数private float yarnProcessors;

1.init方法,加载Configuration,获取参数值,挂载cgroup,创建控制组,默认是hadoop-yarn,然后计算出要限制的cpu.cfs_quota_us和cpu.cfs_period_us的值,写入相应的文件中。

  @VisibleForTestingvoid init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)throws IOException {//加载配置文件initConfig();//如果设置允许挂载,则挂载cgroup // mount cgroups if requestedif (cgroupMount && cgroupMountPath != null) {ArrayList<String> cgroupKVs = new ArrayList<String>();//值为  cpu=/cgroup/cpucgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" +CONTROLLER_CPU);//   两个值分别为  cpu=/cgroup/cpu   , hadoop-yarn lce.mountCgroups(cgroupKVs, cgroupPrefix);}//创建/cgroup/cpu/hadoop-yarn目录initializeControllerPaths();// cap overall usage to the number of cores allocated to //得到设置的nodemanger允许的最大的cpu核数YARNyarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);int systemProcessors = plugin.getNumProcessors();if (systemProcessors != (int) yarnProcessors) {LOG.info("YARN containers restricted to " + yarnProcessors + " cores");//计算得到cpu.cfs_period_us和cpu.cfs_quota_us的值int[] limits = getOverallLimits(yarnProcessors);//更新cpu.cfs_period_us的值updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));//更新cpu.cfs_quota_us的值updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));} else if (cpuLimitsExist()) {LOG.info("Removing CPU constraints for YARN containers.");updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));}}

2.getOverallLimits方法,根据值计算出相应的limit[]值:cpu.cfs_period_us 和cpu.cfs_quota_us。

在linux cgroup中,linux cgroup系统中cfs_period_us的取值为1000到1000*1000,单位为微秒us。与cpu.cfs_quota_us配合,来限制cpu资源。

这个方法quotaUS默认1000*1000,然后计算出cfs_period_us的值。
参数yarnProcessors为可用的cpu的核数

 @VisibleForTestingint[] getOverallLimits(float yarnProcessors) {int[] ret = new int[2];if (yarnProcessors < 0.01f) {throw new IllegalArgumentException("Number of processors can't be <= 0.");}int quotaUS = MAX_QUOTA_US;int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);if (yarnProcessors < 1.0f) {periodUS = MAX_QUOTA_US;quotaUS = (int) (periodUS * yarnProcessors);if (quotaUS < MIN_PERIOD_US) {LOG.warn("The quota calculated for the cgroup was too low. The minimum value is "+ MIN_PERIOD_US + ", calculated value is " + quotaUS+ ". Setting quota to minimum value.");quotaUS = MIN_PERIOD_US;}}// cfs_period_us can't be less than 1000 microseconds// if the value of periodUS is less than 1000, we can't really use cgroups// to limit cpuif (periodUS < MIN_PERIOD_US) {LOG.warn("The period calculated for the cgroup was too low. The minimum value is "+ MIN_PERIOD_US + ", calculated value is " + periodUS+ ". Using all available CPU.");periodUS = MAX_QUOTA_US;quotaUS = -1;}ret[0] = periodUS;ret[1] = quotaUS;return ret;}

3.updateCgroup,将/controller/groupName/param文件的值更新为value。

private void updateCgroup(String controller, String groupName, String param,String value) throws IOException {String path = pathForCgroup(controller, groupName);param = controller + "." + param;if (LOG.isDebugEnabled()) {LOG.debug("updateCgroup: " + path + ": " + param + "=" + value);}PrintWriter pw = null;try {File file = new File(path + "/" + param);Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");pw = new PrintWriter(w);pw.write(value);} catch (IOException e) {throw new IOException("Unable to set " + param + "=" + value +" for cgroup at: " + path, e);} finally {if (pw != null) {boolean hasError = pw.checkError();pw.close();if(hasError) {throw new IOException("Unable to set " + param + "=" + value +" for cgroup at: " + path);}if(pw.checkError()) {throw new IOException("Error while closing cgroup file " + path);}}}}

4.该类继承了LCEResourcesHandler接口

public interface LCEResourcesHandler extends Configurable {void init(LinuxContainerExecutor lce) throws IOException;/*** Called by the LinuxContainerExecutor before launching the executable* inside the container.* @param containerId the id of the container being launched* @param containerResource the node resources the container will be using*/void preExecute(ContainerId containerId, Resource containerResource)throws IOException;/*** Called by the LinuxContainerExecutor after the executable inside the* container has exited (successfully or not).* @param containerId the id of the container which was launched*/void postExecute(ContainerId containerId);String getResourcesOption(ContainerId containerId);
}

CgroupsLCEResourcesHandler的实现方法:

  /** LCE Resources Handler interface*/public void preExecute(ContainerId containerId, Resource containerResource)throws IOException {setupLimits(containerId, containerResource);}public void postExecute(ContainerId containerId) {clearLimits(containerId);}public String getResourcesOption(ContainerId containerId) {String containerName = containerId.toString();StringBuilder sb = new StringBuilder("cgroups=");if (isCpuWeightEnabled()) {sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks");sb.append(",");}if (sb.charAt(sb.length() - 1) == ',') {sb.deleteCharAt(sb.length() - 1);}return sb.toString();}

5.setupLimits方法,为每一个container创建控制组,并且是严格模式的话,更新每个container的cpu限制。

hadoop cgroup会在/cgroup/cpu/hadoop-yarn下为每一个container创建一个控制组,等任务结束后再调用clearLimits()方法来删除这个控制组。

  private void setupLimits(ContainerId containerId,Resource containerResource) throws IOException {String containerName = containerId.toString();if (isCpuWeightEnabled()) {int containerVCores = containerResource.getVirtualCores();createCgroup(CONTROLLER_CPU, containerName);int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;updateCgroup(CONTROLLER_CPU, containerName, "shares",String.valueOf(cpuShares));//是否启用了严格模式,在严格模式的情况下,会更新每个container内的cfs_quota_us和cfs_period_us的值if (strictResourceUsageMode) {int nodeVCores =conf.getInt(YarnConfiguration.NM_VCORES,YarnConfiguration.DEFAULT_NM_VCORES);if (nodeVCores != containerVCores) {float containerCPU =(containerVCores * yarnProcessors) / (float) nodeVCores;int[] limits = getOverallLimits(containerCPU);updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,String.valueOf(limits[0]));updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,String.valueOf(limits[1]));}}}}

6.clearLimits()方法,container结束后删除这个container的控制组

private void clearLimits(ContainerId containerId) {if (isCpuWeightEnabled()) {deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString()));}}

hadoop cgroup源码解读相关推荐

  1. Hadoop源码解读系列目录

    Hadoop源码解读系列 1.hadoop源码|common模块-configuration详解 2.hadoop源码|core模块-序列化与压缩详解 3.hadoop源码|core模块-远程调用与N ...

  2. Bert系列(二)——源码解读之模型主体

    本篇文章主要是解读模型主体代码modeling.py.在阅读这篇文章之前希望读者们对bert的相关理论有一定的了解,尤其是transformer的结构原理,网上的资料很多,本文内容对原理部分就不做过多 ...

  3. Bert系列(三)——源码解读之Pre-train

    https://www.jianshu.com/p/22e462f01d8c pre-train是迁移学习的基础,虽然Google已经发布了各种预训练好的模型,而且因为资源消耗巨大,自己再预训练也不现 ...

  4. linux下free源码,linux命令free源码解读:Procps free.c

    linux命令free源码解读 linux命令free源码解读:Procps free.c 作者:isayme 发布时间:September 26, 2011 分类:Linux 我们讨论的是linux ...

  5. nodeJS之eventproxy源码解读

    1.源码缩影 !(function (name, definition) { var hasDefine = typeof define === 'function', //检查上下文环境是否为AMD ...

  6. PyTorch 源码解读之即时编译篇

    点击上方"AI遇见机器学习",选择"星标"公众号 重磅干货,第一时间送达 作者丨OpenMMLab 来源丨https://zhuanlan.zhihu.com/ ...

  7. Alamofire源码解读系列(九)之响应封装(Response)

    本篇主要带来Alamofire中Response的解读 前言 在每篇文章的前言部分,我都会把我认为的本篇最重要的内容提前讲一下.我更想同大家分享这些顶级框架在设计和编码层次究竟有哪些过人的地方?当然, ...

  8. Feflow 源码解读

    Feflow 源码解读 Feflow(Front-end flow)是腾讯IVWEB团队的前端工程化解决方案,致力于改善多类型项目的开发流程中的规范和非业务相关的问题,可以让开发者将绝大部分精力集中在 ...

  9. spring-session源码解读 sesion

    2019独角兽企业重金招聘Python工程师标准>>> spring-session源码解读 sesion 博客分类: java spring 摘要: session通用策略 Ses ...

  10. 前端日报-20160527-underscore 源码解读

    underscore 源码解读 API文档浏览器 JavaScript 中加号操作符细节 抛弃 jQuery,拥抱原生 JS 从 0 开始学习 GitHub 系列之「加入 GitHub」 js实现克隆 ...

最新文章

  1. 终端多窗口管理旗舰------screen
  2. linux播放视频的最简单方法
  3. 使用shell脚本完成自动化部署jar包
  4. C语言二叉树曲折级顺序遍历(附完整源码)
  5. Python到底是有什么魅力,让程序猿为它折腰?
  6. 数论六之计算几何——An Easy Problem,Ancient Berland Circus,Open-air shopping malls
  7. ContentProvider与ContentResolver
  8. 我的内核学习笔记3:我的platform驱动模板文件
  9. Live CD|ISO
  10. 【数据库原理实验(openGauss)】视图
  11. 学习笔记:GoogLeNet
  12. PPT实现单页点名的方式
  13. 机器学习入门(1、特征抽取)
  14. 360 随身wifi安装服务器系统,win7系统无法安装360随身WiFi的解决方法
  15. 集成平台即服务,云和……独角兽
  16. 夜游灯光表演如何更好的丰富游客体验
  17. 玩玩群辉NAS-常用命令行
  18. 使用 WebSphere ILOG JRules 开发保险应用系统【六】——同步BOM、Rule项目到teamserver,并部署Rule到bres上
  19. 技术面试中,什么样的问题才是好问题?
  20. OBPS开发平台——导入功能vb代码解析

热门文章

  1. 笑话,随便笑不收费.
  2. Selenium—获取页面的title,url;使用句柄方式切换窗口
  3. 电脑是怎样执行编程语言的
  4. Win10免费升级win11方法
  5. 华为一碰传nfc_详解:华为免费升级的“一碰传”到底是个什么东西?
  6. Godaddy域名解析设置
  7. JAVA试题(100道)
  8. IIS5.1完整安装包使用指南(详解版)
  9. yaml UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte 0xba in position 15: illegal multibyte sequen
  10. Python代码格式化工具autopep8安装及使用极简版