[源码解析] 从TimeoutException看Flink的心跳机制

文章目录

  • [源码解析] 从TimeoutException看Flink的心跳机制
    • 0x00 摘要
    • 0x01 缘由
    • 0x02 背景概念
      • 2.1 四大模块
      • 2.2 Akka
      • 2.3 RPC机制
        • 2.3.1 RpcEndpoint:RPC的基类
        • RpcService:RPC服务提供者
        • RpcGateway:RPC调用的网关
      • 2.4 常见心跳机制
    • 0x03 Flink心跳机制
      • 3.1 代码和机制
      • 3.2 静态架构
        • 3.2.1 HeartbeatTarget :监控目标抽象
        • 3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳状态
        • 3.2.3 HeartbeatManager :心跳管理者
        • 3.2.4 HearbeatListener 处理心跳结果
      • 3.3 动态运行机制
        • 3.3.1 HearbeatManagerImpl : Receiver
        • 3.3.2 HeartbeatManagerSenderImpl : Sender
        • 3.3.3 HeartbeatMonitorImpl
        • 3.3.3 HeartbeatServices
    • 0x04 初始化
      • 4.1 心跳服务创建
    • 0x05 Flink中具体应用
      • 5.1 总述
        • 5.1.1 RM, JM, TM之间关系
        • 5.1.2 三者间心跳机制
      • 5.2 初始化过程
        • 5.2.1 TaskExecutor初始化
        • 5.2.2 JobMaster的初始化
        • 5.2.3 ResourceManager初始化
      • 5.3 注册过程
        • 5.3.1 TM注册到RM中
          • 5.3.1.1 TM的操作
          • 5.3.1.2 RM的操作
          • 5.3.1.3 返回到TM
        • 5.3.2 TM注册到 JM
      • 5.4 心跳过程
        • 5.4.1 ResourceManager主动发起
          • 5.4.1.1 Sender遍历所有监控的Monitor(Target)
          • 5.4.1.2 Target进行具体操作
          • 5.4.1.3 RPC调用
        • 5.4.2 RM通过RPC调用TM
        • 5.4.3 TM 通过RPC回到 RM
      • 5.5 超时处理
        • 5.5.1 TaskManager
        • 5.5.2 ResourceManager
    • 0x06 解决问题
    • 0x07 参考

0x00 摘要

本文从一个调试时候常见的异常 "TimeoutException: Heartbeat of TaskManager timed out"切入,为大家剖析Flink的心跳机制。文中代码基于Flink 1.10。

0x01 缘由

大家如果经常调试Flink,当进入断点看到了堆栈和变量内容之后,你容易陷入了沉思。当你发现了问题可能所在,高兴的让程序Resume的时候,你发现程序无法运行,有如下提示:

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.

这实在是很郁闷的事情。作为程序猿不能忍啊,既然异常提示中有 Heartbeat 字样,于是我们就来一起看看Flink的心跳机制,看看有没有可以修改的途径。

0x02 背景概念

2.1 四大模块

Flink有核心四大组件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。

  • Dispatcher(Application Master)用于接收client提交的任务和启动相应的JobManager。其提供REST接口来接收client的application提交,负责启动JM和提交application,同时运行Web UI。
  • ResourceManager:主要用于资源的申请和分配。当TM有空闲的slot就会告诉JM,没有足够的slot也会启动新的TM。kill掉长时间空闲的TM。
  • JobMaster :功能主要包括(旧版本中JobManager的功能在新版本中以JobMaster形式出现,可能本文中会混淆这两个词,请大家谅解):
    • 将JobGraph转化为ExecutionGraph(physical dataflow graph,并行化)。
    • 向RM申请资源、schedule tasks、保存作业的元数据。
  • TaskManager:类似Spark的executor,会跑多个线程的task、数据缓存与交换。Flink 架构遵循 Master - Slave 架构设计原则,JobMaster 为 Master 节点,TaskManager 为Slave节点。

这四大组件彼此之间的通信需要依赖RPC实现

2.2 Akka

Flink底层RPC基于Akka实现。Akka是一个开发并发、容错和可伸缩应用的框架。它是Actor Model的一个实现,和Erlang的并发模型很像。在Actor模型中,所有的实体被认为是独立的actors。actors和其他actors通过发送异步消息通信。

Actor模型的强大来自于异步。它也可以显式等待响应,这使得可以执行同步操作。但是强烈不建议同步消息,因为它们限制了系统的伸缩性。

2.3 RPC机制

RPC作用是:让异步调用看起来像同步调用。

Flink基于Akka构建了其底层通信系统,引入了RPC调用,各节点通过GateWay方式回调,隐藏通信组件的细节,实现解耦。Flink整个通信框架的组件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等构成。

RPC相关的主要接口如下:

  • RpcEndpoint
  • RpcService
  • RpcGateway

2.3.1 RpcEndpoint:RPC的基类

RpcEndpoint是Flink RPC终端的基类,所有提供远程过程调用的分布式组件必须扩展RpcEndpoint,其功能由RpcService支持。

RpcEndpoint的子类只有四类组件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有这四个组件有RPC的能力,换句话说只有这四个组件有RPC的这个需求。

每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了RpcGateway接口,

RpcService:RPC服务提供者

RpcServer是RpcEndpoint的成员变量,为RpcService提供RPC服务/连接远程Server,其只有一个子类实现:AkkaRpcService(可见目前Flink的通信方式依然是Akka)。

RpcServer用于启动和连接到RpcEndpoint, 连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。

Flink四大组件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的实现,所以构建四大组件时,同步需要初始化RpcServer。如JobManager的构造方式,第一个参数就是需要知道RpcService。

RpcGateway:RPC调用的网关

