算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行。这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加了吞吐量。

一、禁用链

env.disableoperatorchaining

二、开启新链

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StartNewChainDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).startNewChain(); //从当前算子filter开始,开启一个新链SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}

三、断开链

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DisableChainingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//并行度就是1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).disableChaining(); //将该算子前后的链都断开SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);summed.print();env.execute();}
}

共享资源槽: Flink并不是将task合并,而是上游的task和下游的task可以共享一个槽位,所以Flink需要使用多少资源和task的数量没有关系,而是和节点的最大并行度有关系,因为有几个并行度就需要几个槽位。

上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下:

由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。

四、设置共享资源槽

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SetSharingGroupDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//并行度就是1DataStreamSource<String> lines = env.socketTextStream(args[0], 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(word);}}});SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.startsWith("error");}}).setParallelism(2).disableChaining().slotSharingGroup("doit");//从这个算子开始,后期的task的共享资源槽的名称都是doit(就近原则)SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).setParallelism(2).slotSharingGroup("default");SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1).setParallelism(2).slotSharingGroup("default");;summed.print().setParallelism(2).slotSharingGroup("default");;env.execute();}
}

大数据之flink共享资源槽相关推荐

  1. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  2. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  3. 大数据与人工智能:学习资源收藏

    大数据与人工智能学习资源收藏 Hadoop应用架构.pdf: https://itdocs.pipipan.com/fs/3843664-360663708 Hadoop数据分析.pdf: https ...

  4. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  5. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  6. 大数据入门--Flink(四)状态管理与容错机制

    状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...

  7. 要学就学最难!附1T大数据免费学习全套资源!

    大数据广泛应用于电网运行.经营管理及优质服务等各大领域,并正在改变着各行各业,也引领了大数据人才的变革.大数据就业前景怎么样?这对于在就业迷途中的我们是一个很重要的信息. 一.大数据人才需求及现状分析 ...

  8. 大数据时代:公共数据资源开放至关重要

    进入大数据时代,大数据带来的巨大价值逐渐凸显,围绕大数据的竞争也愈演愈烈,大数据时代的竞争,将是数据资产的竞争. 数据资产怎么获得?这是大数据商用化进程中一个不可回避的问题.除了企业自身的积累乃至付费 ...

  9. 大数据之Flink流式计算引擎

    Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...

最新文章

  1. 用递归法计算斐波那契数列的第n项
  2. xshell简单命令
  3. java 函数内部类_java 内部类详解 转
  4. Faster_RCNN 4.训练模型
  5. Express请求处理-静态资源的处理
  6. hash hashcode变化_没想到 Hash 冲突还能这么玩,你的服务中招了吗?
  7. 浅谈Mysql 表设计规范
  8. python画roc曲线_使用Python画ROC曲线以及AUC值
  9. SVN的trunk branch tag
  10. log4j日志输出性能优化
  11. cocos2dx3.2 画图方法小修改之 C++ final学习
  12. Gym 100796B Wet Boxes(思维)题解
  13. RadASM 颜色配置
  14. 什么是back annotation
  15. 手机号登录和微信登录
  16. MySQL--数据库基础知识点(一)
  17. 谷歌浏览器 翻译无法使用的问题
  18. verilog对信号二分频 时钟分频信号作为时钟使能信号
  19. Java核心技术卷一、二读书笔记(PDF)分享
  20. OE链面临区块链生态新机遇

热门文章

  1. queryWrapper对同一字段进行两次eq() 是覆盖还是叠加查询
  2. 如何将webp免费转改成jpg格式?
  3. 深入理解:scp,rsync,sftp,xsync等命令的基本使用方法,以及cmd命令窗口下进行相关的ssh命令操作
  4. 阿里巴巴面试资源汇总。
  5. 计算机信息加工是指什么作用,什么是信息加工信息加工的方式
  6. 全球疫情引起 P 站访问量激增
  7. Mapper method ‘com.LH.mapper.EmployeeMapper.insertEmployee‘ has an unsupported return type: cla 报错
  8. python如何获取传感器数据_连接获取传感器数据的几大方法
  9. g++ : 无法将“g++”项识别为 cmdlet、函数、脚本文件或可运行程序的 名称。请检查名称的拼写,如果包括路径,请确保路径正确,然后再试一 次。
  10. mysql timestamp 默认_MySQL数据库TIMESTAMP怎么设置默认值 | 学步园