1.概述

转载:Flink 1.12.2 源码浅析 : StreamTask 浅析

在Task类的doRun方法中, 首先会构建一个运行环境变量RuntimeEnvironment . 然后会调用loadAndInstantiateInvokable方法来加载&实例化task的可执行代码 .

可以看一下loadAndInstantiateInvokable 方法会根据传入的类加载器userCodeClassLoader.asClassLoader()、实例化类的名字nameOfInvokableClass以及构建实例化任务所需要的环境变量信息RuntimeEnvironment.

org.apache.flink.runtime.taskmanager.Task#doRun

private void doRun() {// 初始化状态相关 代码..// 记载执行代码所需要的各种任务相关...// 请求与初始化用户的代码&方法// 构建代码执行所需要的环境变量Environment env =new RuntimeEnvironment(jobId,vertexId,executionId,executionConfig,taskInfo,jobConfiguration,taskConfiguration,userCodeClassLoader,memoryManager,ioManager,broadcastVariableManager,taskStateManager,aggregateManager,accumulatorRegistry,kvStateRegistry,inputSplitProvider,distributedCacheEntries,consumableNotifyingPartitionWriters,inputGates,taskEventDispatcher,checkpointResponder,operatorCoordinatorEventGateway,taskManagerConfig,metrics,this,externalResourceInfoProvider);// 加载&实例化task的可执行代码// now load and instantiate the task's invokable codeinvokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);// 执行代码invokable.invoke();// 其他代码略.......}

在这里,我们看一下实例化的类nameOfInvokableClass的主要的四种类型 .

名称 描述
org.apache.flink.streaming.runtime.tasks.SourceStreamTask Source相关的StreamTask
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask 单输入的StreamTask
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask 两输入的StreamTask
org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask 多输入的StreamTask
org.apache.flink.streaming.runtime.tasks.StreamIterationHead A special {@link StreamTask} that is used for executing feedback edges.
org.apache.flink.streaming.runtime.tasks.StreamIterationTail A special {@link StreamTask} that is used for executing feedback edges.

二 .AbstractInvokable

这是TaskManager可以执行的每个任务的抽象基类。
具体的任务扩展了这个类,例如流式处理和批处理任务。
TaskManager在执行任务时调用{@link#invoke()}方法。
任务的所有操作都在此方法中发生(设置输入输出流读写器以及任务的核心操作)。
所有扩展的类都必须提供构造函数{@code MyTask(Environment,TaskStateSnapshot)}.
为了方便起见,总是无状态的任务也只能实现构造函数{@code MyTask(Environment)}.

开发说明:
虽然构造函数不能在编译时强制执行,但我们还没有冒险引入工厂(毕竟它只是一个内部API,对于java8,可以像工厂lambda一样使用 {@code Class::new} )。

注意:
没有接受初始任务状态快照并将其存储在变量中的构造函数。
这是出于目的,因为抽象调用本身不需要状态快照(只有StreamTask等子类需要状态),我们不希望无限期地存储引用,从而防止垃圾收集器清理初始状态结构。

任何支持可恢复状态并参与检查点设置的子类都需要重写
{@link #triggerCheckpointAsync(CheckpointMetaData, CheckpointOptions, boolean)},
{@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder)},
{@link #abortCheckpointOnBarrier(long, Throwable)}and {@link #notifyCheckpointCompleteAsync(long)}.

2.1. 属性&初始化

AbstractInvokable 抽象类只有两个属性Environment environment和shouldInterruptOnCancel = true

属性

  /*** 分配给此可调用对象的环境。* The environment assigned to this invokable.* */private final Environment environment;/*** 标记取消是否应中断正在执行的线程。* Flag whether cancellation should interrupt the executing thread.* */private volatile boolean shouldInterruptOnCancel = true;

构造方法
构造方法就是传入一个Environment对象.

/*** Create an Invokable task and set its environment.** @param environment The environment assigned to this invokable.*/
public AbstractInvokable(Environment environment) {this.environment = checkNotNull(environment);
}

2.2. Environment

Environment 是AbstractInvokable抽象类(以及子类)的构造函数入参. 在构造Task的时候会把环境参数信息封装成Environment的子类.
交给 任务的实现类(比如: SourceStreamTask 或者 OneInputStreamTask 来处理.)

这个没啥可说的,就是封装了一系列的环境引用信息.

private final JobID jobId;
private final JobVertexID jobVertexId;
private final ExecutionAttemptID executionId;private final TaskInfo taskInfo;private final Configuration jobConfiguration;
private final Configuration taskConfiguration;
private final ExecutionConfig executionConfig;private final UserCodeClassLoader userCodeClassLoader;private final MemoryManager memManager;
private final IOManager ioManager;
private final BroadcastVariableManager bcVarManager;
private final TaskStateManager taskStateManager;
private final GlobalAggregateManager aggregateManager;
private final InputSplitProvider splitProvider;
private final ExternalResourceInfoProvider externalResourceInfoProvider;private final Map<String, Future<Path>> distCacheEntries;private final ResultPartitionWriter[] writers;
private final IndexedInputGate[] inputGates;private final TaskEventDispatcher taskEventDispatcher;private final CheckpointResponder checkpointResponder;
private final TaskOperatorEventGateway operatorEventGateway;private final AccumulatorRegistry accumulatorRegistry;private final TaskKvStateRegistry kvStateRegistry;private final TaskManagerRuntimeInfo taskManagerInfo;
private final TaskMetricGroup metrics;private final Task containingTask;

2.3. 方法清单

核心的方法

名称 描述
invoke Starts the execution
必须被具体的任务实现所覆盖。当任务的实际执行开始时,task manager 将调用此方法。
cancel 当由于用户中止或执行失败而取消任务时,将调用此方法.
它可以被覆盖以响应正确关闭用户代码。
shouldInterruptOnCancel 设置执行{@link #invoke()}方法的线程是否应在取消过程中中断。
此方法为 initial interrupt 和 repeated interrupt 设置标志。
dispatchOperatorEvent 外部影响task执行的入口. Operator Events

Checkpoint相关方法

名称 描述
triggerCheckpointAsync 此方法由检查点协调器异步调用以触发检查点。
triggerCheckpointOnBarrier 在所有 input streams 上接收到检查点屏障而触发检查点时,将调用此方法。
abortCheckpointOnBarrier 在接收一些checkpoint barriers 的结果时, 放弃checkpoint …
notifyCheckpointCompleteAsync 通知checkpoint完成
notifyCheckpointAbortAsync 通知notifyCheckpointAbortAsync取消

三 .StreamTask

所有流式处理任务的基类。
Task是由TaskManager部署和执行的本地处理单元。
每个任务运行一个或多个{@link StreamOperator},这些{@link StreamOperator}构成任务的操作符 chained 。
chained 接在一起的运算符在同一线程中同步执行,因此在同一流分区上执行。
这些chained的常见情况是连续的map/flatmap/filter任务。

任务 chained 包含一个“head”operator和多个 chained operator。

StreamTask专门用于 head operator 的类型:

one-input : OneInputStreamTask
two-input tasks : TwoInputStreamTask
sources : SourceStreamTask
iteration heads : StreamIterationHead
iteration tails : StreamIterationTail

Task类处理由head操作符读取的流的设置,以及操作符在操作符 chained 的末端生成的流。

注意, chained 可能分叉,因此有多个端部。

任务的生命周期设置如下:

 <pre>{@code-- setInitialState -> 提供chain中所有operators的状态-- invoke()|+----> Create basic utils (config, etc) and load the chain of operators+----> operators.setup()+----> task specific init()+----> initialize-operator-states()+----> open-operators()+----> run()+----> close-operators()+----> dispose-operators()+----> common cleanup+----> task specific cleanup()}</pre>

{@code StreamTask}有一个名为{@code lock}的锁对象。
必须在此锁对象上同步对{@code StreamOperator}上方法的所有调用,以确保没有方法被并发调用。

3.1. 属性& 构造方法

属性相关

  /** The thread group that holds all trigger timer threads. */public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");/** The logger used by the StreamTask and its subclasses. */protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);// ------------------------------------------------------------------------/*** 任务之外的所有操作{@link #mailboxProcessor mailbox} , 比如 (i.e. 另一个线程执行)* 必须通过此执行器执行,以确保没有使一致检查点无效的并发方法调用。**** All actions outside of the task {@link #mailboxProcessor mailbox}* (i.e. performed by another thread)* must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/private final StreamTaskActionExecutor actionExecutor;/***  输入处理器。在{@link #init()}方法中初始化。*  The input processor. Initialized in {@link #init()} method. */@Nullable protected StreamInputProcessor inputProcessor;/*** [重要] 使用此任务的输入流的主运算符。* the main operator that consumes the input streams of this task.* */protected OP mainOperator;/*** task执行的 OperatorChain* The chain of operators executed by this task. */protected OperatorChain<OUT, OP> operatorChain;/*** streaming task的配置信息.* The configuration of this streaming task. */protected final StreamConfig configuration;/*** 我们的状态后端。** 我们使用它来创建检查点流和 keyed 状态后端。* Our state backend. We use this to create checkpoint streams and a keyed state backend. */protected final StateBackend stateBackend;/*** 子任务 Checkpoint 协调器*/private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;/*** 内部{@link TimerService}用于定义当前处理时间(默认值={@code System.currentTimeMillis()})* 并为将来要执行的任务注册计时器。** The internal {@link TimerService} used to define the current processing time (default =* {@code System.currentTimeMillis()}) and register timers for tasks to be executed in the* future.*/protected final TimerService timerService;/*** 当前活动的后台具体线程* The currently active background materialization threads.* */private final CloseableRegistry cancelables = new CloseableRegistry();/*** 异常处理相关*/private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;/*** 将任务标记为“操作中”的标志,在这种情况下,需要将check初始化为true,* 以便invoke()之前的early cancel()正常工作。** Flag to mark the task "in operation", in which case check needs to be initialized to true, so* that early cancel() before invoke() behaves correctly.*/private volatile boolean isRunning;/*** 标识任务被取消.* Flag to mark this task as canceled. */private volatile boolean canceled;/*** 标识任务失败, 比如在invoke方法中发生异常...** Flag to mark this task as failing, i.e. if an exception has occurred inside {@link* #invoke()}.*/private volatile boolean failing;/*** ???? 干啥的*/private boolean disposedOperators;/** Thread pool for async snapshot workers. */private final ExecutorService asyncOperationsThreadPool;private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;protected final MailboxProcessor mailboxProcessor;final MailboxExecutor mainMailboxExecutor;/** TODO it might be replaced by the global IO executor on TaskManager level future. */private final ExecutorService channelIOExecutor;private Long syncSavepointId = null;private Long activeSyncSavepointId = null;private long latestAsyncCheckpointStartDelayNanos;

构造方法就是普通的赋值操作, 需要注意的是 this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);

   protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);this.mailboxProcessor.initMetric(environment.getMetricGroup());this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);this.asyncOperationsThreadPool =Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));this.stateBackend = createStateBackend();// ????????????this.subtaskCheckpointCoordinator =new SubtaskCheckpointCoordinatorImpl(stateBackend.createCheckpointStorage(getEnvironment().getJobID()),getName(),actionExecutor,getCancelables(),getAsyncOperationsThreadPool(),getEnvironment(),this,configuration.isUnalignedCheckpointsEnabled(),this::prepareInputSnapshot);// if the clock is not already set, then assign a default TimeServiceProviderif (timerService == null) {ThreadFactory timerThreadFactory =new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());this.timerService =new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);} else {this.timerService = timerService;}this.channelIOExecutor =Executors.newSingleThreadExecutor(new ExecutorThreadFactory("channel-state-unspilling"));injectChannelStateWriterIntoChannels();}