Flink的RPC协议通过RpcGateway来定义;由前面可知,若想与远端Actor通信,则必须提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster会先启动ActorSystem,此时TaskExecutor的Container还未分配,后面与TaskExecutor通信时,必须让其提供对应地址。

Dispatcher,JobMaster,ResourceManager,TaskExecutor 这四大组件通过各种方式实现了Gateway。以JobMaster为例,JobMaster实现JobMasterGateway接口。各组件类的成员变量都有需要通信的其他组件的GateWay实现类,这样可通过各自的Gateway实现RPC调用。

2.4 常见心跳机制

常见的心跳检测有两种:

  • socket 套接字SO_KEEPALIVE本身带有的心跳机制,定期向对方发送心跳包,对方收到心跳包后会自动回复;
  • 应用自身实现心跳机制,同样也是使用定期发送请求的方式;

Flink实现的是第二种方案。

0x03 Flink心跳机制

3.1 代码和机制

Flink的心跳机制代码在:

Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat

四个接口:

HeartbeatListener.java          HeartbeatManager.java      HeartbeatTarget.java  HeartbeatMonitor.java

以及如下几个类:

HeartbeatManagerImpl.java   HeartbeatManagerSenderImpl.java   HeartbeatMonitorImpl.java
HeartbeatServices.java    NoOpHeartbeatManager.java

Flink集群有多种业务流程,比如Resource Manager, Task Manager, Job Manager。每种业务流程都有自己的心跳机制。Flink的心跳机制只是提供接口和基本功能,具体业务功能由各业务流程自己实现

我们首先设定 心跳系统中有两种节点:sender和receiver。心跳机制是sender和receivers彼此相互检测。但是检测动作是Sender主动发起,即Sender主动发送请求探测receiver是否存活,因为Sender已经发送过来了探测心跳请求,所以这样receiver同时也知道Sender是存活的,然后Reciver给Sender回应一个心跳表示自己也是活着的。

因为Flink的几个名词和我们常见概念有所差别,所以流程上需要大家仔细甄别,即:

  • Flink Sender 主动发送Request请求给Receiver,要求Receiver回应一个心跳;
  • Flink Receiver 收到Request之后,通过Receive函数回应一个心跳请求给Sender;

3.2 静态架构

3.2.1 HeartbeatTarget :监控目标抽象

HeartbeatTarget是对监控目标的抽象。心跳机制在行为上而言有两种动作:

  • 向某个节点发送请求。
  • 处理某个节点发来的请求。

HeartbeatTarget的函数就是这两个动作:

  • receiveHeartbeat :向某个节点(Sender)发送心跳回应,其参数heartbeatOrigin 就是 Receiver。
  • requestHeartbeat :向某个节点(Receiver)要求其回应一个心跳,其参数requestOrigin 就是 Sender。requestHeartbeat这个函数是Sender的函数,其中Sender通过RPC直接调用到Receiver

这两个函数的参数也很简单:分别是请求的发送放和接收方,还有Payload载荷。对于一个确定节点而言,接收的和发送的载荷是同一类型的。

public interface HeartbeatTarget<I> {/*** Sends a heartbeat response to the target.* @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.*/// heartbeatOrigin 就是 Receivervoid receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);/*** Requests a heartbeat from the target. * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.*/// requestOrigin 就是 Sendervoid requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}

3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳状态

对HeartbeatTarget的封装,这样Manager对Target的操作是通过对Monitor完成,后续会在其继承类中详细说明。

public interface HeartbeatMonitor<O> {// Gets heartbeat target.HeartbeatTarget<O> getHeartbeatTarget();// Gets heartbeat target id.ResourceID getHeartbeatTargetId();// Report heartbeat from the monitored target.void reportHeartbeat();//Cancel this monitor.void cancel();//Gets the last heartbeat.long getLastHeartbeat();
}

3.2.3 HeartbeatManager :心跳管理者

HeartbeatManager负责管理心跳机制,比如启动/停止/报告一个HeartbeatTarget。此接口继承HeartbeatTarget。

除了HeartbeatTarget的函数之外,这接口有4个函数:

  • monitorTarget,把和某资源对应的节点加入到心跳监控列表;
  • unmonitorTarget,从心跳监控列表删除某资源对应的节点;
  • stop,停止心跳管理服务,释放资源;
  • getLastHeartbeatFrom,获取某节点的最后一次心跳数据。
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);void unmonitorTarget(ResourceID resourceID);void stop();long getLastHeartbeatFrom(ResourceID resourceId);
}

3.2.4 HearbeatListener 处理心跳结果

用户业务逻辑需要继承这个接口以处理心跳结果。其可以看做服务的输出,实现了三个回调函数

  • notifyHeartbeatTimeout,处理节点心跳超时
  • reportPayload,处理节点发来的Payload载荷
  • retrievePayLoad。获取对某节点发下一次心跳请求的Payload载荷
public interface HeartbeatListener<I, O> {void notifyHeartbeatTimeout(ResourceID resourceID);void reportPayload(ResourceID resourceID, I payload);O retrievePayload(ResourceID resourceID);
}

3.3 动态运行机制

之前提到Sender和Receiver,下面两个类就对应上述概念。

  • HeartbeatManagerImpl :Receiver,存在于JobMaster与TaskExecutor中;
  • HeartbeatManagerSenderImpl :Sender,继承 HeartbeatManagerImpl类,用于周期发送心跳要求,存在于JobMaster、ResourceManager中。

