摘要:本文介绍字节跳动在过去一段时间里做的两个主要的 Feature,一是在 Network 层的单点恢复的功能,二是 Checkpoint 层的 Regional Checkpoint。内容包括:

  1. 单点恢复机制

  2. Regional Checkpoint

  3. 在 Checkpoint 的其它优化

  4. 挑战 & 未来规划

Tips:点击文末「阅读原文」即可回顾作者原版分享视频~

一、单点恢复机制

在字节跳动的实时推荐场景中,我们使用 Flink 将用户特征与用户行为进行实时拼接,拼接样本作为实时模型的输入。拼接服务的时延和稳定性直接影响了线上产品对用户的推荐效果,而这种拼接服务在 Flink 中是一个类似双流 Join 的实现,Job 中的任何一个 Task 或节点出现故障,都会导致整个 Job 发生 Failover,影响对应业务的实时推荐效果。

在介绍单点恢复之前,我们回顾一下 Flink 的 Failover 策略。

  • Individual-Failover:

只重启出错的 Task,适用于 Task 间无连接的情况,应用场景有限。

  • Region-Failover:

该策略会将作业中的所有 Task 划分为数个 Region。当有 Task 发生故障时,它会尝试找出进行故障恢复需要重启的最小 Region 集合。相比于全局重启故障恢复策略,这种策略在一些场景下的故障恢复需要重启的 Task 会更少。

如果使用 Region-Failover 策略,但因为 Job 是一个全连接的拓扑,本身就是一个大 region。重启 region 相当于重启整个 Job,所以我们考虑是否可以用 Flink Individual-task-failover 策略去替代 Region-failover 策略?而 Individual-task-failover 的策略在这种拓扑下是完全不适用的。所以我们对于以下特征的场景,需要设计开发一个新的 Failover 策略:

  • 多流 Join 拓扑

  • 流量大(30M QPS)、高并发度(16K*16K)

  • 允许短时间内小量部分数据丢失

  • 对数据持续输出型要求高

在讲述技术方案之前,看一下 Flink 现有的数据传输机制。

从左往右看(SubTaskA):

  1. 当数据流入时会先被 RecordWriter 接收

  2. RecordWriter 根据数据的信息,例如 key,将数据进行 shuffle 选择对应的 channel

  3. 将数据装载到 buffer 中,并放到 channel 对应的 buffer 队列里

  4. 通过 Netty Server 向下游发送

  5. 下游 Netty Client 接收数据

  6. 根据 buffer 中的分区信息,转发发到下游对应的 channel 中

  7. 由 InputProcessor 将数据从 buffer 中取出,执行 operator 逻辑

根据上面提出的思路我们要解决以下几个问题:

  • 如何让上游 Task 感知下游 Failure

  • 下游 Task 失败后,如何让上游 Task 向正常的 Task 发送数据

  • 上游 Task 失败后,如何让下游 Task 继续消费 buffer 中的数据

  • 上下游中不完整的数据如何处理

  • 如何建立新的连接

针对上述问题提出解决方案。

■ 如何让上游 Task 感知下游 Failure

下游 SubTask 主动将失败信息传递给上游,或者 TM 被关闭上游 Netty Server 也可以感知到。图中用 X 表示不可用的 SubPartition。

首先将 SubPartition1 和对应的 view (Netty Server 用来取 SubPartition 数据的一个结构)置为不可用。

之后当 Record Writer 接收到新数据需要向 SubPartition1 发送数据,此时需要进行一个可用性判断,当 SubPartition 状态可用则正常发送,不可用直接丢弃数据。

■ 上游 Task 接收到下游 Task 新的连接

下游 subTask 被重新调度启动后,向上游发送 Partition Request,上游 Netty Server 收到 Partition Request 后重新给下游 SubTask 创建对用的 View, 此时上游 Record Writer 就可以正常写数据。

■ 下游 Task 感知上游 Task 失败

同样的下游 Netty Client 能感知到上游有 subTask 失败了,这时找出对应的 channel ,在末尾插入一个不可用的事件(这里用感叹号来表示事件)。我们的目的是想要尽可能的少丢数据,此时 channel 中的 buffer 任可以被 InputProcessor 正常消费,直到读取到“不可用事件”。再进行 channel 不可用标记和对应的 buffer 队列清理。

■ Buffer 中有不完整的数据

首先要知道不完整的数据存放在哪里,它存在于 input process 的内部,input process 会给每一个 channel 维护一个小的 buffer 队列。当收到一个 buffer ,它是不完整的数据,那么等到接收到下一个 buffer 后再拼接成一条完整的数据发往 operator。

■ 下游 Task 和上游 Task 重新连接

