概要:

0. 问题背景

1. Stream Job的切分

2. 计算资源的调度 & 任务的执行

3. 最后的总结

0. 问题背景:

开始用flink处理流式作业的时候,用yarn-cluster模式提交作业的时候,脚本如下:

$FLINK_BIN run -m yarn-cluster -yqu root.profile -yn 20 -yjm 4096 -ytm 8192 -ynm RecentViewApp -ys 5 ./profileStreaming-0.1.jar

(程序中设置的并行度都是5)

脚本参数的解释:

-yn         要分配的YARN container数(Task Managers个数)

-yjm       JobManager内存

-ytm      每个TaskManager内存

-ys         每个TaskManager的slot数

比较奇怪的是,作业webUI的资源配置跟脚本上的资源分配不符合,而且启动的时候会有动态变化,下面两个截图,一个是最大资源的时候(图1.),一个是最终稳定资源的时候(图2.),而且经过测试调整yn参数,发现并没有发生变化,这里TaskManager和Task Slots是根据什么进行分配的似乎让人捉摸不定。

图1

图2

1. Stream Job的切分

跟了下源码,分析这现象的原因(基于1.6.0版本的)

首先,要知道flink on yarn是怎么做资源分配之前,必须先要了解一个Stream job提交到flink是怎么进行job的切分的。首先程序构建成StreamGraph --> JobGraph,在生成JobGraph的时候,有几个重要的地方,见图4红色标记部分,然后提交到flink集群上,flink再对JobGraph转换ExecutionGraph,其实到ExecutionGraph这步就是为了后续的调度任务存在的,可以发现,ExecutionGraph(准确来说是有ExecutionJobVertex这个结构,而ExecutionJobVertex下又套有ExecutionVertex, Execution)下有这几种数据结构(图3):

图3

ExecutionGraph这里就已经可以进行任务调度了:

1. 一个task任务对应一个executor,后面会看到execution调用deploy就起一个任务;

2.IntermediateResultPartition对应ResultPartition,这两个本质上是相等的,只不过IntermediateResultPartition是ExecutionGraph调度阶段的概念,而ResultPartition是具体TaskManager底层数据交换时候的概念(这部分内容是flink底层数据底层数据如何处理,这里只需要知道ResultPartition(或者说ResultSubPartition)是存放序列化后的数据的,数据节点之间通过netty来传输,每个节点都会初始化一个netty server, netty client,每个节点既是server,又是client,而ResultPartition就是这里上游节点的数据结构,下游节点对应的数据结构是InputGate)。

图4

2.计算资源的调度 & 任务执行

接下来看看是如何进行调度的:

这里先说一点ExecutionJobVertex和ExecutionVertex的关系,如果说在程序里用flatMap这个算子,然后并行度设置为5,ExecutionJobVertex下的ExecutionVertex数组内容为:flatMap(1/5), flatMap(2/5), flatMap(3/5), flatMap(4/5), flatMap(5/5);

直接从Execution::scheduleForExecution()开始,有两个重要的方法:

1. allocateAndAssignSlotForExecution() ;

2. deploy();

Execution::allocateAndAssignSlotForExecution 里面重要的代码:

这里就是对slot的申请分配,也就是当当前状态是“CREATED"的时候才能往下进行操作,把"CREATED"状态转换成"SCHEDULED",这里就是针对executionVertext进行slot的申请分配,根据优先偏好设置进行分配,指定slot要分配在哪个TaskManager,具体的优先策略是怎么分配的,这里不做展开,简单的说,就是先确定输入源source的slot的分配,然后该源source的所在的TaskManager,肯定是下游slot分配的第一选择,也就是先把TaskManager所在的slot填满再说。

通过SlotPoolGateway进行转换SlotPool,

在SlotPool中(SlotPool是对Slot的池化,就是把slot资源放到一个池中,这个还是比较常见的),如果有slot共享,就多个task共享slot。

如果没有slot共享,就要申请新的slot槽位

从resourceManager申请一个新的slot,如果没有resourceManager的链接(也是通过resourceManagerGateway转发),就保持到一个map中,等待拿到新的链接再发起request;如果有,直接发起申请。

保存到一个map里,

如果没有ResourceManager连接的话,把请求放到waitingForResourceManager这个map里。SlotPool一旦拿到ResourceManager连接,遍历waitingForResourceManager发送请求:

