1.概述

转载:Flink 基于 MailBox 实现的 StreamTask 线程模型

先来给介绍一下目前 StreamTask 中基于 MailBox 实现的线程模型,这个模型从 1.9 开始实现,在目前发布的 1.10 版本中,基本上已经改造完成,具体 issue 见 FLINK-12477: Change threading-model in StreamTask to a mailbox-based approach,其设计文档见 Change threading-model in StreamTask to a mailbox-based approach,去年,vinoyang 也写了一篇关于它的介绍,见 重磅!Flink 将重构其核心线程模型。因为 Flink 1.10 已经发布,本篇关于 MailBox 实现的介绍会基于 1.10 最新的代码来讲述(系列的其他篇,没有说明的话,默认还是以 1.9 的代码为例),这个功能在 1.9 中还并没有完全完成,所以本文以 1.10 代码为例讲述。

2.Motivation

先来看下这个改造/改进最初的动机,在之前 Flink 的线程模型中,会有多个潜在的线程去并发访问其内部的状态,比如 event-processing 和 checkpoint triggering,它们都是通过一个全局锁(checkpoint lock)来保证线程安全,这种实现方案带来的问题是:

  • 锁对象会在多个类中传递,代码的可读性比较差;
  • 而且锁对象还暴露给了面向用户的 API(见 SourceFunction#getCheckpointLock());
  • 在使用时,如果没有获取锁,可能会造成很多问题,使得问题难以定位;

基于上面的这些问题,关于线程模型,提出了一个全新的解决方案 —— MailBox 模型,它可以让 StreamTask 中所有状态的改变都会像在单线程中实现得一样简单。方案借鉴了 Actor 模型的 MailbBox 设计理念,它会让这些 action 操作(需要获取 checkpoint lock 的操作)先加入到一个 阻塞队列,然后主线程再从队列取相应的 mail task 去执行。

3.设计方案

这里先看下,之前的实现方案中,StreamTask 中 checkpoint lock 都主要用在什么地方:

  • Event-processing: events、watermarks、barriers、latency markers 等的发送和处理;
  • Checkpoints: 通过 RPC 向 TaskExecutor 发送 Checkpoint trigger 和 completeness 的通知,以及 Checkpoint 的 trigger 和 cancel 在 event 处理期间也可以通过 barrier 接收到;
  • Processing Time Timers: 目前 SystemProcessingTimeService 是使用 ScheduledExecutor 异步地处理 processing time timer(而 event time timer 依赖于 Watermark 的处理,并且它同步触发的)。
    另外,设计方案不但要能达到排它锁的效果,还要对一些核心环节(比如:event processing)能够做到原子性处理。

下面来看下 MailBox 模型 最初设计文档中的设计(方案方案见:Change threading-model in StreamTask to a mailbox-based approach)。

4.StreamTask 中要做的改变

这里会在 StreamTask 中引入一个 MailBox 变量,最初的一个想法是将 MailBox 设计为一个 ArrayBlockingQueue(实际上在 1.9 的实现中,使用的是一个 ring buffer,1.10 对这部分又做了重构,后面会介绍)。MailBox 将会取代 StreamTask#run() 方法的角色,而且它还可以处理 Checkpoint eventprocessing timer event,这些 event 都会被封装为一个 task 添加到 MailBox 的队列中,而 MailBox 的主线程(单线程)将会消费这个队列中的 task 进行顺序处理。StreamTask 实现的伪代码如下:

BlockingQueue<Runnable> mailbox = ...
void runMailboxProcessing() {//TODO: can become a cancel-event through mailbox eventuallyRunnable letter;while (isRunning()) {while ((letter = mailbox.poll()) != null) { letter.run();letter.run();}defaultAction();}
}
void defaultAction() {// e.g. event-processing from an input
}

上面的代码实现只是核心代码大概实现,在真正的实现中还可以做很多优化,队列的公平性也是我们考虑的一个点,之前的抢锁操作是完全没有任何公平性而言的。

4.1 client 代码需要做的改变

之前的实现中,Checkpoint lock 通过 getter 暴露给相关的 actor(Checkpoint、processing timer、event processing),而在 MailBox 的实现中,将会把 mailbox 隐藏在 queue 接口后面,仅仅向上层暴露 queue 的 getter 接口。

4.2 event 的产生与处理

MailBox 的实现将会极大简化代码的实现,MailBox 模型可以确保这些改变都是由单线程来操作,之前很多需要加锁的代码在新的实现中可以被移除。而为了实现MailBox 模型,需要将之前 run() 方法中 event processing 循环调用处理改为一个 event 有界流处理,举个例子:

One/TwoInputStreamTask 中的下面代码

while (running && inputProcessor.processInput())

可以修改为

inputProcessor.processInput() // 每次触发,都相当于处理一个有限流

在实现中,会先检查 MailBox 有没有 mail(即加入到队列里的 task 任务)需要处理,有的话,就进行处理,如果没有的话,就执行上面的操作,进行 event processing

这里有一个问题:就是 SourceStreamTask,会有一个兼容性的问题,因为在流的 source 端,它的 event prcessing 是来专门产生一个无限流数据,在这个处理中,并不能穿插 MailBox 中的 mail 检测,也就是说,如果只有一个 MailBox 线程处理的话,当这个线程去产生数据的话,它一直运行下去,就无法再去检测 MailBox 中是否有新的 mail 到来(在 Source 未来的版本中,可以完美兼容 MailBox 线程设计,见 FLIP-27,但现在的版本还不兼容)。

为了兼容 Source 端,目前的解决方案是:两个线程操作,一个专门用产生无限流,另一个是 MailBox 线程(处理 Checkpoint、timer 等),这两个线程为了保证线程安全,还是使用 Checkpoint Lock 做排它锁,如下图所示(图片来自设计文档):

4.3 Checkpoint 和 timer 的 trigger

对于 Checkpoint 和 timer 的 trigger,这里会发现,目前的这个设计是完全可以满足需求的,Checkpoint 和 Timer 的触发事件都会以一个 Runnable 的形式添加到 MailBox 的队列中,等待 MailBox 主线程去处理。

5.具体实现

介绍完其设计方案,这里注重看下在 Apache Flink 1.10 的代码中,基于 MailBox 模型 的 StreamTask 是如何实现的。

5.1 StreamTask 处理流程

在 Flink 中,当一个作业被调度起来后,对于流计算来说,作业中的 Task 最终会以 StreamTask 的形式去执行,在 1.10 的实现中,一个 StreamTask 的核心处理流程如下:


StreamTask 中 invoke()runMailboxLoop() 方法的实现如下:

// org.apache.flink.streaming.runtime.tasks.StreamTask
public final void invoke() throws Exception {try {beforeInvoke();// final check to exit early before starting to runif (canceled) {throw new CancelTaskException();}// let the task do its workisRunning = true;runMailboxLoop();// if this left the run() method cleanly despite the fact that this was canceled,// make sure the "clean shutdown" is not attemptedif (canceled) {throw new CancelTaskException();}afterInvoke();}finally {cleanUpInvoke();}
}private void runMailboxLoop() throws Exception {//note: mailbox 处理try {mailboxProcessor.runMailboxLoop();}catch (Exception e) {Optional<InterruptedException> interruption = ExceptionUtils.findThrowable(e, InterruptedException.class);if (interruption.isPresent()) {if (!canceled) {Thread.currentThread().interrupt();throw interruption.get();}} else if (canceled) {LOG.warn("Error while canceling task.", e);}else {throw e;}}
}

最后真正执行的是 MailboxProcessor 中的 runMailboxLoop() 方法,也就是上面说的 MailBox 主线程,StreamTask 运行的核心流程也是在这个方法中,其实现如下:

//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
/*** Runs the mailbox processing loop. This is where the main work is done.* note: mailbox 处理核心流程*/
public void runMailboxLoop() throws Exception {final TaskMailbox localMailbox = mailbox;Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");//note: MailBox 的状态必须是 OPEN,才能继续循环assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController defaultActionContext = new MailboxController(this);while (processMail(localMailbox)) { //note: 如果有 mail 需要处理,这里会进行相应的处理,处理完才会进行下面的 event processing//note: 进行 task 的 default action,也就是调用 processInput()mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed}
}

上面的方法中,最关键的有两个地方:

  • processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false;
  • runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的。

5.2 event-processing 处理

对于 StreamTask 来说,event-processing 现在是在 processInput() 方法中实现的:

//org.apache.flink.streaming.runtime.tasks.StreamTask
/*** This method implements the default action of the task (e.g. processing one event from the input). Implementations* should (in general) be non-blocking.* note: 这个方法执行这个 task 默认的 action** @param controller controller object for collaborative interaction between the action and the stream task.* @throws Exception on any problems in the action.*/
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status = inputProcessor.processInput(); //note: event 处理if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {//note: 如果输入还有数据,并且 writer 是可用的,这里就直接返回了return;}if (status == InputStatus.END_OF_INPUT) {//note: 输入已经处理完了,会调用这个方法controller.allActionsCompleted();return;}CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);//note: 告诉 MailBox 先暂停 loopMailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();//note: 等待 future 完成后,继续 mailbox loop(等待 input 和 output 可用后,才会继续)jointFuture.thenRun(suspendedDefaultAction::resume);
}
  1. 首先通过 processMail() 方法处理 MailBox 中的 mail

    • 如果没有 mail 要处理,这里直接返回;
    • 先将 MailBox 中当前现存的 mail 全部处理完;
    • 通过 isDefaultActionUnavailable() 做一个状态检查(目的是提供一个接口方便上层控制调用,这里把这个看作一个状态检查方便讲述),如果是 true 的话,会在这里一直处理 mail 事件,不会返回,除非状态改变;
  2. 然后再调用 StreamTask 的 processInput() 方法来处理 event:
    • 先调用 InputProcessor 的 processInput() 方法来处理 event;
    • 如果上面处理结果返回的状态是 MORE_AVAILABLE(表示还有可用的数据等待处理)并且 recordWriter 可用(之前的异步操作已经处理完成),就会立马返回;
    • 如果上面处理结果返回的状态是 END_OF_INPUT,它表示数据处理完成,这里就会告诉 MailBox 数据已经处理完成了;
    • 否则的话,这里会等待,直到有可用的数据到来及 recordWriter 可用。

5.3 checkpoint trigger 处理

接着来看下 Checkpoint Trigger 是怎么处理的,要先看下 Streamtask 的 triggerCheckpointAsync() 实现:

//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
@Override
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,boolean advanceToEndOfEventTime) {//note: checkpoint 触发时,提交相应的 taskreturn mailboxProcessor.getMainMailboxExecutor().submit(() -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),"checkpoint %s with %s",checkpointMetaData,checkpointOptions);
}

