目录

  • (8)Aggregate详解
  • (9)Join详解
  • (10)Union详解

(8)Aggregate详解

通过Aggregate Function将一组元素值合并成单个值,可以在整个DataSet数据集上使用。
Java代码实现:

package com.aikfk.flink.dataset.transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** @author :caizhengjie* @description:TODO* @date :2021/3/7 8:26 下午*/
public class AggregateJava {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> dateSource = env.fromElements("java java spark hive","hive java java spark","java java hadoop");/*** map*/DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {@Overridepublic String map(String line) throws Exception {return line.toUpperCase();}});/*** flatmap*/DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : s.split(" ")){collector.collect(new Tuple2<>(word,1));}}});/*** aggregate*/DataSet<Tuple2<String,Integer>> aggregateSource =  flatmapSource.groupBy(0).aggregate(Aggregations.SUM,1);aggregateSource.print();/*** (HIVE,2)* (HADOOP,1)* (JAVA,6)* (SPARK,2)*/}
}

(9)Join详解

Join的几种方式:

根据指定的条件关联两个数据集,然后根据选择的字段形成一个数据集。关联的key可以通过Key表达式、Key-selector函数、字段位置以及Case Class字段指定。

  1. 对于两个Tuple类型的数据集可以通过字段位置进行关联,左边数据集的字段通过where方法指定,右边数据集的字段通过equalTo()方式指定
dataSet_1.join(dataSet_2).where(0).equalTo(0)
where(左边的关联的字段).equal(右边关联的字段)
  1. 关联过程中指定自定义Join Funtion
dataSet_1.join(dataSet_2).where(0).equalTo(0){(left,right) => (left.id , right.name)
}
  1. 通过JoinWithTiny或者JoinWithHuge标识数据集的大小
dataSet_1.joinWithTiny(dataSet_2).where(0).equalTo(0)提示F1ink第二个数据集是小数据集
dataSet_1.joinWithHuge(dataSet_2).where(0).equalTo(0)提示Flink第二个数据集是大数据集

Join的优化:
Flink提供了Join算法提示,可以让F1ink更加灵活高效地执行Join操作

  1. 将第一个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0)
  1. 将第二个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0)
  1. 和不设定Hint相同,将优化工作交给系统自动处理
dataSet_1.join(dataSet_2, JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)
  1. 将两个数据集重新分区,并将第一个数据集转换成HashTable存储,该策略适用于第一个数据集比第二个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_FIRST).where(0).equalTo(0)
  1. 将两个数据集重新分区,并将第二个数据集转换成HashTable存储,该策略适用于第二个数据集比第一个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_SECOND).where(0).equalTo(0)
  1. 将两个数据集重新分区,并将每个分区排序,该策略适用于两个数据集已经排好序的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_SORT_MEGER).where(0).equalTo(0)

join代码实现

数据源:
dept.csv

100,技术部
200,市场部
300,营销部
400,采购部

employee.csv

100,alex,15000
100,jack,34000
200,tony,5000
200,jone,6700
300,cherry,12000
100,lili,8000
400,lucy,7800

Java代码实现:
POJO类

package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 3:23 下午*/
public class EmployeePOJO {public String deptId;public String name;public int salary;public EmployeePOJO() {}public EmployeePOJO(String deptId, String name, int salary) {this.deptId = deptId;this.name = name;this.salary = salary;}@Overridepublic String toString() {return "EmployeePOJO{" +"deptId='" + deptId + '\'' +", name='" + name + '\'' +", salary=" + salary +'}';}
}
package com.aikfk.flink.dataset.transform;import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 3:13 下午*/
public class JoinJava {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取csv数据(方式二:映射成Tuple类,带有两个个字段)DataSet<Tuple2<String,String>> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv").types(String.class,String.class);deptSource.print();/*** (200,市场部)* (100,技术部)* (400,采购部)* (300,营销部)*/// 读取csv数据(方式一:映射POJO类对象)DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv").pojoType(EmployeePOJO.class,"deptId","name","salary");employeeSource.print();/*** EmployeePOJO{deptId='100', name='alex', salary=15000}* EmployeePOJO{deptId='200', name='jone', salary=6700}* EmployeePOJO{deptId='100', name='lili', salary=8000}* EmployeePOJO{deptId='400', name='lucy', salary=7800}* EmployeePOJO{deptId='300', name='cherry', salary=12000}* EmployeePOJO{deptId='200', name='tony', salary=5000}* EmployeePOJO{deptId='100', name='jack', salary=34000}*//*** join() -> map()*/DataSet<Tuple3<String,String,String>> joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId").map(new MapFunction<Tuple2<Tuple2<String, String>, EmployeePOJO>, Tuple3<String, String, String>>() {@Overridepublic Tuple3<String, String, String> map(Tuple2<Tuple2<String, String>, EmployeePOJO> tuple2) throws Exception {return new Tuple3<>(tuple2.f0.f0,tuple2.f0.f1,tuple2.f1.name);}});joinResult.print();/*** (100,技术部,jack)* (100,技术部,alex)* (400,采购部,lucy)* (100,技术部,lili)* (300,营销部,cherry)* (200,市场部,jone)* (200,市场部,tony)*/}
}

