一、状态简述

Flink的状态分为三种:
MemoryStateBackend:默认的方式,即基于JVM的堆内存进行存储,主要适用于本地开发和调试;
FsStateBackend:基于文件系统进行存储,可以是本地文件系统,也可以是HDFS等分布式文件凶系统。需要注意,虽然选择使用 FsStateBackend,但是正在进行的数据仍然存储在 TaskManager的内存中,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上;
RocksDBStateBackend:Flink内置的第三方状态管理器,采用嵌入式的 key-value 型数据库RocksDB 来存储正在进行的数据。等到 checkpoint 时,再讲其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时,也需要配置持久化存储的文件系统。之所以这样做,是因为 RocksDB 作为嵌入式数据库安全性比较低,但是比起全文件系统的方式,其读取速度更快,比起全内存的方式,其存储空间更大,因此是一种比较均衡的方案。

在状态比较大的情况下,推荐使用 RocksDB,除了上面提到的特点,可以更方便地进行优化。

下面的介绍和调优,主要以 RocksDB 为例。

二、RocksDB 大状态调优

2.1 开启State访问性能监控

Flink1.13中引入了 State 访问的性能监控,即 latency tracking state。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用次功能。

当然,State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样(sample),对不同的 State Backend 性能损失影响不同:
对于 RocksDB State Backend,性能损失大概在 1%左右
对于 Heap State Backend,性能损失最多可达 10%

开启监控的方式,可以在指令中加入

state.backend.latency-track.keyed-state-enabled: true  #启用访问状态的性能监控
state.backend.latency-track.sample-interval: 100  #采样间隔
state.backend.latency-track.history-size: 128  #保留的采样数据个数,越大越精确
state.backend.latency-track.state-name-as-viriable: true  #将状态名作为变量

正常情况下,开启第一个参数即可。

2.2 开启增量检查点和本地恢复

2.2.1 开启增量检查点

RocksDB 是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:

state.backend.incremental: true    #默认false,改为true
或代码中指定
new EmbeddedRocksDBStateBackend(true)

增量检查点,表示在 checkpoint 时,只备份和上个检查点相比,发生变化的检查点。在状态比较大的情况下,是否开启增量检查点,对性能的影响会非常大。比如,在程序运行很长时间之后,总的状态量达到1G,每次变化的状态只有100M甚至更低,那么在不开启增量备份的情况下,每次备份都要全量备份,也就是1G的状态量;如果开启了增量备份,每次只需要备份100M甚至更低;两者相比,增量备份检查点,可以大大节省备份的时间。

在项目的实际使用过程中,曾经经理过一次大的性能问题,在没有开启增量备份检查点的情况下,每次备份需要消耗几十秒的时间,这对于实时计算来说,简直是个灾难;在使用 RocksDBStateBackend,并开启增量备份检查点之后,每次备份只需要几秒甚至几十毫秒就可以完成,大大节省了状态备份的时间。

2.2.2 开启本地恢复

当Flink任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据。本地恢复目前仅涵盖监控类型的状态后端(RocksDB),MemoryStateBackend 不支持本地恢复并忽略此选项。

开启指令

state.backend.local-recovery: true

2.3 调整预定义选项

Flink 为 RocksDB 提供了一些预定义的选项集合,比如 DEFAULT、SPINNING_DISK_OPTIMIZED、SPINING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。

DEFAULT:啥都不配;
SPINNING_DISK_OPTIMIZED:基于磁盘的优化;
SPINING_DISK_OPTIMIZED_HIGH_MEM:基于磁盘和内存的优化;
FLASH_SSD_OPTIMIZED:基于固态硬盘的优化;

一般使用 SPINING_DISK_OPTIMIZED_HIGH_MEM 即可,如果条件充足,可以指定 FLASH_SSD_OPTIMIZED。

2.3.1 配置方式

1)代码指定

EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
env.setStateBackend(embeddedRocksDBStateBackend);

2)启动指令指定

-Dstate.backend.rocksdb.predefined-options: SPINING_DISK_OPTIMIZED_HIGH_MEM #机械硬盘+内存

2.4 其他高阶配置

该部分属于管理内存,即 flink 内存的0.4倍。

2.4.1 增大block缓存

整个 RocksDB 共享一个 block cache(对应上图的Read Only Block Cache,最近读取的数据会放到 block cache 中),读取数据时内存的 cache 大小,直接影响数据读取效率;读取数据时,优先从内存读取,读取不到时,再从磁盘加载,所以,内存 cache 越大,缓存命中率越高。默认大小Wie 8MB,建议设置到 64~256MB,根据自身资源而定。