几个关键问题:

  1. 如何判定心跳超时?

    心跳服务启动后,Flink在Monitor中通过 ScheduledFuture 会启动一个线程来处理心跳超时事件。在设定的心跳超时时间到达后才执行线程。

    如果在设定的心跳超时时间内接收到组件的心跳消息,会先将该线程取消而后重新开启,重置心跳超时事件的触发。

    如果在设定的心跳超时时间内没有收到组件的心跳,则会通知组件:你超时了。

  2. 何时"调用双方"发起心跳检查?

    心跳检查是双向的,一方(Sender)会主动发起心跳请求,而另一方(Receiver)则是对心跳做出响应,两者通过RPC相互调用,重置对方的 Monitor 超时线程。

    以JobMaster和TaskManager为例,JM在启动时会开启周期调度,向已经注册到JM中的TM发起心跳检查,通过RPC调用TM的requestHeartbeat方法,重置TM中对JM超时线程的调用,表示当前JM状态正常。在TM的requestHeartbeat方法被调用后,通过RPC调用JM的receiveHeartbeat,重置 JM 中对TM超时线程的调用,表示TM状态正常。

  3. 如何处理心跳超时?

    心跳服务依赖 HeartbeatListener,当在timeout时间范围内未接收到心跳响应,则会触发超时处理线程,该线程通过调用HeartbeatListener.notifyHeartbeatTimeout方法做后续重连操作或者直接断开。

下面是一个概要(以RM & TM为例):

  • RM : 实现了ResourceManagerGateway (可以直接被RPC调用)

  • TM : 实现了TaskExecutorGateway (可以直接被RPC调用)

  • RM :有一个Sender HM : taskManagerHeartbeatManager,Sender HM 拥有用户定义的 TaskManagerHeartbeatListener

  • TM :有一个Receiver HM :resourceManagerHeartbeatManager,Receiver HM 拥有用户定义的ResourceManagerHeartbeatListener。

  • HeartbeatManager 有一个ConcurrentHashMap<ResourceID, HeartbeatMonitor> heartbeatTargets,这个Map是它监控的所有Target。

  • 对于RM的每一个需要监控的TM, 其生成一个HeartbeatTarget,进而被构造成一个HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。

  • 每一个Target对应的Monitor中,有自己的异步任务ScheduledFuture,这个ScheduledFuture不停的被取消/重新生成。如果在某个期间内没有被取消,则通知用户定义的listener出现了timeout。

3.3.1 HearbeatManagerImpl : Receiver

HearbeatManagerImpl是receiver的具体实现。它由 心跳 被发起方(就是Receiver,例如TM) 创建,接收 发起方(就是Sender,例如 JM)的心跳发送请求。心跳超时 会触发 heartbeatListener.notifyHeartbeatTimeout方法。

注意:被发起方监控线程(Monitor)的开启是在接收到请求心跳(requestHeartbeat被调用后)以后才触发的,属于被动触发。

HearbeatManagerImpl主要维护了

  • 一个心跳监控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;。这是一个KV关联。

    key代表要发送心跳组件(例如:TM)的ID,value则是为当前组件创建的触发心跳超时的线程HeartbeatMonitor,两者一一对应。

    当一个从所联系的machine发过来的心跳被收到时候,对应的monitor的状态会被更新(重启一个新ScheduledFuture)。当一个monitor发现了一个 heartbeat timed out,它会通知自己的HeartbeatListener。

  • 一个 ScheduledExecutor mainThreadExecutor 负责heartbeat timeout notifications。

  • heartbeatListener :处理心跳结果。

HearbeatManagerImpl 数据结构如下:

@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {/** Heartbeat timeout interval in milli seconds. */private final long heartbeatTimeoutIntervalMs;/** Resource ID which is used to mark one own's heartbeat signals. */private final ResourceID ownResourceID;/** Heartbeat listener with which the heartbeat manager has been associated. */private final HeartbeatListener<I, O> heartbeatListener;/** Executor service used to run heartbeat timeout notifications. */private final ScheduledExecutor mainThreadExecutor;/** Map containing the heartbeat monitors associated with the respective resource ID. */private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;/** Running state of the heartbeat manager. */protected volatile boolean stopped;
}

HearbeatManagerImpl实现的主要函数有:

  • monitorTarget :把一个节点加入到心跳监控列表。

    • 传入参数有:ResourceId和HearbeatTarget,monitorTarget根据这两个参数,生成一个HeartbeatMonitor对象,然后把这个对象跟ResrouceId做kv关联,存入到heartbeatTargets。 一个节点可能参与多个业务流程,因此一个节点参与多个心跳流程,一个节点上运行多个不同类型的HearbeatTarget。所以一个ResourceID可能会跟不同类型的HearbeatTarget对象关联,分别加入到多个HeartbeatManager,进行不同类型的心跳监控。也因此这个函数入参是两个参数。
  • requestHeartbeat :Sender通过RPC异步调用到Receiver的这个函数 以要求receiver向requestOrigin节点(就是Sender)发起一次心跳响应,载荷是heartbeatPayLoad。其内部流程如下:
    • 首先会调用reportHeartbeat函数,作用是 通过Monitor 记录发起请求的这个时间点,然后创建一个ScheduleFuture。如果到期后,requestOrigin没有作出响应,那么就将requestOrigin节点对应的HeartbeatMonitor的state设置成TIMEOUT状态,如果到期内requestOrigin响应了,ScheduleFuture会被取消,HeartbeatMonitor的state仍然是RUNNING。
    • 其次调用reportPayload函数,把requestOrigin节点的最新的heartbeatPayload通知给heartbeatListener。heartbeatListener是外部传入的,它根据所有节点的心跳记录做监听管理。
    • 最后调用receiveHearbeat函数,响应一个心跳给Sender。

3.3.2 HeartbeatManagerSenderImpl : Sender

