为什么80%的码农都做不了架构师?>>>   

本文主要研究一下flink的Execution Plan Visualization

实例

代码

    @Testpublic void testExecutionPlan(){final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS).flatMap(new WordCountTest.Tokenizer()).keyBy(0).sum(1);dataStream.print();System.out.println(env.getExecutionPlan());}

json

{"nodes": [{"id": 1,"type": "Source: Collection Source","pact": "Data Source","contents": "Source: Collection Source","parallelism": 1},{"id": 2,"type": "Flat Map","pact": "Operator","contents": "Flat Map","parallelism": 4,"predecessors": [{"id": 1,"ship_strategy": "REBALANCE","side": "second"}]},{"id": 4,"type": "Keyed Aggregation","pact": "Operator","contents": "Keyed Aggregation","parallelism": 4,"predecessors": [{"id": 2,"ship_strategy": "HASH","side": "second"}]},{"id": 5,"type": "Sink: Print to Std. Out","pact": "Data Sink","contents": "Sink: Print to Std. Out","parallelism": 4,"predecessors": [{"id": 4,"ship_strategy": "FORWARD","side": "second"}]}]
}

可视化

打开flink plan visualizer将上面的json,输入到文本框,点击Draw进行可视化如下:

StreamExecutionEnvironment.getExecutionPlan

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

@Public
public abstract class StreamExecutionEnvironment {//....../*** Creates the plan with which the system will execute the program, and* returns it as a String using a JSON representation of the execution data* flow graph. Note that this needs to be called, before the plan is* executed.** @return The execution plan of the program, as a JSON String.*/public String getExecutionPlan() {return getStreamGraph().getStreamingPlanAsJSON();}/*** Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.** @return The streamgraph representing the transformations*/@Internalpublic StreamGraph getStreamGraph() {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}return StreamGraphGenerator.generate(this, transformations);}//......
}
  • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan

StreamGraph.getStreamingPlanAsJSON

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java

@Internal
public class StreamGraph extends StreamingPlan {private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;private final StreamExecutionEnvironment environment;private final ExecutionConfig executionConfig;private final CheckpointConfig checkpointConfig;private boolean chaining;private Map<Integer, StreamNode> streamNodes;private Set<Integer> sources;private Set<Integer> sinks;private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;protected Map<Integer, String> vertexIDtoBrokerID;protected Map<Integer, Long> vertexIDtoLoopTimeout;private StateBackend stateBackend;private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;//......public String getStreamingPlanAsJSON() {try {return new JSONGenerator(this).getJSON();}catch (Exception e) {throw new RuntimeException("JSON plan creation failed", e);}}//......
}
  • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

小结

  • flink提供了flink plan visualizer的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
  • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
  • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

doc

  • Execution Plans
  • flink plan visualizer

转载于:https://my.oschina.net/go4it/blog/3009595

聊聊flink的Execution Plan Visualization相关推荐

  1. Execution Plan 执行计划介绍

    后面的练习中需要下载 Demo 数据库, 有很多不同的版本, 可以根据个人需要下载.  下载地址 - http://msftdbprodsamples.codeplex.com/ 1. 什么是执行计划 ...

  2. SAP ABAP SQL的execution plan和cache

    我在SE38里执行这段open SQL: 因为我在OPEN SQL里没有使用IGNORE_PLAN_CACHE这个hint, 所以execution plan会存储在表M_SQL_PLAN_CACHE ...

  3. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  4. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  5. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  6. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

  7. 聊聊flink的OperatorStateBackend

    序 本文主要研究一下flink的OperatorStateBackend OperatorStateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/ ...

  8. 聊聊flink的CheckpointScheduler 1

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  9. 聊聊flink Table的ScalarFunction

    序 本文主要研究一下flink Table的ScalarFunction 实例 public class HashCode extends ScalarFunction {private int fa ...

最新文章

  1. 声智科技完成2亿元B轮融资,将持续拓展语音交互产品的规模化落地
  2. poj1236(强连通缩点)
  3. 性能领先,即训即用,快速部署,飞桨首次揭秘服务器端推理库
  4. vs2017开发php,C#编写的可供PHP调用的com dll(Visual studio 2017)
  5. Hadoop yarn容量调度器capacity-scheduler.xml配置示例
  6. html无损转换pdf,Pdf2html :高保真PDF至HTML转换
  7. WEB安全基础-SQL相关
  8. java中常用的类——Math类
  9. 11-实战模拟DRBD项目案例环境准备
  10. 计算机度分秒在线,度分秒换算器(度分秒换算器在线)
  11. Junit 5 实现testsuite
  12. C语言中 malloc函数介绍
  13. 51nod 1912 咖啡馆
  14. WLC开机卡在launching....(变砖)
  15. 学习opencv:PS滤镜—浮雕
  16. 虚拟机Linux - HTTP request sent, awaiting response... 404 Not Found
  17. 《Python股票量化交易从入门到实践》随书赠送“回测框架”的使用帮助
  18. 【LeetCode刷题笔记-39 714.买卖股票的最佳时机(含手续费)】
  19. win10定时关机c语言,win10定时关机在哪?win10设置定时关机的三种方法
  20. 安装mingw-w64失败解决方法

热门文章

  1. linux vi 命令大全
  2. C++ vector的初始化、添加、遍历、插入、删除、查找、排序、释放操作
  3. ios::ate ios::app ios::out ios::in ios::trunc ios::binary(组合总结)
  4. css宋体代码_前端开发必备的CSS命名规范与常用CSS代码集合
  5. AbstractListView源码分析5
  6. php mdecrypt generic,mdecrypt_generic
  7. 怎么查看atcoder的数据_多表数据联动查看,怎么做?
  8. 构建iOS持续集成平台(三)——CI服务器与自动化部署
  9. spring+quartz 完整例子
  10. shell脚本执行返回的状态码