Scala代码实现:

package com.aikfk.flink.dataset.transformimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object JoinScala {case class employee(deptId:String,name:String,salary:Int)def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment;// 读取csv数据(方式二:映射成Tuple类,带有两个个字段)val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")// 读取csv数据(方式一:映射POJO类对象)val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")// join -> mapval joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId").map(tuple2 => (tuple2._1._1,tuple2._1._2,tuple2._2.name))joinResult.print()/*** (400,采购部,lucy)* (100,技术部,jack)* (100,技术部,alex)* (100,技术部,lili)* (300,营销部,cherry)* (200,市场部,tony)* (200,市场部,jone)*/}
}

(10)Union详解

合并两个DataSet数据集,两个数据集的数据元素格式必须相同,多个数据集可以连接合并。

dataSet_1.union(dataSet_2)

Union代码实现:

数据源:
employee.csv

100,alex,15000
100,jack,34000
200,tony,5000
200,jone,6700
300,cherry,12000
100,lili,8000
400,lucy,7800

employee2.csv

100,zhang,1400
100,li,3500
200,liu,6000
200,cai,6800
300,wang,13000
100,cao,8900
400,peng,7800

POJO类:

package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 3:23 下午*/
public class EmployeePOJO {public String deptId;public String name;public int salary;public EmployeePOJO() {}public EmployeePOJO(String deptId, String name, int salary) {this.deptId = deptId;this.name = name;this.salary = salary;}@Overridepublic String toString() {return "EmployeePOJO{" +"deptId='" + deptId + '\'' +", name='" + name + '\'' +", salary=" + salary +'}';}
}
package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 7:40 下午*/
public class DeptSalaryPOJO {public String deptId;public int salary;public DeptSalaryPOJO() {}public DeptSalaryPOJO(String deptId, int salary) {this.deptId = deptId;this.salary = salary;}@Overridepublic String toString() {return "DeptSalaryPOJO{" +"deptId='" + deptId + '\'' +", salary=" + salary +'}';}
}
package com.aikfk.flink.base;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 7:57 下午*/
public class DeptPOJO {public String deptId;public String name;public DeptPOJO() {}public DeptPOJO(String deptId, String name) {this.deptId = deptId;this.name = name;}@Overridepublic String toString() {return "DeptPOJO{" +"deptId='" + deptId + '\'' +", name='" + name + '\'' +'}';}
}

Java代码实现:

package com.aikfk.flink.dataset.transform;import com.aikfk.flink.base.DeptPOJO;
import com.aikfk.flink.base.DeptSalaryPOJO;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;/*** @author :caizhengjie* @description:TODO* @date :2021/3/8 7:36 下午*/
public class UnionJava {public static void main(String[] args) throws Exception {// 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取csv数据(方式一:映射POJO类对象)DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv").pojoType(EmployeePOJO.class,"deptId","name","salary");// 读取csv数据(方式一:映射POJO类对象)DataSet<EmployeePOJO> employee2Source = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv").pojoType(EmployeePOJO.class,"deptId","name","salary");// 读取csv数据(方式一:映射POJO类对象)DataSet<DeptPOJO> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv").pojoType(DeptPOJO.class,"deptId","name");/*** union() -> map() -> groupBy() -> reduce()*/DataSet<DeptSalaryPOJO> unionResult = employeeSource.union(employee2Source).map(new MapFunction<EmployeePOJO, DeptSalaryPOJO>() {@Overridepublic DeptSalaryPOJO map(EmployeePOJO employeePOJO) throws Exception {return new DeptSalaryPOJO(employeePOJO.deptId,employeePOJO.salary);}}).groupBy("deptId").reduce(new ReduceFunction<DeptSalaryPOJO>() {@Overridepublic DeptSalaryPOJO reduce(DeptSalaryPOJO t1, DeptSalaryPOJO t2) throws Exception {return new DeptSalaryPOJO(t1.deptId,t1.salary + t2.salary);}});unionResult.print();/*** DeptSalaryPOJO{deptId='100', salary=70800}* DeptSalaryPOJO{deptId='400', salary=15600}* DeptSalaryPOJO{deptId='300', salary=25000}* DeptSalaryPOJO{deptId='200', salary=24500}*//*** join() -> map()*/DataSet<Tuple3<String, String, Integer>> joinResult = unionResult.join(deptSource).where("deptId").equalTo("deptId").map(new MapFunction<Tuple2<DeptSalaryPOJO, DeptPOJO>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<DeptSalaryPOJO, DeptPOJO> tuple2) throws Exception {return new Tuple3<>(tuple2.f0.deptId,tuple2.f1.name,tuple2.f0.salary);}});joinResult.print();/*** (100,技术部,70800)* (400,采购部,15600)* (300,营销部,25000)* (200,市场部,24500)*/}
}

Scala代码实现:

package com.aikfk.flink.dataset.transformimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object UnionScala {case class employee(deptId:String,name:String,salary:Int)def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment;// 读取csv数据(方式一:映射POJO类对象)val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")// 读取csv数据(方式一:映射POJO类对象)val employeeSource2 = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv")// 读取csv数据(方式二:映射成Tuple类,带有两个个字段)val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")/*** union() -> map() -> groupBy() -> reduce()*/val unionResult = employeeSource.union(employeeSource2).map(emp => (emp.deptId,emp.salary)).groupBy(0).reduce((x ,y) => (x._1,x._2 + y._2))unionResult.print()/*** (100,70800)* (400,15600)* (300,25000)* (200,24500)*//*** join() -> map()*/val joinResult = unionResult.join(deptSource).where(0).equalTo(0).map(tuple => (tuple._1._1,tuple._2._2,tuple._1._2) )joinResult.print()/*** (400,采购部,15600)* (100,技术部,70800)* (300,营销部,25000)* (200,市场部,24500)*/}
}

有关其他DataSet的transform算子可以见官网:

  • https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/dataset/transformations/#distinct

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

Flink之DataSet转换操作(二)相关推荐

  1. 【08】Flink 之 DataSet API(二):Transformation 操作

    1.DataSet Transformation 部分详解 Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作 FlatMap:输入一个元素,可以返回零个,一个或者多个元素 MapP ...

  2. Flink实操 : 算子操作

    . 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...

  3. Flink入门——DataSet Api编程指南

    简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...

  4. Flink 多流转换

    概述   无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的.而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多 ...

  5. Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

    前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 ...

  6. 《从0到1学习Flink》—— Flink Data transformation(转换)

    前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...

  7. flink 自定义 窗口_《从0到1学习Flink》—— Flink Data transformation(转换)

    前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...

  8. 深度学习(6)TensorFlow基础操作二: 创建Tensor

    深度学习(6)TensorFlow基础操作二: 创建Tensor 一. 创建方式 1. From Numpy,List 2. zeros,ones (1) tf.zeros() (2) tf.zero ...

  9. GIS基础软件及操作(二)

    原文 GIS基础软件及操作(二) 练习二.管理地理空间数据库 1.利用ArcCatalog 管理地理空间数据库 2.在ArcMap中编辑属性数据 第1步 启动 ArcCatalog 打开一个地理数据库 ...

  10. 【Flink】Flink 流处理 Sum操作 Table is not an append-only table. Use the toRetractStream() in order to hand

    1.背景 flink Flink 流处理 Sum操作 报错 @Testdef sumTest(): Unit = {val env = StreamExecutionEnvironment.getEx ...

最新文章

  1. 不想被问年终奖?2018年春节自救攻略来了!
  2. 手机连接服务器数据库文件,手机连接服务器数据库文件夹
  3. java小应用_java小应用
  4. scala初学之函数定义、流程控制、异常处理入门
  5. c语言甘勇第二版第五章答案,C语言(1) - Patata的个人页面 - OSCHINA - 中文开源技术交流社区...
  6. ffmpeg合并音频(转)
  7. cpu(s)和%CPU的的区别
  8. JS 阻止浮层弹窗下滚动
  9. 微信小号来了!同一个手机号可注册两个微信号
  10. android 一种键盘不能调起的解决方法
  11. 黑盒与白盒到底是什么?
  12. MySQL基本架构示意图
  13. 社交 APP 唔哩星球完成数百万元天使轮融资,投资方为启明创投
  14. IPv6地址基础理论讲解
  15. android手机为什么卡?
  16. 的路由放在本地_大佬私藏玩法,NAS还能当路由
  17. 科研伦理与学术规范课后答案
  18. 服务器查看文件口令,查看服务器序列号命令
  19. 【计算机网络】网线规格的鉴别与接线方法
  20. C语言编程集合A和B的交集,求两个递增链表A和B的交集,并将结果放在链表A中

热门文章

  1. 得洲奥斯汀研究生计算机专业排名,德克萨斯大学奥斯汀分校世界排名及专业排名汇总(QS世界大学排名版)...
  2. 华为超级技术大牛的十年经验总结
  3. leader epoch
  4. page loading strategy
  5. MSDN visual studio 2010简体中文旗舰版,专业版下载(内置正版密钥)
  6. jenkins 使用报ERROR: svn: E155010: The node ‘/aaa/bbb/ccc/ddd‘ was not found.
  7. windows调节屏幕文字清晰度、锐度,屏幕字体模糊怎么办,屏幕字体不清晰
  8. 如何更改计算机屏幕分辨率,换了显示器分辨率不对怎么办_换了电脑显示器不清晰怎么解决-win7之家...
  9. 读论文《Toward Controlled Generation of Text》
  10. transforms常用函数简介