PAI 在frameworkLauncher中对节点的分配做了进一步优化,可以为任务选择node的host。hadoop在2.6之后增加了对NodeLabel的支持,在配置NodeManager时可以给它打上标签,然后请求container的时候也可以加上label,这样就会分配到有该lable的node上运行。参考解释:http://dongxicheng.org/mapreduce-nextgen/hadoop-yarn-label-based-scheduling/

但是这个node label特性不支持locality,因此frameworkLauncher用hostName作为一种node label。

1、Hadoop中的ContainerRequest()方法有六个参数来指明container的需求:

 /*** Instantiates a {@link ContainerRequest} with the given constraints.* * @param capability*          The {@link Resource} to be requested for each container.*          Resource描述基本资源,pai中包括了cpu、mem、gpu。* @param nodes*          Any hosts to request that the containers are placed on.*          指定container所在的主机* @param racks*          Any racks to request that the containers are placed on. The*          racks corresponding to any hosts requested will be automatically*          added to this list.*          指定container所在的机架,如果指定了host,这个参数会被自动填充。* @param priority*          The priority at which to request the containers. Higher*          priorities have lower numerical values.*          container的优先级* @param relaxLocality*          If true, containers for this request may be assigned on hosts*          and racks other than the ones explicitly requested.*          是否松弛,即能否分配到非指定的主机上* @param nodeLabelsExpression*          Set node labels to allocate resource, now we only support*          asking for only a single node label*          node label描述*/public ContainerRequest(Resource capability, String[] nodes,String[] racks, Priority priority, boolean relaxLocality,String nodeLabelsExpression) {// Validate requestPreconditions.checkArgument(capability != null,"The Resource to be requested for each container " +"should not be null ");Preconditions.checkArgument(priority != null,"The priority at which to request containers should not be null ");Preconditions.checkArgument(!(!relaxLocality && (racks == null || racks.length == 0) && (nodes == null || nodes.length == 0)),"Can't turn off locality relaxation on a " + "request with no location constraints");this.capability = capability;this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);this.priority = priority;this.relaxLocality = relaxLocality;this.nodeLabelsExpression = nodeLabelsExpression;}     

View Code

2、frameworkLauncher中的HadoopUtils.toContainerRequest()方法提供4个参数指明container需求,而且如果hostName不为空,那么会用hostName来覆盖nodLabel。hostName就对应着上述的nodes。

 1 /*
 2 * 如果hostName不为空,那么会用hostName来覆盖nodLabel。
 3 */
 4 public static ContainerRequest toContainerRequest(
 5       ResourceDescriptor resource, Priority priority, String nodeLabel, String hostName) throws Exception {
 6     if (hostName != null && !ResourceRequest.isAnyLocation(hostName)) {
 7       return new ContainerRequest(
 8           resource.toResource(), new String[]{hostName}, new String[]{}, priority, false, null);
 9     } else {
10       return new ContainerRequest(
11           resource.toResource(), new String[]{}, new String[]{}, priority, true, nodeLabel);
12     }
13   }

View Code

3、frameworkLauncher中的setupContainerRequest()方法具体设置一个container的需求。

这里规定了为Task申请资源的3个原则:

1) 如果正好match Task的要求,那么task等待分配,containerRequestTimeoutSec = -1

2) 如果too relax,那么重新request

3) 如果too strict,将这次的请求过时然后重新request。

  private ContainerRequest setupContainerRequest(TaskStatus taskStatus) throws Exception {String taskRoleName = taskStatus.getTaskRoleName();Priority requestPriority = statusManager.getNextContainerRequestPriority();String requestNodeLabel = requestManager.getTaskRolePlatParams(taskRoleName).getTaskNodeLabel();ResourceDescriptor requestResource = requestManager.getTaskResource(taskRoleName);ResourceDescriptor maxResource = conf.getMaxResource();
//判断集群的最大资源能否满足请求if (!ResourceDescriptor.fitsIn(requestResource, maxResource)) {LOGGER.logWarning("Request Resource does not fit in the Max Resource configured in current cluster, " +"request may fail or never get satisfied: " +"Request Resource: [%s], Max Resource: [%s]",requestResource, maxResource);}
/*
* 判断是否有GPU或端口的要求
*/if (requestResource.getGpuNumber() > 0 || requestResource.getPortNumber() > 0) {updateNodeReports(yarnClient.getNodeReports(NodeState.RUNNING));SelectionResult selectionResult = selectionManager.selectSingleNode(taskRoleName);ResourceDescriptor optimizedRequestResource = selectionResult.getOptimizedResource();if (selectionResult.getNodeHosts().size() > 0) {return HadoopUtils.toContainerRequest(optimizedRequestResource, requestPriority, null, selectionResult.getNodeHosts().get(0));}return HadoopUtils.toContainerRequest(optimizedRequestResource, requestPriority, requestNodeLabel, null);}return HadoopUtils.toContainerRequest(requestResource, requestPriority, requestNodeLabel, null);}

