想起来之前被问到了一个问题,如果Flink中的Task是一直不停的运行的话,那么拉取Kafka数据的Source端是不是会一直不停的拉取数据,如果消费速度不及时,内存不就很快会被撑爆了么?一开始对这个问题是一脸闷逼,后来随着对Flink使用的逐渐深入,对Flink的内部也有了一定的了解,所以本文就来了解下Flink内部的反压机制,看下反压机制是如何解决该问题的。

什么是反压以及反压所带来的影响?

在流式处理系统中,如果出现下游消费的速度跟不上上游生产数据的速度,就种现象就叫做反压。出现反压时,理所应当限制上游生产者的速度,使得下游的速度跟得上上游的速度。

反压会导致流处理作业数据延迟的增加,同时还会影响到Checkpoint。由于Flink的Checkpoint机制需要进行Barrier对齐,如果此时某个Task出现了反压,Barrier流动的速度就会变慢,导致Checkpoint整体时间变长,如果反压很严重,还有可能导致Checkpoint超时失败。这部分内容在《Flink中的状态一致性(再细说下Checkpoint)》里面有详细的说明,同时里面还分析了Flink1.11版本中新提供的Unaligned Checkpoints机制来解耦反压和 Checkpoint)。

长期或者频繁出现反压才需要处理,如果只是由于网络波动或者正常GC出现的偶尔反压可以不必处理。

如何定位反压:

可以在Web界面,从Sink到Source这样反向逐个Task排查,找到第一个出现反压的Task,一般上Task出现反压会出现如下现象:

当 Web 页面切换到某个 Task 的 BackPressure 页面时,才会对这个Task触发反压检测。BackPressure界面会周期性的对Task线程栈信息采样,通过线程被阻塞在请求Buffer的频率来判断节点是否处于反压状态(反压就是因为Buffer不够用了,大白话就是内存不够用了,所以Task暂时性的阻塞住了)。默认情况下,这个频率在 0.1 以下显示为 OK,0.1 至 0.5 显示 LOW,而超过 0.5 显示为 HIGH。

通过反压状态可以大致锁定反压可能存在的算子,但具体反压是由于当前Task自身处理速度慢还是由于下游Task处理慢导致的,需要通过metric监控进一步判断。因为反压存在两种可能性:

  1. 当前Task发送的速度跟不上它产生数据的速度。一般发生在一条输入多条输出这种情况,导致当前Task发送端申请不到足够的内存,例如flatmap或者collect多次。
  2. 当前Task处理数据的速度比较慢,比如每条数据都要进行算法调用之类的,而上游Task处理数据较快,从而导致上游发送端申请不到足够的内存。

因为BackPressure面板监控的是发送端,所以如果我们找到出现反压的节点,那么反压根源要么是就这个节点(对应情况1),要么是它紧接着的下游节点(对应情况2)。

当不太好判断是当前Task还是下游Task出现反压时,需要利用Flink Metrics 定位产生反压的真正位置。可以参考如下几个指标:

outPoolUsage:    发送端Buffer使用率

inPoolUsage:    接收端Buffer使用率

floatingBuffersUsage(1.9以上): 接收端Floating Buffer使用率

exclusiveBuffersBuffersUsage(1.9以上):    接收端Exclusive Buffer使用率

outPoolUsage代表发送端Buffer使用率,也就是ResultPartition。发送端共用一个LocalBufferPool,只有一个指标。

inPoolUsage代表接收端Buffer使用率,也就是InputGate。接收端也共用一个LocalBufferPool,但是接收端每个Channel在初始化阶段都会分配固定数量的Buffer(Exclusive Buffer)。如果某一时刻接收端接受到的数量太多,Exclusive Buffer就会耗尽,此时就会向BufferPool申请剩余的Floating Buffer(除了Exclusive Buffer,其他的都是Floating Buffer,备用Buffer)。其中inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。

并且一个LocalBufferPool所能够使用的最大资源是有限的,即NetworkBufferPool 不会把自己的 Buffer 全分配给一个Task,因为 TaskManager 上一般会运行了多个 Task,这个受taskmanager.network.memory.max-buffers-per-channel参数控制。

总觉得在大内存情况下,这些参数设置了会导致内存的浪费…Task数量少,LocalBufferPool能申请的最大的Buffer数量又是固定的。所以个人觉得如果TaskManager内存比较大的话,这个参数是不是调大会好一点…这个后续有时间验证下。