继承HearbeatManagerImpl,由心跳管理的一方(例如JM)创建,实现了run函数(即它可以作为一个单独线程运行),创建后立即开启周期调度线程,每次遍历自己管理的heartbeatTarget,触发heartbeatTarget.requestHeartbeat,要求 Target 返回一个心跳响应。属于主动触发心跳请求。

public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {public void run() {if (!stopped) {for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);}// 周期调度getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);}}//  主动发起心跳检查private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();// 调用 Target 的 requestHeartbeat 函数heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);}
}

3.3.3 HeartbeatMonitorImpl

Heartbeat monitor管理心跳目标,它启动一个ScheduledExecutor。

  • 如果在timeout时间内没有接收到心跳信号,则判定心跳超时,通知给HeartbeatListener。
  • 如果在timeout时间内接收到心跳信号,则重置当前ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {/** Resource ID of the monitored heartbeat target. */private final ResourceID resourceID;   // 被监控的resource ID/** Associated heartbeat target. */private final HeartbeatTarget<O> heartbeatTarget; //心跳目标private final ScheduledExecutor scheduledExecutor;/** Listener which is notified about heartbeat timeouts. */private final HeartbeatListener<?, ?> heartbeatListener; // 心跳监听器/** Maximum heartbeat timeout interval. */private final long heartbeatTimeoutIntervalMs;private volatile ScheduledFuture<?> futureTimeout;//  AtomicReference  使用private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);//  最近一次接收到心跳的时间private volatile long lastHeartbeat;// 报告心跳public void reportHeartbeat() {// 保留最近一次接收心跳时间lastHeartbeat = System.currentTimeMillis();// 接收心跳后,重置timeout线程resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);}// 心跳超时,触发lister的notifyHeartbeatTimeoutpublic void run() {// The heartbeat has timed out if we're in state runningif (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {heartbeatListener.notifyHeartbeatTimeout(resourceID);}}//  重置TIMEOUTvoid resetHeartbeatTimeout(long heartbeatTimeout) {if (state.get() == State.RUNNING) {//先取消线程,在重新开启cancelTimeout();// 启动超时线程futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);// Double check for concurrent accesses (e.g. a firing of the scheduled future)if (state.get() != State.RUNNING) {cancelTimeout();}}}

3.3.3 HeartbeatServices

建立heartbeat receivers and heartbeat senders,主要是对外提供服务。这里我们可以看到:

  • HeartbeatManagerImpl就是receivers。
  • HeartbeatManagerSenderImpl就是senders。
public class HeartbeatServices {// Creates a heartbeat manager which does not actively send heartbeats.public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) {return new HeartbeatManagerImpl<>(...);}// Creates a heartbeat manager which actively sends heartbeats to monitoring targets.public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) {return new HeartbeatManagerSenderImpl<>(...);}
}

0x04 初始化

4.1 心跳服务创建

心跳管理服务在Cluster入口创建。因为我们是调试,所以在MiniCluster.start调用。

public void start() throws Exception {......heartbeatServices = HeartbeatServices.fromConfiguration(configuration);......
}

HeartbeatServices.fromConfiguration会从Configuration中获取配置信息:

  • 心跳间隔 heartbeat.interval
  • 心跳超时时间 heartbeat.timeout

个就是我们解决最开始问题的思路:从配置信息入手,扩大心跳间隔

public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {this.heartbeatInterval = heartbeatInterval;this.heartbeatTimeout = heartbeatTimeout;
}public static HeartbeatServices fromConfiguration(Configuration configuration) {long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

0x05 Flink中具体应用

5.1 总述

5.1.1 RM, JM, TM之间关系

系统中有几个ResourceManager?整个 Flink 集群中只有一个 ResourceManager。

系统中有几个JobManager?JobManager 负责管理作业的执行。默认情况下,每个 Flink 集群只有一个 JobManager 实例。JobManager 相当于整个集群的 Master 节点,负责整个集群的任务管理和资源管理。

系统中有几个TaskManager?这个由具体启动方式决定。比如Flink on Yarn,Session模式能够指定拉起多少个TaskManager。 Per job模式中TaskManager数量是在提交作业时根据并发度动态计算,即Number of TM = Parallelism/numberOfTaskSlots。比如:有一个作业,Parallelism为10,numberOfTaskSlots为1,则TaskManager为10。

5.1.2 三者间心跳机制

Flink中ResourceManager、JobMaster、TaskExecutor三者之间存在相互检测的心跳机制:

  • ResourceManager会主动发送请求探测JobMaster、TaskExecutor是否存活。
  • JobMaster也会主动发送请求探测TaskExecutor是否存活,以便进行任务重启或者失败处理。

我们之前讲过,HeartbeatManagerSenderImpl属于Sender,HeartbeatManagerImpl属于Receiver。

  1. HeartbeatManagerImpl所处位置可以理解为client,存在于JobMaster与TaskExecutor中;
  2. HeartbeatManagerSenderImpl类,继承 HeartbeatManagerImpl类,用于周期发送心跳请求,所处位置可以理解为server, 存在于JobMaster、ResourceManager中。

ResourceManager 级别最高,所以两个HM都是Sender,监控taskManager和jobManager

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>extends FencedRpcEndpoint<ResourceManagerId>implements ResourceManagerGateway, LeaderContender {taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSenderjobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
}

JobMaster级别中等,一个Sender, 一个Receiver,受到ResourceManager的监控,监控taskManager。

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSenderresourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager
}

TaskExecutor级别最低,两个Receiver,分别被JM和RM疾控。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManagerthis.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
}

以JobManager和TaskManager为例。JM在启动时会开启周期调度,向已经注册到JM中的TM发起心跳检查,通过RPC调用TM的requestHeartbeat方法,重置对JM超时线程的调用,表示当前JM状态正常。在TM的requestHeartbeat方法被调用后,通过RPC调用JM的receiveHeartbeat,重置对TM超时线程的调用,表示TM状态正常。

