flink streamGraph生成
当完成DataStream的配置后,调用其中环境上下文StreamExecutionEnvironment的getStreamGrahph()方法即可生成关于该DataStream的streamGraph。
@Internal
public StreamGraph getStreamGraph() {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}return StreamGraphGenerator.generate(this, transformations);
}public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {return new StreamGraphGenerator(env).generateInternal(transformations);
}
生成streamGraph实则是在StreamGraphGenerator的generate()方法中,参数除了调用的env本身之外还需要env当中表示被转换的DataStream中所有操作streamTransformation的数组。
之后会在generate()方法中会生成一个StreamGraphGenerator并调用generateInternal()方法根据DataStream中的所有transformation开始生成streamGraph。
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {for (StreamTransformation<?> transformation: transformations) {transform(transformation);}return streamGraph;
}
此处会遍历所有的streamTransformation,依次调用transform()方法来转换至streamGraph。
在transform()方法中,会根据所有类型的transform分别处理,具体的代码如下。
if (transform instanceof OneInputTransformation<?, ?>) {transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {throw new IllegalStateException("Unknown transformation: " + transform);
}
以一个map操作节点为例子,如果是map类型的transformation,那么将会在此处通过transformOneInputTransform()方法构造成streamGraph中的streamNode。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {Collection<Integer> inputIds = transform(transform.getInput());// the recursive call might have already transformed thisif (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);streamGraph.addOperator(transform.getId(),slotSharingGroup,transform.getCoLocationGroupKey(),transform.getOperator(),transform.getInputType(),transform.getOutputType(),transform.getName());if (transform.getStateKeySelector() != null) {TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);}streamGraph.setParallelism(transform.getId(), transform.getParallelism());streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());for (Integer inputId: inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);}return Collections.singleton(transform.getId());
}
其中,在一开始,会得到该transformation的输入,尝试先于当前节点之前先转换其输入节点,由此,会一路往上回溯到source节点开始转换。
当其上游节点全部转换为streamGraph中的节点之后,就会开始当前节点的转换,而后,会根据determineSlotSharingGroup()方法确定该节点的slot共享名。
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {if (specifiedGroup != null) {return specifiedGroup;} else {String inputGroup = null;for (int id: inputIds) {String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);if (inputGroup == null) {inputGroup = inputGroupCandidate;} else if (!inputGroup.equals(inputGroupCandidate)) {return "default";}}return inputGroup == null ? "default" : inputGroup;}
}
在这里,如果没有专门制定相应的共享名,那么则会根据其上游输入节点进行确定,如果上游不一致,则会采用默认的default来指定。
在确定完共享名之后,streamGraph就会通过addOperaor()方法来将该节点加入到streamGraph中。在该方法中首先根据节点类型加入到stramGraph的streanNodes数组中,之后根据输入输出的类型得到相应的类型序列化对象设置到相应的节点中。
之后,会遍历当前节点的所有输入节点,调用addEdge()方法依次生成节点的边streamEdge。
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null,new ArrayList<String>(),null);}private void addEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,List<String> outputNames,OutputTag outputTag) {if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;if (outputTag == null) {outputTag = virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSelectNodes.get(virtualId).f0;if (outputNames.isEmpty()) {// selections that happen downstream override earlier selectionsoutputNames = virtualSelectNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;if (partitioner == null) {partitioner = virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else {StreamNode upstreamNode = getStreamNode(upStreamVertexID);StreamNode downstreamNode = getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}
}
此处,可以看到一条StreamEdge的组成主要有以下几个要素,起始节点,终点节点,输出分区类型,output选择器选择的字段名。
在生成了相应的streamEdge会分别加在起始节点的 out边和终点节点的input边上。
至此,一个节点被加入到streamGraph的流程结束。
之后以spilt,select类型的transform为例子。
private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {StreamTransformation<T> input = split.getInput();Collection<Integer> resultIds = transform(input);// the recursive transform call might have transformed this alreadyif (alreadyTransformed.containsKey(split)) {return alreadyTransformed.get(split);}for (int inputId : resultIds) {streamGraph.addOutputSelector(inputId, split.getOutputSelector());}return resultIds;
}
spilt类型的transformation不会加入到streamGraph中,而是会将split中的outputSelector加入到上游节点中。
private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {StreamTransformation<T> input = select.getInput();Collection<Integer> resultIds = transform(input);// the recursive transform might have already transformed thisif (alreadyTransformed.containsKey(select)) {return alreadyTransformed.get(select);}List<Integer> virtualResultIds = new ArrayList<>();for (int inputId : resultIds) {int virtualId = StreamTransformation.getNewNodeId();streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());virtualResultIds.add(virtualId);}return virtualResultIds;
}
而select类型的节点则会加入到streamGraph中的虚拟节点中,在之前的edge构造中,就将根据实际节点的id得到虚拟select节点,将相应的outputName加入到edge之中。
分区策略也将类似select的形式生成虚拟节点。
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {StreamTransformation<T> input = partition.getInput();List<Integer> resultIds = new ArrayList<>();Collection<Integer> transformedIds = transform(input);for (Integer transformedId: transformedIds) {int virtualId = StreamTransformation.getNewNodeId();streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());resultIds.add(virtualId);}return resultIds;
}
类似的union也不会引起新的节点生成,只是会统计所有输入节点id,交由统一在下一个实际节点中一次获取。
最后生成的streamGraph可以转化为下方类似的json。
{"nodes":[{"id":1,"type":"Source: Collection Source","pact":"Data Source","contents":"Source: Collection Source","parallelism":1},{"id":3,"type":"Map","pact":"Operator","contents":"Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":7,"type":"Map","pact":"Operator","contents":"Map","parallelism":8,"predecessors":[{"id":3,"ship_strategy":"BROADCAST","side":"second"}]},{"id":9,"type":"Map","pact":"Operator","contents":"Map","parallelism":8,"predecessors":[{"id":3,"ship_strategy":"FORWARD","side":"second"}]},{"id":13,"type":"Map","pact":"Operator","contents":"Map","parallelism":8,"predecessors":[{"id":3,"ship_strategy":"FORWARD","side":"second"}]},{"id":17,"type":"Map","pact":"Operator","contents":"Map","parallelism":8,"predecessors":[{"id":3,"ship_strategy":"FORWARD","side":"second"}]},{"id":24,"type":"Map","pact":"Operator","contents":"Map","parallelism":8,"predecessors":[{"id":9,"ship_strategy":"BROADCAST","side":"second"},{"id":13,"ship_strategy":"GLOBAL","side":"second"},{"id":17,"ship_strategy":"SHUFFLE","side":"second"}]},{"id":8,"type":"Sink: Unnamed","pact":"Data Sink","contents":"Sink: Unnamed","parallelism":8,"predecessors":[{"id":7,"ship_strategy":"FORWARD","side":"second"}]},{"id":25,"type":"Sink: Unnamed","pact":"Data Sink","contents":"Sink: Unnamed","parallelism":8,"predecessors":[{"id":24,"ship_strategy":"FORWARD","side":"second"}]}]}
在flink给出的https://flink.apache.org/visualizer/ 上可以转化为可视化图。
flink streamGraph生成相关推荐
- flink streamGraph生成jobGraph
当需要通过streamGraph生成jobGraph的时候,通过StreamingJobGraphGenerator的createJobGraph()方法来生成. public static JobG ...
- flink java生成流式数据
写法比较套路,整体思路是: 定义一个需要生成的数据类型 实现SourceFunction接口的两个功能 直接使用env.addSource()传入即可 import org.apache.flink. ...
- Flink自定义生成 Watermark
Watermark 策略简介 # 为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳.其通常通过使用 TimestampAss ...
- Flink 源码解析1--StreamGraph的生成
1. 先来简单看一下入门的WordCount程序 1.首先数据源会产生随机的数字数据流(0-10内的数字)形式,然后通过flink的transformation将数据进行单词计数,再print输出 / ...
- 追源索骥:透过源码看懂Flink核心框架的执行流程
https://www.cnblogs.com/bethunebtj/p/9168274.html 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 Hello,World WordC ...
- 【Flink】Flink 源码之OperatorChain
1.概述 转载:Flink 源码之OperatorChain 前言 OperatorChain是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot中串联完成,从而最 ...
- Flink 网络流控和反压剖析详解
传送门:Flink 系统性学习笔记 前言: 本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor.OPPO 大数据平台研发负责人张俊老师分享,社区 ...
- 一文弄懂Flink网络流控及反压
一文弄懂Flink网络流控及反压 1. 为什么需要网络流控? 2. 网络流控的实现:静态限速 3. 网络流控的实现:动态反馈/自动反压 3.1 案例一:Storm 反压实现 3.2 案例二:Spark ...
- Flink 1.12.2 源码浅析 : JobGraph
. 一 .前言 二 .数据结构 2.1. JobVertex 2.2. JobEdge 2.3. 数据实例 三 .代码浅析 3.1. 入口 3.2. StreamingJobGraphGenerato ...
最新文章
- 安装 Homestead 可能会出现的一些 Problems
- C++实现质因数分解
- 3. nginx的请求转发算法,如何配置根据权重转发
- openshift命令_使用命令行工具创建WildFly OpenShift应用程序
- 互联网把农业推向“科技仙境”
- 安卓应用安全指南 4.7 使用可浏览的意图
- kafka ConsumerConfig: The configuration max.poll.records = 1 was supplied but isn't a known config
- Vscode合并develop代码分支到master开发分支
- android 组件生命周期,Android组件化开发实践(五):组件生命周期管理
- 回顾︱时间序列预测与分解有哪些模型?(一)
- visio 模板_Mac软件推荐:免费的流程图软件,完美替代Visio
- vscode彻底卸载记录/使用经验
- 保护您的眼睛:电脑背景色设置(XP WIN 7)
- APK反编译教程新手第一课:安卓基础知识
- Shell中的感叹号
- 整理一些个人常用的windows软件
- java怎么查看jdk版本_java版本和jdk版本必须一样
- 手机充电总要充到100%吗?充电时先插手机还是充电器
- stm32正常运行流程图_STM32单片机学习笔记(超详细整理143个问题,学习必看)...
- npm ERR code ERR_SOCKET_TIMEOUT npm ERR 出现错误改正方法