flink 执行计划、数据传输策略
文章目录
- 前言
- 执行输出工具
- 打印执行计划
- 执行计划可视化工具
- 数据传输策略:ship_strategy
- 案例
前言
Flink 的优化器会根据诸如数据量或集群机器数等不同的参数自动地为你的程序选择执行策略。但在大多数情况下,准确地了解 Flink 会如何执行你的程序是很有帮助的。
执行输出工具
打印执行计划
- flink table 通过
explainSql
api输出
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
...
String query = "insert into print SELECT\n" +" user_id,\n" +" product,\n" +" amount\n" +" FROM orders";
tEnv.explainSql(query);
使用
EXPLAIN PLAN FOR
输出执行计划
...
String query = "EXPLAIN PLAN FOR" +" insert into print SELECT\n" +" user_id,\n" +" product,\n" +" amount\n" +" FROM orders";
tEnv.executeSql(query).print();
输出:
...
== Physical Execution Plan ==
Stage 1 : Data Sourcecontent : Source: Custom File sourceStage 2 : Operatorcontent : CsvTableSource(read fields: user_id, product, amount)ship_strategy : REBALANCEStage 3 : Operatorcontent : SourceConversion(table=[default_catalog.default_database.orders, source: [CsvTableSource(read fields: user_id, product, amount)]], fields=[user_id, product, amount])ship_strategy : FORWARDStage 4 : Data Sinkcontent : Sink: Sink(table=[default_catalog.default_database.print], fields=[user_id, product, amount])ship_strategy : FORWARD
执行计划可视化工具
以下代码展示了如何在你的程序中打印 JSON 格式的执行计划:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
...
System.out.println(env.getExecutionPlan());
可以通过如下步骤可视化执行计划:
- 使用你的浏览器打开可视化工具网站,
- 将 JSON 字符串拷贝并粘贴到文本框中,
- 点击 draw 按钮。
完成后,详细的执行计划图会在网页中呈现。
- Web 界面
Flink 提供了用于提交和执行任务的 Web 界面。该界面是 JobManager Web 界面的一部分,起到管理监控的作用,默认情况下运行在 8081 端口。
可视化工具可以在执行 Flink 作业之前展示执行计划图,你可以据此来指定程序的参数。
数据传输策略:ship_strategy
分区策略统一实现StreamPartitioner
抽象类,其接口为ChannelSelector
。ChannelSelector来定义如何选择数据通道。
public interface ChannelSelector<T extends IOReadableWritable> {/*** 设置输出通道的总数*/void setup(int numberOfChannels);/*** 返回给定记录应写入的逻辑通道索引*/int selectChannel(T record);/*** 返回通道选择器是否始终选择所有输出通道**/boolean isBroadcast();
}
HASH[…]
BinaryRowData类型数据分析器
- 实现类:
BinaryHashPartitioner
BROADCAST
广播所有输出通道的分区器,将消息发送到下游每个数据输出通道
- 实现类:
BroadcastPartitioner
- 触发情况:数据流进行
DataStream:broadcast
对下游算子进行广播
CUSTOM
分区器,用于在键上使用用户定义的分区器函数选择数据输出通道
- 实现类:
CustomPartitionerWrapper
- 触发情况:对数据流进行
DataStream#partitionCustom
自定义数据分区
FORWARD
仅将元素转发给本地运行的下游操作的分区器。
- 实现类:
ForwardPartitioner
- 触发情况:
- 上下游并行度相同
- 算子:
flatMap
、filter
、map
,的one-to-one模式
GLOBAL
将所有元素发送到子任务ID为0的下游运算符的分区器。可能会在应用程序中造成严重的性能瓶颈
- 实现类:
GlobalPartitioner
- 触发情况:对数据流进行
DataStream#global
操作
HASH
分区器根据key组索引选择目标通道。
- 实现类:
KeyGroupStreamPartitioner
- 触发情况:对数据流进行
keyBy/window
、keyBy/reduce
操作
REBALANCE
通过在输出通道中循环平均分配数据的分区器。
- 实现类:
RebalancePartitioner
- 触发情况:上下游算子并行度不同,默认进行
REBALANCE
RESCALE
在不同并行的情况下,通过在输出通道中循环平均分配数据的分区器。以循环方式将输出元素均匀地分布到下一个操作的实例子集。
- 实现类:
RescalePartitioner
- 触发情况:对数据流进行
DataStream#rescale
操作
SHUFFLE
通过随机选择一个输出通道来平均分配数据的分区器。
- 实现类:
ShufflePartitioner
- 触发情况:对数据流进行
DataStream#shuffle
操作
案例
- 同步作业:
public class WordCountMain {public static void main(String[] args) throws Exception {//创建流运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//输入数据DataStream<String> dataStreamSource = env.fromElements("Hello my name is li", "Hello my name is li", "name is li");dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] splits = value.toLowerCase().split("\\W+");for (String split : splits) {if (split.length() > 0) {out.collect(new Tuple2<>(split, 1));}}}}).setParallelism(2).filter((Objects::nonNull)).setParallelism(2).partitionCustom((key, numPartitions) -> key == null ? 0 : Math.abs(key.hashCode()) % numPartitions,new KeySelector<Tuple2<String, Integer>,String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {return new Tuple2<>(value.f0.toLowerCase(), value.f1);}})//根据key,将数据发送到不同subTasks.keyBy((value -> value.f0)).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}}).setParallelism(2).shuffle().print().setParallelism(2);System.out.println(env.getExecutionPlan());}
}
- 执行计划
flink 执行计划、数据传输策略相关推荐
- oracle稳定执行计划1
稳定执行计划 1 策略: Oracle的sql 执行计划在一些场景下会发生变化,导致系统会发生不可知的情况,影响系统的稳定性,特别是关键业务的sql. 比如下面的场景: 统计信息过老,重新收集了统计信 ...
- PostgreSQL查询当前执行中SQL的执行计划——pg_show_plans
点击上方"蓝字" 关注我们,享更多干货! 执行计划存储 如果同样的SQL要执行很多遍,且每次都是同样的执行计划.每次都发生硬解析,则会消耗大量时间.类似于Oracle存放执行计划的 ...
- Flink 流处理概念:Dataflow编程、执行图、并行度、数据传输策略、任务链
文章目录 Dataflow编程 执行图 并行度 数据传输策略 任务链 Dataflow编程 顾名思义,Dataflow程序描述了数据如何在不同操作之间流动.Dataflow程序通常表现为有向无环图(D ...
- oracle分区表执行计划分区合并,利用ORACLE分区技术提高管理和性能_PART2
接PART1:http://blog.chinaunix.net/uid/7655508.html 11g interval分区: 1)11g之前创建日期范围分区,经常是预先创建一部分,等即将用完重新 ...
- oracle更新统计信息执行计划,为准确生成执行计划更新统计信息-analyze与dbms_stats...
如果我们想让CBO利用合理利用数据的统计信息,正确判断执行任何SQL查询时的最快途径,需要及时的使用analyze命令或者dbms_stats重新统计数据的统计信息. 例如索引跳跃式扫描(INDEX ...
- 1.18.2.10 解释表:Table.explain、物理执行计划等
1.18.2.10.解释表 Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划. 这是通过 Table.explain() 方法或者 StatementSet.explai ...
- mysql 执行计划不对_关于mysql主从查询执行计划不一致问题的分析
最近面试过程中被面试官抛了一个问题,说曾经有一个线上出现的奇怪的问题,主库和从库各种配置是一致的,当数据量比较大的时候,某些时候同样的查询,在从库里的执行计划执行成功了,而主库里没有执行这个执行计划, ...
- MySQL调优(八):查缺补漏(mysql的锁机制,读写分离,执行计划详解,主从复制原理)
mysql的锁机制 1.MySQL锁的基本介绍 锁是计算机协调多个进程或线程并发访问某一资源的机制.在数据库中,除传统的 计算资源(如CPU.RAM.I/O等)的争用以外,数据也是一种供许多用户共 ...
- DRDS分布式SQL引擎—执行计划介绍
摘要: 本文着重介绍 DRDS 执行计划中各个操作符的含义,以便用户通过查询计划了解 SQL 执行流程,从而有针对性的调优 SQL. DRDS分布式SQL引擎 - 执行计划介绍 前言 数据库系统中,执 ...
最新文章
- 经验 | 秋招总结(拼多多,腾讯,百度,字节)
- (一)检测浏览器是否支持websocket
- android中完全退出当前应用程序的四种方法
- Spring+XFire WS-Security安全认证开发感悟
- python字符串类型_python字符串类型介绍
- Oracle-物化视图
- c++数字金字塔_“资金管理是投资最大的秘密”(超级干货),一生死记“金字塔加仓减仓法”,最安全稳健的操盘法方式!...
- centos树莓派安装mysql_树莓派3B+安装CentOS7
- linux安装配置java,Linux 安装配置 java 环境
- 修改 “嗨加游-Prefix.pch” 或者 “嗨加游-Info.plist ” 方法
- 让电脑提速的小方法-----QoS数据计划程序
- C#实现超长位整数运算
- eclipse android 服务端,Eclipse搭建服务器,实现与Android的简单通信
- 解决安装vc2005运行库时提示Command line option syntax error.Type Command/?for Help
- Windows AD域管理软件详解
- win10摄像头可以用计算机里不显示,win10打开计算机如何显示摄像头
- 阿里云ACP云计算认证有用吗?
- 强化学习过程中对产生的无效动作应该如何进行屏蔽处理?(强化学习中可变的动作空间怎么处理)
- 计算机测试 原理是什么,rtk的测量原理和工作步骤是什么?
- flutter - 使用 SingleChildScrollView() 解决键盘遮挡输入框的问题