1.概述

转载并且补充:Flink控制任务调度:作业链与处理槽共享组(slot-sharing-group)

为了实现并行执行,Flink应用会将算子划分为不同任务,然后将这些任务分配到集群中的不同进程上去执行。和很多其他分布式系统一样,Flink应用的性能很大程度上取决于任务的调度方式。任务被分配到的工作进程、任务间的共存情况以及工作进程中的任务数都会对应用的性能产生显著影响。本节中我们就讨论一下如何通过调整默认行为以及控制作业链与作业分配(处理槽共享组)来提高应用的性能。

其实这两个概念我们可以看作:资源共享链与资源共享组。当我们编写完一个Flink程序,从Client开始执行——>JobManager——>TaskManager——>Slot启动并执行Task的过程中,会对我们提交的执行计划进行优化,其中有两个比较重要的优化过程是:任务链与处理槽共享组,前者是对执行效率的优化,后者是对内存资源的优化

2.执行过程


Chain:Flink会尽可能地将多个operator链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行

优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换(即降低本地数据交换成本),减少了延迟的同时提高整体的吞吐量

这一点请参考:【Flink】 collector 非常慢 一次尴尬的 排查错误方向 chain 与 不chain 的 区别

概述:在StreamGraph转换为JobGraph过程中,关键在于将多个 StreamNode 优化为一个 JobVertex,对应的 StreamEdge 则转化为 JobEdge,并且 JobVertex 和 JobEdge 之间通过 IntermediateDataSet (中间数据集)形成一个生产者和消费者的连接关系。每个JobVertex就是JobManger的一个任务调度单位(任务Task)。为了避免在这个过程中将关联性很强的几个StreamNode(算子)放到不同JobVertex(Task)中,从而导致因为Task执行产生的效率问题(数据交换(网络传输)、线程上下文切换),Flink会在StreamGraph转换为JobGraph过程中将可以优化的算子合并为一个算子链(也就是形成一个Task)。这样就可以把这条链上的算子放到一个线程中去执行,这样就提高了任务执行效率。

可见,StreamGraph转换为JobGraph过程中,实际上是逐条审查每一个StreamEdge和该SteamEdge两头连接的两个StreamNode的特性,来决定该StreamEdge两头的StreamNode是不是可以合并在一起形成算子链。这个判断过程flink给出了明确的规则,我们看一下StreamingJobGraphGenerator中的isChainable()方法:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {/** 获取StreamEdge的源和目标StreamNode */StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);/** 获取源和目标StreamNode中的StreamOperator */StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();/*** 1、下游节点只有一个输入* 2、下游节点的操作符不为null* 3、上游节点的操作符不为null* 4、上下游节点在一个槽位共享组内* 5、下游节点的连接策略是 ALWAYS* 6、上游节点的连接策略是 HEAD 或者 ALWAYS* 7、edge 的分区函数是 ForwardPartitioner 的实例* 8、上下游节点的并行度相等* 9、可以进行节点连接操作*///如果边的下游流节点的入边数目为1(也即其为单输入算子)return downStreamVertex.getInEdges().size() == 1//边的下游节点对应的算子不为null&& outOperator != null//边的上游节点对应的算子不为null&& headOperator != null//边两端节点有相同的槽共享组名称&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)//边下游算子的链接策略为ALWAYS&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||//上游算子的链接策略为HEAD或者ALWAYSheadOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)//边的分区器类型是ForwardPartitioner&& (edge.getPartitioner() instanceof ForwardPartitioner)&& edge.getShuffleMode() != ShuffleMode.BATCH//上下游节点的并行度相等&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()//当前的streamGraph允许链接的&& streamGraph.isChainingEnabled();}

3.处理槽共享组

处理槽共享组(出于某中目的将多个Task放到同一个slot中执行)

3.1 Task Slot

TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念,通过 Task Slot 来定义Flink 中的计算资源。solt 对TaskManager内存进行平均分配,每个solt内存都相同,加起来和等于TaskManager可用内存,但是仅仅对内存做了隔离,并没有对cpu进行隔离将资源 slot 化意味着来自不同job的task不会为了内存而竞争,而是每个task都拥有一定数量的内存储备

通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。

3.2 共享槽

问题:

一个TaskManager中至少有一个插槽slot,每个插槽均分内存并且之间是内存隔离的,但是共享CPU。算子根据计算复杂度可以分为资源密集型与非资源密集型算子(可以认为有的算子计算时内存需求大,有些算子内存需求小)。现在有这么个情况:某个Job下的Tasks中既有资源密集型Task(A),又有非资源密集型Task(B),他们被分到不同的slot上,这就会产生问题:

  • 有的slot内存使用率大,有的slot内存使用率小,这样就很不公平,一个槽资源没有得到充分的利用;
  • 对于槽资源有限的情况,任务并行度也不高。

解决方案

默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许槽共享,会有以下两个方面的好处:

  • 对于slot有限的场景,我们可以增大每个task的并行度。比如如果不设置SlotSharingGroup,默认所有task在同一个共享组(可以共享所有slot),那么Flink集群需要的任务槽与作业中使用的最高并行度正好相同。但是如上图所示,如果我们强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的共享组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30;

  • 能更好的利用资源:如果没有slot共享,那些资源需求不大的map/source/flatmap子任务将和资源需求更大的window/sink占用相同的资源,槽资源没有充分利用(内存没有充分利用)。

3.2.1 具体共享机制实现