这里可以看到,其实现跟方案设计中的是一致,Checkpoint trigger 这里的操作就是向 MailBox 提交一个 Task,等待 MailBox 去处理。

5.4 SourceStreamTask 如何兼容

在设计文档中,有个重要的、特别要注意的点就是 SourceStreamTask 的兼容问题,开始的设计方案是在 SourceStreamTask 中专门启动两个线程来保持兼容性问题,而且虽然使用了 MailBox 模型,但还是会继续使用 checkpoint lock 来保证线程安全,这里看下其是如何实现的。

//org.apache.flink.streaming.runtime.tasks.SourceStreamTask
@Override
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {//note: 告诉 MailBox 先暂停 loopcontroller.suspendDefaultAction();// Against the usual contract of this method, this implementation is not step-wise but blocking instead for// compatibility reasons with the current source interface (source functions run as a loop, not in steps).sourceThread.setTaskDescription(getName());sourceThread.start();sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {if (sourceThreadThrowable == null || isFinished) {//note: sourceThread 完成后,没有抛出异常或 task 完成的情况下mailboxProcessor.allActionsCompleted();} else {//note: 没有完成但结束了或者抛出异常的情况下mailboxProcessor.reportThrowable(sourceThreadThrowable);}});
}/*** Runnable that executes the the source function in the head operator.* note: source 产生 data 的一个线程*/
private class LegacySourceFunctionThread extends Thread {private final CompletableFuture<Void> completionFuture;LegacySourceFunctionThread() {this.completionFuture = new CompletableFuture<>();}@Overridepublic void run() {try {//note: 调用 source Operator 的 runheadOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}}public void setTaskDescription(final String taskDescription) {setName("Legacy Source Thread - " + taskDescription);}CompletableFuture<Void> getCompletionFuture() {return completionFuture;}
}

可以看到:

