1.概述

转载:Flink 源码之OperatorChain

前言

OperatorChain是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot中串联完成,从而最小化线程执行上下文切换和网络通信,提高流计算系统性能。

Flink判断哪些操作可以纳入同一个chain的逻辑位于JobGraph生成过程中,详情请参见:Flink 源码之JobGraph生成。

名词解释

  • StreamEdge:为StreamGraph拓扑图中的元素。StreamGraph由StreamNode和StreamEdge构成DAG图。详情参见 Flink 源码之StreamGraph生成。
  • RecordWriterOutput:一种operator输出(Output)类型,用于将数据通过RecordWriter输出到ResultPartition。
  • ChainingOutput:和RecordWriterOutput类似,也是一种operator输出类型,只不过ChainingOutput是在OperatorChain中专用的。它作为桥梁,将上游operator处理过得数据交给下游的operator。后面章节有详细分析。
  • TypeSerializer:用于从DataInputView读取字节数组并反序列化为T类型,或者是将T类型序列化为DataOutputView。其中DataInputView和DataOutputView均直接操纵字节数组,这些字节数组的实际存储由MemorySegment支撑。
  • StreamOperatorWrapper:用于包装StreamOperator,OperatorChain专用。它具有两个指针,分别指向前后两个operator,形成一个双向链表。Chain的概念由此而来。
    接下来我们从OperatorChain的构造方法开始展开分析。

构造方法

OperatorChain在StreamTask中beforeInvoke方法构建出来(参见 Flink 源码之StreamTask)。获取chain到一起的operator(为OperatorChain中的mainOperator,如何生成chain到一起的operator的具体过程后面有分析),有数据到来的时候数据便交由mainOperator来处理。

OperatorChain的构造函数和分析如下所示:

public OperatorChain(StreamTask<OUT, OP> containingTask,RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {// 创建发送和接收OperatorEvent的Dispatcherthis.operatorEventDispatcher =new OperatorEventDispatcherImpl(containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),containingTask.getEnvironment().getOperatorCoordinatorEventGateway());// 获取用户代码类加载器final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();// 获取任务的配置final StreamConfig configuration = containingTask.getConfiguration();// 获取StreamTask的StreamOperator工厂StreamOperatorFactory<OUT> operatorFactory =configuration.getStreamOperatorFactory(userCodeClassloader);// we read the chained configs, and the order of record writer registrations by output name// 获取OperatorChain中所有StreamOperator对应的StreamConfig,map的key为vertexIDMap<Integer, StreamConfig> chainedConfigs =configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);// create the final output stream writers// we iterate through all the out edges from this job vertex and create a stream output// 按照数据流顺序,获取各个任务的StreamEdgeList<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =new HashMap<>(outEdgesInOrder.size());this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];// from here on, we need to make sure that the output writers are shut down again on failureboolean success = false;try {// 创建链式输出// 用于初始化streamOutputMap变量// streamOutputMap保存了每步操作的StreamEdge和output的对应关系createChainOutputs(outEdgesInOrder,recordWriterDelegate,chainedConfigs,containingTask,streamOutputMap);// we create the chain of operators and grab the collector that leads into the chain// 创建包含所有operatorWrapper的集合List<StreamOperatorWrapper<?, ?>> allOpWrappers =new ArrayList<>(chainedConfigs.size());// 创建mainOperator对应的output// OperatorChain的入口Operator为mainOperator// 这个operator通过ChainingOutput按照数据流向顺序串联了OperatorChain中的所有operatorthis.mainOperatorOutput =createOutputCollector(containingTask,configuration,chainedConfigs,userCodeClassloader,streamOutputMap,allOpWrappers,containingTask.getMailboxExecutorFactory());if (operatorFactory != null) {// 创建mainOperator和时间服务Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =StreamOperatorFactoryUtil.createOperator(operatorFactory,containingTask,configuration,mainOperatorOutput,operatorEventDispatcher);OP mainOperator = mainOperatorAndTimeService.f0;// 设置Watermark监控项mainOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,mainOperatorOutput.getWatermarkGauge());// 创建mainOperatorWrapperthis.mainOperatorWrapper =createOperatorWrapper(mainOperator,containingTask,configuration,mainOperatorAndTimeService.f1,true);// add main operator to end of chain// 将mainOperatorWrapper添加到chain的最后allOpWrappers.add(mainOperatorWrapper);// createOutputCollector方法将各个operator包装到operatorWrapper中// 按照数据流相反的顺序加入到allOpWrappers集合// 所以,尾部的operatorWrapper就是index为0的元素this.tailOperatorWrapper = allOpWrappers.get(0);} else {// 如果OperatorFactory为nullcheckState(allOpWrappers.size() == 0);this.mainOperatorWrapper = null;this.tailOperatorWrapper = null;}// 创建chain数据源this.chainedSources =createChainedSources(containingTask,configuration.getInputs(userCodeClassloader),chainedConfigs,userCodeClassloader,allOpWrappers);this.numOperators = allOpWrappers.size();// 将所有的StreamOperatorWrapper按照从上游到下游的顺序,形成双向链表firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);success = true;} finally {// make sure we clean up after ourselves in case of a failure after acquiring// the first resourcesif (!success) {for (RecordWriterOutput<?> output : this.streamOutputs) {if (output != null) {output.close();}}}}
}

createChainOutputs

createChainOutputs方法作用为生成并保存每个StreamEdge和streamOutput的对应关系。代码如下所示:

private void createChainOutputs(List<StreamEdge> outEdgesInOrder,RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,Map<Integer, StreamConfig> chainedConfigs,StreamTask<OUT, OP> containingTask,Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {// 遍历已排序的StreamEdgefor (int i = 0; i < outEdgesInOrder.size(); i++) {StreamEdge outEdge = outEdgesInOrder.get(i);// 创建streamOutputRecordWriterOutput<?> streamOutput =createStreamOutput(recordWriterDelegate.getRecordWriter(i),outEdge,chainedConfigs.get(outEdge.getSourceId()),containingTask.getEnvironment());// 更新streamOutput数组this.streamOutputs[i] = streamOutput;// 保存每个StreamEdge和streamOutput的对应关系streamOutputMap.put(outEdge, streamOutput);}
}

接着继续分析createStreamOutput方法:

private RecordWriterOutput<OUT> createStreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,StreamEdge edge,StreamConfig upStreamConfig,Environment taskEnvironment) {// 获取Output标签,如果没有配置旁路输出,没有OutputTagOutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutputTypeSerializer outSerializer = null;// 根据是否为旁路输出,获取对应的类型序列化器if (edge.getOutputTag() != null) {// side outputoutSerializer =upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(),taskEnvironment.getUserCodeClassLoader().asClassLoader());} else {// main outputoutSerializer =upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserCodeClassLoader().asClassLoader());}// 返回创建的RecordWriterOutputreturn new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}

createOutputCollector

这个方法是chain的主要逻辑所在。我们重点分析。createOutputCollector方法分析如下:

private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> containingTask,StreamConfig operatorConfig,Map<Integer, StreamConfig> chainedConfigs,ClassLoader userCodeClassloader,Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,MailboxExecutorFactory mailboxExecutorFactory) {List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =new ArrayList<>(4);// create collectors for the network outputs// 遍历非链式StreamEdge,非链式的StreamEdge输出需要走网络连接// 因此生成的Output类型为RecordWriterOutputfor (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {@SuppressWarnings("unchecked")// 从上一步createChainOutputs方法返回的streamOutputs中获取StreamEdge对应的outputRecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);// 加入到allOutputs集合中allOutputs.add(new Tuple2<>(output, outputEdge));}// Create collectors for the chained outputs// 获取该Operator对应的所有chained StreamEdge// 如果这个Operator具有多个chained的下游,这里会获取到多个outEdgefor (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {int outputId = outputEdge.getTargetId();// 获取这个outputEdge对应的StreamConfigStreamConfig chainedOpConfig = chainedConfigs.get(outputId);// 根据StreamEdge生成streamOutput,为WatermarkGaugeExposingOutput类型// WatermarkGaugeExposingOutput包装了Output和一个监控watermark的仪表盘// 如果存在可以chain的operator,需要递归调用,将下游与上游链接起来WatermarkGaugeExposingOutput<StreamRecord<T>> output =createOperatorChain(containingTask,chainedOpConfig,chainedConfigs,userCodeClassloader,streamOutputs,allOperatorWrappers,outputEdge.getOutputTag(),mailboxExecutorFactory);// 将其加入allOutputs集合中allOutputs.add(new Tuple2<>(output, outputEdge));}// 如果输出只有一个,返回这个输出if (allOutputs.size() == 1) {return allOutputs.get(0).f0;} else {// send to N outputs. Note that this includes the special case// of sending to zero outputs// 如果有多个输出,将allOutputs转换为Output类型数组@SuppressWarnings({"unchecked"})Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];for (int i = 0; i < allOutputs.size(); i++) {asArray[i] = allOutputs.get(i).f0;}// This is the inverse of creating the normal ChainingOutput.// If the chaining output does not copy we need to copy in the broadcast output,// otherwise multi-chaining would not work correctly.// 根据配置中对象是否可重用,创建不同的OutputCollectorif (containingTask.getExecutionConfig().isObjectReuseEnabled()) {// 在StreamRecord发往下游的时候实际发送的是StreamRecord的浅拷贝// 避免使用深拷贝,从而提高性能,但是需要注意如果开启ObjectReuse// 避免在下游改变流数据元素的值,否则会出现线程安全问题return new CopyingBroadcastingOutputCollector<>(asArray, this);} else {return new BroadcastingOutputCollector<>(asArray, this);}}
}

然后需要分析createOperatorChain方法。它将OperatorChain中所有的Operator包装为StreamOperatorWrapper类型,按照数据流反方向存入allOperatorWrappers集合。根据operator的顺序,依次生成ChainingOutput,将各个operator数据流串联起来。该方法内容如下:

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(StreamTask<OUT, ?> containingTask,StreamConfig operatorConfig,Map<Integer, StreamConfig> chainedConfigs,ClassLoader userCodeClassloader,Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,OutputTag<IN> outputTag,MailboxExecutorFactory mailboxExecutorFactory) {// create the output that the operator writes to first. this may recursively create more// operators// 这里的operatorConfig为前一个方法中每次遍历的chainedOpConfig// 这里存在一个递归调用,将下游outEdge对应的StreamConfig作为参数,再次调用createOutputCollector// 最终的效果为上游operator的output指向下游operator,实现了chain,即链式调用// 最先返回的是最下游的output// operator的output按照从下游到上游的顺序,依次被包装为WatermarkGaugeExposingOutputWatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =createOutputCollector(containingTask,operatorConfig,chainedConfigs,userCodeClassloader,streamOutputs,allOperatorWrappers,mailboxExecutorFactory);// 创建链式operator// 参数中使用上一步生成的operator outputOneInputStreamOperator<IN, OUT> chainedOperator =createOperator(containingTask,operatorConfig,userCodeClassloader,chainedOperatorOutput,allOperatorWrappers,false);// 将operator包装到output中并返回,后面分析return wrapOperatorIntoOutput(chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag);
}

createOperator方法根据operatorConfig创建出StreamOperator,然后使用StreamOperatorWrapper包装:

private <OUT, OP extends StreamOperator<OUT>> OP createOperator(StreamTask<OUT, ?> containingTask,StreamConfig operatorConfig,ClassLoader userCodeClassloader,WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,boolean isHead) {// now create the operator and give it the output collector to write its output to// 使用StreamOperatorFactory创建出一个StreamOperator,使用指定的output// 这个方法比较复杂,这里不再介绍Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =StreamOperatorFactoryUtil.createOperator(operatorConfig.getStreamOperatorFactory(userCodeClassloader),containingTask,operatorConfig,output,operatorEventDispatcher);// 获取创建的operatorOP chainedOperator = chainedOperatorAndTimeService.f0;// 使用StreamOperatorWrapper包装此新创建的operator// StreamOperatorWrapper是operator在chaining时执行专用的封装类型,后面分析// 由于是递归调用,最先执行到这里的是最下游的算子// 因此allOperatorWrappers保存的顺序实际上是operator按照数据流向反向排列allOperatorWrappers.add(createOperatorWrapper(chainedOperator,containingTask,operatorConfig,chainedOperatorAndTimeService.f1,isHead));// 添加一个watermark监控用仪表盘chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,output.getWatermarkGauge()::getValue);return chainedOperator;
}

这里我们重点说下createOperatorWrapper。该方法使用StreamOperatorWrapper将StreamOperator包装起来。大家可能会问为什么这里需要使用StreamOperatorWrapper。我们看下StreamOperatorWrapper中的部分属性和方法。

private StreamOperatorWrapper<?, ?> previous;private StreamOperatorWrapper<?, ?> next;// 中间省略...public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)throws Exception {if (!isHead && !isStoppingBySyncSavepoint) {// NOTE: This only do for the case where the operator is one-input operator. At present,// any non-head operator on the operator chain is one-input operator.actionExecutor.runThrowing(() -> endOperatorInput(1));}quiesceTimeServiceAndCloseOperator(actionExecutor);// propagate the close operation to the next wrapperif (next != null) {next.close(actionExecutor, isStoppingBySyncSavepoint);}
}