Flink决定哪些任务需要共享slot 以及哪些任务必须放入特定slot。虽然task共享Slot提升资源利用率,但是如果一个Slot中容纳过多task反而会造成资源低下(比如极端情况下所有task都分布在一个Slot内)。所以在Flink中task需要按照一定规则共享Slot ,主要通过SlotSharingGroup和CoLocationGroup定义:

  • CoLocationGroup:强制将subTasksk放到同一个slot中,是一种硬约束:

    • 保证把JobVertices的第n个运行实例和其他相同组内的JobVertices第n个实例运作在相同的slot中(所有的并行度相同的subTasks运行在同一个slot );
    • 主要用于迭代流(训练机器学习模型) ,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。
  • SlotSharingGroup: 它是Flink中用来实现slot共享的类,尽可能的允许不同的JobVertices部署在相同的Slot中,但这是一种宽约束,只是尽量做到不能完全保证。

    • 算子的默认group为default,所有任务可以共享同一个slot;
    • 要想确定一个未做SlotSharingGroup设置的算子的group是什么,可以根据上游算子的 group 和自身是否设置 group共同确定(也就是说如果下游算子没有设置分组,它继承上游算子的分组);
    • 为了防止不合理的共享,用户可以通过提供的API强制指定operator的共享组。因为不合理的共享槽资源(比如默认情况下所有任务共享所有的slot)会导致每个槽中运行的线程述增多,增加了机器负载。所以适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。比如: someStream.filter(...).slotSharingGroup("group1")就强制指定了filter的slot共享组为group1。

3.2.2 案例

@Testpublic void  slotSharingGroupTest() throws Exception {Configuration configuration = new Configuration();configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER,true);configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,7088);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1,configuration);env.getConfig().enableObjectReuse();DataStream<String> text = env.socketTextStream("localhost", 9992, "\n");SingleOutputStreamOperator<String> map = text.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SingleOutputStreamOperator<String> filter = map.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return true;}}).slotSharingGroup("group_03");SingleOutputStreamOperator<String> bb = filter.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SingleOutputStreamOperator<String> cc = bb.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return true;}}).slotSharingGroup("group_04");SingleOutputStreamOperator<String> dd = cc.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});dd.print();env.execute("xxx");}

可以看到运行结果如下

每个slotSharingGroup不能互相chain在一起,每个slotSharingGroup内部的算子可以chain在一起。

但是有些疑问:

  1. 分开做性能很好嘛?

  2. 我们一般都是默认的,正确所有的chain在一起。这样效率更高吗?

【Flink】Flink 的 slotSharingGroup 有什么用相关推荐

  1. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  2. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  3. 凌波微步Flink——Flink API中的一些基础概念

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  4. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  5. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看

    简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...

  6. flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍

    前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...

  7. [Flink] Flink运行报错The number of requested virtual cores for application master

    文章目录 1.概述 2.环境如下 1.概述 运行一个flink任务,启动的时候报错 flink提交到yarn 环境报错 IllegalConfigurationException: The numbe ...

  8. [Flink] Flink运行报错Container released on a *lost* node

    文章目录 1.背景 2.源码 2.1 onContainersAllocated 2.2 getContainersFromPreviousAttempts 3. 其他 M.扩展 本文为博主九师兄(Q ...

  9. Flink : Flink JobManager报错 akka.pattern.AskTimeoutException: Ask timed out on

    1.美图 2.背景 Flink 1.10 JobManager报错 错误信息如下 2020-04-02 14:38:26,867 INFO org.apache.flink.runtime.execu ...

  10. [Flink]Flink DataStream window join 和interval join

    目录 window join interval join window join 窗口连接把两个流中相同窗口通过一个键值连接起来.然后,两边的元素被传递到用户定义的JoinFunction或FlatJ ...

最新文章

  1. C语言关闭日志文件时忘了将日志文件全局变量指针置为NULL
  2. WhatsApp用户数突破10亿 每天发送消息420亿条
  3. 得到相对Plugin的路径
  4. tensorflow tf.ConfigProto() (配置tf.Session的运算方式)(allow_soft_placement、inter_op_parallelism_threads等)
  5. cdr怎么转换成psd转换为位图标准_动漫角色转换真人,飞屋环游记中的小罗竟酷似他……...
  6. excel取消隐藏_Excel技巧:批量删除隐藏数据及隐藏工作表
  7. html(1)基本组成
  8. Linux下通过设置PS1变量改变bash提示符颜色
  9. Drupal 关于节点(nodes)的理解
  10. mysql编译安装root密码_MySQL 5.7.11编译安装以及修改root密码小结
  11. JNI开发笔记(五)--JNI语法总结
  12. property内存管理策略
  13. MongoDB3.x中添加用户和权限控制
  14. Eclipse PHPEclipse 配置
  15. ccf 节日 java 思路
  16. ollydebug 调试上手基础
  17. sinr是什么意思_信噪比有负的吗?表示什么意思?
  18. LeetCode/LintCode 题解丨一周爆刷字符串:简化路径
  19. css3,background-clip/background-origin的使用场景,通俗讲解
  20. COM组件 ATL的创建和调用

热门文章

  1. 环球影城门票开售,开园日门票1分钟内售罄
  2. 外媒:英特尔未来10年可能投资950亿美元在欧洲新建8家芯片厂
  3. 2021款iPad Pro跑分曝光:远超安卓阵营产品
  4. iPhone 11降至3000元档,一顿降价猛如虎!
  5. 品质生活在于细节 8月6日张朝阳“做饭直播”带货厨房好物
  6. 钉钉终于崩了!小学生欢呼庆祝解放,没想到没高兴多久就...
  7. 尘埃落定!熊猫互娱近20亿元投资纠纷已解决
  8. 终于没刘海了!iPhone12 Pro 渲染图首曝,回归经典
  9. 三星Galaxy Note 10系列机模曝光:开孔全面屏实锤
  10. 苹果WWDC 2019最全剧透抢先看:iOS夜间模式要来了!