5.2 初始化过程

5.2.1 TaskExecutor初始化

TM初始化生成了两个Receiver HM。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {/** The heartbeat manager for job manager in the task manager. */private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;/** The heartbeat manager for resource manager in the task manager. */private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;//初始化函数this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}

生成HeartbeatManager时,就注册了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。

此时,两个HeartbeatManagerImpl中已经创建好对应monitor线程,只有在JM或者RM执行requestHeartbeat后,才会触发该线程的执行。

5.2.2 JobMaster的初始化

JM生成了一个Sender HM,一个Receiver HM。这里会注册 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;private void startHeartbeatServices() {taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(),getMainThreadExecutor(),log);resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceId,new ResourceManagerHeartbeatListener(),getMainThreadExecutor(),log);}
}

5.2.3 ResourceManager初始化

JobMaster在启动时候,会在startHeartbeatServices函数中生成两个Sender HeartbeatManager。

taskManagerHeartbeatManager :HeartbeatManagerSenderImpl对象,会反复启动一个定时器,定时扫描需要探测的对象并且发送心跳请求。

jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,会反复启动一个定时器,定时扫描需要探测的对象并且发送心跳请求。

taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(),getMainThreadExecutor(),log);jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new JobManagerHeartbeatListener(),getMainThreadExecutor(),log);

5.3 注册过程

我们以TM与RM交互为例。TaskExecutor启动之后,需要注册到RM和JM中。

流程图如下:

 * 1. Run in Task Manager**    TaskExecutor.onStart //Life cycle*        |*        +----> startTaskExecutorServices@TaskExecutor*        |     //开始TM服务*        |     *        +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());*        |     // 开始连接到RM*        |     // start by connecting to the ResourceManager*        |    *        +----> notifyLeaderAddress@ResourceManagerLeaderListener  *        |     // 当RM状态变化之后,将回调到这里*        |     // The listener for leader changes of the resource manager.*        |        *        +----> reconnectToResourceManager@TaskExecutor   *        |     // 以下三步调用是渐进的,就是与RM联系。*        |    *        +----> tryConnectToResourceManager@TaskExecutor *        |     *        +----> connectToResourceManager()@TaskExecutor*        |     // 主要作用是生成了 TaskExecutorToResourceManagerConnection*        |  *        +----> start@TaskExecutorToResourceManagerConnection*        |     // 开始RPC调用,将会调用到其基类RegisteredRpcConnection的start*        |   *        +----> start@RegisteredRpcConnection*        |     // RegisteredRpcConnection实现了组件之间注册联系的基本RPC*        |      * ~~~~~~~~ 这里是 Akka RPC* 2. Run in Resource Manager   * 现在程序执行序列到达了RM, 主要是添加一个Target到RM 的 Sender HM;**    registerTaskExecutor@ResourceManager*        |*        +----> taskExecutorGatewayFuture.handleAsync*        |     // 异步调用到这里*        |     *        +----> registerTaskExecutorInternal@ResourceManager*        |     // RM的内部实现,将把TM注册到RM自己这里*        |      *        +----> taskManagerHeartbeatManager.monitorTarget*        |     // 生成HeartbeatMonitor,*        |      *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);*        |     // 把Monitor放到 HM in TM之中,就是说TM开始监控了RM*        |        * ~~~~~~~~ 这里是 Akka RPC* 3. Run in Task Manager* 现在程序回到了TM, 主要是添加一个Target到 TM 的 Receiver HM;**    onRegistrationSuccess@TaskExecutorToResourceManagerConnection*        |   *        |   *        +---->  onRegistrationSuccess@ResourceManagerRegistrationListener *        |       // 回调函数*        |  *        +---->  runAsync(establishResourceManagerConnection) *        |       // 异步执行*        |  *        +---->  establishResourceManagerConnection@TaskExecutor *        |       // 说明已经和RM建立了联系,所以可以开始监控RM了*        |  *        +---->  resourceManagerHeartbeatManager.monitorTarget *        |     // 生成HeartbeatMonitor,*        |      *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);  *        |       // 把 RM 也注册到 TM了*        |       // monitor the resource manager as heartbeat target

下面是具体文字描述。

5.3.1 TM注册到RM中

5.3.1.1 TM的操作
  • TaskExecutor启动之后,调用onStart,开始其生命周期。
  • onStart直接调用startTaskExecutorServices。
  • 启动服务的第一步就是与ResourceManager取得联系,这里注册了一个ResourceManagerLeaderListener(),用来监听RM Leader的变化。
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// resourceManagerLeaderRetriever其实是EmbeddedLeaderService的实现,A simple leader election service, which selects a leader among contenders and notifies listeners.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  • 当得到RM Leader的地址之后,会调用到回调函数notifyLeaderAddress@ResourceManagerLeaderListener,然后调用notifyOfNewResourceManagerLeader。
  • notifyOfNewResourceManagerLeader中获取到RM地址后,就通过reconnectToResourceManager与RM联系。
  • reconnectToResourceManager中间接调用到TaskExecutorToResourceManagerConnection。其作用是建立TaskExecutor 和 ResourceManager之间的联系。因为知道 ResourceManagerGateway所以才能进行RPC操作。
  • 然后在 TaskExecutorToResourceManagerConnection中,就通过RPC与RM联系。