我们不难发现StreamOperatorWrapper具有previous和next两个指针,形成了一个双向链表。OperatorChain中的所有的operator保存在这种双向链表结构中,从而实现了chain的语义,即OperatorChain中的operator按照顺序依次执行。

createOperatorWrapper方法仅仅是使用StreamOperatorWrapper包装了StreamOperator,并没有生成双向队列。构建双向队列的方法为linkOperatorWrappers,后面我们分析。

除此以外createOperatorWrapper还具有自己的关闭逻辑。如close方法所示,它除了关闭当前operator外,还会递归关闭队列后面所有的operator。

最后我们分析。wrapOperatorIntoOutput方法。它将operator包装到Output中。output的类型为ChainingOutput。

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(OneInputStreamOperator<IN, OUT> operator,StreamTask<OUT, ?> containingTask,StreamConfig operatorConfig,ClassLoader userCodeClassloader,OutputTag<IN> outputTag) {WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;// 如果开启了对象重用,创建ChainingOutput// 具体ChainingOutput相关内容在接下来章节分析if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {currentOperatorOutput = new ChainingOutput<>(operator, this, outputTag);} else {// 否则创建CopyingChainingOutput// 传递StreamRecord时会进行深拷贝TypeSerializer<IN> inSerializer =operatorConfig.getTypeSerializerIn1(userCodeClassloader);currentOperatorOutput =new CopyingChainingOutput<>(operator, inSerializer, outputTag, this);}// wrap watermark gauges since registered metrics must be unique// 创建一个watermark监控仪表operator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK,currentOperatorOutput.getWatermarkGauge()::getValue);return currentOperatorOutput;
}