View Code

上面代码只是判断如果有请求GPU资源或者是有指定了端口资源,那么需要用selectionManager辅助构造request。好像没有实现上述的规则,是个疑问。

4、SelectionManager负责帮助选择节点。

在SelectionManager中,有以下REGION StateVariable负责记录集群中可用资源和taskRole请求过的资源,以便于给后续请求资源的task提供参考。
  • Map<String, Node> allNodes : String放的是hostName
  • Map<String, ResourceDescriptor> localTriedResource:
  • Map<String, List<ValueRange>> previousRequestedPorts:String放的是taskRoleName,记录该taskRole的上一个task使用了哪些端口
  • List<String> filteredNodes:String放的是hostName,用于后面的select操作。
  • int reuesedPortsTimes

上一步中的入口方法是selectSingleNode()。

 1 public synchronized SelectionResult selectSingleNode(String taskRoleName) throws NotAvailableException {
 2     SelectionResult results = select(taskRoleName);
 3     if (results.getNodeHosts().size() > 0) {
 4       // Random pick a host from the result set to avoid conflicted requests for concurrent container requests from different jobs
 5       ResourceDescriptor optimizedRequestResource = results.getOptimizedResource();
 6       String candidateNode = results.getNodeHosts().get(CommonUtils.getRandomNumber(0, results.getNodeHosts().size() - 1));
 7       optimizedRequestResource.setGpuAttribute(results.getGpuAttribute(candidateNode));
 8
 9       // re-create single node result object.
10       SelectionResult result = new SelectionResult();
11       result.addSelection(candidateNode, results.getGpuAttribute(candidateNode), results.getOverlapPorts());
12       result.setOptimizedResource(optimizedRequestResource);
13       LOGGER.logDebug("selectSingleNode: Selected: " + candidateNode + " optimizedRequestResource:" + optimizedRequestResource);
14       return result;
15     }
16     return results;
17   }

View Code

它会先调用select()方法,得到一个list的SelectionResult,然后从列表中随机选择一个节点,作为选择结果。