5.3.1.2 RM的操作
  • RPC调用后,程序就来到了RM中,RM做如下操作:
  • 会注册一个新的TaskExecutor到自己的taskManagerHeartbeatManager中。
  • registerTaskExecutor@ResourceManager会通过异步调用到registerTaskExecutorInternal。
  • registerTaskExecutorInternal中首先看看是否这个TaskExecutor的ResourceID之前注册过,如果注册过就移除再添加一个新的TaskExecutor。
  • 通过 taskManagerHeartbeatManager.monitorTarget 开始进行心跳机制的注册。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the// TaskManager}public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);}
});

当注册完成后,RM中的Sender HM内部结构如下,能看出来多了一个Target:

taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} heartbeatPeriod = 10000heartbeatTimeoutIntervalMs = 50000ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af"heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} heartbeatTargets = {ConcurrentHashMap@8875}  size = 1{ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"value = {HeartbeatMonitorImpl@9448} resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"heartbeatTarget = {ResourceManager$2@8868} scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} heartbeatTimeoutIntervalMs = 50000futureTimeout = {ScheduledFutureAdapter@10140} state = {AtomicReference@9786} "RUNNING"lastHeartbeat = 0
5.3.1.3 返回到TM

RM会通过RPC再次回到TaskExecutor,其新执行序列如下:

  • 首先RPC调用到了 onRegistrationSuccess@TaskExecutorToResourceManagerConnection。
  • 然后onRegistrationSuccess@ResourceManagerRegistrationListener中通过异步执行调用到了establishResourceManagerConnection。这说明TM已经和RM建立了联系,所以可以开始监控RM了。
  • 然后和RM操作类似,通过resourceManagerHeartbeatManager.monitorTarget 来把RM注册到自己这里。
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor
heartbeatTargets.put(resourceID, heartbeatMonitor);

当注册完成后,其Receiver HM结构如下:

resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} heartbeatTimeoutIntervalMs = 50000ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2"heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} heartbeatTargets = {ConcurrentHashMap@10427}  size = 1{ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"value = {HeartbeatMonitorImpl@10666} resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"heartbeatTarget = {TaskExecutor$1@10668} scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} heartbeatTimeoutIntervalMs = 50000futureTimeout = {ScheduledFutureAdapter@10992} state = {AtomicReference@10667} "RUNNING"lastHeartbeat = 0

5.3.2 TM注册到 JM

其调用基本思路与之前相同,就是TM和JM之间互相注册一个代表对方的monitor:

JobLeaderListenerImpl ----> establishJobManagerConnection

消息到了JM中,做如下操作。

registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget// monitor the task manager as heartbeat target

5.4 心跳过程

在任务提交之后,我们就进入了正常的心跳监控流程。我们依然用 TM 和 RM进行演示。

我们先给出一个流程图。

 * 1. Run in Resouce Manager**    HeartbeatManagerSender in RM*        |*        +----> run@HeartbeatManagerSenderImpl*        |     //遍历所有监控的Monitor(Target),逐一在Target上调用requestHeartbeat*        |     *        +----> requestHeartbeat@HeartbeatManagerSenderImpl *        |     // 将调用具体监控对象的自定义函数*        |     // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);*        |      *        +----> getHeartbeatListener().retrievePayload   *        |     // 调用到TaskManagerHeartbeatListener@ResourceManager*        |     // 这里是return null;,因为RM不会是任何人的Receiver*        |        *        +----> requestHeartbeat@HeartbeatTarget   *        |     // 调用到Target这里,代码在ResourceManager这里,就是生成Target时候赋值的*        |    *        +----> taskExecutorGateway.heartbeatFromResourceManager*        |     // 会通过gateway RPC 调用到TM,这就是主动对TM发起了心跳请求*        |      * ~~~~~~~~ 这里是 Akka RPC* 2. Run in Task Manager   * 现在程序执行序列到达了TM, 主要是 1. 重置TM的Monitor线程; 2.返回一些负载信息;**    heartbeatFromResourceManager@TaskExecutor*        |*        +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);*        |     //开始要调用到 Receiver HM in Task Manager*        |     *        +----> requestHeartbeat@HeartbeatManager in TM *        |     // 在Receiver HM in Task Manager 这里运行*        |      *        +----> reportHeartbeat@HeartbeatMonitor   *        |     //reportHeartbeat : 记录发起请求的这个时间点,然后resetHeartbeatTimeout*        |      *        +----> resetHeartbeatTimeout@HeartbeatMonitor*        |     // 如果Monitor状态依然是RUNNING,则取消之前设置的ScheduledFuture。*        |     // 重新创建一个ScheduleFuture。因为如果不取消,则之前那个ScheduleFuture运行时*        |     // 会调用HeartbeatMonitorImpl.run函数,run直接compareAndSet后,通知目标函数*        |     // 目前已经超时,即调用heartbeatListener.notifyHeartbeatTimeout。*        |     // 这里代表 JM 状态正常。*        |      *        +---->  heartbeatListener.reportPayload*        |     // 把Target节点的最新的heartbeatPayload通知给heartbeatListener。*        |     // heartbeatListerner是外部传入的,它根据所拥有的节点的心跳记录做监听管理。*        |        *        +---->  heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));*        |      *        |     *        +---->  retrievePayload@ResourceManagerHeartbeatListener in TM   *        |       // heartbeatTarget.receiveHeartbeat参数调用的*        |     *        +---->  return new TaskExecutorHeartbeatPayload*        |   *        |     *        +---->  receiveHeartbeat in TM *        |       // 回到 heartbeatTarget.receiveHeartbeat,这就是TM生成Target的时候的自定义函数*        |       // 就是响应一个心跳消息回给RM*        |     *        +----> resourceManagerGateway.heartbeatFromTaskManager  *        |     // 会通过gateway RPC 调用到 ResourcManager*        |      * ~~~~~~~~ 这里是 Akka RPC* 3. Run in Resouce Manager* 现在程序回到了RM, 主要是 1.重置RM的Monitor线程;2. 上报收到TaskExecutor的负载信息  **    heartbeatFromTaskManager in RM*        |   *        |   *        +---->  taskManagerHeartbeatManager.receiveHeartbeat *        |       // 这是个Sender HM*        |  *        +---->  HeartbeatManagerImpl.receiveHeartbeat  *        |  *        |     *        +---->  HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin);*        | *        |    *        +---->  heartbeatMonitor.reportHeartbeat(); *        |      // 这里就是重置RM 这里对应的Monitor。在reportHeartbeat重置 JM monitor线程的触发,即cancelTimeout取消注册时候的超时定时任务,并且注册下一个超时检测futureTimeout;这代表TM正常执行。*        |     *        +----> heartbeatListener.reportPayload    *        |      //把Target节点的最新的heartbeatPayload通知给 TaskManagerHeartbeatListener。heartbeatListerner是外部传入的,它根据所拥有的节点的心跳记录做监听管理。*        |      *        +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport());*        |     // TaskManagerHeartbeatListener中调用,上报收到TaskExecutor的负载信息*        |