ChainingOutput

ChainingOutput实现了把上游operator的输出作为下一个operator的输入。创建ChainingOutput时需要传入下游operator,保存到input属性中。它的构造方法如下所示:

protected final Input<T> input;public ChainingOutput(OneInputStreamOperator<T, ?> operator,StreamStatusProvider streamStatusProvider,@Nullable OutputTag<T> outputTag) {this(operator,(OperatorMetricGroup) operator.getMetricGroup(),streamStatusProvider,outputTag,operator::close);
}public ChainingOutput(Input<T> input,OperatorMetricGroup operatorMetricGroup,StreamStatusProvider streamStatusProvider,@Nullable OutputTag<T> outputTag,@Nullable AutoCloseable closeable) {this.input = input;this.closeable = closeable;{Counter tmpNumRecordsIn;try {OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();} catch (Exception e) {LOG.warn("An exception occurred during the metrics setup.", e);tmpNumRecordsIn = new SimpleCounter();}numRecordsIn = tmpNumRecordsIn;}this.streamStatusProvider = streamStatusProvider;this.outputTag = outputTag;
}

为了证实下ChainingOutput的确会把数据输出到下游operator,我们查看collect方法:

@Override
public void collect(StreamRecord<T> record) {if (this.outputTag != null) {// we are not responsible for emitting to the main output.return;}pushToOperator(record);
}protected <X> void pushToOperator(StreamRecord<X> record) {try {// we know that the given outputTag matches our OutputTag so the record// must be of the type that our operator expects.@SuppressWarnings("unchecked")StreamRecord<T> castRecord = (StreamRecord<T>) record;numRecordsIn.inc();input.setKeyContextElement(castRecord);input.processElement(castRecord);} catch (Exception e) {throw new ExceptionInChainedOperatorException(e);}
}

collect方法调用了pushToOperator方法。其中执行了input.processElement(castRecord),从而把数据传递给了下一个operator。

ChainingOutput还有一个子类叫做CopyingChainingOutput。它重写了pushToOperator方法,在数据发送往下游operator之前会创建一个深拷贝。如果启用了Object重用(containingTask.getExecutionConfig().isObjectReuseEnabled()返回true),使用ChainingOutput,否则使用CopyingChainingOutput。

@Override
protected <X> void pushToOperator(StreamRecord<X> record) {try {// we know that the given outputTag matches our OutputTag so the record// must be of the type that our operator (and Serializer) expects.@SuppressWarnings("unchecked")StreamRecord<T> castRecord = (StreamRecord<T>) record;numRecordsIn.inc();// 这里创建出一个深拷贝,再发往下游StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));input.setKeyContextElement(copy);input.processElement(copy);} catch (ClassCastException e) {// ...} catch (Exception e) {throw new ExceptionInChainedOperatorException(e);}
}

createChainedSources

该方法用于创建chained数据源。

@SuppressWarnings("rawtypes")
private Map<SourceInputConfig, ChainedSource> createChainedSources(StreamTask<OUT, OP> containingTask,InputConfig[] configuredInputs,Map<Integer, StreamConfig> chainedConfigs,ClassLoader userCodeClassloader,List<StreamOperatorWrapper<?, ?>> allOpWrappers) {// 如果所有的configuredInputs都不是SourceInputConfig类型,返回空mapif (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof SourceInputConfig)) {return Collections.emptyMap();}// chained 数据源只适用于多个输入的StreamOperatorcheckState(mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,"Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");Map<SourceInputConfig, ChainedSource> chainedSourceInputs = new HashMap<>();MultipleInputStreamOperator<?> multipleInputOperator =(MultipleInputStreamOperator<?>) mainOperatorWrapper.getStreamOperator();// 获取它所有的InputList<Input> operatorInputs = multipleInputOperator.getInputs();// 计算InputGate的Index,为所有InputGate的index最大值加1int sourceInputGateIndex =Arrays.stream(containingTask.getEnvironment().getAllInputGates()).mapToInt(IndexedInputGate::getInputGateIndex).max().orElse(-1)+ 1;// 遍历每个Inputfor (int inputId = 0; inputId < configuredInputs.length; inputId++) {// 排除掉所有不是SourceInputConfig类型的情况if (!(configuredInputs[inputId] instanceof SourceInputConfig)) {continue;}SourceInputConfig sourceInput = (SourceInputConfig) configuredInputs[inputId];int sourceEdgeId = sourceInput.getInputEdge().getSourceId();// 根据input edge获取sourceInputConfigStreamConfig sourceInputConfig = chainedConfigs.get(sourceEdgeId);OutputTag outputTag = sourceInput.getInputEdge().getOutputTag();// 创建链式的数据源output// 目前只支持Object Reuse开启// 实际返回的类型为ChainingOutputWatermarkGaugeExposingOutput chainedSourceOutput =createChainedSourceOutput(containingTask,operatorInputs.get(inputId),(OperatorMetricGroup) multipleInputOperator.getMetricGroup(),outputTag);// 创建数据源operator// createOperator前面分析过,不再赘述SourceOperator<?, ?> sourceOperator =(SourceOperator<?, ?>)createOperator(containingTask,sourceInputConfig,userCodeClassloader,(WatermarkGaugeExposingOutput<StreamRecord<OUT>>)chainedSourceOutput,allOpWrappers,true);// 放入chainedSourceInputs中chainedSourceInputs.put(sourceInput,new ChainedSource(chainedSourceOutput,new StreamTaskSourceInput<>(sourceOperator, sourceInputGateIndex++, inputId)));}return chainedSourceInputs;
}

linkOperatorWrappers

linkOperatorWrappers方法将chain中的operator按照逐个连接起来。注意,由于上一步createOutputCollector方法构造的allOperatorWrappers存放的各个operator顺序为从下游到上游,因此linkOperatorWrappers方法需要将这个连接顺序颠倒过来。

private StreamOperatorWrapper<?, ?> linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorWrappers) {// 暂存前一个处理的operatorStreamOperatorWrapper<?, ?> previous = null;// 遍历所有的operatorfor (StreamOperatorWrapper<?, ?> current : allOperatorWrappers) {if (previous != null) {// 设置previous的前一个operator为当前operatorprevious.setPrevious(current);}// 设置当前operator的下一个operator为previouscurrent.setNext(previous);// 设置当前operator为previousprevious = current;}return previous;
}

initializeStateAndOpenOperators

在StreamTask开始接收数据之前,需要初始化各个operator的状态(state)和开启operator(调用各个operator的open方法)。initializeStateAndOpenOperators正是用来完成这个工作的。

protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator();operator.initializeState(streamTaskStateInitializer);operator.open();}
}

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

