1.概述

转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型

Flink 1.10 对内部事件处理的线程模型做了一个大的改进,采用了类似 Actor 的信箱模型。这篇文章我们将深入 Flink 内部 Mailbox 线程模型的设计即实现。

2.背景

在之前的线程模型中,StreamTask 中可能存在多个潜在的线程会修改内部的状态,因此需要通过加锁的方式来确保线程安全的状态,这个全局的锁就是著名的 checkpointLock。通过 checkpointLock 控制线程间的并发会让程序代码变得很复杂,并且锁对象还通过一些 API 暴露给了用户(例如 SourceFunction#getCheckpointLock()),如果没有正确加锁很容易引发线程安全问题。

为了解决这个问题,社区提出了基于 Mailbox 的线程模型,见 FLINK-12477。Mailbox 机制借鉴了 Actor 模型,通过单个 Mailbox 线程配合阻塞队列的方式,将内部状态的修改交由单个线程完成,从而避免多线程的问题。相比于使用 checkpointLock,Mailbox 模型另一个好处是方便控制事件处理的优先级,通过锁竞争很难达到类似的效果。

在原始的线程模型中,checkpointLock 主要用在三个地方:

  • 事件处理:包括 events, watermarks, barriers, latency markers 的处理和发送

  • checkpoint 触发:通过 RPC 调用触发 checkpoint(在 Source 中)、通知 checkpoint 的完成情况,(注:对下游来说,checkpoint 触发和取消是通过 barrier 触发的,归为第一种情况)

  • Processing Time Timers: 处理时间定时器是通过 ScheduledExecutor 异步执行的(事件事件定时器触发是通过 watermark 触发的,归为第一种情况)

在新的改进方案中,对锁的替换不仅仅要做到排他的效果,对于事件处理还需要保证原子性。

3.改进方案

Mailbox 模型的核心思想其实比较简单,其底层就是 FIFO 的队列 + 一个单线程的循环事件处理。所有需要处理的事件都封装成一个 Mail 投递到 Mailbox 中,然后按先后顺序由单线程加以处理,从而简化了并发访问问题。

在使用 Mailbox 以前,StreamTask 的核心逻辑是在 StreamTask#run() 中,内部是一个循环的事件处理。除此以外,checkpoint triggerprocessing time timer 在其它线程中运行。

在改进方案中,StreamTask 的基础逻辑大致如下(伪代码,来自设计文档):

BlockingQueue<Runnable> mailbox = ...void runMailboxProcessing() {//TODO: can become a cancel-event through mailbox eventuallyRunnable letter;while (isRunning()) { while ((letter = mailbox.poll()) != null) {letter.run();}defaultAction();}
}void defaultAction() {// e.g. event-processing from an input
}

上面只是核心代码的大致逻辑,具体的实现还有一些优化,比如队列的公平性。之前的抢锁操作是完全没有任何公平性而言的。

在这个模型下,事件处理的循环被移到了 Mailbox 处理线程中,因此以往在 StreamTask#run() 中的循环逻辑就不再需要了。但这里会有个问题,因为历史原因,Flink Source Function 的核心逻辑是一个循环,这个循环不能和 Mailbox 的事件循环穿插执行,因此需要进行兼容性处理。在 FLIP-27 提出的新的 Source 接口中,已经可以比较好地和 Mailbox 模型进行兼容了。

对于 checkpoint trigger 和 processing time timer,只需要将对应的操作封装为 Mail 投递到 Mailbox 中,等待 Mailbox 线程进行处理即可

4.具体实现

4.1 整体设计

下面这张图展示了 Mailbox 线程模型中的核心抽象。

Mail 中封装了需要处理的消息和相应的动作,checkpoint trigger 和 processing time timer 就是通过 Mail 触发的TaskMailbox 用于存储 Mail(需要处理的消息);MailboxProcessor 负责从 TaskMailbox 中取出信件并处理;其它的调用方通过 MailboxExecutor 向 TaskMailbox 中投递信件。

MailboxDefaultAction 则是 MailboxProcessor 的默认动作,如前所述,MailboxDefaultAction 主要负责处理基础的 stream event、barrier、watermark 等。在 Mailbox 主线程的循环中,处理完新的 Mail 后就会执行该动作。MailboxDefaultAction 通过一个 MailboxControllerMailbox 进行交互,可以借此获悉所有的事件都处理完毕,或者临时暂停 MailboxDefaultAction

4.2 Mailbox

TaskMailbox 的内部使用了一个普通的 Deque 存储写入的 Mail,对 Deque 读写通过一个 ReentrantLock 来加以保护。Mailbox 的一个主要特性是可以做优先级控制,每一个 Mail 都有其优先级,从 TaskMailbox 获取 Mail 时可以指定优先级,实际实现时就是通过遍历队列元素比较优先级

为了减少读取队列时的同步开销TaskMailbox 支持创建一个 batch 后续消费,相当于把队列中的元素存入一个额外的队列,后续消费时就避免了加锁的操作。

4.3 MailboxProcessor

MailboxProcessot 核心就是前面提过的事件循环,在这个事件循环中,除了处理 TaskMailbox 中的事件外,还有一个 MailboxDefaultAction 用做默认的行为

MailboxDefaultActionTaskMailbox 内部的 Mail 的区别在于,Mail 通常用于一些控制类的消息处理,例如 checkpoint 触发,而 MailboxDefaultAction用于数据流上的普通消息处理(如正常的数据记录,barrier)等。数据流上的消息数据量比较大,通过邮箱内部队列进行处理显然开销比较大。

public class MailboxProcessor implements Closeable {//邮箱protected final TaskMailbox mailbox;// 默认行为,用于普通的数据流上的消息数据处理protected final MailboxDefaultAction mailboxDefaultAction;/*** Runs the mailbox processing loop. This is where the main work is done. This loop can be* suspended at any time by calling {@link #suspend()}. For resuming the loop this method should* be called again.**  // 运行邮箱处理循环。 这是完成主要工作的地方。*/public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;// 检查当前运行线程是否是 mailbox 线程,只有 mailbox 线程能运行该方法//确保当前调用必须发生在 Mailbox 的事件处理线程中checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");// mailbox 状态必须是 OPENassert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController defaultActionContext = new MailboxController(this);// TODO:邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务...while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.// 在默认操作可用之前,阻塞的`processMail`调用将不会返回。// 处理事件,这是一个阻塞方法,如果默认行为不可用,方法不会返回processMail(localMailbox, false);// 再做一次检查,因为上面的 mail 处理可能会改变运行状态if (isNextLoopPossible()) {// TODO: 执行一个默认的动作 邮箱默认操作在StreamTask构造器中指定,为 processInputmailboxDefaultAction.runDefaultAction(// 根据需要在默认操作中获取锁defaultActionContext); // lock is acquired inside default action as needed}}}

4.4 MailboxExecutor

MailboxExecutor 的主要作用是向 TaskMailbox 中投递 Mail,这个接口被设计为类似 java.util.concurrent.Executor 接口。提交 Mail 的行为可以在任意线程中进行,因为 TaskMailbox 内部有基于锁的同步控制

除了提交 Mail 外,MailboxExecutor 还有一个比较重要的作用体现在 MailboxExecutor#yield 方法中。yield 这个词在程序设计语言中非常常见,但其含义往往又让人摸不着头脑。从字面解释来看,yield 有“让出”,“屈服”之意,在一些场景下也有“生成”的意思。这里我们不纠结这个,还是来看看这个方法设计的意图的什么。

Mailbox 模型中所有的事件都是在单个事件处理线程中处理的,排除掉优先级的因素,所有的事件按照 FIFO 的顺序加以处理。正常情况下,这种处理顺序是没有问题的。但是考虑到一种特殊的情况,如果要完成对事件A的处理需要等待一个条件,只有在处理完事件B之后这个条件才能满足,但是事件B在队列里的顺序是在事件A之后的,这样某种程度上来说就造成了一种 “死锁”。

yield 方法就是为了解决上面的问题,yield 会从队列中取出下一个事件进行处理,看上去像是暂时“让出”了对当前事件的处理。

说起来有点抽象,看一个示例:

MailboxExecutor mailboxExecutor = ....mailboxExecutor.executr(() -> {// ...// 当前事件处理的逻辑,要完成,需要依赖后面某个事件的处理while (resource not available) {// 取出下一个事件处理mailboxExecutor.yield();}// 继续处理当前事件// ...
})

注意,为了不破坏 Mailbox 模型单线程执行的特性,这个方法必须在 Mailbox 事件处理线程中调用。这是一个阻塞方法,因此可能会阻塞事件处理线程。有些场景下可能还需要依赖事件处理线程来提交新的事件,因此也提供了非阻塞的 tryYield 方法。

5.StreamTask 如何应用 Mailbox 模型

StreamTask 的核心是处理消息流中的 StreamRecord,这个处理逻辑是 MailboxProcessor 的默认行为,即:

class StreamTask {protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status = inputProcessor.processInput(); //处理输入if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}if (status == InputStatus.END_OF_INPUT) {// 没有后续的输入了,告知 MailboxDefaultAction.Controller controller.allActionsCompleted();return;}// 暂时没有输入的情况TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();TimerGauge timer;CompletableFuture<?> resumeFuture;if (!recordWriter.isAvailable()) {timer = ioMetrics.getBackPressuredTimePerSecond();resumeFuture = recordWriter.getAvailableFuture();} else {timer = ioMetrics.getIdleTimeMsPerSecond();resumeFuture = inputProcessor.getAvailableFuture();}// 一旦有输入了,就告知 controller 要恢复 MailboxDefaultAction 的处理assertNoException(resumeFuture.thenRun(// 首先会暂停 MailboxDefaultAction 的处理new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));}private static class ResumeWrapper implements Runnable {private final Suspension suspendedDefaultAction;private final TimerGauge timer;public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge timer) {this.suspendedDefaultAction = suspendedDefaultAction;timer.markStart();this.timer = timer;}@Overridepublic void run() {timer.markEnd();suspendedDefaultAction.resume();}}
}

对于 checkpoint 的触发,是通过 MailboxExecutor 提交一个 Mail 来实现的:

@Overridepublic Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {CompletableFuture<Boolean> result = new CompletableFuture<>();mainMailboxExecutor.execute(() -> {try {// 触发Checkpoint操作 这里可以看到,其实现跟方案设计中的是一致,Checkpoint trigger// 这里的操作就是向 MailBox 提交一个 Task,等待 MailBox 去处理。result.complete(triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions));} catch (Exception ex) {// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throw ex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);return result;}

checkpoint 完成或者放弃的通知也是提交到 Mailbox 中运行的:

class StreamTask {// checkpoint 完成或者失败的回调通知操作private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {CompletableFuture<Void> result = new CompletableFuture<>();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {try {runnable.run();} catch (Exception ex) {result.completeExceptionally(ex);throw ex;}result.complete(null);},description);return result;}
}

对于 processing time timer 的触发也是类似的:

class streamTask {public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {return mailboxExecutor ->new ProcessingTimeServiceImpl(timerService,callback -> deferCallbackToMailbox(mailboxExecutor, callback));}ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {return timestamp -> {// 提交到 mailbox 中运行mailboxExecutor.execute(() -> invokeProcessingTimeCallback(callback, timestamp),"Timer callback for %s @ %d",callback,timestamp);};}
}

6.Legacy Source 的兼容处理

前面提到,因为历史遗留的问题,SourceFunction 被设计成一个无限的循环,这个循环不能和 Mailbox 的事件循环穿插执行,因此需要进行兼容性处理。

SourceStreamTask 被设计为 StreamTask 的子类,会启动另外一个独立的线程 LegacySourceFunctionThread 运行 SourceFunction 中的循环。这样相当于有两个线程在同时运行

  1. 一个是 SourceFunction 中生成数据流中的数据
  2. 另一个是 Mailbox 中的事件处理线程。

为了防止这两个线程发生冲突,在 SourceStreamTask 中保留了 checkpoint lock,用于在这两个线程间进行并发控制。

为了达到这样的效果,Flink 提供了一个 StreamTaskActionExecutor 的封装,用来运行 Runnable。正常情况下,StreamTaskActionExecutor 的实现就是直接去运行 Runnable;同时也提供了一个 SynchronizedStreamTaskActionExecutor 的实现,在运行 Runnable 的时候会进行加锁控制,这样就把获取锁的操作引入到 Mailbox 处理线程中了:

class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}

7.小结

Mailbox 模型是常见的用来控制并发的一种设计,通过引入 Mailbox 的线程模型,Flink 简化了 StreamTask 的代码逻辑,规避了多线程竞争带来的并发问题。

通过对 Mailbox、 MailboxProcessor、MailboxExecutor 这几个接口的设计进行分析,可以看出 Flink 的 Mailbox 模型设计还是比较优雅的,在使用方面也比较简单,很值得我们在开发其它项目的时候参考。

【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型相关推荐

  1. 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

    1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

  2. 【Flink】Flink 源码阅读笔记(16)- Flink SQL 的元数据管理

    1.概述 转载:Flink 源码阅读笔记(16)- Flink SQL 的元数据管理 Flink 源码阅读笔记(17)- Flink SQL 中的时间属

  3. 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架

    1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...

  4. syzkaller 源码阅读笔记1(syz-extract syz-sysgen)

    文章目录 1. syz-extract 1-0 总结 1-1. `main()` 1-2 `archList()` - `1-1 (3)` 获取架构 name list 1-3 `createArch ...

  5. Transformers包tokenizer.encode()方法源码阅读笔记

    Transformers包tokenizer.encode()方法源码阅读笔记_天才小呵呵的博客-CSDN博客_tokenizer.encode

  6. 源码阅读笔记 BiLSTM+CRF做NER任务 流程图

    源码阅读笔记 BiLSTM+CRF做NER任务(二) 源码地址:https://github.com/ZhixiuYe/NER-pytorch 本篇正式进入源码的阅读,按照流程顺序,一一解剖. 一.流 ...

  7. 代码分析:NASM源码阅读笔记

    NASM源码阅读笔记 NASM(Netwide Assembler)的使用文档和代码间的注释相当齐全,这给阅读源码 提供了很大的方便.按作者的说法,这是一个模块化的,可重用的x86汇编器, 而且能够被 ...

  8. CI框架源码阅读笔记4 引导文件CodeIgniter.php

    到了这里,终于进入CI框架的核心了.既然是"引导"文件,那么就是对用户的请求.参数等做相应的导向,让用户请求和数据流按照正确的线路各就各位.例如,用户的请求url: http:// ...

  9. Yii源码阅读笔记 - 日志组件

    2015-03-09 一 By youngsterxyf 使用 Yii框架为开发者提供两个静态方法进行日志记录: Yii::log($message, $level, $category); Yii: ...

最新文章

  1. android框架连接mysql_Android:ROOM数据库框架
  2. 数据备份软件,BackBone,NetVault,网络存储备份,系统集成
  3. Leetcode1685. 有序数组中差绝对值之和[C++题解]:前缀和和差的绝对值之和
  4. 人和计算机在时间管理方面的相似性
  5. RocketMQ简介、环境搭建
  6. NG客制项目下的I18n国际化标准方案
  7. django安装_技术大牛详解:Django框架之环境安装
  8. 详解:JVM内存调优参数
  9. 固件 日立 硬盘_最强性价比储存方案体验:这硬盘盒,真香!
  10. 七种武器——.NET工程师求职面试必杀技(转)
  11. 沉迷Link-Cut tree无法自拔之:[BZOJ2594][Wc2006]水管局长数据加强版
  12. Java加密方式(AES,DES,RSA,DSA,MD5)
  13. 7本软书,助你打破职场天花板
  14. netd模块工作流程
  15. python视频补帧_视频补帧软件(DAIN APP)
  16. 腾讯地图 周边 poi 搜索及参数配置
  17. 基于Ant Design vue框架登录demo
  18. 安卓转战React-Native之签名打包成Apk并极速多渠道打包
  19. Auto-Rig Pro文档翻译:安装
  20. 适用于Android设备的十大应用程序锁

热门文章

  1. 中国电信:张志勇辞任公司执行副总裁
  2. 启科量子加速商业化:量子通信为「盾」,量子计算为「矛」
  3. iPhone 13系列将首发A15芯片:采用增强版5nm工艺 性能提升20%
  4. 专为中国车主开发,特斯拉计划今年推出数据平台
  5. 美团关联公司公开“无人车及无人配送系统”相关专利
  6. 2699元起!格力首款5G手机悄然上架:骁龙765G处理器
  7. 飞书上线“程序员友好”功能 迎接1024程序员节
  8. 苹果:iPhone 12定价很合适,首批预订秒光说明用户认可
  9. 9月安卓机性能榜单公布:华为未进前十,第一名有点意外
  10. iPhone 12性能首曝:6GB内存、A14“挤牙膏”