state.backend.rocksdb.block.cache-size: 64m #默认 8m

2.4.2 增大 write buffer 和 level 阈值大小

RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用单独的 write buffer,默认是64MB,建议调大。

调整 write buffer 时,通常要适当增加 L1 层的大小阈值 max-size-level-base,默认是 256 MB。该值太小,会导致能存档的 SST 文件过少,层级变多造成查找困难,需要更多层索引,才能命中需要的文件;值太大,造成文件过大,合并困难。建议设置为 target_file_size_base(默认64MB)的倍数,且不能太小,建议 5~10 倍,即 320~640MB。

state.backend.rocksdb.writebuffer.size: 128m
state.backend.rocksdb.compaction.level.max-size-level-base: 320m

2.4.3 增大 write buffer 数量

每个 Column Family 对应的 write buffer 最大数量,实际上是内存中“ReadOnly MemTable(只读内存表)”的最大数量,默认值是2。对于机械磁盘来说,如果内存足够大,可以调大到 5 左右,人多力量大!

state.backend.rocksdb.writebuffer.count: 5

2.4.4 增大后台线程数和 write buffer 合并数

1)增大后台线程数

用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值,人多力量大!

state.backend.rocksdb.thread.num: 4

2)增大 write buffer 最小合并数

将数据从 write buffer 中 flush 到磁盘时,需要合并的 write buffer 最小数量,默认值为 1,可以调到 3,减小合并次数。

state.backend.rocksdb.writebuffer.number-to-merge: 3

2.4.5 开启分区索引功能

Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存较小的场景中,性能提升10倍左右。如果在内存管控下 RocksDB 性能不如预期的话,这个也能作为一个性能优化点。

state.backend.rocksdb.memory.partitioned-index-filters:true #默认 false

2.5 参数设定案例