作者:AlienPaul
链接:https://www.jianshu.com/p/928352c9101d
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

【Flink】Flink 源码之OperatorChain相关推荐

  1. Flink Checkpoint源码浅析

    1. JobManager 端checkpoint调度 dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> exec ...

  2. Flink checkpoint源码理解

    参考:https://blog.jrwang.me/2019/flink-source-code-checkpoint/#checkpoint-%E7%9A%84%E5%8F%91%E8%B5%B7% ...

  3. 【Flink】flink highavailabilityservices 源码解析

    1.概述 转载:https://www.freesion.com/article/5743743878/ 写在前面:源码查看入口 runtime ---> Entrypoint 不同模式对应不同 ...

  4. Flink Watermark 源码分析

    随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...

  5. Flink Cep 源码分析

    复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤.关联.聚合等技术,根据事 ...

  6. Flink内核源码(八)Flink Checkpoint

    Flink中Checkpoint是使Flink 能从故障恢复的一种内部机制.检查点是 Flink 应用状态的一个一致性副本,在发生故障时,Flink 通过从检查点加载应用程序状态来恢复. 核心思想:是 ...

  7. flink CompactingHashTable源码解析

    CompactingHashTable是使用flink管理内存的hash表. 这个table被设计分为两个部分,一部分是hash索引,用来定位数据的具体位置,而另一部分则是被分区的内存buffer用来 ...

  8. 【Flink】源码-Flink重启策略-简介 Task恢复策略 重启策略监听器

    文章目录 1.概述 3.固定间隔 4.失败率 4.1 案例 5. 无重启策略 5.1 案例 6.实际代码演示 7. Task恢复策略 8.重启策略监听器 8.1 测试 M.参考 1.概述 ​ Flin ...

  9. 【Flink源码篇】Flink 1.15.0源码编译

    目录 1. 下载源码并解压 2. Flink项目配置 3. 源码编译 4. 编译问题记录 5. IDEA调试Flink程序 1. 下载源码并解压 从github下载Flink的源码:https://g ...