当上游有问题的 Task 被重新调度后,通过调用 TaskManager API 来通知下游。下游 Shuffle Environment 收到通知后判断对应的 channel 状态,如果是不可,用直接生成新的 channel 并释放掉老的。如果是可用状态,说明 channel 的 buffer 没有消费完,需要等待 buffer 消费完再进行替换操作。

业务收益

上图是以 4000 并行度的作业为例做了对比测试。业务是将一个用户展现流和一个用户行为流的进行 Join,整个作业共有 12000个 Task。

上图中 单点恢复(预留资源)是使用调度组做的一个 feature,在申请资源的时,选择额外多申请一些资源,当发生 failover 时省去了从 YARN 去申请资源的时间开销。

最后做到了作业的输出减少千分之一,恢复时间约 5 秒。因为整个恢复过程时间较短,可以基本做到下游无感知。

二、Regional Checkpoint

在一个比较经典的数据集成场景,数据导入导出。比如从 Kafka 导入到 Hive,满足下面几个特征。

  • 拓扑中没有 All-to-All 的连接

  • 强依赖 Checkpoint 来实现 Exactly-Once 语义下的数据输出

  • Checkpoint 间隔长,对成功率要求高

在这种情况下,数据没有任何的 shuffle 。

在数据集成的场景中遇到哪些问题?

  • 单个 Task Checkpoint 失败会影响全局的 Checkpoint 输出

  • 网络抖动、写入超时/失败、存储环境抖动对作业的影响过于明显

  • 2000并行以上的作业成功率明显下降,低于业务预期

在这里,我们想到作业会根据 region-failover 策略将作业的拓扑划分为多个 region。那么 Checkpoint 是否可以采取类似的思路,将 checkpoint 以 region 的单位来管理?答案是肯定的。

在这种情况下不需要等到所有 Task checkpoint 完成后才去做分区归档操作(例如 HDFS 文件 rename)。而是当某个 region 完成后即可进行 region 级别的 checkpoint 归档操作。

介绍方案之前先简单回顾 Flink 现有的 checkpoint 机制。相信大家都比较熟悉。

现有 ckp

上图中是一个 Kafka source 和 Hive sink 算子的拓扑,并行度为 4 的例子。

首先 checkpoint coordinator 触发 triggerCheckpoint 的操作,发送到各个 source task。在 Task 收到请求之后,触发 Task 内的 operator 进行 snapshot 操作。例子中有 8 个 operator 状态。

现有 ckp1

在各 operator 完成 snapshot 后,Task 发送 ACK 消息给 checkpoint coordinator 表示当前 Task 已经完成了 Checkpoint。

之后当 coordinator 收到所有 Task 成功的 ACK 消息,那么 checkpont 可以认为是成功了。最后触发 finalize 操作,保存对应的 metadata。通知所有 Task checkpoint 完成。

当我们使用 Region 方式去管理 checkpoint 时会遇到什么问题?

  • 如何划分 Checkpoint Region

把彼此没有连接的 Task 集合,划分为 1 个 region。显而易见例子中有四个  Region。

  • 失败 Region 的 Checkpoint 结果如何处理

假设第一次 checkpoint 能正常完成,每个 operator 对应的状态都成功写入 HDFS checkpoint1 目录中,并通过逻辑映射,将 8 个operator 映射到 4 个 checkpoint region。注意仅仅是逻辑映射,并没有对物理文件做出任何移动和修改。

现有 ckp1

第二次进行 checkpoint 时 region-4-data(Kafka-4,Hive-4)checkpoint 失败。checkpoint2 (job/chk_2)目录中没有对应 Kafka-4-state 和 Hive-4-state 文件,当前 checkpoint2 是不完整的。为了保证完整,从上一次或之前成功的 checkpoint 文件中寻找 region-4-data 成功的 state 文件,并进行逻辑映射。这样当前 checkpoint 每个 region 状态文件就完整了,可以认为 checkpoint 完成。

此时如果发生大部分或所有 region 都失败,如果都引用前一次 checkpoint 那么当前这个 checkpoint 和上一个 checkpoint 相同也就没有意义了。

通过配置 region 最大失败比例, 比如 50%,例子中 4 个 region ,最多能接受两个 region 失败。

  • 如何避免在文件系统上存储过多的 Checkpoint 历史数据

如果有某个 region 一直失败(遇到脏数据或代码逻辑问题),当前的机制会导致把所有历史 checkpoint 文件都保留下来,显然这是不合理的。

通过配置支持 region 最大连续失败次数。例如2表示 region 最多能引用前两次的  checkpoint 成功的 region 结果。

工程实现难点

  1. 如何处理 Task Fail 和 checkpoint timeout

  2. 同一 region 内已经 snapshot 成功的 subTask 状态如何处理

  3. 如何保证和 checkpoint Coordinator 的兼容性