sudo -u hdfs $FLINK_HOME/bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dparallelism.default=12 \
-Dstate.backend.incremental=true \
-Dstate.backend.local-recovery=true \
-Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \
-Dstate.backend.rocksdb.block.cache-size=64m \
-Dstate.backend.rocksdb.writebuffer.size=128m \
-Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \
-Dstate.backend.rocksdb.writebuffer.count=5 \
-Dstate.backend.rocksdb.thread.num=4 \
-Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \
-Dstate.backend.rocksdb.memory.partitioned-index-filters=true \
-Dstate.backend.latency-track.keyed-state-enabled=true \
-Dyarn.application.name="TestDemo" \
/tmp/****-jar-with-dependencies.jar

三、Checkpoint设置

Checkpoint 时间间隔,需要根据业务场景对时效性的要求而定。如果时效性要求不高,可以设置到分钟级别,比如5分钟、10分钟;如果对时效性要求很高,结合 flink  控制页面 Checkpoints 的Summary 中的 End to End Duration,通过最大值、最小值和平均值,合理设置时间间隔。注意,时间间隔需要比 End to End Duration 的时间要长,否则,可能会导致上一个 checkpoint 没结束,下一个 checkpoint 已经开始。为了避免这一情况的发生,除了设置时间间隔,两次 checkpoint 的最小时间间隔也可以起到作用,该配置决定在上一次 checkpoint 结束之后,至少等待多长时间开始下一次的 checkpoint。

具体配置,可以参考下面的代码:

// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hadoop01:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 开启 Checkpoint,间隔为 1 分钟
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔 2 分钟
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2))
// 超时时间 10 分钟
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Flink优化02--状态及Checkpoint调优相关推荐

  1. Flink 第2章 状态及Checkpoint调优

    RocksDB大状态调优 RocksDB是基于LSM Tree实现的,写数据都是先写入到内存中,所有RocksDB的写请求效率较高.RocksDB使用内存结合磁盘的方式来存储数据,每次获取数据时,先从 ...

  2. Java性能优化,操作系统内核性能调优,JYM优化,Tomcat调优

    文章目录 Java性能优化 尽量在合适的场合使用单例 尽量避免随意使用静态变量 尽量避免过多过常地创建Java对象 尽量使用final修饰符 尽量使用局部变量 尽量处理好包装类型和基本类型两者的使用场 ...

  3. 性能优化专题 - JVM 性能优化 - 04 - GC算法与调优

    目录导航 前言 Garbage Collect(垃圾回收) 如何确定一个对象是垃圾? 引用计数法 可达性分析 垃圾收集算法 标记-清除(Mark-Sweep) 复制(Copying) 标记-整理(Ma ...

  4. sql优化的方法及思路_合理的sql优化思路--如何缩短SQL调优时间?

    概述 当生产环境发生故障或者系统特别慢的时候,这时候你从awr报告拿到有问题的sql,但是优化的时候却优化了很久还没解决,这时候在领导或者客户面前就不太好了...那么我们怎么去缩短sql调优的时间,一 ...

  5. 华为诺亚开源贝叶斯优化库:超参数调优河伯、组合优化器CompBO

    ©作者 | 陈萍.杜伟 来源 | 机器之心 华为诺亚开源了一个贝叶斯优化的库,该库包含三个部分:河伯.T-LBO.CompBO. 贝叶斯优化可以说是一种黑盒优化算法,该算法用于求解表达式未知函数的极值 ...

  6. Android性能优化(31)---虚拟机调优

    Android性能优化之虚拟机调优 众所周知,我们的Android App运行在Java虚拟机之上,而Java是一门带GC的语言.在虚拟机进行垃圾回收的时候,要做一件很形象的事叫做STW(stop t ...

  7. mysql的从头到脚优化之服务器参数的调优

    一. 说到mysql的调优,有许多的点可以让我们去做,因此梳理下,一些调优的策略,今天只是总结下服务器参数的调优  其实说到,参数的调优,我的理解就是无非两点: 如果是Innodb的数据库,innod ...

  8. Day794.如何用协程来优化多线程业务 -Java 性能调优实战

    如何用协程来优化多线程业务 Hi,我是阿昌,今天学习记录的是关于如何用协程来优化多线程业务. 近一两年,国内很多互联网公司开始使用或转型 Go 语言,其中一个很重要的原因就是 Go 语言优越的性能表现 ...

  9. Day814.电商系统表设计优化案例分析 -Java 性能调优实战

    电商系统表设计优化案例分析 Hi,我是阿昌,今天学习记录的是关于电商系统表设计优化案例分析. 如果在业务架构设计初期,表结构没有设计好,那么后期随着业务以及数据量的增多,系统就很容易出现瓶颈. 如果表 ...

最新文章

  1. 应用安全与微软SDL-IT流程
  2. iOS 调用h5页面 视频不自动播放的问题
  3. 全国计算机等级考试题库二级C操作题100套(第64套)
  4. ElasticSearch核心基础之映射
  5. c语言学习进阶-C语言带命令行参数的文件数据批量计算
  6. 新秀发挥云17号:RHEL改变以太网地址克隆虚拟机后,
  7. [Asp.Net WebApi]WebApi入门
  8. Day 11 - 视频转换成图片
  9. py thon画一个实心五角星
  10. lbj学习日记 05 一维数组和二维数组
  11. 计算机用word做贺卡,运用Word制作电子贺卡教学设计
  12. 新生宝宝取名大全:带梓字寓意大气的男孩名字
  13. 没赶上互联网,也没赶上移动互联网,微软到底赶上了什么??
  14. 《Windows 8 权威指南》——2.9 轻松为Windows 8 Metro开始屏幕增加关机/重启等应用...
  15. 给到工作的你诗和远方~
  16. 走进区块链企业 | 区块链应用商店 BeeStore
  17. 数据分析:基于Pandas的全球自然灾害分析与可视化
  18. 神奇的口袋【北京大学】
  19. 为什么建议大家使用 Linux 开发?有那么爽吗?
  20. 被问麻了,Spring 如何处理循环依赖?

热门文章

  1. 【车辆计数】基于matlab GUI背景差分法道路行驶多车辆检测【含Matlab源码 1911期】
  2. html5 驾考 答题样式,驾考科目一答题规律
  3. 百度地图的POI帮助文件
  4. 魅族手机突然显示无服务器,魅族Flyme6是悟空请来的?Bug竟然有这么多?
  5. knex 单表查询_SQL查询构建器 knex.js
  6. foxit 福昕阅读器 点击书签跳转时,保持 页面 缩放比例
  7. 启用openvpn后网络问题
  8. 安全基线规范之Cisco核心交换机
  9. 远程桌面用administrator登录的原因简介
  10. 基于51单片机的DS12C887电子钟万年历带农历温度