文章目录

  • 01 引言
  • 02 累加器
    • 2.1 相关API
    • 2.2 示例代码
  • 03 广播变量
    • 3.1 原理
    • 3.2 示例代码
  • 04 分布式缓存
    • 4.1 原理
    • 4.2 示例代码
  • 05 文末

01 引言

在前面的博客,我们已经对Flink的程序模型里的Connectors使用有了一定的了解了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》
  • 《Flink教程(06)- Flink批流一体API(Source示例)》
  • 《Flink教程(07)- Flink批流一体API(Transformation示例)》
  • 《Flink教程(08)- Flink批流一体API(Sink示例)》
  • 《Flink教程(09)- Flink批流一体API(Connectors示例)》

到此,我们把Flink批流一体的API大致学完了,还剩余一些其它API没有讲,本文来讲解下。

02 累加器

2.1 相关API

Flink累加器Flink中的累加器,与Mapreduce counter的应用场景类似,可以很好地观察task在运行期间的数据变化,如在Flink job任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。

Flink有以下内置累加器,每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

编码步骤:

1.创建累加器

private IntCounter numLines = new IntCounter();

2.注册累加器

getRuntimeContext().addAccumulator("num-lines", this.numLines);

3.使用累加器

this.numLines.add(1);

4.获取累加器的结果

myJobExecutionResult.getAccumulatorResult("num-lines")

2.2 示例代码

/*** 累加器** @author : YangLinWei* @createTime: 2022/3/7 5:36 下午*/
public class Accumulator {public static void main(String[] args) throws Exception {//1.envExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.SourceDataSource<String> dataDS = env.fromElements("aaa", "bbb", "ccc", "ddd");//3.TransformationMapOperator<String, String> result = dataDS.map(new RichMapFunction<String, String>() {//-1.创建累加器private IntCounter elementCounter = new IntCounter();Integer count = 0;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//-2注册累加器getRuntimeContext().addAccumulator("elementCounter", elementCounter);}@Overridepublic String map(String value) throws Exception {//-3.使用累加器this.elementCounter.add(1);count += 1;System.out.println("不使用累加器统计的结果:" + count);return value;}}).setParallelism(2);//4.Sinkresult.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);//5.execute//-4.获取加强结果JobExecutionResult jobResult = env.execute();int nums = jobResult.getAccumulatorResult("elementCounter");System.out.println("使用累加器统计的结果:" + nums);}
}

运行结果:

03 广播变量

3.1 原理

Flink支持广播:可以将数据广播到TaskManager上就可以供TaskManager中的SubTask/task去使用,数据存储到内存中,这样可以减少大量的shuffle操作,而不需要多次传递给集群节点;

比如:在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

图解:

  • 可以理解广播就是一个公共的共享变量
  • 将一个数据集广播后,不同的Task都可以在节点上获取到
  • 每个节点只存一份
  • 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费


注意:

  • 广播变量是要把dataset广播到内存中,所以广播的数据量不能太大,否则会出现OOM
  • 广播变量的值不可修改,这样才能确保每个节点获取到的值都是一致的。

编码步骤:

  1. 广播数据:.withBroadcastSet(DataSet, "name");
  2. 获取广播的数据:Collection<> broadcastSet = getRuntimeContext().getBroadcastVariable("name");
  3. 使用广播数据

3.2 示例代码

需求:将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)
然后使用scoreDS(学号,学科,成绩)和广播数据(学号,姓名)进行关联,得到这样格式的数据:(姓名,学科,成绩)

/*** Broadcast** @author : YangLinWei* @createTime: 2022/3/7 5:43 下午*/
public class Broadcast {public static void main(String[] args) throws Exception {//1.envExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.Source//学生数据集(学号,姓名)DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(Arrays.asList(Tuple2.of(1, "张三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五")));//成绩数据集(学号,学科,成绩)DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(Arrays.asList(Tuple3.of(1, "语文", 50), Tuple3.of(2, "数学", 70), Tuple3.of(3, "英文", 86)));//3.Transformation//将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)//然后使用scoreDS(学号,学科,成绩)和广播数据(学号,姓名)进行关联,得到这样格式的数据:(姓名,学科,成绩)MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {//定义一集合用来存储(学号,姓名)Map<Integer, String> studentMap = new HashMap<>();//open方法一般用来初始化资源,每个subtask任务只被调用一次@Overridepublic void open(Configuration parameters) throws Exception {//-2.获取广播数据List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentInfo");for (Tuple2<Integer, String> tuple : studentList) {studentMap.put(tuple.f0, tuple.f1);}//studentMap = studentList.stream().collect(Collectors.toMap(t -> t.f0, t -> t.f1));}@Overridepublic Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {//-3.使用广播数据Integer stuID = value.f0;String stuName = studentMap.getOrDefault(stuID, "");//返回(姓名,学科,成绩)return Tuple3.of(stuName, value.f1, value.f2);}//-1.广播数据到各个TaskManager}).withBroadcastSet(studentDS, "studentInfo");//4.Sinkresult.print();}
}

运行结果:

04 分布式缓存

4.1 原理

Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问,这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等。

注意 :广播变量是将变量分发到各个TaskManager节点的内存上,分布式缓存是将文件缓存到各个TaskManager节点上;

编码步骤:

  1. 注册一个分布式缓存文件:`env.registerCachedFile(“hdfs:///path/file”, “cachefilename”)
  2. 访问分布式缓存文件中的数据:File myFile = getRuntimeContext().getDistributedCache().getFile("cachefilename");
  3. 使用

4.2 示例代码

需求:将scoreDS(学号, 学科, 成绩)中的数据和分布式缓存中的数据(学号,姓名)关联,得到这样格式的数据: (学生姓名,学科,成绩)

/*** DistributedCache** @author : YangLinWei* @createTime: 2022/3/7 5:49 下午*/
public class DistributedCache {public static void main(String[] args) throws Exception {//1.envExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.Source//注意:先将本地资料中的distribute_cache_student文件上传到HDFS//-1.注册分布式缓存文件//env.registerCachedFile("hdfs://node01:8020/distribute_cache_student", "studentFile");env.registerCachedFile("data/input/distribute_cache_student", "studentFile");//成绩数据集(学号,学科,成绩)DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(Arrays.asList(Tuple3.of(1, "语文", 50), Tuple3.of(2, "数学", 70), Tuple3.of(3, "英文", 86)));//3.Transformation//将scoreDS(学号, 学科, 成绩)中的数据和分布式缓存中的数据(学号,姓名)关联,得到这样格式的数据: (学生姓名,学科,成绩)MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {//定义一集合用来存储(学号,姓名)Map<Integer, String> studentMap = new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {//-2.加载分布式缓存文件File file = getRuntimeContext().getDistributedCache().getFile("studentFile");List<String> studentList = FileUtils.readLines(file);for (String str : studentList) {String[] arr = str.split(",");studentMap.put(Integer.parseInt(arr[0]), arr[1]);}}@Overridepublic Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {//-3.使用分布式缓存文件中的数据Integer stuID = value.f0;String stuName = studentMap.getOrDefault(stuID, "");//返回(姓名,学科,成绩)return Tuple3.of(stuName, value.f1, value.f2);}});//4.Sinkresult.print();}
}

05 文末

本文主要讲解了Flink批流一体的其它API,即累加器、广播和分布式缓存,谢谢大家的阅读,本文完!

Flink教程(10)- Flink批流一体API(其它)相关推荐

  1. Flink教程(06)- Flink批流一体API(Source示例)

    文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...

  2. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  3. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  4. Flink 和 Pulsar 的批流融合

    简介:如何通过 Apache Pulsar 原生的存储计算分离的架构提供批流融合的基础,以及 Apache Pulsar 如何与 Flink 结合,实现批流一体的计算. 简介:StreamNative ...

  5. Flink 1.11 与 Hive 批流一体数仓实践

    导读:Flink 从 1.9.0 开始提供与 Hive 集成的功能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的功能进一步深化,并且开始尝试将流计算场景与Hive 进行 ...

  6. [数据湖] 基于flink hudi的批流一体实践

    1.业务背景介绍 广告主和代理商通过广告投放平台来进行广告投放,由多个媒介进行广告展示 ,从而触达到潜在用户.整个过程中会产生各种各样的数据,比如展现数据.点击数据.其中非常重要的数据是计费数据,以计 ...

  7. hive表ddl导出_Flink 1.11 与 Hive 批流一体数仓实践

    简介:Flink 从 1.9.0 开始提供与 Hive 集成的功能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的功能进一步深化,并且开始尝试将流计算场景与Hive 进行 ...

  8. 开源 | 全球首个批流一体机器学习平台 Alink

    背景 随着大数据时代的到来和人工智能的崛起,机器学习所能处理的场景更加广泛和多样.构建的模型需要对批量数据进行处理,为了达到实时性的要求还需要直接对流式数据进行实时预测,还要具备将模型应用在企业应用和 ...

  9. 袋鼠云批流一体分布式同步引擎ChunJun(原FlinkX)的前世今生

      一.前言 ChunJun(原FlinkX)是一个基于Flink提供易用.稳定.高效的批流统一的数据集成工具,是袋鼠云一站式大数据开发平台-数栈DTinsight的核心计算引擎,其技术架构基于实时计 ...

最新文章

  1. BBI:Eran Elinav组综述在微生物组研究中使用宏转录组
  2. 同一Linux下起两台Mysql Server
  3. 论文阅读:Uncertainty-aware Joint Salient Object and Camouflaged Object Detection
  4. mysql 记录操作_MySQL 记录操作
  5. MySQL数据库事务实例(模拟银行转账)
  6. arduino控制步进电机和舵机
  7. Linux驱动里的wmb函数
  8. Ajax 异步显示订单详情总结
  9. 深度linux iso镜像,深度 Deepin 15 正式版 ISO 镜像下载 - 精美易用适合国人学习的国产 Linux 发行版......
  10. 工厂模式及在项目中的应用
  11. retina屏 适配问题
  12. 郑职院官计算机网络,2020年陕西省青年职业技能大赛计算机网络管理员决赛开幕式在汉中职院举行...
  13. 计算机控制字如何使用,字由怎么用?一款软件管理1594种字体 很多设计大神都在用...
  14. JavaScript数组内置方法-知识
  15. android obb在哪,obb是什么文件 obb文件怎么用
  16. h264基础知识梳理
  17. 不得不说,这是青铜才需要了解的,绝地求生刺激战场段位保护解析
  18. flask之人工智能
  19. 如何开展数据安全风险评估(参考)
  20. SpringBoot 实现邮件发送功能

热门文章

  1. iHRM 人力资源管理系统_第8章POI报表高级应用
  2. 性能工具之Taurus
  3. 员工福利待遇包括哪些方面
  4. 计算机路由器工作原理,路由器工作原理
  5. 怎么在桌面上显示计算机和控制面板图标,W7系统怎么在桌面上显示控制面板图标...
  6. 使用鼠标从Matplotlib显示的图像中取点,画框
  7. geoip是什么 linux_GeoIP安装使用
  8. heidiSQL怎么连接JAVA_Heidisql连接sql server后如何使用-Heidisql使用教程 - 河东软件园...
  9. 吾爱破解crackme 065-070
  10. 曲线拟合问题与L2正则