最新文章

  1. 三分钟,了解PLM真谛
  2. C# winform 获取当前路径
  3. codeforces 498 div3(a-e java)
  4. 《Advanced .NET Debugging》 读书笔记 Listing 3-2: 如何查看程序的PID
  5. 前端开发必备的1个CSS框架
  6. 【转】最为详尽的WPF类继承关系*!
  7. [AssertionError: nput tensor input format are different]
  8. Asp.net开发环境的设置所遇到的问题
  9. 使用NVIDIA端到端深度学习平台进行缺陷自动检测
  10. 各种手机的UserAgent大全
  11. 横向合计代码 锐浪报表_[原创]锐浪报表动态加入列和最后加入合计列+进度条显示...
  12. 京东无法登录显示服务器异常,京东账号异常怎么解决?方法介绍
  13. 数据库 -- 基础操作(二)
  14. 【mysql报错】Data truncation: Data too long for column ‘XXX‘ at row 1
  15. 小程序源码:好玩的表情包机器人
  16. 每日一滴(实践)——NLP之处理停用词
  17. 你们怎么都有自己的聊天机器人?给我也来一个!
  18. 预约订座APP系统(基于uni-app框架)毕业设计毕业论文开题报告参考(3)系统后台管理功能
  19. 函数分离常数法 oracle,函数值域之《分离常数法》正确打开方式
  20. tabbar图片位置大小修改

热门文章

  1. 6999元!红魔6S Pro推出战地迷彩主题限定套装:11月1日正式开售
  2. 自如CEO熊林接任董事长
  3. 抖音回应火山小视频被判赔腾讯800万元:目前已提起上诉
  4. 雷军变身IPO收割机:坐拥4家上市公司,今年至少收获8个IPO!
  5. 华为张熙伟:鲲鹏计算产业已汇聚30万开发者
  6. 格力:今日投放12万只格力口罩 明日起增至16万只
  7. 特斯拉在华招聘太阳能和储能项目经理 屋顶光伏业务要来了?
  8. 语C、耽美、盲盒、Lolita……这些95后文化,你看懂了吗?
  9. AMD因虚假宣传遭集体诉讼 向消费者赔偿1210万美元
  10. 吊打6599元的三星?买手机莫慌 三款国产新手机将发