文章目录

  • 前言
  • 执行输出工具
    • 打印执行计划
    • 执行计划可视化工具
  • 数据传输策略:ship_strategy
  • 案例

前言

Flink 的优化器会根据诸如数据量或集群机器数等不同的参数自动地为你的程序选择执行策略。但在大多数情况下,准确地了解 Flink 会如何执行你的程序是很有帮助的。

执行输出工具

打印执行计划

  • flink table 通过explainSql api输出
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
...
String query = "insert into print SELECT\n" +" user_id,\n" +" product,\n" +" amount\n" +" FROM orders";
tEnv.explainSql(query);

使用EXPLAIN PLAN FOR 输出执行计划

...
String query = "EXPLAIN PLAN FOR" +" insert into print SELECT\n" +" user_id,\n" +" product,\n" +" amount\n" +" FROM orders";
tEnv.executeSql(query).print();

输出:

...
== Physical Execution Plan ==
Stage 1 : Data Sourcecontent : Source: Custom File sourceStage 2 : Operatorcontent : CsvTableSource(read fields: user_id, product, amount)ship_strategy : REBALANCEStage 3 : Operatorcontent : SourceConversion(table=[default_catalog.default_database.orders, source: [CsvTableSource(read fields: user_id, product, amount)]], fields=[user_id, product, amount])ship_strategy : FORWARDStage 4 : Data Sinkcontent : Sink: Sink(table=[default_catalog.default_database.print], fields=[user_id, product, amount])ship_strategy : FORWARD

执行计划可视化工具

以下代码展示了如何在你的程序中打印 JSON 格式的执行计划:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
...
System.out.println(env.getExecutionPlan());

可以通过如下步骤可视化执行计划:

  1. 使用你的浏览器打开可视化工具网站,
  2. 将 JSON 字符串拷贝并粘贴到文本框中,
  3. 点击 draw 按钮。
    完成后,详细的执行计划图会在网页中呈现。
  • Web 界面

Flink 提供了用于提交和执行任务的 Web 界面。该界面是 JobManager Web 界面的一部分,起到管理监控的作用,默认情况下运行在 8081 端口。

可视化工具可以在执行 Flink 作业之前展示执行计划图,你可以据此来指定程序的参数。

数据传输策略:ship_strategy

分区策略统一实现StreamPartitioner抽象类,其接口为ChannelSelector。ChannelSelector来定义如何选择数据通道。

public interface ChannelSelector<T extends IOReadableWritable> {/*** 设置输出通道的总数*/void setup(int numberOfChannels);/*** 返回给定记录应写入的逻辑通道索引*/int selectChannel(T record);/*** 返回通道选择器是否始终选择所有输出通道**/boolean isBroadcast();
}

HASH[…]

BinaryRowData类型数据分析器

  • 实现类:BinaryHashPartitioner

BROADCAST

广播所有输出通道的分区器,将消息发送到下游每个数据输出通道

  • 实现类:BroadcastPartitioner
  • 触发情况:数据流进行DataStream:broadcast对下游算子进行广播

CUSTOM

分区器,用于在键上使用用户定义的分区器函数选择数据输出通道

  • 实现类:CustomPartitionerWrapper
  • 触发情况:对数据流进行DataStream#partitionCustom自定义数据分区

FORWARD

仅将元素转发给本地运行的下游操作的分区器。

  • 实现类:ForwardPartitioner
  • 触发情况:
    1. 上下游并行度相同
    2. 算子:flatMapfiltermap,的one-to-one模式

GLOBAL

将所有元素发送到子任务ID为0的下游运算符的分区器。可能会在应用程序中造成严重的性能瓶颈

  • 实现类:GlobalPartitioner
  • 触发情况:对数据流进行 DataStream#global 操作

HASH

分区器根据key组索引选择目标通道。

  • 实现类:KeyGroupStreamPartitioner
  • 触发情况:对数据流进行keyBy/windowkeyBy/reduce操作

REBALANCE

通过在输出通道中循环平均分配数据的分区器。

  • 实现类:RebalancePartitioner
  • 触发情况:上下游算子并行度不同,默认进行REBALANCE

RESCALE

在不同并行的情况下,通过在输出通道中循环平均分配数据的分区器。以循环方式将输出元素均匀地分布到下一个操作的实例子集。

  • 实现类:RescalePartitioner
  • 触发情况:对数据流进行 DataStream#rescale 操作

SHUFFLE

通过随机选择一个输出通道来平均分配数据的分区器。

  • 实现类:ShufflePartitioner
  • 触发情况:对数据流进行 DataStream#shuffle 操作

案例

  • 同步作业:

public class WordCountMain {public static void main(String[] args) throws Exception {//创建流运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//输入数据DataStream<String> dataStreamSource = env.fromElements("Hello my name is li", "Hello my name is li", "name is li");dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] splits = value.toLowerCase().split("\\W+");for (String split : splits) {if (split.length() > 0) {out.collect(new Tuple2<>(split, 1));}}}}).setParallelism(2).filter((Objects::nonNull)).setParallelism(2).partitionCustom((key, numPartitions) -> key == null ? 0 : Math.abs(key.hashCode()) % numPartitions,new KeySelector<Tuple2<String, Integer>,String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {return new Tuple2<>(value.f0.toLowerCase(), value.f1);}})//根据key,将数据发送到不同subTasks.keyBy((value -> value.f0)).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}}).setParallelism(2).shuffle().print().setParallelism(2);System.out.println(env.getExecutionPlan());}
}
  • 执行计划

flink 执行计划、数据传输策略相关推荐