下面是两个select方法,第一个select的作用是设置好端口信息,第二个是真正的根据要求过滤节点,执行选择操作。

 1 public synchronized SelectionResult select(String taskRoleName)
 2       throws NotAvailableException {
 3     ResourceDescriptor requestResource = requestManager.getTaskResource(taskRoleName);
 4     LOGGER.logInfo(
 5         "select: TaskRole: [%s] Resource: [%s]", taskRoleName, requestResource);
 6     String requestNodeLabel = requestManager.getTaskRolePlatParams(taskRoleName).getTaskNodeLabel();
 7     String requestNodeGpuType = requestManager.getTaskRolePlatParams(taskRoleName).getTaskNodeGpuType();
 8     Map<String, NodeConfiguration> configuredNodes = requestManager.getClusterConfiguration().getNodes();
 9     Boolean samePortAllocation = requestManager.getTaskRolePlatParams(taskRoleName).getSamePortAllocation();
10     int startStatesTaskCount = statusManager.getTaskStatus(TaskStateDefinition.START_STATES, taskRoleName).size();
11
12     /* Prefer to use previous successfully associated ports. if no associated ports, try to reuse the "Requesting" ports.
13     /如果之前有成功使用过的端口,那么优先使用该端口,如果没有,那么重用请求中的端口;
14     /疑问:什么叫重用请求中的端口?
15     */
16     List<ValueRange> reusedPorts = new ArrayList<>();
17     if (samePortAllocation) {
18       reusedPorts = statusManager.getAnyLiveAssociatedContainerPorts(taskRoleName);
19       /*条件1:ValueRangeUtils.getValueNumber(reusedPorts) <= 0 说明指定的端口已经使用完毕了
20       / 条件2:previousRequestedPorts.containsKey(taskRoleName) 说明taskRole曾经用过某些端口,这些端口的记录由SelectionManager管理,而且SelectionManager只负责记录上一次的请求端口。
21       */
22       if (ValueRangeUtils.getValueNumber(reusedPorts) <= 0 && previousRequestedPorts.containsKey(taskRoleName)) {
23         reusedPorts = previousRequestedPorts.get(taskRoleName);
24         // the cache only guide the next task to use previous requesting port.
25         previousRequestedPorts.remove(taskRoleName);
26       }
27     }
28     /*真正的select:六个参数
29     / 1、ResourceDescriptor:requestResource  请求资源的描述:cpu、mem、gpu
30     / 2、String:requestNodeLabel 请求节点的标签
31     / 3、String:requestNodeGpuType 请求的gpu类型
32     / 4、int:startStatesTaskCount taskRole中处于start状态的task数量
33     / 5、List<ValueRange>: reuesedPorts 可重用的端口
34     / 6、Map<String, NodeConfiguration>:configuredNodes 集群中的节点描述(hostname, gputype)
35     */
36     SelectionResult result = select(requestResource, requestNodeLabel, requestNodeGpuType, startStatesTaskCount, reusedPorts, configuredNodes);
37
38     //判断当前任务是否是最后一个task,如果不是,将当前task的端口记录到previousRequestedPorts中
39     if (samePortAllocation) {
40       // This startStatesTaskCount also count current task. StartStatesTaskCount == 1 means current task is the last task.
41       // reusedPortsTimes is used to avoid startStatesTaskCount not decrease in the situation of timeout tasks back to startStates.
42       if (startStatesTaskCount > 1) {
43         if (reusedPortsTimes == 0) {
44           reusedPortsTimes = startStatesTaskCount;
45         }
46         // If there has other tasks waiting, push current ports to previousRequestedPorts.
47         if (reusedPortsTimes > 1) {
48           previousRequestedPorts.put(taskRoleName, result.getOptimizedResource().getPortRanges());
49         }
50         reusedPortsTimes--;
51       }
52     }
53     return result;
54   }

