聊聊flink的Execution Plan Visualization
为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下flink的Execution Plan Visualization
实例
代码
@Testpublic void testExecutionPlan(){final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS).flatMap(new WordCountTest.Tokenizer()).keyBy(0).sum(1);dataStream.print();System.out.println(env.getExecutionPlan());}
json
{"nodes": [{"id": 1,"type": "Source: Collection Source","pact": "Data Source","contents": "Source: Collection Source","parallelism": 1},{"id": 2,"type": "Flat Map","pact": "Operator","contents": "Flat Map","parallelism": 4,"predecessors": [{"id": 1,"ship_strategy": "REBALANCE","side": "second"}]},{"id": 4,"type": "Keyed Aggregation","pact": "Operator","contents": "Keyed Aggregation","parallelism": 4,"predecessors": [{"id": 2,"ship_strategy": "HASH","side": "second"}]},{"id": 5,"type": "Sink: Print to Std. Out","pact": "Data Sink","contents": "Sink: Print to Std. Out","parallelism": 4,"predecessors": [{"id": 4,"ship_strategy": "FORWARD","side": "second"}]}]
}
可视化
打开flink plan visualizer将上面的json,输入到文本框,点击Draw进行可视化如下:
StreamExecutionEnvironment.getExecutionPlan
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@Public
public abstract class StreamExecutionEnvironment {//....../*** Creates the plan with which the system will execute the program, and* returns it as a String using a JSON representation of the execution data* flow graph. Note that this needs to be called, before the plan is* executed.** @return The execution plan of the program, as a JSON String.*/public String getExecutionPlan() {return getStreamGraph().getStreamingPlanAsJSON();}/*** Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.** @return The streamgraph representing the transformations*/@Internalpublic StreamGraph getStreamGraph() {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}return StreamGraphGenerator.generate(this, transformations);}//......
}
- StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan
StreamGraph.getStreamingPlanAsJSON
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java
@Internal
public class StreamGraph extends StreamingPlan {private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;private final StreamExecutionEnvironment environment;private final ExecutionConfig executionConfig;private final CheckpointConfig checkpointConfig;private boolean chaining;private Map<Integer, StreamNode> streamNodes;private Set<Integer> sources;private Set<Integer> sinks;private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;protected Map<Integer, String> vertexIDtoBrokerID;protected Map<Integer, Long> vertexIDtoLoopTimeout;private StateBackend stateBackend;private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;//......public String getStreamingPlanAsJSON() {try {return new JSONGenerator(this).getJSON();}catch (Exception e) {throw new RuntimeException("JSON plan creation failed", e);}}//......
}
- StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan
小结
- flink提供了flink plan visualizer的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
- StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
- StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan
doc
- Execution Plans
- flink plan visualizer
转载于:https://my.oschina.net/go4it/blog/3009595
聊聊flink的Execution Plan Visualization相关推荐
- Execution Plan 执行计划介绍
后面的练习中需要下载 Demo 数据库, 有很多不同的版本, 可以根据个人需要下载. 下载地址 - http://msftdbprodsamples.codeplex.com/ 1. 什么是执行计划 ...
- SAP ABAP SQL的execution plan和cache
我在SE38里执行这段open SQL: 因为我在OPEN SQL里没有使用IGNORE_PLAN_CACHE这个hint, 所以execution plan会存储在表M_SQL_PLAN_CACHE ...
- 聊聊flink的FsStateBackend
序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...
- 聊聊flink的TimeCharacteristic
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...
- 聊聊flink的CheckpointScheduler
序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...
- 聊聊flink Table的groupBy操作
序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...
- 聊聊flink的OperatorStateBackend
序 本文主要研究一下flink的OperatorStateBackend OperatorStateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/ ...
- 聊聊flink的CheckpointScheduler 1
序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...
- 聊聊flink Table的ScalarFunction
序 本文主要研究一下flink Table的ScalarFunction 实例 public class HashCode extends ScalarFunction {private int fa ...
最新文章
- 声智科技完成2亿元B轮融资,将持续拓展语音交互产品的规模化落地
- poj1236(强连通缩点)
- 性能领先,即训即用,快速部署,飞桨首次揭秘服务器端推理库
- vs2017开发php,C#编写的可供PHP调用的com dll(Visual studio 2017)
- Hadoop yarn容量调度器capacity-scheduler.xml配置示例
- html无损转换pdf,Pdf2html :高保真PDF至HTML转换
- WEB安全基础-SQL相关
- java中常用的类——Math类
- 11-实战模拟DRBD项目案例环境准备
- 计算机度分秒在线,度分秒换算器(度分秒换算器在线)
- Junit 5 实现testsuite
- C语言中 malloc函数介绍
- 51nod 1912 咖啡馆
- WLC开机卡在launching....(变砖)
- 学习opencv:PS滤镜—浮雕
- 虚拟机Linux - HTTP request sent, awaiting response... 404 Not Found
- 《Python股票量化交易从入门到实践》随书赠送“回测框架”的使用帮助
- 【LeetCode刷题笔记-39 714.买卖股票的最佳时机(含手续费)】
- win10定时关机c语言,win10定时关机在哪?win10设置定时关机的三种方法
- 安装mingw-w64失败解决方法
热门文章
- linux vi 命令大全
- C++ vector的初始化、添加、遍历、插入、删除、查找、排序、释放操作
- ios::ate ios::app ios::out ios::in ios::trunc ios::binary(组合总结)
- css宋体代码_前端开发必备的CSS命名规范与常用CSS代码集合
- AbstractListView源码分析5
- php mdecrypt generic,mdecrypt_generic
- 怎么查看atcoder的数据_多表数据联动查看,怎么做?
- 构建iOS持续集成平台(三)——CI服务器与自动化部署
- spring+quartz 完整例子
- shell脚本执行返回的状态码