来看目前 Flink 是如何做的。

现有 coordinator

当发生 Task failure ,先会通知到 JobMaster FailoverStrategy,通过 FailoverStrategy 来通知 checkpoint coordinator 进行 checkpoint cancel 操作。

那么 checkpoint timeout 情况如何处理?当 coordinator 触发 checkpoint 时,会开启 checkpoint canceller。canceller 内有一个定时器,当超过预设时间并且 coordinator 还未完成 checkpoint,说明出现timeout,通知 coordinator cancel 本次 checkpoint。

无论是 Task fail 还是 timeout 最终都会指向 pendding checkpoint,并且当前指向的 checkpoint 就会被丢弃。

在做出相应修改前先梳理 checkpoint 相关的 Message,和 checkpoint coordinator 会做出的反应。

Global checkpoint 为 Flink 现有机制。

为了保持和 checkpoint Coordinator 兼容性,添加一个 CheckpointHandle 接口。并添加了两个实现分别是  GlobalCheckpointHandle 和 RegionalCheckpointHandle 通过过滤消息的方式实现 global checkpoint 和 region checkpoint 相关操作。

region checkpoint 提一点。如果 handler 接收到失败消息,将这个 region 置为失败,并尝试从之前的 successful checkpoint 进行 region 逻辑映射。同样 coordinator 发送 nofityComplate 消息也会先经过 handler 的过滤,过滤掉发送给失败 Task 的消息。

业务收益

测试在 5000 并行度下,假设单个 Task snapshot 的成功率为 99.99%。使用 Global checkpoint 的成功率为 60.65%, 而使用 Region checkpoint 任然能保持 99.99%。

三、Checkpoint 上的其它优化

并行化恢复 operator 状态

union state 是一种比较特殊的状态,在恢复时需要找到 job 所有的 Task state 再进行 union 恢复到单个 Task 中。如果 Job 并行度非常大,如 10000, 那么每个 task 的 union state 进行恢复时至少需要读取 10000 个文件。如果串行恢复这 10000 个文件里的状态,那么恢复的耗时可想而知是非常漫长的。

虽然 OperatorState 对应的数据结构是无法进行并行操作的,但是我们读取文件的过程是可以并行化的,在 OperatorStateBackend 的恢复过程中,我们将读取 HDFS 文件的过程并行化,等到所有状态文件解析到内存后,再用单线程去处理,这样我们可以将几十分钟的状态恢复时间减少到几分钟。

增强 CheckpointScheduler 并支持 Checkpoint 整点触发

Flink checkpoint 的 interval,timeout 在任务提交之后是无法修改的。但刚上线时只能根据经验值进行设置。而往往在作业高峰期时会发现 interval,timeout 等参数设置不合理。这时通常一个方法是修改参数重启任务,对业务影响比较大,显然这种方式是不合理的。

在这里,我们对 CheckpointCoordinator 内部的 Checkpoint 触发机制做了重构,将已有的 Checkpoint 触发流程给抽象出来,使得我们可以很快地基于抽象类对 Checkpoint 触发机制进行定制化。比如在支持数据导入的场景中,为了更快地形成 Hive 分区,我们实现了整点触发的机制,方便下游尽快地看到数据。

还有很多优化点就不一一列举了。

四、挑战 & 未来规划

目前字节内部的作业状态最大能达到 200TB 左右的水平,而对于这种大流量和大状态的作业,直接使用 RocksDB StateBackend 是无法支撑的。所以未来,我们会之后继续会在 state 和 checkpoint 性能优化和稳定性上做更多的工作,比如强化已有的 StateBackend、解决倾斜和反压下 Checkpoint 的速率问题、增强调试能力等。

字节跳动实时计算团队负责公司内部实时计算场景, 支撑了数仓/机器学习/推荐/搜索/广告/流媒体/安全和风控等众多核心业务,我们面临的挑战是超大单体作业(千万级别 QPS),超大集群规模(上万台机器)的应用场景,在 SQL, State, Runtime 上都有深度优化。公司仍处于高速发展阶段,欢迎有能力、有想法的同学来这里一起建设实时计算引擎。感兴趣的同学可联系邮箱:liaojiayi@bytedance.com

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~


▼ 关注「Flink 中文社区」,获取更多技术干货 ▼

戳我,回顾作者分享视频!

