【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型
1.概述
转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型
相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型
Flink 1.10 对内部事件处理的线程模型做了一个大的改进,采用了类似 Actor 的信箱模型。这篇文章我们将深入 Flink 内部 Mailbox 线程模型的设计即实现。
2.背景
在之前的线程模型中,StreamTask
中可能存在多个潜在的线程会修改内部的状态,因此需要通过加锁
的方式来确保线程安全的状态,这个全局的锁就是著名的 checkpointLock
。通过 checkpointLock
控制线程间的并发会让程序代码变得很复杂,并且锁对象还通过一些 API 暴露给了用户(例如 SourceFunction#getCheckpointLock()),如果没有正确加锁很容易引发线程安全问题。
为了解决这个问题,社区提出了基于 Mailbox 的线程模型,见 FLINK-12477
。Mailbox 机制借鉴了 Actor 模型,通过单个 Mailbox 线程配合阻塞队列的方式,将内部状态的修改交由单个线程完成,从而避免多线程的问题。相比于使用 checkpointLock
,Mailbox 模型另一个好处是方便控制事件处理的优先级,通过锁竞争很难达到类似的效果。
在原始的线程模型中,checkpointLock 主要用在三个地方:
事件处理:包括 events, watermarks, barriers, latency markers 的处理和发送
checkpoint
触发:通过 RPC 调用触发checkpoint
(在 Source 中)、通知checkpoint
的完成情况,(注:对下游来说,checkpoint 触发和取消是通过 barrier 触发的,归为第一种情况)Processing Time Timers: 处理时间定时器是通过
ScheduledExecutor
异步执行的(事件事件定时器触发是通过 watermark 触发的,归为第一种情况)
在新的改进方案中,对锁的替换不仅仅要做到排他的效果,对于事件处理还需要保证原子性。
3.改进方案
Mailbox 模型的核心思想其实比较简单,其底层就是 FIFO 的队列 + 一个单线程的循环事件处理
。所有需要处理的事件都封装成一个 Mail 投递到 Mailbox 中,然后按先后顺序由单线程加以处理
,从而简化了并发访问问题。
在使用 Mailbox
以前,StreamTask
的核心逻辑是在 StreamTask#run()
中,内部是一个循环的事件处理。除此以外,checkpoint trigger
和 processing time timer
在其它线程中运行。
在改进方案中,StreamTask 的基础逻辑大致如下(伪代码,来自设计文档):
BlockingQueue<Runnable> mailbox = ...void runMailboxProcessing() {//TODO: can become a cancel-event through mailbox eventuallyRunnable letter;while (isRunning()) { while ((letter = mailbox.poll()) != null) {letter.run();}defaultAction();}
}void defaultAction() {// e.g. event-processing from an input
}
上面只是核心代码的大致逻辑,具体的实现还有一些优化,比如队列的公平性
。之前的抢锁
操作是完全没有任何公平性
而言的。
在这个模型下,事件处理的循环被移到了 Mailbox
处理线程中,因此以往在 StreamTask#run()
中的循环逻辑就不再需要了。但这里会有个问题,因为历史原因,Flink Source Function
的核心逻辑是一个循环,这个循环不能和 Mailbox
的事件循环穿插执行,因此需要进行兼容性处理。在 FLIP-27
提出的新的 Source 接口中,已经可以比较好地和 Mailbox 模型进行兼容了。
对于 checkpoint trigger 和 processing time timer,只需要将对应的操作封装为 Mail 投递到 Mailbox 中,等待 Mailbox 线程进行处理即可
。
4.具体实现
4.1 整体设计
下面这张图展示了 Mailbox 线程模型中的核心抽象。
Mail 中封装了需要处理的消息和相应的动作
,checkpoint trigger 和 processing time timer 就是通过 Mail 触发的
;TaskMailbox
用于存储 Mail(需要处理的消息);MailboxProcessor
负责从 TaskMailbox 中取出信件并处理;其它的调用方通过 MailboxExecutor 向 TaskMailbox 中投递信件。
MailboxDefaultAction
则是 MailboxProcessor
的默认动作,如前所述,MailboxDefaultAction
主要负责处理基础的 stream event、barrier、watermark
等。在 Mailbox
主线程的循环中,处理完新的 Mail 后就会执行该动作。MailboxDefaultAction
通过一个 MailboxController
和 Mailbox
进行交互,可以借此获悉所有的事件都处理完毕,或者临时暂停 MailboxDefaultAction
。
4.2 Mailbox
TaskMailbox
的内部使用了一个普通的 Deque
存储写入的 Mail
,对 Deque
读写通过一个 ReentrantLock
来加以保护。Mailbox
的一个主要特性是可以做优先级控制
,每一个 Mail
都有其优先级
,从 TaskMailbox
获取 Mail
时可以指定优先级
,实际实现时就是通过遍历队列元素比较优先级
。
为了减少读取队列时的同步开销
,TaskMailbox
支持创建一个 batch
后续消费,相当于把队列中的元素存入一个额外的队列,后续消费时就避免了加锁的操作。
4.3 MailboxProcessor
MailboxProcessot
核心就是前面提过的事件循环
,在这个事件循环中,除了处理 TaskMailbox
中的事件外,还有一个 MailboxDefaultAction
用做默认的行为
。
MailboxDefaultAction
和 TaskMailbox
内部的 Mail 的区别在于,Mail 通常用于一些控制类的消息处理
,例如 checkpoint 触发,而 MailboxDefaultAction
则用于数据流上的普通消息处理
(如正常的数据记录,barrier)等。数据流上的消息数据量比较大,通过邮箱内部队列进行处理显然开销比较大。
public class MailboxProcessor implements Closeable {//邮箱protected final TaskMailbox mailbox;// 默认行为,用于普通的数据流上的消息数据处理protected final MailboxDefaultAction mailboxDefaultAction;/*** Runs the mailbox processing loop. This is where the main work is done. This loop can be* suspended at any time by calling {@link #suspend()}. For resuming the loop this method should* be called again.** // 运行邮箱处理循环。 这是完成主要工作的地方。*/public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;// 检查当前运行线程是否是 mailbox 线程,只有 mailbox 线程能运行该方法//确保当前调用必须发生在 Mailbox 的事件处理线程中checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");// mailbox 状态必须是 OPENassert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController defaultActionContext = new MailboxController(this);// TODO:邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务...while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.// 在默认操作可用之前,阻塞的`processMail`调用将不会返回。// 处理事件,这是一个阻塞方法,如果默认行为不可用,方法不会返回processMail(localMailbox, false);// 再做一次检查,因为上面的 mail 处理可能会改变运行状态if (isNextLoopPossible()) {// TODO: 执行一个默认的动作 邮箱默认操作在StreamTask构造器中指定,为 processInputmailboxDefaultAction.runDefaultAction(// 根据需要在默认操作中获取锁defaultActionContext); // lock is acquired inside default action as needed}}}
4.4 MailboxExecutor
MailboxExecutor
的主要作用是向 TaskMailbox
中投递 Mail
,这个接口被设计为类似 java.util.concurrent.Executor
接口。提交 Mail
的行为可以在任意线程中进行,因为 TaskMailbox 内部有基于锁的同步控制
。
除了提交 Mail 外,MailboxExecutor
还有一个比较重要的作用体现在 MailboxExecutor#yield
方法中。yield 这个词在程序设计语言中非常常见,但其含义往往又让人摸不着头脑。从字面解释来看,yield 有“让出”,“屈服”之意,在一些场景下也有“生成”的意思。这里我们不纠结这个,还是来看看这个方法设计的意图的什么。
Mailbox 模型中所有的事件都是在单个事件处理线程中处理的,排除掉优先级的因素,所有的事件按照 FIFO 的顺序加以处理
。正常情况下,这种处理顺序是没有问题的。但是考虑到一种特殊的情况,如果要完成对事件A的处理需要等待一个条件,只有在处理完事件B之后这个条件才能满足,但是事件B在队列里的顺序是在事件A之后的,这样某种程度上来说就造成了一种 “死锁”。
yield 方法就是为了解决上面的问题,yield 会从队列中取出下一个事件进行处理,看上去像是暂时“让出”了对当前事件的处理。
说起来有点抽象,看一个示例:
MailboxExecutor mailboxExecutor = ....mailboxExecutor.executr(() -> {// ...// 当前事件处理的逻辑,要完成,需要依赖后面某个事件的处理while (resource not available) {// 取出下一个事件处理mailboxExecutor.yield();}// 继续处理当前事件// ...
})
注意,为了不破坏 Mailbox
模型单线程执行的特性,这个方法必须在 Mailbox
事件处理线程中调用。这是一个阻塞方法,因此可能会阻塞事件处理线程。有些场景下可能还需要依赖事件处理线程来提交新的事件,因此也提供了非阻塞的 tryYield
方法。
5.StreamTask 如何应用 Mailbox 模型
StreamTask 的核心是处理消息流中的 StreamRecord,这个处理逻辑是 MailboxProcessor 的默认行为,即:
class StreamTask {protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status = inputProcessor.processInput(); //处理输入if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}if (status == InputStatus.END_OF_INPUT) {// 没有后续的输入了,告知 MailboxDefaultAction.Controller controller.allActionsCompleted();return;}// 暂时没有输入的情况TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();TimerGauge timer;CompletableFuture<?> resumeFuture;if (!recordWriter.isAvailable()) {timer = ioMetrics.getBackPressuredTimePerSecond();resumeFuture = recordWriter.getAvailableFuture();} else {timer = ioMetrics.getIdleTimeMsPerSecond();resumeFuture = inputProcessor.getAvailableFuture();}// 一旦有输入了,就告知 controller 要恢复 MailboxDefaultAction 的处理assertNoException(resumeFuture.thenRun(// 首先会暂停 MailboxDefaultAction 的处理new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));}private static class ResumeWrapper implements Runnable {private final Suspension suspendedDefaultAction;private final TimerGauge timer;public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge timer) {this.suspendedDefaultAction = suspendedDefaultAction;timer.markStart();this.timer = timer;}@Overridepublic void run() {timer.markEnd();suspendedDefaultAction.resume();}}
}
对于 checkpoint 的触发,是通过 MailboxExecutor 提交一个 Mail 来实现的:
@Overridepublic Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {CompletableFuture<Boolean> result = new CompletableFuture<>();mainMailboxExecutor.execute(() -> {try {// 触发Checkpoint操作 这里可以看到,其实现跟方案设计中的是一致,Checkpoint trigger// 这里的操作就是向 MailBox 提交一个 Task,等待 MailBox 去处理。result.complete(triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions));} catch (Exception ex) {// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throw ex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);return result;}
checkpoint 完成或者放弃的通知也是提交到 Mailbox 中运行的:
class StreamTask {// checkpoint 完成或者失败的回调通知操作private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {CompletableFuture<Void> result = new CompletableFuture<>();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {try {runnable.run();} catch (Exception ex) {result.completeExceptionally(ex);throw ex;}result.complete(null);},description);return result;}
}
对于 processing time timer 的触发也是类似的:
class streamTask {public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {return mailboxExecutor ->new ProcessingTimeServiceImpl(timerService,callback -> deferCallbackToMailbox(mailboxExecutor, callback));}ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {return timestamp -> {// 提交到 mailbox 中运行mailboxExecutor.execute(() -> invokeProcessingTimeCallback(callback, timestamp),"Timer callback for %s @ %d",callback,timestamp);};}
}
6.Legacy Source 的兼容处理
前面提到,因为历史遗留的问题,SourceFunction
被设计成一个无限的循环,这个循环不能和 Mailbox
的事件循环穿插执行,因此需要进行兼容性处理。
SourceStreamTask
被设计为 StreamTask
的子类,会启动另外一个独立的线程 LegacySourceFunctionThread
运行 SourceFunction
中的循环。这样相当于有两个线程在同时运行
- 一个是
SourceFunction
中生成数据流中的数据 - 另一个是
Mailbox
中的事件处理线程。
为了防止这两个线程发生冲突,在 SourceStreamTask 中保留了 checkpoint lock,用于在这两个线程间进行并发控制。
为了达到这样的效果,Flink 提供了一个 StreamTaskActionExecutor
的封装,用来运行 Runnable
。正常情况下,StreamTaskActionExecutor
的实现就是直接去运行 Runnable
;同时也提供了一个 SynchronizedStreamTaskActionExecutor
的实现,在运行 Runnable
的时候会进行加锁控制,这样就把获取锁的操作引入到 Mailbox
处理线程中了:
class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}
7.小结
Mailbox 模型是常见的用来控制并发的一种设计,通过引入 Mailbox 的线程模型,Flink 简化了 StreamTask 的代码逻辑,规避了多线程竞争带来的并发问题。
通过对 Mailbox、 MailboxProcessor、MailboxExecutor 这几个接口的设计进行分析,可以看出 Flink 的 Mailbox 模型设计还是比较优雅的,在使用方面也比较简单,很值得我们在开发其它项目的时候参考。
【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型相关推荐
- 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表
1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表
- 【Flink】Flink 源码阅读笔记(16)- Flink SQL 的元数据管理
1.概述 转载:Flink 源码阅读笔记(16)- Flink SQL 的元数据管理 Flink 源码阅读笔记(17)- Flink SQL 中的时间属
- 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架
1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...
- syzkaller 源码阅读笔记1(syz-extract syz-sysgen)
文章目录 1. syz-extract 1-0 总结 1-1. `main()` 1-2 `archList()` - `1-1 (3)` 获取架构 name list 1-3 `createArch ...
- Transformers包tokenizer.encode()方法源码阅读笔记
Transformers包tokenizer.encode()方法源码阅读笔记_天才小呵呵的博客-CSDN博客_tokenizer.encode
- 源码阅读笔记 BiLSTM+CRF做NER任务 流程图
源码阅读笔记 BiLSTM+CRF做NER任务(二) 源码地址:https://github.com/ZhixiuYe/NER-pytorch 本篇正式进入源码的阅读,按照流程顺序,一一解剖. 一.流 ...
- 代码分析:NASM源码阅读笔记
NASM源码阅读笔记 NASM(Netwide Assembler)的使用文档和代码间的注释相当齐全,这给阅读源码 提供了很大的方便.按作者的说法,这是一个模块化的,可重用的x86汇编器, 而且能够被 ...
- CI框架源码阅读笔记4 引导文件CodeIgniter.php
到了这里,终于进入CI框架的核心了.既然是"引导"文件,那么就是对用户的请求.参数等做相应的导向,让用户请求和数据流按照正确的线路各就各位.例如,用户的请求url: http:// ...
- Yii源码阅读笔记 - 日志组件
2015-03-09 一 By youngsterxyf 使用 Yii框架为开发者提供两个静态方法进行日志记录: Yii::log($message, $level, $category); Yii: ...
最新文章
- android框架连接mysql_Android:ROOM数据库框架
- 数据备份软件,BackBone,NetVault,网络存储备份,系统集成
- Leetcode1685. 有序数组中差绝对值之和[C++题解]:前缀和和差的绝对值之和
- 人和计算机在时间管理方面的相似性
- RocketMQ简介、环境搭建
- NG客制项目下的I18n国际化标准方案
- django安装_技术大牛详解:Django框架之环境安装
- 详解:JVM内存调优参数
- 固件 日立 硬盘_最强性价比储存方案体验:这硬盘盒,真香!
- 七种武器——.NET工程师求职面试必杀技(转)
- 沉迷Link-Cut tree无法自拔之:[BZOJ2594][Wc2006]水管局长数据加强版
- Java加密方式(AES,DES,RSA,DSA,MD5)
- 7本软书,助你打破职场天花板
- netd模块工作流程
- python视频补帧_视频补帧软件(DAIN APP)
- 腾讯地图 周边 poi 搜索及参数配置
- 基于Ant Design vue框架登录demo
- 安卓转战React-Native之签名打包成Apk并极速多渠道打包
- Auto-Rig Pro文档翻译:安装
- 适用于Android设备的十大应用程序锁
热门文章
- 中国电信:张志勇辞任公司执行副总裁
- 启科量子加速商业化:量子通信为「盾」,量子计算为「矛」
- iPhone 13系列将首发A15芯片:采用增强版5nm工艺 性能提升20%
- 专为中国车主开发,特斯拉计划今年推出数据平台
- 美团关联公司公开“无人车及无人配送系统”相关专利
- 2699元起!格力首款5G手机悄然上架:骁龙765G处理器
- 飞书上线“程序员友好”功能 迎接1024程序员节
- 苹果:iPhone 12定价很合适,首批预订秒光说明用户认可
- 9月安卓机性能榜单公布:华为未进前十,第一名有点意外
- iPhone 12性能首曝:6GB内存、A14“挤牙膏”