3.2. invoke

invoke是Task的核心方法, 看下都干了啥…

Invoke之前 : beforeInvoke();
Invoke: runMailboxLoop();
Invoke之后: afterInvoke();
清理: cleanUpInvoke();
// map之类的算子...@Overridepublic final void invoke() throws Exception {try {// 初始化行管...beforeInvoke();// final check to exit early before starting to runif (canceled) {throw new CancelTaskException();}// [核心] 执行任务...// let the task do its workrunMailboxLoop();// if this left the run() method cleanly despite the fact that this was canceled,// make sure the "clean shutdown" is not attemptedif (canceled) {throw new CancelTaskException();}afterInvoke();} catch (Throwable invokeException) {failing = !canceled;try {cleanUpInvoke();}// TODO: investigate why Throwable instead of Exception is used here.catch (Throwable cleanUpException) {Throwable throwable =ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);ExceptionUtils.rethrowException(throwable);}ExceptionUtils.rethrowException(invokeException);}cleanUpInvoke();}

3.2.1. beforeInvoke

构造: OperatorChain 和 执行 实例化Task类的初始化init方法…

protected void beforeInvoke() throws Exception {disposedOperators = false;// Initializing Source: Socket Stream -> Flat Map (1/1)#0.// Initializing Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)#0.LOG.debug("Initializing {}.", getName());operatorChain = new OperatorChain<>(this, recordWriter);//    mainOperator = {StreamSource@6752}//        ctx = null//        canceledOrStopped = false//        hasSentMaxWatermark = false//        userFunction = {SocketTextStreamFunction@6754}//        functionsClosed = false//        chainingStrategy = {ChainingStrategy@6755} "HEAD"//        container = {SourceStreamTask@6554} "Source: Socket Stream -> Flat Map (1/1)#0"//        config = {StreamConfig@6756} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Source: Socket Stream-1 -> Flat Map-2, typeNumber=0, outputPartitioner=FORWARD, bufferTimeout=-1, outputTag=null)]\nOperator: SimpleUdfStreamOperatorFactory\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{2=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 1\nOutput names: [(Flat Map-2 -> Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-4, typeNumber=0, outputPartitioner=HASH, bufferTimeout=-1, outputTag=null)]\nPartitioning:\n\t4: HASH\nChained subtasks: []\nOperator: SimpleUdfStreamOperatorFactory\nState Monitoring: false}"//        output = {CountingOutput@6757}//        runtimeContext = {StreamingRuntimeContext@6758}//        stateKeySelector1 = null//        stateKeySelector2 = null//        stateHandler = null//        timeServiceManager = null//        metrics = {OperatorMetricGroup@6759}//        latencyStats = {LatencyStats@6760}//        processingTimeService = {ProcessingTimeServiceImpl@6761}//        combinedWatermark = -9223372036854775808//        input1Watermark = -9223372036854775808//        input2Watermark = -9223372036854775808mainOperator = operatorChain.getMainOperator();// 执行任务初始化操作.// task specific initializationinit();// save the work of reloading state, etc, if the task is already canceledif (canceled) {throw new CancelTaskException();}// -------- Invoke --------// Invoking Source: Socket Stream -> Flat Map (1/1)#0LOG.debug("Invoking {}", getName());// 我们需要确保open()中安排的所有触发器在所有操作符打开之前都不能执行// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are openedactionExecutor.runThrowing(() -> {SequentialChannelStateReader reader =getEnvironment().getTaskStateManager().getSequentialChannelStateReader();// TODO: for UC rescaling, reenable notifyAndBlockOnCompletion for non-iterative// jobsreader.readOutputData(getEnvironment().getAllWriters(), false);operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());channelIOExecutor.execute(() -> {try {reader.readInputData(getEnvironment().getAllInputGates());} catch (Exception e) {asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);}});for (InputGate inputGate : getEnvironment().getAllInputGates()) {inputGate.getStateConsumedFuture().thenRun(() ->mainMailboxExecutor.execute(inputGate::requestPartitions,"Input gate request partitions"));}});isRunning = true;
}