反压分析可以参考下图:

Flink 1.9 and above: If inPoolUsage is constantly around 100%, this is a strong indicator for exercising backpressure upstream.

The following table summarises all combinations and their interpretation. Bear in mind, though, that backpressure may be minor or temporary (no need to look into it), on particular channels only, or caused by other JVM processes on a particular TaskManager, such as GC, synchronisation, I/O, resource shortage, instead of a specific subtask.

分析反压的大致思路是:反压肯定是某个SubTask的处理能力跟不上导致的,所以反压的这个SubTask inPoolUsage使用率肯定是100%反压了。如果一个SubTask的outPoolUsage占用率很高,大概率表示它是被下游Task反压限速了。

反压如何解决:

定位到反压的Task之后,可以根据Task中具体执行的内容来进行相应的处理。大部分情况反压是由于用户代码的执行效率问题(用户代码对一条数据的处理太慢了,例如每条数据都要调用算法,而调用算法运算耗时又很长) 或者数据倾斜引起的。如果是用户代码的执行效率引起的,可以通过增加并发度或者其他资源的方式来缓解反压。如果是数据倾斜引起的,可以对数据进行一次KeyBy之类的操作来解决。

从源码中了解下反压机制:

Flink1.5之前的反压机制就不详细介绍了(1.5之前是基于TCP协议进行的反压,该机制有一个问题,如果同一个Task的不同SubTask分配到了同一个TaskManager内部,那么他们与),因为项目上使用的是最新的Flink1.11,所以这里只是探究下Flink1.11中的反压机制,也就是Credit-based流量控制机制。反压的图文描述可以见参考中的《一文彻底搞懂 Flink 网络流控与反压机制》,里面图文描述已经讲的非常详细了,这里我就不做图文的搬运工了,大概做一个说明:

  1. 接收端(InputGate)向发送端(ResultPartition)声明可用的Credit(一个可用的buffer对应一点Credit),表明它有多少个空闲的buffer可以接受数据。只有在Credit>0的情况下发送端才发送buffer;发送端每发送一个buffer,Credit也相应地减少一点。当Credit值为0时,停止向下游发送数据,下游有空闲的credit是会通知上游有credit可用,这样就可以控制上游发送给下游的速率。如果是同一个Task内部,那么它的RecordWrite会被Block,而RecordReader、Operator、RecordWrite都是属于同一个线程,所以大家都Block住了,反压也就生效了。
  2. 当发送端发送buffer的时候,它同样把当前堆积的buffer数量(backlog)告知接收端;接收端根据发送端堆积的数量来申请floatingbuffer
  3. PS:Event事件是直接处理的,无需使用Credit

接收端:

发送端:

总结:

Flink不需要一个特殊的机制来处理反压, 因为Flink中的数据传输机制相当于已经提供了反压机制。Flink程序的最大吞吐量由程序中运行最慢的那个Task所决定。

在进行反压测试时,可以使用Flink作业禁用全局chain,然后看下究竟是哪个步骤的速度最慢,命令如下:

$FLINK_HOME/bin/flink run -e remote -p 10 -D pipeline.operator-chaining=false -D execution.checkpointing.interval=60000 -c com.***.mainClass -d  /***/***-shaded.jar

--------------------------------------------------分割线--------------------------------------------------

最近刚换了工作,又要熟悉新的东西了,会有一段时间暂时不会更新了,祝自己还有大家都好运^_^!!!

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/back_pressure.html(监控反压)

https://flink.apache.org/2019/07/23/flink-network-stack-2.html (通过Flink Metric进行反压判断)

https://zhuanlan.zhihu.com/p/92743373(如何分析及处理 Flink 反压?)

https://developer.aliyun.com/article/57815 (Flink MemorySegment内存管理机制)

https://blog.csdn.net/lvwenyuan_1/article/details/93490297(Flink旧版本和新版本的反压机制图示)

https://zhuanlan.zhihu.com/p/149706396(Flink Network Buffer相关知识)

https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager(Flink Improvement Proposals)

https://www.jianshu.com/p/2779e73abcb8(一文彻底搞懂 Flink 网络流控与反压机制)

https://blog.csdn.net/u012151684/article/details/109479588(Flink反压机制分析)

