大数据之flink共享资源槽
算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行。这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加了吞吐量。
一、禁用链
env.disableoperatorchaining
二、开启新链
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StartNewChainDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).startNewChain(); //从当前算子filter开始,开启一个新链SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}
三、断开链
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DisableChainingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).disableChaining(); //将该算子前后的链都断开SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}
共享资源槽: Flink并不是将task合并,而是上游的task和下游的task可以共享一个槽位,所以Flink需要使用多少资源和task的数量没有关系,而是和节点的最大并行度有关系,因为有几个并行度就需要几个槽位。
上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下:
由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。
四、设置共享资源槽
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SetSharingGroupDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//并行度就是1DataStreamSource<String> lines = env.socketTextStream(args[0], 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).setParallelism(2).disableChaining().slotSharingGroup("doit");//从这个算子开始,后期的task的共享资源槽的名称都是doit(就近原则)SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).setParallelism(2).slotSharingGroup("default");SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1).setParallelism(2).slotSharingGroup("default");;summed.print().setParallelism(2).slotSharingGroup("default");;env.execute();}
}
大数据之flink共享资源槽相关推荐
- 手把手教你搭建实时大数据引擎FLINK
手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...
- Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2
七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...
- 大数据与人工智能:学习资源收藏
大数据与人工智能学习资源收藏 Hadoop应用架构.pdf: https://itdocs.pipipan.com/fs/3843664-360663708 Hadoop数据分析.pdf: https ...
- 大数据之flink教程
第一章 Flink简介 1.1 初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...
- 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)
导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...
- 大数据入门--Flink(四)状态管理与容错机制
状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...
- 要学就学最难!附1T大数据免费学习全套资源!
大数据广泛应用于电网运行.经营管理及优质服务等各大领域,并正在改变着各行各业,也引领了大数据人才的变革.大数据就业前景怎么样?这对于在就业迷途中的我们是一个很重要的信息. 一.大数据人才需求及现状分析 ...
- 大数据时代:公共数据资源开放至关重要
进入大数据时代,大数据带来的巨大价值逐渐凸显,围绕大数据的竞争也愈演愈烈,大数据时代的竞争,将是数据资产的竞争. 数据资产怎么获得?这是大数据商用化进程中一个不可回避的问题.除了企业自身的积累乃至付费 ...
- 大数据之Flink流式计算引擎
Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...
最新文章
- 用递归法计算斐波那契数列的第n项
- xshell简单命令
- java 函数内部类_java 内部类详解 转
- Faster_RCNN 4.训练模型
- Express请求处理-静态资源的处理
- hash hashcode变化_没想到 Hash 冲突还能这么玩,你的服务中招了吗?
- 浅谈Mysql 表设计规范
- python画roc曲线_使用Python画ROC曲线以及AUC值
- SVN的trunk branch tag
- log4j日志输出性能优化
- cocos2dx3.2 画图方法小修改之 C++ final学习
- Gym 100796B Wet Boxes(思维)题解
- RadASM 颜色配置
- 什么是back annotation
- 手机号登录和微信登录
- MySQL--数据库基础知识点(一)
- 谷歌浏览器 翻译无法使用的问题
- verilog对信号二分频 时钟分频信号作为时钟使能信号
- Java核心技术卷一、二读书笔记(PDF)分享
- OE链面临区块链生态新机遇
热门文章
- queryWrapper对同一字段进行两次eq() 是覆盖还是叠加查询
- 如何将webp免费转改成jpg格式?
- 深入理解:scp,rsync,sftp,xsync等命令的基本使用方法,以及cmd命令窗口下进行相关的ssh命令操作
- 阿里巴巴面试资源汇总。
- 计算机信息加工是指什么作用,什么是信息加工信息加工的方式
- 全球疫情引起 P 站访问量激增
- Mapper method ‘com.LH.mapper.EmployeeMapper.insertEmployee‘ has an unsupported return type: cla 报错
- python如何获取传感器数据_连接获取传感器数据的几大方法
- g++ : 无法将“g++”项识别为 cmdlet、函数、脚本文件或可运行程序的 名称。请检查名称的拼写,如果包括路径,请确保路径正确,然后再试一 次。
- mysql timestamp 默认_MySQL数据库TIMESTAMP怎么设置默认值 | 学步园