  • LegacySourceFunctionThread 线程在启动时,会先通知一下 MailBox,这个就是上面说的那个状态检查,收到这个信号之后,MailBox 就会在 processMail() 中一直等待并且处理 mail,不会返回(也就是 MailBox 主线程一直在处理 mail 事件);
  • LegacySourceFunctionThread 线程就是专门生产数据的,跟 MailBox 这两个线程都在运行。

那么两个线程如何保证线程安全呢?如果仔细看上面的代码就会发现,在 SourceStreamTask 中还继续使用了 getCheckpointLock(),虽然这个方法现在已经被标注了将要被废弃,但 Source 没有改造完成之前,Source 的实现还是会继续依赖 checkpoint lock。

6.总结

这里,总结一下 Flink 1.10 中 MailBox 模型的核心设计,如下图所示:

  • MailboxExecutor: 它负责向 MailBox 提交 task 任务;
  • TaskMailbox: 负责存储相应 task 任务(也就是 mail),它支持多写单读,单线程读取并处理;
  • MailboxProcessor: MailBox 的核心处理线程,MailboxDefaultAction 是其默认的 action 实现,可以理解为 StreamTask 的 event 处理逻辑就是基于 MailboxDefaultAction 接口实现的。

Flink MailBox 这块的设计还是非常不错的,无论是从代码的可读性上还是后续维护性上都是要比之前的设计好很多,也值得我们学习借鉴。

M.参考:

FLINK-12477: Change threading-model in StreamTask to a mailbox-based approach;

Change threading-model in StreamTask to a mailbox-based approach;

重磅!Flink 将重构其核心线程模型;

flink线程模型源码分析1之前篇将StreamTask中的线程模型更改为基于Mailbox的方法

【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型相关推荐