Flink的背压机制相关推荐

  1. Flink 的背压机制(Back Pressure)

    什么是 Back Pressure 如果看到任务的背压警告(如 High 级别),这意味着 生成数据的速度比下游算子消费的的速度快.以一个简单的 Source -> Sink 作业为例.如果能看 ...

  2. Flink的容错机制

    文章目录 检查点 检查点的保存 从检查点恢复状态 检查点算法 检查点配置 1.启动检查点 2.检查点存储 3.其他高级配置 保存点 1.保存点的用途 2.使用保存点 状态一致性 一致性的概念和级别 端 ...

  3. Flink系列-背压(反压)

    目录 了解背压 什么是背压 背压产生的原因 背压导致的影响 定位背压 解决背压 了解背压 什么是背压 在流式处理系统中,如果出现下游消费的速度跟不上上游生产数据的速度,就种现象就叫做背压 (backp ...

  4. Flink 如何处理背压

    传送门:Flink 系统性学习笔记 Flink 1.4 版本 人们经常会问 Flink 是如何处理背压的.答案很简单:Flink 不使用任何复杂的机制,因为它不需要任何处理机制.只凭借数据流引擎,就可 ...

  5. Flink的背压问题产生原因和解决方法

    最近flink job出现了背压的问题, 后果是导致了checkpoint的生成超时, 影响了flink job的运行. 定位问题: 如下图: 1) flink的checkpoint生成超时, 失败: ...

  6. Flink的CheckPoint机制

    这里已经是Flink的第三篇原创啦.第一篇:Flink入门讲解了Flink的基础和相关概念,第二篇:压背原理,讲解了什么是背压,在Flink背压大概的流程是怎么样的. 这篇来讲Flink另一个比较重要 ...

  7. Flink中容错机制 完整使用 (第十章)

    Flink中容错机制 完整使用 一.容错机制 1.检查点(Checkpoint) 1. 检查点的保存 1. 周期性的触发保存 2. 保存的时间点 3. 保存的具体流程 2.从检查点恢复状态 (1)重启 ...

  8. 【Flink】 Flink JobManager HA 机制的扩展与实现

    1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...

  9. Node.js stream模块(三)背压机制

    我们知道 可读流是作为数据生产者,而可写流作为数据消费者. 那么二者必然是可以结合使用的.即可读流生产出来的数据给可写流消费. 我们这里使用文件可读流和文件可写流来模拟这种情况: 实现很简单,可读流对 ...

最新文章

  1. java笔试题(3)
  2. 对接钉钉审批_低代码对接钉钉创建外部联系人
  3. 盲去卷积原理及在图像复原的应用
  4. llvm编译器实战教程第二版_LLVM编译器实战教程
  5. 还没吃透内存缓存LruCache实现原理的看这篇文章,面试必会
  6. 软件工程——个人课程总结
  7. 2021大“游”不同——百度旅游行业洞察
  8. 功能丰富的Perl:用Perl读写Excel文件
  9. html如何保存离线使用,如何完整保存离线网页
  10. 圆内接等边三角形的画法_如何画出圆的内接正三角形
  11. 用gambit学博弈论--完全信息动态博弈-参与者信息集、博弈树上虚线的解释(三)
  12. 如何制作龙芯系统安装U盘
  13. STM32 CubeMx(七)SPI串行同步通信与外部FLASH(W25Q128)的读写和TFT液晶屏
  14. HEGERLS供应定做层板仓储货架 防腐防锈库房立体层板货架
  15. Python OCR工具pytesseract详解
  16. 项目中分页查询得实现
  17. Android 常用RGB值及名称
  18. 分类信息网和织梦搬家后报错Fatal error: Uncaught ArgumentCountError: Too few arguments to function AddFilter(), 3
  19. adobe cs5全线作品
  20. PHP学习-3 端口开放

热门文章

  1. 综合设计一个OPPE主页--页面的售后服务
  2. Gin框架源码解析【建议收藏】
  3. Cf#741-C. Rings(构造)
  4. 写一个自己的前端手脚架(1)
  5. CRM系统能给企业带来什么? CRM系统推荐
  6. 如何做一个基于微信高校食堂就餐预约小程序序系统毕业设计毕设作品
  7. android8 Camera2 从 CameraService 到 HAL Service
  8. 免费午餐时代的结束 - Docker Hub 新的服务条款
  9. 麦肯锡深度解析:量子计算将拯救地球?
  10. 项目GIF斗图软件 总结概述