View Code

 1  public synchronized SelectionResult select(ResourceDescriptor requestResource, String requestNodeLabel, String requestNodeGpuType,
 2       int startStatesTaskCount, List<ValueRange> reusedPorts, Map<String, NodeConfiguration> configuredNodes) throws NotAvailableException {
 3
 4     LOGGER.logInfo(
 5         "select: Request: Resource: [%s], NodeLabel: [%s], NodeGpuType: [%s], StartStatesTaskCount: [%d], ReusedPorts: [%s]",
 6         requestResource, requestNodeLabel, requestNodeGpuType, startStatesTaskCount, CommonExts.toString(reusedPorts));
 7
 8     //将所有可用节点加入到filteredNodes中。
 9     initFilteredNodes();
10     //过滤掉nodeLabel不符合的节点
11     filterNodesByNodeLabel(requestNodeLabel);
12     //过滤掉不合适的gpu节点:不在设置的ClusterConfiguration里面的、不符合requestNodeGpuType的。
13     filterNodesByGpuType(configuredNodes, requestNodeGpuType);
14     //不需要gpu的job能否运行到有gpu节点上。
15      if (!conf.getAmAllowNoneGpuJobOnGpuNode()) {
16       int jobTotalRequestGpu = requestManager.getTotalGpuCount();
17       filterNodesForNoneGpuJob(jobTotalRequestGpu);
18     }
19
20     //利用snakeyaml得到一个新的ResourceDescriptor对象,与传入的requestResource对象相同。
21     ResourceDescriptor optimizedRequestResource = YamlUtils.deepCopy(requestResource, ResourceDescriptor.class);
22     /*请求的端口数量大于零,但是根据valueRange计算出来的端口数量小于0,
23     / 意思就是有请求端口,但端口还没有分配,也没有指定range。
24     / 疑问:为什么这里如果是有reusedPorts的话可以直接设置,不用在filteredNodes里面check一下吗?是有什么机制保证了?
25     */
26     if (optimizedRequestResource.getPortNumber() > 0 && ValueRangeUtils.getValueNumber(optimizedRequestResource.getPortRanges()) <= 0) {
27       if (ValueRangeUtils.getValueNumber(reusedPorts) > 0) {
28         LOGGER.logInfo(
29             "select: reuse pre-selected ports: %s", CommonExts.toString(reusedPorts));
30         optimizedRequestResource.setPortRanges(reusedPorts);
31       } else {
32         List<ValueRange> portRanges = selectPortsFromFilteredNodes(optimizedRequestResource);
33         LOGGER.logInfo(
34             "select: select ports from all filteredNodes: %s", CommonExts.toString(portRanges));
35         //上一步的selectPortsFromFilteredNodes有可能返回一个空的ArrayList<RangeValue>,这时没有else处理,那么optimizedRequestResource的这个值就没有设置了。
36         if (ValueRangeUtils.getValueNumber(portRanges) == optimizedRequestResource.getPortNumber()) {
37             optimizedRequestResource.setPortRanges(portRanges);
38         }
39       }
40     }
41     /*过滤掉resource(cpu、mem、gpu)不符合的节点
42     / 疑问:skipLocalTriedResource 这个标志有什么用?
43     */
44     filterNodesByResource(optimizedRequestResource, requestManager.getPlatParams().getSkipLocalTriedResource());
45
46     filterNodesByRackSelectionPolicy(optimizedRequestResource, startStatesTaskCount);
47
48      //最后再检查filteredNodes中有没有剩下的资源
49      if (filteredNodes.size() < 1) {
50       // Don't have candidate nodes for this request.
51       if (requestNodeGpuType != null) {
52         // GpuType relax is not supported in yarn, the gpuType is specified, abort this request and try later.
53         throw new NotAvailableException(String.format(
54             "Don't have enough nodes to meet GpuType request: optimizedRequestResource: [%s], NodeGpuType: [%s], NodeLabel: [%s]",
55             optimizedRequestResource, requestNodeGpuType, requestNodeLabel));
56       }
57       if (optimizedRequestResource.getPortNumber() > 0 && ValueRangeUtils.getValueNumber(optimizedRequestResource.getPortRanges()) <= 0) {
58         // Port relax is not supported in yarn, The portNumber is specified, but the port range is not selected, abort this request and try later.
59         throw new NotAvailableException(String.format(
60             "Don't have enough nodes to meet Port request: optimizedRequestResource: [%s], NodeGpuType: [%s], NodeLabel: [%s]",
61             optimizedRequestResource, requestNodeGpuType, requestNodeLabel));
62       }
63     }
64
65     /*这一步是真正的在filteredNodes中挑选节点。
66     / 还留出了接口,可以实现更多的挑选节点的规则
67     / 当前实现是将所有符合规则的节点都加入到SelectionResult中
68     */
69     SelectionResult selectionResult = selectNodes(optimizedRequestResource, startStatesTaskCount);
70     //If port is not previous selected, select ports from the selectionResult.
71     List<ValueRange> portRanges = selectPorts(selectionResult, optimizedRequestResource);
72     optimizedRequestResource.setPortRanges(portRanges);
73     selectionResult.setOptimizedResource(optimizedRequestResource);
74     return selectionResult;
75   }

View Code

select完成后,目的是得到一个ResourceDescriptor,返回到第3步去提交containrRequest.

5、ps:

这一块的代码很多、很乱,而且很多逻辑不知道为什么要这样实现><

转载于:https://www.cnblogs.com/chenqy930/p/9584552.html