下面是具体文字描述。

5.4.1 ResourceManager主动发起

5.4.1.1 Sender遍历所有监控的Monitor(Target)

心跳机制是由Sender主动发起的。这里就是 ResourceManager 的HeartbeatManagerSenderImpl中定时schedual调用,这里会遍历所有监控的Monitor(Target),逐一在Target上调用requestHeartbeat。

// HeartbeatManagerSenderImpl中的代码@Overridepublic void run() {if (!stopped) {for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {// 这里向被监控对象节点发起一次心跳请求,载荷是heartbeatPayLoad,要求被监控对象回应心跳requestHeartbeat(heartbeatMonitor);}getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);}}
}// 运行时候的变量
this = {HeartbeatManagerSenderImpl@9037} heartbeatPeriod = 10000heartbeatTimeoutIntervalMs = 50000ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95"heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} // 调用栈如下
requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
5.4.1.2 Target进行具体操作

具体监控对象 Target 会调用自定义的requestHeartbeat。

HeartbeatManagerSenderImplprivate void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();// 这里就是具体监控对象heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);}heartbeatTarget = {ResourceManager$2@10688} taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334"this$0 = {StandaloneResourceManager@9458}

请注意,每一个Target都是由ResourceManager生成的。ResourceManager之前注册成为Monitor时候就注册了这个HeartbeatTarget。

这个HeartbeatTarget的定义如下,两个函数是:

  • receiveHeartbeat :这个是空,因为RM没有自己的Sender。

  • requestHeartbeat :这个针对TM,就是调用TM的heartbeatFromResourceManager,当然是通过RPC调用。

5.4.1.3 RPC调用

会调用到ResourceManager定义的函数requestHeartbeat,而requestHeartbeat会通过gateway调用到TM,这就是主动对TM发起了心跳请求。

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {@Overridepublic void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the TaskManager}@Overridepublic void requestHeartbeat(ResourceID resourceID, Void payload) {//就是调用到这里taskExecutorGateway.heartbeatFromResourceManager(resourceID); }
});

5.4.2 RM通过RPC调用TM

通过taskExecutorGateway。心跳程序执行就通过RPC从RM跳跃到了TM。

taskExecutorGateway.heartbeatFromResourceManager 的意义就是:通过RPC调用回到TaskExecutor。这个是在TaskExecutorGateway就定义好的。

// TaskExecutor RPC gateway interface.
public interface TaskExecutorGateway extends RpcGateway

TaskExecutor实现了TaskExecutorGateway,所以具体在TaskExecutor内部实现了接口函数。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {//调用到了这里 ...........resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

TM中,resourceManagerHeartbeatManager 定义如下。

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;

所以下面就是执行TM中的Receiver HM。在这个过程中有两个处理步骤:

  1. 调用对应HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消注册时候的超时定时任务,并且注册下一个超时检测futureTimeout;
  2. 调用monitorTarget的receiveHeartbeat方法,也就是会通过rpc调用JobMaster的heartbeatFromTaskManager方法返回一些负载信息;

具体是调用 requestHeartbeat@HeartbeatManager。在其中会

  • 调用reportHeartbeat@HeartbeatMonitor,记录发起请求的这个时间点,然后resetHeartbeatTimeout。
  • 在resetHeartbeatTimeout@HeartbeatMonitor之中,如果Monitor状态依然是RUNNING,则取消之前设置的ScheduledFuture。重新创建一个ScheduleFuture。因为如果不取消,则之前那个ScheduleFuture运行时会调用HeartbeatMonitorImpl.run函数,run直接compareAndSet后,通知目标函数目前已经超时,即调用heartbeatListener.notifyHeartbeatTimeout。
  • 调用 heartbeatListener.reportPayload,把Target节点的最新的heartbeatPayload通知给heartbeatListener。
  • 调用 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); 就是响应一个心跳消息回给RM。
 @Overridepublic void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);}heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));}}}

最后会通过resourceManagerGateway.heartbeatFromTaskManager 调用到 ResourcManager。

5.4.3 TM 通过RPC回到 RM

JobMaster在接收到rpc请求后调用其heartbeatFromTaskManager方法,会调用taskManagerHeartbeatManager的receiveHeartbeat方法,在这个过程中同样有两个处理步骤:

  1. 调用对应HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消注册时候的超时定时任务,并且注册下一个超时检测futureTimeout;
  2. 调用TaskManagerHeartbeatListener的reportPayload方法,上报收到TaskExecutor的负载信息

至此一次完成心跳过程已经完成,会根据heartbeatInterval执行下一次心跳。