拿到resourceManager的链接,发起的请求,

resourceManagerGateway.requestSlot(),从resourceManagerGateway,转到resourceManager(YarnResourceManager)

=====================Next0=====================

1.YarnResourceManager:

这里就是开始申请Yarn Container, 配置taskManager<8192 vcores:5>,也就是每个Container cpu Core是5,内存8192M,这里一共会调用5次,所以numPendingContainerRequests最终累加结果是5。8192>

numPendingContainerRequests 是用来记request次数的。

2.YarnResourceManager:

在YarnResourceManager的生命周期函数中,onContainersAllocated是请求container后的回调函数,这里的逻辑就是根据numPendingContainerRequests次数启动container

ContainerLaunchContext taskExecutorLaunchContext= createTaskExecutorLaunchContext

Utils::createTaskExecutorContext(), 就是封装了要启动的Container的相关信息。Starting TaskManagers......起了5个。

YarnResourceManager::onContainersAllocated(),返回到这个方法里,

这下我们终于知道5个TaskManager是怎么起起来的。

接下去为啥TaskManager会慢慢减少呢?

SlotManager::start() slotManager在启动的时候,会有个定时任务,监控TaskManager的心跳,如果没有心跳,就释放掉container资源。代码比较简单,不做赘述。

ResourceManager(YarnResourceManager)

YarnResourceManager:

通过nodeManagerClient,停止相应Container,并把container所在的worker节点从workerNodeMap里移除。

2.deploy():

重新回到Execution::scheduleForExecution()方法里,可以看到是遍历execution,然后每个execution调用一次deploy(),也就是说如果程序source--flatMap--sink的话,并行度是5,调用结果应该是,Execution(ExecutionJobVertxt) 5次[source-flatMap]+5次[sink] :

封装成TaskDeloymentDescriptor,根据taskManager网关提交任务,就是把封装的任务描述器提交过去,TaskManager就是任务执行管理器,提供了内存,IO,网络通信等功能,TaskManager收到taskDeloymentDescriptor,转换成task执行,包括序列化的算子,数据传输过程中的ResultParitition, InputGate等结构。task本身就是一个线程,这里就不做赘述了。

3. 最后的结论

回过头来看看,如果在程序中最大并行度是5,ys=5, 程序在启动的时候最大分配TaskManager=5, Task slots=5*5(最大并行度5,5个taskManager, 每个taskManager分配slot=5, 所以一共25个slot,用去5个slot, 剩下20个slot空闲)。

我们可以先假设如果设置并行度是6呢,ys=5,这时候启动最大TaskManager=6, Task slots=5*6=30,因为并行度是6,所以最高峰会是6个TaskManager, Task slots=30,剩下可用24个,由于每个TaskManager设置slot=5, 所以需要2个TaskManager,用去6个slot,所以剩余可用4,经测试验证,确实如此。

最终会是:

我们可以看到, flink有两处的优化:

1. operator chain.(上图红色部分),需满足一些严格的条件

为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。(这里解释一下为啥sink没有做operator chain,因为sink前程序做了keyBy,keyBy不是operator,只是一个数据的分发策略,所以这里不满足operator chain的条件)

2. SlotSharingGroup共用slot (如上图中,[source1,flatmap1]和sink1共用一个slot)

对计算资源的利用率更高

最后最后的结论:yn的设置确实是不生效的,资源的分配是根据job最大并行度来设置的,先按照最大并行度来起TaskManager,如果有剩余,再进行释放,由于是per-job模式,就算是有剩余的TaskManager也没法给其他任务使用,所以这里又进行了回收。

