Flink之DataSet迭代计算
目录
- (1)迭代计算分类与原理
- (2)全量迭代计算详解
- (2.1)案例分析
- (2.2)案例实战
- (3)增量迭代计算详解
(1)迭代计算分类与原理
迭代计算在批量数据处理过程中的应用非常广泛,如常用的机器学习算法Kmeans、逻辑回归,以及图形计算等,都会用到迭代计算。DataSet API对迭代计算功能的支持相对比较完善,在性能上比较其他分布式计算框架也具有非常高的优势。目前Flink中的迭代计算种类有两种模式,分别是Bulk Iteration (全量迭代计算)和 Delt Iteration(增量迭代计算)
什么是迭代运算?
所谓迭代运算,就是给定一个初值,用所给的算法公式计算初值得到一个中间结果,然后将中间结果作为输入参数进行反复计算,在满足一定条件的时候得到计算结果。
(2)全量迭代计算详解
Bulk Iteration
这种迭代方式称为全量迭代,它会将整个数据输入,经过一定的迭代次数
全量迭代计算,一共有几个步骤:
- 首先初始化数据,可以通过从DataSource算子中获取,也可以从其他转化Operators中接入
- 其次定义Step Function,并在每一步迭代过程使用Step Function,结合数据集以及上一次迭代计算的Solution数据集,进行本次迭代计算。
- 每一次迭代过程中Step Function输出的结果,被称为Next Partital Solution数据集,该结果会作为下一次迭代计算的输入数据集。
- 最后一次迭代计算的结果输出,可以通过DataSink输出,或者接入到下一个0perators中。
迭代终止的条件有两种,分别为达到最大迭代次数或者符合自定义聚合器收敛条件:
- 最大迭代次数:指定迭代的最大次数,当计算次数超过该设定值是,终止迭代
- 自定义收敛条件:用户自定义的聚合器和收敛条件,例如终止条件设定为当Sum统计结果小于零则终止,否则继续迭代。
(2.1)案例分析
(2.2)案例实战
全量迭代计算通过使用DataSet的iterate()方法调用:
package com.aikfk.flink.dataset.iteration;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;/*** @author :caizhengjie* @description:TODO* @date :2021/3/9 5:59 下午*/
public class PiIterator {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 创建初始IterativeDataSetIterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer integer) throws Exception {double x = Math.random();double y = Math.random();return integer + ((x * x + y * y <= 1 ) ? 1 : 0);}});// 计算出点的距离小于一的个数DataSet<Integer> count = initial.closeWith(iteration);// 求出PIDataSet<Double> result = count.map(new MapFunction<Integer, Double>() {@Overridepublic Double map(Integer count) throws Exception {return count / (double) 10000 * 4;}});result.print();/*** 3.146*/}
}
(3)增量迭代计算详解
增量迭代是通过部分计算取代全量计算,在计算过程中会将数据集分为热点数据和非热点数据集,每次迭代计算会针对热点数据展开,这种模式适合用于数据量比较大的计算场景,不需要对全部的数据集进行计算,所以在性能和速度上都会有很大的提升。
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
Flink之DataSet迭代计算相关推荐
- Flink入门——DataSet Api编程指南
简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...
- 【08】Flink 之 DataSet API(二):Transformation 操作
1.DataSet Transformation 部分详解 Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作 FlatMap:输入一个元素,可以返回零个,一个或者多个元素 MapP ...
- flink的dataset/stream/sql三套API的选择以及是否应该阅读源码
我常常在钉钉群群里面请教,群里也有阿里P7/P8的专家. 但是每当我请教dataset/datastream相关问题的时候,即使是专家也没有响应. 钉钉群里面P7的是云邪,擅长使用的也是flink s ...
- Flink实战(五) Batch(DataSet) 运算的相关概念
文章目录 1. 示例程序 2. 转换函数 3. 数据源 配置CSV解析规则 递归读取输入路径目录 读取压缩文件 4. 数据输出 (Sink) 5. 迭代操作 Bulk Iterations(**批量迭 ...
- flink dataset api使用及原理
随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...
- Flink 1.7.2 dataset transformation 示例
Flink 1.7.2 dataset transformation 示例 源码 https://github.com/opensourceteams/flink-maven-scala 概述 Fli ...
- 五万字 | Flink知识体系保姆级总结
本文目录: 一.Flink简介 二.Flink 部署及启动 三.Flink 运行架构 四.Flink 算子大全 五.流处理中的 Time 与 Window 六.Flink 状态管理 七.Flink 容 ...
- flink与spark的区别----阅读笔记1
Flink简介 spark基本架构 flink基本架构 Spark提出的最主要抽象概念是弹性分布式数据集(RDD) flink支持增量迭代计算.基于流执行引擎,Flink提供了诸多更高抽象层的API以 ...
- Apache 流框架 Flink,Spark Streaming,Storm
1.Flink架构及特性分析 Flink是个相当早的项目,开始于2008年,但只在最近才得到注意.Flink是原生的流处理系统,提供high level的API.Flink也提供 API来像Spark ...
最新文章
- 播放失败246106异常代码_web前端面试题:您能读懂的Promise源码实现(手写代码)...
- RocketMQ各种集群模式介绍
- ONENET读取与控制麒麟座MINI开发板LED状态
- 判断程序是否已经运行
- Net4.0---对HTML净化的处理
- 面试官问我:如何减少客户对交付成果的质疑
- ReactNative入门之android与rn初始化参数的传递
- String.GetEnumerator 方法的C#例子
- linux maven安装与入门
- [论文总结] 智慧农业论文摘要阅读概览
- 清华大学计算机杜瑜皓,立足改革、多元择优为清华选拔有理想有才能的优秀学子-清华大学新闻网...
- Photoshop CS6安装教程
- ssms远程服务器地址,SSMS无法连接到远程服务器,崩溃
- 读《富爸爸,穷爸爸》后感(三)
- wps表格怎么按特定的顺序对数据进行排序
- 计算机网络量化噪音是怎么消除的,数字图像噪声消除算法研究(可编辑).doc
- python获取路由器数据包pppoe_利用PPPOE获取路由器中宽带账号密码
- 浅谈《数学之美》①——自然语言处理
- 什么是垃圾回收机制(GC)
- 怎么修改文件创建时间?