5.5 超时处理

5.5.1 TaskManager

首先,在HeartbeatMonitorImpl中,如果超时,会调用Listener。

public void run() {// The heartbeat has timed out if we're in state runningif (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {heartbeatListener.notifyHeartbeatTimeout(resourceID);}
}

这就来到了ResourceManagerHeartbeatListener,会尝试再次连接RM。

private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {@Overridepublic void notifyHeartbeatTimeout(final ResourceID resourceId) {validateRunsInMainThread();// first check whether the timeout is still validif (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {reconnectToResourceManager(new TaskManagerException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));} else {.....}}

5.5.2 ResourceManager

RM就直接简单粗暴,关闭连接。

private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {@Overridepublic void notifyHeartbeatTimeout(final ResourceID resourceID) {validateRunsInMainThread();closeTaskManagerConnection(resourceID,new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));}
}

0x06 解决问题

心跳机制我们讲解完了,但是我们最初提到的异常应该如何解决呢?在程序最开始生成环境变量时候,通过设置环境变量的配置即可搞定:

Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

0x07 参考

[flink-001]flink的心跳机制

Flink中心跳机制

flink1.8 心跳服务

你有必要了解一下Flink底层RPC使用的框架和原理

flink RPC(akka)

弄清Flink1.8的远程过程调用(RPC)

Apache Flink源码解析 (七)Flink RPC的底层实现

flink源码阅读第一篇—入口

flink-on-yarn 基础架构和启动流程

★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。

[源码解析] 从TimeoutException看Flink的心跳机制相关推荐

  1. HashSet源码解析(最好先看HashMap的源码解析)

    HashMap的源码解析:https://mp.csdn.net/console/editor/html/106188425 HashSet:Java中的一个集合类,该容器不允许包含重复的数值 pub ...

  2. 震撼来袭,阿里高工的源码解析笔记手抄本,看完去怼面试官

    很多程序员一开始在学习上找不到方向,但我想在渡过了一段时间的新手期之后这类问题大多都会变得不再那么明显,工作的方向也会逐渐变得清晰起来. 但是没过多久,能了解到的资料就开始超过每天学习的能力,像是买了 ...

  3. 【Flink图计算源码解析】开篇:Flink图计算总览

    文章目录 1. 图计算的作用 2. 本专题的写作目的 3. Flink Gelly引擎总览 3.1. Gelly的源码结构 1. Graph的存储数据结构 2. 图的类别 3. 图的验证以及指标 4. ...

  4. python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现

    概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...

  5. Fabric v2.0 源码解析——排序节点(Orderer)运行机制

    本文的内容还需进一步丰富,有时间会继续完善. 文章目录 1. Orderer在Fabric网络中的作用 Handle函数 2. Orderer接收的交易类型 ProcessMessage函数 3. 共 ...

  6. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  7. JVM-白话聊一聊JVM类加载和双亲委派机制源码解析

    文章目录 Java 执行代码的大致流程 类加载loadClass的步骤 类加载器和双亲委派机制 sun.misc.Launcher源码解析 Launcher实例化 Launcher 构造函数 双亲委派 ...

  8. QT源码解析(一) QT创建窗口程序、消息循环和WinMain函数

    版权声明 请尊重原创作品.转载请保持文章完整性,并以超链接形式注明原始作者"tingsking18"和主站点地址,方便其他朋友提问和指正. QT源码解析(一) QT创建窗口程序.消 ...

  9. Dubbo第三讲:Dubbo的可扩展机制SPI源码解析

    本文是Dubbo第三讲:Dubbo的可扩展机制SPI源码解析 文章目录 1.Dubbo SPI机制 1.1.Dubbo具有良好拓展性的原因 1.2.Dubbo SPI和Java SPI的区别? 1.3 ...

最新文章

  1. reduce实现filter,map 数组扁平化等
  2. 12.PDE与PTE
  3. springboot 引入jdbc驱动_Spring Boot:企业常用的 Starter以及实现
  4. Java 进程占用 VIRT 虚拟内存超高的问题研究
  5. dm368ipnc 重写架构中的swosd 实现中文osd
  6. 产品经理对企业的价值
  7. 浏览器请求静态资源的流程
  8. 紧急!Log4j2 再再爆雷:刚升级,又连爆 “核弹级” 远程数据泄露 ! v2.17.0 横空出世。。。...
  9. Android入门笔记11
  10. mysql省市区县街道
  11. 微软tfs项目管理工具_研究Project2010+TFS2010项目需求管理功能
  12. 【OpenCV】计算两幅图片视觉差
  13. js将月份转换为英文简写的形式
  14. shiro权限框架中五张基本数据表
  15. Maven五分钟入门
  16. 解决ubuntu20.04播放b站视频,提示“May need to install the required video codecs”问题
  17. salesforce 和 salesforce platform 的License的区别
  18. gloo pytorch_使用Solo Gloo等微服务/ API网关公开在AWS EKS中运行的微服务
  19. 对XP系统中Autorun.inf Autorun.exe以及RECYCLER文件夹的认识
  20. [笔记]SSH 端口转发

热门文章

  1. CSS——空间转换 和 动画
  2. 自适应参数估计+滑模变结构控制高超声速飞行器(源代码)
  3. 正则表达式初级(记忆)
  4. 正确设置IE以使用陕西信合网银
  5. window10安装配置Intel SGX
  6. 什么是集中采购 集中采购管理软件介绍
  7. 多元微积分_stokes定理
  8. 【数模国奖作品解析之一】加拿大各地区温度的时空变化趋势分析
  9. 如何打破迷茫,找到适合自己的职业方向?
  10. 设计软件调节CPU使用率及更改优先级