Flink之DataSet转换操作(二)
目录
- (8)Aggregate详解
- (9)Join详解
- (10)Union详解
(8)Aggregate详解
通过Aggregate Function将一组元素值合并成单个值,可以在整个DataSet数据集上使用。
Java代码实现:
package com.aikfk.flink.dataset.transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** @author :caizhengjie* @description:TODO* @date :2021/3/7 8:26 下午*/
public class AggregateJava {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> dateSource = env.fromElements("java java spark hive","hive java java spark","java java hadoop");/*** map*/DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {@Overridepublic String map(String line) throws Exception {return line.toUpperCase();}});/*** flatmap*/DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : s.split(" ")){collector.collect(new Tuple2<>(word,1));}}});/*** aggregate*/DataSet<Tuple2<String,Integer>> aggregateSource = flatmapSource.groupBy(0).aggregate(Aggregations.SUM,1);aggregateSource.print();/*** (HIVE,2)* (HADOOP,1)* (JAVA,6)* (SPARK,2)*/}
}
(9)Join详解
Join的几种方式:
根据指定的条件关联两个数据集,然后根据选择的字段形成一个数据集。关联的key可以通过Key表达式、Key-selector函数、字段位置以及Case Class字段指定。
- 对于两个Tuple类型的数据集可以通过字段位置进行关联,左边数据集的字段通过where方法指定,右边数据集的字段通过equalTo()方式指定
dataSet_1.join(dataSet_2).where(0).equalTo(0)
where(左边的关联的字段).equal(右边关联的字段)
- 关联过程中指定自定义Join Funtion
dataSet_1.join(dataSet_2).where(0).equalTo(0){(left,right) => (left.id , right.name)
}
- 通过JoinWithTiny或者JoinWithHuge标识数据集的大小
dataSet_1.joinWithTiny(dataSet_2).where(0).equalTo(0)提示F1ink第二个数据集是小数据集
dataSet_1.joinWithHuge(dataSet_2).where(0).equalTo(0)提示Flink第二个数据集是大数据集
Join的优化:
Flink提供了Join算法提示,可以让F1ink更加灵活高效地执行Join操作
- 将第一个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0)
- 将第二个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0)
- 和不设定Hint相同,将优化工作交给系统自动处理
dataSet_1.join(dataSet_2, JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)
- 将两个数据集重新分区,并将第一个数据集转换成HashTable存储,该策略适用于第一个数据集比第二个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_FIRST).where(0).equalTo(0)
- 将两个数据集重新分区,并将第二个数据集转换成HashTable存储,该策略适用于第二个数据集比第一个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_SECOND).where(0).equalTo(0)
- 将两个数据集重新分区,并将每个分区排序,该策略适用于两个数据集已经排好序的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_SORT_MEGER).where(0).equalTo(0)
join代码实现
数据源:
dept.csv
100,技术部
200,市场部
300,营销部
400,采购部
employee.csv
100,alex,15000
100,jack,34000
200,tony,5000
200,jone,6700
300,cherry,12000
100,lili,8000
400,lucy,7800
Java代码实现:
POJO类
package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 3:23 下午*/
public class EmployeePOJO {public String deptId;public String name;public int salary;public EmployeePOJO() {}public EmployeePOJO(String deptId, String name, int salary) {this.deptId = deptId;this.name = name;this.salary = salary;}@Overridepublic String toString() {return "EmployeePOJO{" +"deptId='" + deptId + '\'' +", name='" + name + '\'' +", salary=" + salary +'}';}
}
package com.aikfk.flink.dataset.transform;import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 3:13 下午*/
public class JoinJava {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取csv数据(方式二:映射成Tuple类,带有两个个字段)DataSet<Tuple2<String,String>> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv").types(String.class,String.class);deptSource.print();/*** (200,市场部)* (100,技术部)* (400,采购部)* (300,营销部)*/// 读取csv数据(方式一:映射POJO类对象)DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv").pojoType(EmployeePOJO.class,"deptId","name","salary");employeeSource.print();/*** EmployeePOJO{deptId='100', name='alex', salary=15000}* EmployeePOJO{deptId='200', name='jone', salary=6700}* EmployeePOJO{deptId='100', name='lili', salary=8000}* EmployeePOJO{deptId='400', name='lucy', salary=7800}* EmployeePOJO{deptId='300', name='cherry', salary=12000}* EmployeePOJO{deptId='200', name='tony', salary=5000}* EmployeePOJO{deptId='100', name='jack', salary=34000}*//*** join() -> map()*/DataSet<Tuple3<String,String,String>> joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId").map(new MapFunction<Tuple2<Tuple2<String, String>, EmployeePOJO>, Tuple3<String, String, String>>() {@Overridepublic Tuple3<String, String, String> map(Tuple2<Tuple2<String, String>, EmployeePOJO> tuple2) throws Exception {return new Tuple3<>(tuple2.f0.f0,tuple2.f0.f1,tuple2.f1.name);}});joinResult.print();/*** (100,技术部,jack)* (100,技术部,alex)* (400,采购部,lucy)* (100,技术部,lili)* (300,营销部,cherry)* (200,市场部,jone)* (200,市场部,tony)*/}
}
Scala代码实现:
package com.aikfk.flink.dataset.transformimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object JoinScala {case class employee(deptId:String,name:String,salary:Int)def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment;// 读取csv数据(方式二:映射成Tuple类,带有两个个字段)val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")// 读取csv数据(方式一:映射POJO类对象)val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")// join -> mapval joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId").map(tuple2 => (tuple2._1._1,tuple2._1._2,tuple2._2.name))joinResult.print()/*** (400,采购部,lucy)* (100,技术部,jack)* (100,技术部,alex)* (100,技术部,lili)* (300,营销部,cherry)* (200,市场部,tony)* (200,市场部,jone)*/}
}
(10)Union详解
合并两个DataSet数据集,两个数据集的数据元素格式必须相同,多个数据集可以连接合并。
dataSet_1.union(dataSet_2)
Union代码实现:
数据源:
employee.csv
100,alex,15000
100,jack,34000
200,tony,5000
200,jone,6700
300,cherry,12000
100,lili,8000
400,lucy,7800
employee2.csv
100,zhang,1400
100,li,3500
200,liu,6000
200,cai,6800
300,wang,13000
100,cao,8900
400,peng,7800
POJO类:
package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 3:23 下午*/
public class EmployeePOJO {public String deptId;public String name;public int salary;public EmployeePOJO() {}public EmployeePOJO(String deptId, String name, int salary) {this.deptId = deptId;this.name = name;this.salary = salary;}@Overridepublic String toString() {return "EmployeePOJO{" +"deptId='" + deptId + '\'' +", name='" + name + '\'' +", salary=" + salary +'}';}
}
package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 7:40 下午*/
public class DeptSalaryPOJO {public String deptId;public int salary;public DeptSalaryPOJO() {}public DeptSalaryPOJO(String deptId, int salary) {this.deptId = deptId;this.salary = salary;}@Overridepublic String toString() {return "DeptSalaryPOJO{" +"deptId='" + deptId + '\'' +", salary=" + salary +'}';}
}
package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 7:57 下午*/
public class DeptPOJO {public String deptId;public String name;public DeptPOJO() {}public DeptPOJO(String deptId, String name) {this.deptId = deptId;this.name = name;}@Overridepublic String toString() {return "DeptPOJO{" +"deptId='" + deptId + '\'' +", name='" + name + '\'' +'}';}
}
Java代码实现:
package com.aikfk.flink.dataset.transform;import com.aikfk.flink.base.DeptPOJO;
import com.aikfk.flink.base.DeptSalaryPOJO;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 7:36 下午*/
public class UnionJava {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取csv数据(方式一:映射POJO类对象)DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv").pojoType(EmployeePOJO.class,"deptId","name","salary");// 读取csv数据(方式一:映射POJO类对象)DataSet<EmployeePOJO> employee2Source = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv").pojoType(EmployeePOJO.class,"deptId","name","salary");// 读取csv数据(方式一:映射POJO类对象)DataSet<DeptPOJO> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv").pojoType(DeptPOJO.class,"deptId","name");/*** union() -> map() -> groupBy() -> reduce()*/DataSet<DeptSalaryPOJO> unionResult = employeeSource.union(employee2Source).map(new MapFunction<EmployeePOJO, DeptSalaryPOJO>() {@Overridepublic DeptSalaryPOJO map(EmployeePOJO employeePOJO) throws Exception {return new DeptSalaryPOJO(employeePOJO.deptId,employeePOJO.salary);}}).groupBy("deptId").reduce(new ReduceFunction<DeptSalaryPOJO>() {@Overridepublic DeptSalaryPOJO reduce(DeptSalaryPOJO t1, DeptSalaryPOJO t2) throws Exception {return new DeptSalaryPOJO(t1.deptId,t1.salary + t2.salary);}});unionResult.print();/*** DeptSalaryPOJO{deptId='100', salary=70800}* DeptSalaryPOJO{deptId='400', salary=15600}* DeptSalaryPOJO{deptId='300', salary=25000}* DeptSalaryPOJO{deptId='200', salary=24500}*//*** join() -> map()*/DataSet<Tuple3<String, String, Integer>> joinResult = unionResult.join(deptSource).where("deptId").equalTo("deptId").map(new MapFunction<Tuple2<DeptSalaryPOJO, DeptPOJO>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<DeptSalaryPOJO, DeptPOJO> tuple2) throws Exception {return new Tuple3<>(tuple2.f0.deptId,tuple2.f1.name,tuple2.f0.salary);}});joinResult.print();/*** (100,技术部,70800)* (400,采购部,15600)* (300,营销部,25000)* (200,市场部,24500)*/}
}
Scala代码实现:
package com.aikfk.flink.dataset.transformimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object UnionScala {case class employee(deptId:String,name:String,salary:Int)def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment;// 读取csv数据(方式一:映射POJO类对象)val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")// 读取csv数据(方式一:映射POJO类对象)val employeeSource2 = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv")// 读取csv数据(方式二:映射成Tuple类,带有两个个字段)val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")/*** union() -> map() -> groupBy() -> reduce()*/val unionResult = employeeSource.union(employeeSource2).map(emp => (emp.deptId,emp.salary)).groupBy(0).reduce((x ,y) => (x._1,x._2 + y._2))unionResult.print()/*** (100,70800)* (400,15600)* (300,25000)* (200,24500)*//*** join() -> map()*/val joinResult = unionResult.join(deptSource).where(0).equalTo(0).map(tuple => (tuple._1._1,tuple._2._2,tuple._1._2) )joinResult.print()/*** (400,采购部,15600)* (100,技术部,70800)* (300,营销部,25000)* (200,市场部,24500)*/}
}
有关其他DataSet的transform算子可以见官网:
- https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/dataset/transformations/#distinct
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
Flink之DataSet转换操作(二)相关推荐
- 【08】Flink 之 DataSet API(二):Transformation 操作
1.DataSet Transformation 部分详解 Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作 FlatMap:输入一个元素,可以返回零个,一个或者多个元素 MapP ...
- Flink实操 : 算子操作
. 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...
- Flink入门——DataSet Api编程指南
简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...
- Flink 多流转换
概述 无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的.而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多 ...
- Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 ...
- 《从0到1学习Flink》—— Flink Data transformation(转换)
前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...
- flink 自定义 窗口_《从0到1学习Flink》—— Flink Data transformation(转换)
前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...
- 深度学习(6)TensorFlow基础操作二: 创建Tensor
深度学习(6)TensorFlow基础操作二: 创建Tensor 一. 创建方式 1. From Numpy,List 2. zeros,ones (1) tf.zeros() (2) tf.zero ...
- GIS基础软件及操作(二)
原文 GIS基础软件及操作(二) 练习二.管理地理空间数据库 1.利用ArcCatalog 管理地理空间数据库 2.在ArcMap中编辑属性数据 第1步 启动 ArcCatalog 打开一个地理数据库 ...
- 【Flink】Flink 流处理 Sum操作 Table is not an append-only table. Use the toRetractStream() in order to hand
1.背景 flink Flink 流处理 Sum操作 报错 @Testdef sumTest(): Unit = {val env = StreamExecutionEnvironment.getEx ...
最新文章
- 不想被问年终奖?2018年春节自救攻略来了!
- 手机连接服务器数据库文件,手机连接服务器数据库文件夹
- java小应用_java小应用
- scala初学之函数定义、流程控制、异常处理入门
- c语言甘勇第二版第五章答案,C语言(1) - Patata的个人页面 - OSCHINA - 中文开源技术交流社区...
- ffmpeg合并音频(转)
- cpu(s)和%CPU的的区别
- JS 阻止浮层弹窗下滚动
- 微信小号来了!同一个手机号可注册两个微信号
- android 一种键盘不能调起的解决方法
- 黑盒与白盒到底是什么?
- MySQL基本架构示意图
- 社交 APP 唔哩星球完成数百万元天使轮融资,投资方为启明创投
- IPv6地址基础理论讲解
- android手机为什么卡?
- 的路由放在本地_大佬私藏玩法,NAS还能当路由
- 科研伦理与学术规范课后答案
- 服务器查看文件口令,查看服务器序列号命令
- 【计算机网络】网线规格的鉴别与接线方法
- C语言编程集合A和B的交集,求两个递增链表A和B的交集,并将结果放在链表A中
热门文章
- 得洲奥斯汀研究生计算机专业排名,德克萨斯大学奥斯汀分校世界排名及专业排名汇总(QS世界大学排名版)...
- 华为超级技术大牛的十年经验总结
- leader epoch
- page loading strategy
- MSDN visual studio 2010简体中文旗舰版,专业版下载(内置正版密钥)
- jenkins 使用报ERROR: svn: E155010: The node ‘/aaa/bbb/ccc/ddd‘ was not found.
- windows调节屏幕文字清晰度、锐度,屏幕字体模糊怎么办,屏幕字体不清晰
- 如何更改计算机屏幕分辨率,换了显示器分辨率不对怎么办_换了电脑显示器不清晰怎么解决-win7之家...
- 读论文《Toward Controlled Generation of Text》
- transforms常用函数简介