字节跳动单点恢复功能及 Regional CheckPoint 优化实践相关推荐

  1. 字节跳动 Flink 单点恢复功能及 Regional CheckPoint 优化实践

    本文介绍了字节跳动在实时计算方面进行的两个实践优化,一是在 Network 层的单点恢复的功能,二是 Checkpoint 层的 Regional Checkpoint. 作者|廖嘉逸 一.单点恢复机 ...

  2. 字节跳动在Spark SQL上的核心优化实践

    字节跳动在Spark SQL上的核心优化实践 大数据架构 今天 以下文章来源于字节跳动技术团队 ,作者郭俊 字节跳动技术团队 字节跳动的技术实践分享 10月26日,字节跳动技术沙龙 | 大数据架构专场 ...

  3. 上海沙龙回顾 | ​字节跳动在Spark SQL上的核心优化实践

    10月26日,字节跳动技术沙龙 | 大数据架构专场 在上海字节跳动总部圆满结束.我们邀请到字节跳动数据仓库架构负责人郭俊,Kyligence 大数据研发工程师陶加涛,字节跳动存储工程师徐明敏,阿里云高 ...

  4. 字节跳动在 Spark SQL 上的核心优化实践

    作者 | 郭俊 封图 | BanburyTang 字节跳动数据仓库架构团队负责数据仓库领域架构设计,支持字节跳动几乎所有产品线(包含但不限于抖音.今日头条.西瓜视频.火山视频)数据仓库方向的需求,如 ...

  5. 字节跳动 Flink 单点恢复功能实践

    简介:在 Flink 现有的架构设计中,多流 Join 拓扑下单个 Task 失败会导致所有 Task 重新部署,耗时可能会持续几分钟,导致作业的输出断流,这对于线上业务来说是不可接受的.针对这一痛点 ...

  6. 字节跳动 录屏功能_一周盘点:Instagram打通视频购物功能 公开叫板TikTok;字节跳动要大力布局跨境出口电商?...

    周末刚结束,预祝各位小伙伴们在接下来的每一天都能大卖! 大师兄先带你盘点这一周都有哪一些跨境电商热点事件: 每周要点 Instagram打通视频购物功能 公开叫板TikTok 据外媒报道,Instag ...

  7. 字节跳动 录屏功能_视频激励-录屏分享参考规范

    背景 [视频录制及分享]是字节跳动系平台独有的也是最核心的内容展示能力及产品分发能力.我们希望通过平台提供的视频相关 api 接口能力,能让小游戏发行和平台能力充分结合,更好的利用,帮助小游戏在本平台 ...

  8. 字节跳动异构场景下的高可用建设实践

    本文首发于:火山引擎开发者社区: 作者:字节跳动基础架构团队系统治理方向负责人邵育亮 本文主要为大家介绍字节跳动在高可用建设上的一些思考和落地经验.先给大家简单介绍一下系统治理团队是做什么的.系统治理 ...

  9. 字节跳动在 RocksDB 存储引擎上的改进实践

    本文选自"字节跳动基础架构实践"系列文章. "字节跳动基础架构实践"系列文章是由字节跳动基础架构部门各技术团队及专家倾力打造的技术干货内容,和大家分享团队在基础 ...

  10. 字节跳动数据中台的 Data Catalog 系统搜索实践

    动手点关注 干货不迷路 1. 背景 Data Catalog 能够帮助大公司更好地梳理和管理自己的资产,是 Data-drvien 公司的重要平台.一个通用的 Data Catalog 平台通常包含元 ...

最新文章

  1. turtlebot雷达模块
  2. EduCoder Linux之文件打包和解压缩
  3. 寄存器地址和内存地址_3. 从0开始学ARMARM模式、寄存器、流水线
  4. curl txt批量_curl与wget高级用法
  5. IIS日志存入数据库之二:ETW
  6. 关于document.cookie的使用javascript
  7. 工科数学分析 MA_12 Vectors and the Geometry of Space (下篇)
  8. GIS空间分析(一)——空间分析与GIS
  9. springboot 文件下载 文件名乱码 特殊字符乱码
  10. 机器学习sklearn----支持向量机SVC模型评估指标
  11. 实用又方便电脑快捷键
  12. 解决 ajax 跨域
  13. 三角形公式 [重心, 内心, 外心, 垂心]
  14. 在线求助ing~ 急!!
  15. Java面向对象实例(双色球摇号篇)
  16. python,时间的四种格式
  17. 转发微雪课堂的STM32CubeMX系列教程
  18. 《北京爱情故事》:四个女人的爱情
  19. Python实现自动从中控考勤机软件中下载记录数据
  20. ansible playbook play task执行顺序

热门文章

  1. JavaScript:split() 方法和join() 方法
  2. tongweb php,TongWeb服务器部署
  3. 加密设备攻防(二)- 智能设备篇
  4. 其他学习笔记(一)——MySQL基础配置+可视化工具安装与破解
  5. redission分布式锁
  6. iOS线程之——NSCondition
  7. VMware下安装CentOS
  8. wenbao与cf整数直角三角形
  9. codeforces 463A Caisa and Sugar 解题报告
  10. [转载] Discrete Mathematics——04 一阶谓词逻辑基本概念