PAI FrameworkLauncher(5)--节点选择SelectionManager相关推荐

  1. 爬虫之xpath语法-常用节点选择语法

    爬虫之xpath语法-常用节点选择语法 可以通过通配符来选取未知的html.xml的元素 1.1 选取未知节点的语法 通配符 描述 * 匹配任何元素节点. node() 匹配任何类型的节点. 1.2 ...

  2. 蓝桥杯节点选择(java)第一道树形dp分析

    蓝桥杯 节点选择 问题描述 有一棵 n 个节点的树,树上每个节点都有一个正整数权值.如果一个点被选择了,那么在树上和它相邻的点都不能被选择.求选出的点的权值和最大是多少? 输入格式 第一行包含一个整数 ...

  3. 动态规划——节点选择(蓝桥杯试题集)

    题目链接: http://lx.lanqiao.cn/problem.page?gpid=T14 问题描述 有一棵 n 个节点的树,树上每个节点都有一个正整数权值.如果一个点被选择了,那么在树上和它相 ...

  4. Kubernetes K8S节点选择(nodeName、nodeSelector、nodeAffinity、podAffinity、Taints以及Tolerations用法)

    感谢以下文章的支持: 容器编排系统K8s之Pod Affinity - Linux-1874 - 博客园 容器编排系统K8s之节点污点和pod容忍度 - Linux-1874 - 博客园 Kubern ...

  5. TreeList的相关设置和操作(节点选择、颜色、定位)

    因为经常要使用到TreeList,查询的内容也是反反复复,把经常用到的相关代码整理了一下 绑定数据源: 指定DataSource为对应 的DataTable,指定KeyFieldName为表主键字段, ...

  6. ABAP PAI事件之前读取选择屏幕字段的值

    就是选择屏幕在PBO事件中需要使用到选择屏幕上某个字段的时候,直接用选择屏幕变量名是没办法获取到值的,因为系统只有在PAI中才会自动把值赋给选择屏幕的变量,这时候就可以使用函数DYNP_VALUES_ ...

  7. QTreeWidget 与复选框相同的节点选择

    在项目使用过程中经常出现需要选择一棵树中的某些节点,通常情况下在树节点的前面增加复选框即可:但是偶尔也有其他的设计,例如本人碰到的在树的右侧显示被选择.接下来将讲解如果在QTreeWidget的右侧显 ...

  8. jsTree复选框checkbox选中和节点选择相互独立

    jsTree插件仅点击checkbox选中和文本分离和jstree只能点击checkbox选中的实现方法 在默认情况下,jstree 每次点击节点都会选中当前节点前的 checkbox jstree ...

  9. winform TreeView 节点选择

    public partial class Form1 : Form{public Form1(){InitializeComponent();}string tag = "True" ...

最新文章

  1. java虚拟机栈帧_Java虚拟机,运行时栈帧结构
  2. vue-cli  自动切换环境
  3. url(r'^index/$',views.index)的含义解释
  4. MVC AjaxOptions 中的OnSuccess方法执行多次的问题
  5. vue 两个table 并排_从零到部署:用 Vue 和 Express 实现迷你全栈电商应用(六)
  6. 【Win 10 应用开发】手写识别
  7. SQL:postgresql中实现查询某字段总数量和该字段不同值各自的数量
  8. 给定一个序列,判断该序列是否为二叉树查找树的后序遍历序列
  9. Android布局详解之一:FrameLayout
  10. python字典的key提取_python 字典操作提取key,value
  11. 习题8.16 (简单方法)输入一个字符串,内有数字和非数字字符
  12. 悦读 | 人生十论. 钱穆
  13. 苹果将在WWDC首日举行“现场特别活动” 邀请函抽签决定
  14. PHP操作MongoDB技術總結
  15. 大学计算机知识考试题,大学计算机基础理论知识前三章测试题
  16. Command CompileAssetCatalog emitted errors but did not return a nonzero exit code to indicate failur
  17. 软件工程网络15个人作业3——案例分析(201521123107)
  18. 中国移动为何要对手机上网流量封顶
  19. 路由器wan口ip地址_如何在没有浏览器的情况下查找外部或WAN IP地址?
  20. ipv6dns服务器无法响应,ipv6获取不到网关和dns服务器

热门文章

  1. java老师和学生(java老师学生类合集)
  2. html----烟花代码
  3. 戴尔笔记本开机logo进度条时间长的解决办法
  4. 冷战与战后东亚国际秩序:影响及后果
  5. 硬盘知识:硬盘中蓝盘、绿盘、黑盘、红盘有什么区别?
  6. IOS input 光标大小调整
  7. mysql对韵母分组,复合元音韵母练习.ppt
  8. Office 中的 Word
  9. B站UP主发起停更潮
  10. PHP+TP框架实现微信公众号开发之发送模板消息