  1. 【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

    1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:[Flink]Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl ...

  2. Redis为什么这么快?Redis的线程模型与Redis多线程

    一.Redis有多快? Redis是基于内存运行的高性能 K-V 数据库,官方提供的测试报告是单机可以支持约10w/s的QPS 二.Redis为什么这么快? (1)完全基于内存,数据存在内存中,绝大部 ...

  3. 【Flink】Flink Table 基于Processing Time、Event Time的多种Window实现

    Flink Table 基于Processing Time.Event Time的多种Window实现 Flink 提供了Table Api,用来统一批流入口,使用Flink Table Api,直接 ...

  4. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  5. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  6. 基于应用层自身反远程线程注入的研究

    基于应用层自身反远程线程注入的研究 现状:目前所有已知的反远程注入方式:r0层hook 句柄的获取,返回失败,让应用层注入者拿不到目标进程的句柄,如hook ntopenprocess ntdubli ...

  7. 【Flink】Flink 1.10之改进的TaskManager内存模型与配置

    1.概述 转载:Flink 1.10之改进的TaskManager内存模型与配置

  8. 用Linux / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)

    用Linux/ C实现基于自动扩/减容线程池+epoll反应堆模型的服务器框架 前言 服务器端源码 客户端源码 自定义库 helper.c 和 helper.h helper.c helper.h M ...

  9. 【SemiDrive源码分析】【MailBox核间通信】42 - 基于Mailbox 实现的 mailbox_demo 应用程序(RTOS Android侧通信实现)

    [SemiDrive源码分析][MailBox核间通信]42 - 基于Mailbox 实现的 mailbox_demo 应用程序(RTOS & Android侧通信实现) 一.编写RTOS侧 ...

最新文章

  1. WIRW:淡水分子微生物生态学综述
  2. nginx访问日志,错误日志参数说明
  3. 2020蓝桥杯省赛---java---B---8(数字三角形)
  4. 配置struts.xml时extends=struts-default会报错,原因和解决
  5. ubuntu18安装vnpyv1.9.2之二
  6. java 银行管理系统怎么储存账户信息_银行管理系统 实现用户注册 登录 存、取款 交易记录查询和修改用户信息等功能...
  7. 锐起无盘安装图文教程
  8. LaTeX 安装及环境配置
  9. 如何查看Spark日志与排查报错问题
  10. LambdaQueryWrapper构建查询条件、模糊查询、范围查询、排序
  11. 芯片春秋: ARM前世今生
  12. 第8章 卷积神经网络
  13. 三明计算机动漫与游戏制作,福建中小学电脑制作-福建中等职业教育与终身教育网.DOC...
  14. 谈谈我了解的那些在线it学习网站
  15. 老程序员到40、50岁该怎么办?是继续留在软件行业还是转行?是默默死去还是向中层管理者蜕变?美国在老程序员的职业发展上的经验?...
  16. 仿酒仙网的一款jQuery侧栏弹出导航栏特效
  17. 跨境电商属于外贸吗,Starday跨境电商靠谱吗?
  18. 迭代器生成器思维导图
  19. 【Java 并发编程】【05】线程安全问题与线程同步
  20. Android Support库百分比布局

热门文章

  1. 微博预期12月8日登陆港交所 最终发售价定为272.8港元
  2. iPhone 14处理器曝光:万众期待的最硬核升级凉了
  3. 美国五家科技巨头十年并购616家小型公司,引发并购审查
  4. 华为回应“发射卫星抢占6G”:假消息
  5. 马斯克称特斯拉Model Y今年产量有限 明年会大规模生产
  6. 中芯国际:公司客户需求强劲 订单饱满
  7. 来做网课老师不?年薪两百万,上不封顶...
  8. 2020后半年iPhone取消附赠耳机?分析师上调AirPods出货量预估
  9. 新款iPhone SE预购好于预期,新款iPhone SE Plus可能要因此延迟了
  10. 华为P40系列国行版来了:价格成最大悬念!