  1. oracle稳定执行计划1

    稳定执行计划 1 策略: Oracle的sql 执行计划在一些场景下会发生变化,导致系统会发生不可知的情况,影响系统的稳定性,特别是关键业务的sql. 比如下面的场景: 统计信息过老,重新收集了统计信 ...

  2. PostgreSQL查询当前执行中SQL的执行计划——pg_show_plans

    点击上方"蓝字" 关注我们,享更多干货! 执行计划存储 如果同样的SQL要执行很多遍,且每次都是同样的执行计划.每次都发生硬解析,则会消耗大量时间.类似于Oracle存放执行计划的 ...

  3. Flink 流处理概念:Dataflow编程、执行图、并行度、数据传输策略、任务链

    文章目录 Dataflow编程 执行图 并行度 数据传输策略 任务链 Dataflow编程 顾名思义,Dataflow程序描述了数据如何在不同操作之间流动.Dataflow程序通常表现为有向无环图(D ...

  4. oracle分区表执行计划分区合并,利用ORACLE分区技术提高管理和性能_PART2

    接PART1:http://blog.chinaunix.net/uid/7655508.html 11g interval分区: 1)11g之前创建日期范围分区,经常是预先创建一部分,等即将用完重新 ...

  5. oracle更新统计信息执行计划,为准确生成执行计划更新统计信息-analyze与dbms_stats...

    如果我们想让CBO利用合理利用数据的统计信息,正确判断执行任何SQL查询时的最快途径,需要及时的使用analyze命令或者dbms_stats重新统计数据的统计信息. 例如索引跳跃式扫描(INDEX ...

  6. 1.18.2.10 解释表:Table.explain、物理执行计划等

    1.18.2.10.解释表 Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划. 这是通过 Table.explain() 方法或者 StatementSet.explai ...

  7. mysql 执行计划不对_关于mysql主从查询执行计划不一致问题的分析

    最近面试过程中被面试官抛了一个问题,说曾经有一个线上出现的奇怪的问题,主库和从库各种配置是一致的,当数据量比较大的时候,某些时候同样的查询,在从库里的执行计划执行成功了,而主库里没有执行这个执行计划, ...

  8. MySQL调优(八):查缺补漏(mysql的锁机制,读写分离,执行计划详解,主从复制原理)

    mysql的锁机制 1.MySQL锁的基本介绍 ​ 锁是计算机协调多个进程或线程并发访问某一资源的机制.在数据库中,除传统的 计算资源(如CPU.RAM.I/O等)的争用以外,数据也是一种供许多用户共 ...

  9. DRDS分布式SQL引擎—执行计划介绍

    摘要: 本文着重介绍 DRDS 执行计划中各个操作符的含义,以便用户通过查询计划了解 SQL 执行流程,从而有针对性的调优 SQL. DRDS分布式SQL引擎 - 执行计划介绍 前言 数据库系统中,执 ...

最新文章

  1. 经验 | 秋招总结(拼多多,腾讯,百度,字节)
  2. (一)检测浏览器是否支持websocket
  3. android中完全退出当前应用程序的四种方法
  4. Spring+XFire WS-Security安全认证开发感悟
  5. python字符串类型_python字符串类型介绍
  6. Oracle-物化视图
  7. c++数字金字塔_“资金管理是投资最大的秘密”(超级干货),一生死记“金字塔加仓减仓法”,最安全稳健的操盘法方式!...
  8. centos树莓派安装mysql_树莓派3B+安装CentOS7
  9. linux安装配置java,Linux 安装配置 java 环境
  10. 修改 “嗨加游-Prefix.pch” 或者 “嗨加游-Info.plist ” 方法
  11. 让电脑提速的小方法-----QoS数据计划程序
  12. C#实现超长位整数运算
  13. eclipse android 服务端,Eclipse搭建服务器,实现与Android的简单通信
  14. 解决安装vc2005运行库时提示Command line option syntax error.Type Command/?for Help
  15. Windows AD域管理软件详解
  16. win10摄像头可以用计算机里不显示,win10打开计算机如何显示摄像头
  17. 阿里云ACP云计算认证有用吗?
  18. 强化学习过程中对产生的无效动作应该如何进行屏蔽处理?(强化学习中可变的动作空间怎么处理)
  19. 计算机测试 原理是什么,rtk的测量原理和工作步骤是什么?
  20. flutter - 使用 SingleChildScrollView() 解决键盘遮挡输入框的问题

热门文章

  1. 类似国外多语言wikipedia百度百科网站源码
  2. 小熊猫SEO外链发布工具
  3. MATLAB 函数 判断一个数是否为素数
  4. swift学习——点点滴滴——3~著名算法
  5. K3S - 轻量级Kubernetes集群
  6. 实时最新中国省市区县geoJSON格式地图行政边界数据Echarts地图数据(可精确到街道级)
  7. 求职真的是欲哭无泪,520,521还要继续找工作
  8. 【转载】《IT经理世界》:腾讯的瓶颈
  9. 常用单片机编程思想及例程2——串口接收断帧
  10. eNSP模拟器路由无法启动,命令控制界面一直冒 #问题的解决方法