3.2.2. runMailboxLoop

 public void runMailboxLoop() throws Exception {// runMailboxLoop ??//mailboxProcessor.runMailboxLoop();}// 运行邮箱处理循环。 这是完成主要工作的地方。/** Runs the mailbox processing loop. This is where the main work is done. */public void runMailboxLoop() throws Exception {final TaskMailbox localMailbox = mailbox;Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController defaultActionContext = new MailboxController(this);// 邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务...while (isMailboxLoopRunning()) {// 在默认操作可用之前,阻塞的`processMail`调用将不会返回。// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isMailboxLoopRunning()) {// 邮箱默认操作在StreamTask构造器中指定,为 processInputmailboxDefaultAction.runDefaultAction(// 根据需要在默认操作中获取锁// lock is acquired inside default action as neededdefaultActionContext);}}}

3.2.3. afterInvoke

   protected void afterInvoke() throws Exception {LOG.debug("Finished task {}", getName());getCompletionFuture().exceptionally(unused -> null).join();final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();// 以 chain effect 方式关闭所有运算符// close all operators in a chain effect wayoperatorChain.closeOperators(actionExecutor);// 确保没有进一步的检查点和通知操作发生。//同时,这可以确保在任何“常规”出口时// make sure no further checkpoint and notification actions happen.// at the same time, this makes sure that during any "regular" exit where stillactionExecutor.runThrowing(() -> {// 确保没有新的计时器// make sure no new timers can comeFutureUtils.forward(timerService.quiesce(), timersFinishedFuture);// 让邮箱执行拒绝从这一点开始的所有新信件// let mailbox execution reject all new letters from this pointmailboxProcessor.prepareClose();// 仅在关闭所有运算符后将StreamTask设置为not running!// only set the StreamTask to not running after all operators have been closed!// See FLINK-7430isRunning = false;});// 处理剩余邮件;无法排队发送新邮件// processes the remaining mails; no new mails can be enqueuedmailboxProcessor.drain();// 确保所有计时器都完成// make sure all timers finishtimersFinishedFuture.get();LOG.debug("Closed operators for task {}", getName());// 确保刷新了所有缓冲数据// make sure all buffered data is flushedoperatorChain.flushOutputs();// 尝试释放操作符,使dispose调用中的失败仍然会导致计算失败// make an attempt to dispose the operators such that failures in the dispose call// still let the computation faildisposeAllOperators();}

3.2.4. cleanUpInvoke

释放各种资源…

protected void cleanUpInvoke() throws Exception {getCompletionFuture().exceptionally(unused -> null).join();// clean up everything we initializedisRunning = false;// Now that we are outside the user code, we do not want to be interrupted further// upon cancellation. The shutdown logic below needs to make sure it does not issue calls// that block and stall shutdown.// Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager// process) as a backup in case some shutdown procedure blocks outside our control.setShouldInterruptOnCancel(false);// clear any previously issued interrupt for a more graceful shutdownThread.interrupted();// stop all timers and threadsException suppressedException =runAndSuppressThrowable(this::tryShutdownTimerService, null);// stop all asynchronous checkpoint threadssuppressedException = runAndSuppressThrowable(cancelables::close, suppressedException);suppressedException =runAndSuppressThrowable(this::shutdownAsyncThreads, suppressedException);// we must! perform this cleanupsuppressedException = runAndSuppressThrowable(this::cleanup, suppressedException);// if the operators were not disposed before, do a hard disposesuppressedException =runAndSuppressThrowable(this::disposeAllOperators, suppressedException);// release the output resources. this method should never fail.suppressedException =runAndSuppressThrowable(this::releaseOutputResources, suppressedException);suppressedException =runAndSuppressThrowable(channelIOExecutor::shutdown, suppressedException);suppressedException = runAndSuppressThrowable(mailboxProcessor::close, suppressedException);if (suppressedException != null) {throw suppressedException;}
}

3.3. Checkpoint相关

Checkpoint相关方法

名称 描述
triggerCheckpointAsync 此方法由检查点协调器异步调用以触发检查点。
triggerCheckpointOnBarrier 在所有 input streams 上接收到检查点屏障而触发检查点时,将调用此方法。
abortCheckpointOnBarrier 在接收一些checkpoint barriers 的结果时, 放弃checkpoint …
notifyCheckpointCompleteAsync 通知checkpoint完成
notifyCheckpointAbortAsync 通知notifyCheckpointAbortAsync取消

3.3.1. triggerCheckpointAsync

通过akka通知触发Checkpoint操作.

@Override
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {CompletableFuture<Boolean> result = new CompletableFuture<>();mainMailboxExecutor.execute(() -> {latestAsyncCheckpointStartDelayNanos =1_000_000* Math.max(0,System.currentTimeMillis()- checkpointMetaData.getTimestamp());try {// 触发Checkpoint操作result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions));} catch (Exception ex) {// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throw ex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);return result;
}

triggerCheckpoint
triggerCheckpoint会调用performCheckpoint开始执行Checkpoint .

private boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)throws Exception {try {// 如果我们注入检查点,则不对齐// No alignment if we inject a checkpointCheckpointMetricsBuilder checkpointMetrics =new CheckpointMetricsBuilder().setAlignmentDurationNanos(0L).setBytesProcessedDuringAlignment(0L);// 初始化CheckpointsubtaskCheckpointCoordinator.initCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);// 执行 Checkpoint操作...boolean success =performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId());}return success;} catch (Exception e) {// propagate exceptions only if the task is still in "running" stateif (isRunning) {throw new Exception("Could not perform checkpoint "+ checkpointMetaData.getCheckpointId()+ " for operator "+ getName()+ '.',e);} else {LOG.debug("Could not perform checkpoint {} for operator {} while the "+ "invokable was not in state running.",checkpointMetaData.getCheckpointId(),getName(),e);return false;}}
}
   private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics)throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(),checkpointOptions.getCheckpointType(),getName());if (isRunning) {actionExecutor.runThrowing(() -> {if (checkpointOptions.getCheckpointType().isSynchronous()) {setSynchronousSavepointId(checkpointMetaData.getCheckpointId(),checkpointOptions.getCheckpointType().shouldIgnoreEndOfInput());if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) {advanceToEndOfEventTime();}} else if (activeSyncSavepointId != null&& activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {activeSyncSavepointId = null;operatorChain.setIgnoreEndOfInput(false);}// 交由subtaskCheckpointCoordinator 进行checkpointState 操作...subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {actionExecutor.runThrowing(() -> {// we cannot perform our checkpoint - let the downstream operators know that// they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain',// because it may not// yet be createdfinal CancelCheckpointMarker message =new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());recordWriter.broadcastEvent(message);});return false;}}

3.3.2. triggerCheckpointOnBarrier

@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics)throws IOException {try {// 执行Checkpointif (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics)) {if (isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) {runSynchronousSavepointMailboxLoop();}}} catch (CancelTaskException e) {LOG.info("Operator {} was cancelled while performing checkpoint {}.",getName(),checkpointMetaData.getCheckpointId());throw e;} catch (Exception e) {throw new IOException("Could not perform checkpoint "+ checkpointMetaData.getCheckpointId()+ " for operator "+ getName()+ '.',e);}
}

3.3.3. abortCheckpointOnBarrier

取消CheckpointOnBarrier

  @Overridepublic void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {resetSynchronousSavepointId(checkpointId, false);subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);}

3.3.4. notifyCheckpointCompleteAsync

@Override
public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return notifyCheckpointOperation(() -> notifyCheckpointComplete(checkpointId),String.format("checkpoint %d complete", checkpointId));
}

3.3.5. notifyCheckpointAbortAsync

  @Overridepublic Future<Void> notifyCheckpointAbortAsync(long checkpointId) {return notifyCheckpointOperation(() -> {resetSynchronousSavepointId(checkpointId, false);subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning);},String.format("checkpoint %d aborted", checkpointId));}

3.4. dispatchOperatorEvent

@Override
public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event)throws FlinkException {try {mainMailboxExecutor.execute(() -> operatorChain.dispatchOperatorEvent(operator, event),"dispatch operator event");} catch (RejectedExecutionException e) {// this happens during shutdown, we can swallow this}
}

3.5. injectChannelStateWriterIntoChannels

建立 channels, 操作的 的输入和输出进行打通.

 private void injectChannelStateWriterIntoChannels() {final Environment env = getEnvironment();final ChannelStateWriter channelStateWriter =subtaskCheckpointCoordinator.getChannelStateWriter();for (final InputGate gate : env.getAllInputGates()) {gate.setChannelStateWriter(channelStateWriter);}for (ResultPartitionWriter writer : env.getAllWriters()) {if (writer instanceof ChannelStateHolder) {((ChannelStateHolder) writer).setChannelStateWriter(channelStateWriter);}}}

四. 处理数据

在invoke方法中的runMailboxLoop 会调用mailboxProcessor.runMailboxLoop();
获取默认的 mailboxDefaultAction 执行runDefaultAction 操作…

 // 运行邮箱处理循环。 这是完成主要工作的地方。/** Runs the mailbox processing loop. This is where the main work is done. */public void runMailboxLoop() throws Exception {final TaskMailbox localMailbox = mailbox;Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController defaultActionContext = new MailboxController(this);// 邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务...while (isMailboxLoopRunning()) {// 在默认操作可用之前,阻塞的`processMail`调用将不会返回。// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isMailboxLoopRunning()) {// 邮箱默认操作在StreamTask构造器中指定,为 processInputmailboxDefaultAction.runDefaultAction(// 根据需要在默认操作中获取锁// lock is acquired inside default action as neededdefaultActionContext);}}}

在这里有个疑问, runDefaultAction是什么.
在StreamTask构造方法中构造MailboxProcessor的时候, 有指定默认的runDefaultAction

// todo [重点关注]
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);

所以默认的实现是processInput .

processInput方法实现任务的默认操作(例如,从输入中处理一个事件)。实现应该(通常)是非阻塞的。

   /**** 此方法实现任务的默认操作(例如,处理来自输入的一个事件)。 (通常)实现应是非阻塞的。** This method implements the default action of the task (e.g. processing one event from the* input). Implementations should (in general) be non-blocking.**  控制器对象,用于操作和流任务之间的协作交互。* @param controller controller object for collaborative interaction between the action and the stream task.** @throws Exception on any problems in the action.*/protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {// 获取 输入 Processor// 有三种 :// StreamOneInputProcessor// StreamTwoInputProcessor// StreamMultipleInputProcessorInputStatus status = inputProcessor.processInput();if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));}
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor#processInput@Overridepublic InputStatus processInput() throws Exception {// StreamTaskInput#emitNext ???//  input 直接发送数据给 output// StreamTaskNetworkInput#emitNext// org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#emitNextInputStatus status = input.emitNext(output);if (status == InputStatus.END_OF_INPUT) {endOfInputAware.endInput(input.getInputIndex() + 1);}return status;}
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#emitNext@Overridepublic InputStatus emitNext(DataOutput<T> output) throws Exception {while (true) {// get the stream element from the deserializerif (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}if (result.isFullRecord()) {//todo  processElementprocessElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();if (bufferOrEvent.isPresent()) {// return to the mailbox after receiving a checkpoint barrier to avoid processing of// data after the barrier before checkpoint is performed for unaligned checkpoint modeif (bufferOrEvent.get().isBuffer()) {processBuffer(bufferOrEvent.get());} else {processEvent(bufferOrEvent.get());return InputStatus.MORE_AVAILABLE;}} else {if (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}}
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#processElement// 处理任务...private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {if (recordOrMark.isRecord()) {//  [ 重点 ]  如果是数据// OneInputStreamTask $ StreamTaskNetworkOutput#emitRecordoutput.emitRecord(recordOrMark.asRecord());} else if (recordOrMark.isWatermark()) {// 如果是 Watermark ...statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);} else if (recordOrMark.isLatencyMarker()) {// 如果是 迟到的数据output.emitLatencyMarker(recordOrMark.asLatencyMarker());} else if (recordOrMark.isStreamStatus()) {// 如果是 StreamStatusstatusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(),flattenedChannelIndices.get(lastChannel),output);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}}

如果是 map 算子, emitRecord 应该在 OneInputStreamTask.java 调用

@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {numRecordsIn.inc();operator.setKeyContextElement1(record);// 转换操作// 如果是map之类的算子, processElement应该在 StreamMap.java调用operator.processElement(record);}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {// userFunction.map(element.getValue()) 就是用户定义的MapFunction里面的map方法// 将element.getValue() 用用户自定义的map方法里面的内容进行处理...output.collect(element.replace(userFunction.map(element.getValue())));
}

【flink】Flink 1.12.2 源码浅析 : StreamTask 浅析相关推荐

  1. 【flink】Flink 1.12.2 源码浅析 : Task数据输入

    1.概述 转载:Flink 1.12.2 源码浅析 : Task数据输入 在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应 ...

  2. 【flink】Flink 1.12.2 源码浅析 :Task数据输出

    1.概述 转载:Flink 1.12.2 源码浅析 :Task数据输出 Stream的计算模型采用的是PUSH模式, 上游主动向下游推送数据, 上下游之间采用生产者-消费者模式, 下游收到数据触发计算 ...

  3. 【flink】Flink 1.12.2 源码浅析 : Task 浅析

    1.概述 转载:Flink 1.12.2 源码浅析 : Task 浅析 Task 表示TaskManager上并行 subtask 的一次执行. Task封装了一个Flink operator(也可能 ...

  4. 【Flink】Flink 1.12.2 源码浅析 : TaskExecutor

    1.概述 转载:Flink 1.12.2 源码浅析 : TaskExecutor TaskExecutor 是TaskManger的具体实现. 二 .TaskExecutorGateway TaskE ...

  5. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...

  6. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...

  7. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  8. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文.这里是补充专栏.请看原文 2. 前言 主要针对yarn-per-job模式进行代码分析. ...

  9. Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二]

    . 一 .前言 二 .启动解析 2.1. StreamExecutionEnvironment#execute 2.2. StreamExecutionEnvironment#executeAsync ...

最新文章

  1. java跳槽原因_跳槽求职必看:Java程序猿面试失败的5大原因!
  2. Linux Shell编程(4)——shell特殊字符(上)
  3. php扩展管理配置信息,PHP扩展管理 - 城市之雾的个人空间 - OSCHINA - 中文开源技术交流社区...
  4. mysql id 不在集合里面_MySQL,PHP:从表中选择*,其中id不在数组中
  5. IT前景---网络工程师规划之路
  6. nginx 报错 upstream timed out (110: Connection timed out)解决方案
  7. pmos低电平驱动_三极管和MOS管驱动电路的正确用法
  8. 有害评论识别问题:数据可视化与频率词云
  9. 山石防火墙CLI创建VLAN
  10. 树莓派搭建文件服务器
  11. Pr 入门教程如何减少音频中的噪音和混响?
  12. 环境保护设施运营组织服务认证 认证专业分类及运营设施范围
  13. Maxima在线性代数的应用
  14. 360安全卫士和火绒之间的事
  15. C#编程, FTP文件上传、下载、重命名公共类
  16. JDK安装和环境变量配置(Win10图文详解)
  17. html------个人简历表
  18. 用python写个程序送给女朋友_用 Python 哄女朋友开心!你觉得可行嘛?
  19. [源代码]基于D-S证据理论的雷达探测信息融合
  20. 华为wlan旁挂三层组网隧道转发

热门文章

  1. 《哈里波特:魔法觉醒》被指侮辱女性玩家?官方回应:动画BUG
  2. 威马汽车否认接盘ST众泰:没有任何兴趣参与
  3. 华为P50 Pro最新渲染图曝光 预装鸿蒙HarmonyOS 2
  4. 小米11 Pro概念图曝光:曲面挖孔屏+后置五摄相机模组
  5. AMD官宣350亿美元收购赛灵思 赛灵思大涨8.56%
  6. iPhone 12顶配版延期到10月:刘海仍在 后置3摄+雷达
  7. 小米大杀器稳了?队友泄露小米MIX4 5G预售页面...
  8. 刘作虎亲曝一加7T外包装盒:里里外外重新设计
  9. MIUI 11或随小米MIX4共同发布 登场时间9月至10月间
  10. 5G套餐月资费感受下:最低325元 仅提供8GB数据流量