slot没有毁灭的问题_解析flink之perjob模式下yn参数不生效问题相关推荐

  1. 技术解析:openstack vlan模式下的隔离和数据流向(转)

    一.隔离 计算机网络,是分层实现的,不同协议工作在不同层,按着OSI的分层模型,共有七个层,我们一般所说的隔离,通常指的是第2层,也叫"数据链路层";数据链路层的网络包,也叫&qu ...

  2. 【FLINK 】 Flink on YARN模式下TaskManager的内存分配

    解决背景: 总的ytm分配的不变的情况下怎么划分给堆内内存JVM 一个更大的内存空间 对于心急的同学来说,我们直接先给一个解决方案,后面想去了解的再往下看: 原来的命令,-ytm 8192,分配给ta ...

  3. java开闭原则 例子_解析Java编程中设计模式的开闭原则的运用

    开闭原则(Open Closed Principle)是Java世界里最基础的设计原则,它指导我们如何建立一个稳定的.灵活的系统. 定义: 一个软件实体如类.模块和函数应该对扩展开放,对修改关闭. S ...

  4. flink on yarn模式下释放flink占用yarn的资源

    除了关闭session.sh启动的进程以外, kill YarnJobClusterEntrypoint所在的jps进程 完成上述操作后,再次前往yarn界面,就可以看到队列中占用的资源都被释放了.

  5. iphone双卡_辟谣!iPhone12双卡模式下不支持5G?国行可正常使用

    5G已经兴起多时,然而苹果却姗姗来迟.直到今年年底,首款支持双模5G网络的iPhone12系列新机才终于发布,目前已经确认所搭载的正是高通5G基带,但苹果尚未明确表示是X55还是X60,前者是目前安卓 ...

  6. chrome 打印布局_在打印预览模式下使用Chrome的Element Inspector?

    慕容大雪花 Chrome v52 +:打开开发人员工具(Windows:F12或Ctrl+ Shift+ I,Mac:Cmd+ Opt+ I)单击自定义并控制DevTools汉堡包菜单按钮,然后选择更 ...

  7. 极速模式下java无法加载_谷歌和360急速模式 下的XMLHttpRequest 的onprogress事件失效...

    场景描述 上传excel后遍历处理每一行的数据.想在页面上展示进度条,提示目前已经处理到第几条了. 使用XMLHttpRequest2来发送请求,在程序服务器端设置HttpServletRespons ...

  8. 2021年大数据Flink(六):Flink On Yarn模式

    目录 Flink On Yarn模式 原理 为什么使用Flink On Yarn? Flink如何和Yarn进行交互? 两种方式 操作 1.关闭yarn的内存检查 2.同步 3.重启yarn 测试 S ...

  9. Flink On Yarn模式,为什么使用Flink On Yarn?Session模式、Per-Job模式、关闭yarn的内存检查,由Yarn模式切换回standalone模式时需要注意的点

    Flink On Yarn模式 原理 为什么使用Flink On Yarn? 在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下: -1.Yarn的资源可以按需使 ...

最新文章

  1. PHP 手册 参考文档
  2. 嵌入式论文3000字_SCI英文论文一般多少字
  3. C++unique函数应用举例
  4. esb 和mq_使用保险丝结构管理MQ和ESB的大型部署,第一部分
  5. 【渝粤教育】 国家开放大学2020年春季 2773特种动物养殖 参考试题
  6. android的开始时对bug的定位和处理
  7. 1.c++模式设计-简单工厂模式
  8. 视觉SLAM——D435i运行ORB-SLAM2-RGB-D(依赖ros版)
  9. [转载] numpy功能快速查找
  10. FineUI分组显示弹框最新的在最上边
  11. atitit.导航的实现最佳实践and声明式编程
  12. Java_接口练习题
  13. excel求回归直线方程的公式_“如何在excel中求解任意直线的方程“excel绘制线性回归方程...
  14. android singletask启动模式,android:Activity启动模式之singleTask(一)(示例代码)
  15. javascript定时器的计时事件
  16. 电子元器件篇---三极管
  17. oracle显示上午下午,如何把时间转换成带有上下午字样的格式?
  18. 日元负利率和美元暴跌,是对人民币的夹击
  19. 数字内容安全实验二:数字图像复制粘贴检测
  20. 搜狗输入法Mac版下载

热门文章

  1. ES6基础之Array.fill函数
  2. matlab处理txt文件数据
  3. IE8下不识别indexOf的问题
  4. 自己亲自写的两本linux资料,免费下载,pdf文档
  5. jquery中ajax的dataType属性包括哪几项
  6. ajax实现自动刷新页面实例
  7. hadoop HDFS常用文件操作命令
  8. 《QTP自动化测试进阶》(1)
  9. 深入理解JavaScript系列(10):JavaScript核心(晋级高手必读篇)
  10. QTP的那些事---页面弹出框的处理,页面等待加载的处理