Apache Flink 零基础入门(十一)Flink transformation
前面讲了常用的DataSource的用法,DataSource其实是把数据加载进来,加载进来之后就需要做Transformation操作了。
Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into sophisticated assemblies.
数据转化可以将一个或多个DataSets转化到一个新的DataSet。就是一个算法的综合使用。
Map Function
Scala
新建一个Object
object DataSetTransformationApp {def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironment}def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))}}
这里的数据源是一个1到10的list集合。Map的原理是:假设data数据集中有N个元素,将每一个元素进行转化:
data.map { x => x.toInt }
好比:y=f(x)
// 对data中的每一个元素都去做一个+1操作data.map((x:Int) => x + 1 ).print()
然后对每一个元素都做一个+1操作。
简单写法:
如果这个里面只有一个元素,就可以直接写成下面形式:
data.map((x) => x + 1).print()
更简洁的写法:
data.map(x => x + 1).print()
更简洁的方法:
data.map(_ + 1).print()
输出结果:
2
3
4
5
6
7
8
9
10
11
Java
public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();mapFunction(executionEnvironment);}public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception {List<String> list = new ArrayList<>();for (int i = 1; i <= 10; i++) {list.add(i + "");}DataSource<String> data = executionEnvironment.fromCollection(list);data.map(new MapFunction<String, Integer>() {public Integer map(String input) {return Integer.parseInt(input) + 1;}}).print();}
因为我们定义的List是一个String的泛型,因此MapFunction的泛型是<String, Integer>,第一个参数表示输入的类型,第二个参数表示输出是一个Integer类型。
Filter Function
将每个元素执行+1操作,并取出大于5的元素。
Scala
def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))data.map(_ + 1).filter(_ > 5).print()}
filter只会返回满足条件的记录。
Java
public static void filterFunction(ExecutionEnvironment env) throws Exception {List<Integer> list = new ArrayList<>();for (int i = 1; i <= 10; i++) {list.add(i);}DataSource<Integer> data = env.fromCollection(list);data.map(new MapFunction<Integer, Integer>() {public Integer map(Integer input) {return input + 1;}}).filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer input) throws Exception {return input > 5;}}).print();}
MapPartition Function
map function 与 MapPartition function有什么区别?
需求:DataSource 中有100个元素,把结果存储在数据库中
如果使用map function ,那么实现方法如下:
// DataSource 中有100个元素,把结果存储在数据库中def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for (i <- 1 to 100) {students.append("Student" + i)}val data = env.fromCollection(students)data.map(x=>{// 每一个元素要存储到数据库中去,肯定需要先获取到connectionval connection = DBUtils.getConnection()println(connection + " ... ")// TODO .... 保存数据到DBDBUtils.returnConnection(connection)}).print()}
打印结果,将会打印100个获取DBUtils.getConnection()的请求。如果数据量增多,显然不停的获取连接是不现实的。
因此MapPartition就应运而生了,转换一个分区里面的数据,也就是说一个分区中的数据调用一次。
因此要首先设置分区:
val data = env.fromCollection(students).setParallelism(4)
设置4个分区,也就是并行度,然后使用mapPartition来处理:
data.mapPartition(x => {val connection = DBUtils.getConnection()println(connection + " ... ")// TODO .... 保存数据到DBDBUtils.returnConnection(connection)x}).print()
那么就会的到4次连接请求,每一个分区获取一个connection。
Java
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {List<String> list = new ArrayList<>();for (int i = 1; i <= 100; i++) {list.add("student:" + i);}DataSource<String> data = env.fromCollection(list);/*data.map(new MapFunction<String, String>() {@Overridepublic String map(String input) throws Exception {String connection = DBUtils.getConnection();System.out.println("connection = [" + connection + "]");DBUtils.returnConnection(connection);return input;}}).print();*/data.mapPartition(new MapPartitionFunction<String, Object>() {@Overridepublic void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception {String connection = DBUtils.getConnection();System.out.println("connection = [" + connection + "]");DBUtils.returnConnection(connection);}}).print();}
first groupBy sortGroup
Scala
first表示获取前几个,groupBy表示分组,sortGroup表示分组内排序
def firstFunction(env:ExecutionEnvironment): Unit = {val info = ListBuffer[(Int, String)]()info.append((1, "hadoop"))info.append((1, "spark"))info.append((1, "flink"))info.append((2, "java"))info.append((2, "springboot"))info.append((3, "linux"))info.append((4, "vue"))val data = env.fromCollection(info)data.first(3).print()//输出:(1,hadoop)//(1,spark)//(1,flink)data.groupBy(0).first(2).print()//根据第一个字段分组,每个分组获取前两个数据//(3,linux)//(1,hadoop)//(1,spark)//(2,java)//(2,springboot)//(4,vue)data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根据第一个字段分组,然后在分组内根据第二个字段升序排序,并取出前两个数据//输出(3,linux)//(1,flink)//(1,hadoop)//(2,java)//(2,springboot)//(4,vue)}
Java
public static void firstFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info = new ArrayList<>();info.add(new Tuple2<>(1, "hadoop"));info.add(new Tuple2<>(1, "spark"));info.add(new Tuple2<>(1, "flink"));info.add(new Tuple2<>(2, "java"));info.add(new Tuple2<>(2, "springboot"));info.add(new Tuple2<>(3, "linux"));info.add(new Tuple2<>(4, "vue"));DataSource<Tuple2<Integer, String>> data = env.fromCollection(info);data.first(3).print();data.groupBy(0).first(2).print();data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();}
FlatMap Function
获取一个元素,然后产生0个、1个或多个元素
Scala
def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = ListBuffer[(String)]()info.append("hadoop,spark");info.append("hadoop,flink");info.append("flink,flink");val data = env.fromCollection(info)data.flatMap(_.split(",")).print()}
输出:
hadoop
spark
hadoop
flink
flink
flink
FlatMap将每个元素都用逗号分割,然后变成多个。
经典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
将每个元素用逗号分割,然后每个元素做map,然后根据第一个字段分组,然后根据第二个字段求和。
输出结果如下:
(hadoop,2)
(flink,3)
(spark,1)
Java
同样实现一个经典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("hadoop,spark");info.add("hadoop,flink");info.add("flink,flink");DataSource<String> data = env.fromCollection(info);data.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String input, Collector<String> out) throws Exception {String[] splits = input.split(",");for(String split: splits) {//发送出去out.collect(split);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return new Tuple2<>(value,1);}}).groupBy(0).sum(1).print();}
Distinct
去重操作
Scala
def distinctFunction(env: ExecutionEnvironment): Unit = {val info = ListBuffer[(String)]()info.append("hadoop,spark");info.append("hadoop,flink");info.append("flink,flink");val data = env.fromCollection(info)data.flatMap(_.split(",")).distinct().print()}
这样就将每一个元素都做了去重操作。输出如下:
hadoop
flink
spark
Java
public static void distinctFunction(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("hadoop,spark");info.add("hadoop,flink");info.add("flink,flink");DataSource<String> data = env.fromCollection(info);data.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String input, Collector<String> out) throws Exception {String[] splits = input.split(",");for(String split: splits) {//发送出去out.collect(split);}}}).distinct().print();}
Join
Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
result = input1.join(input2).where(0) // key of the first input (tuple field 0).equalTo(1); // key of the second input (tuple field 1)
表示第一个tuple input1中的第0个字段,与第二个tuple input2中的第一个字段进行join。
def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = ListBuffer[(Int, String)]() //编号 名字info1.append((1, "hadoop"))info1.append((2, "spark"))info1.append((3, "flink"))info1.append((4, "java"))val info2 = ListBuffer[(Int, String)]() //编号 城市info2.append((1, "北京"))info2.append((2, "上海"))info2.append((3, "深圳"))info2.append((5, "广州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.join(data2).where(0).equalTo(0).apply((first, second)=>{(first._1, first._2, second._2)}).print()}
输出结果如下:
(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)
Java
public static void joinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //编号 名字info1.add(new Tuple2<>(1, "hadoop"));info1.add(new Tuple2<>(2, "spark"));info1.add(new Tuple2<>(3, "flink"));info1.add(new Tuple2<>(4, "java"));List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //编号 城市info2.add(new Tuple2<>(1, "北京"));info2.add(new Tuple2<>(2, "上海"));info2.add(new Tuple2<>(3, "深圳"));info2.add(new Tuple2<>(5, "广州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);}}).print();}
Tuple2<Integer, String>, Tuple2<Integer, String>表示两个输入的集合,Tuple3<Integer, String, String>>表示输出的Tuple3
OuterJoin
上面讲的join是内连接,这个OuterJoin是外连接,包括左外连接,右外连接,全连接在两个数据集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = {val info1 = ListBuffer[(Int, String)]() //编号 名字info1.append((1, "hadoop"))info1.append((2, "spark"))info1.append((3, "flink"))info1.append((4, "java"))val info2 = ListBuffer[(Int, String)]() //编号 城市info2.append((1, "北京"))info2.append((2, "上海"))info2.append((3, "深圳"))info2.append((5, "广州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (second == null) {(first._1, first._2, "-")}else {(first._1, first._2, second._2)}}).print() //左外连接 把左边的所有数据展示出来}
左外连接,当左边的数据在右边没有对应的数据时,需要进行处理,否则会出现空指针异常。输出如下:
(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)
(4,java,-)
右外连接:
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (first == null) {(second._1, "-", second._2)}else {(first._1, first._2, second._2)}}).print()
右外连接,输出:
(3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海)
全连接:
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (first == null) {(second._1, "-", second._2)}else if (second == null){(second._1, "-", second._2)} else {(first._1, first._2, second._2)}}).print()
(3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海)
(4,java,-)
Java
左外连接:
public static void outjoinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //编号 名字info1.add(new Tuple2<>(1, "hadoop"));info1.add(new Tuple2<>(2, "spark"));info1.add(new Tuple2<>(3, "flink"));info1.add(new Tuple2<>(4, "java"));List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //编号 城市info2.add(new Tuple2<>(1, "北京"));info2.add(new Tuple2<>(2, "上海"));info2.add(new Tuple2<>(3, "深圳"));info2.add(new Tuple2<>(5, "广州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(second == null) {return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");}return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);}}).print();}
右外连接:
data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if (first == null) {return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);}return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);}}).print();
全连接:
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if (first == null) {return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);} else if (second == null) {return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");}return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);}}).print();
cross function
Scala
笛卡尔积,左边与右边交叉处理
def crossFunction(env: ExecutionEnvironment): Unit = {val info1 = List("乔峰", "慕容复")val info2 = List(3,1,0)val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.cross(data2).print()}
输出:
(乔峰,3)
(乔峰,1)
(乔峰,0)
(慕容复,3)
(慕容复,1)
(慕容复,0)
Java
public static void crossFunction(ExecutionEnvironment env) throws Exception {List<String> info1 = new ArrayList<>();info1.add("乔峰");info1.add("慕容复");List<String> info2 = new ArrayList<>();info2.add("3");info2.add("1");info2.add("0");DataSource<String> data1 = env.fromCollection(info1);DataSource<String> data2 = env.fromCollection(info2);data1.cross(data2).print();}
Broadcast Variables
广播变量是一组数据,这些数据可以用来清洗数据,广播变量常驻在内存中,所以数据量一定不能太大
Apache Flink 零基础入门(十一)Flink transformation相关推荐
- Apache Flink 零基础入门【转】
Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...
- Apache Flink 零基础入门(十八)Flink Table APISQL
什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...
- Apache Flink 零基础入门(三)编写最简单的helloWorld
实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...
- Apache Flink 零基础入门(二十一)Flink HistoryServer概述与配置
之前我们做了Flink的开发,开发完成之后就需要进行监控. Flink 有一个History Server可以在相关Flink 集群关闭之后,还可以查看一些信息.也就是一些作业运行完成之后,可以用Hi ...
- Apache Flink 零基础入门(十六)Flink DataStream transformation
Operators transform one or more DataStreams into a new DataStream. Operators操作转换一个或多个DataStream到一个新的 ...
- Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...
- Apache Flink 零基础入门(一):基础概念解析
Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...
- Apache Flink 零基础入门(二十)Flink部署与作业的提交
之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...
- Apache Flink 零基础入门(二十)Flink kafka connector
内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...
最新文章
- 赢得高薪的锦囊三秘诀
- oracle ocr组成员替换,Oracle RAC 迁移替换 OCR 盘
- jenkins运行日志时间与linux,持续集成之Jenkins结合脚本实现代码自动化部署及一键回滚至上一版本...
- 图论模型迪杰斯特拉算法
- 一周学C#第五天——命名空间
- 独立站运营模式怎么选择?
- Scala 基础 —— String(StringOps)、tuple、Range
- 动态sql语句基本语法
- java字符串模糊匹配_正则表达式实现字符的模糊匹配功能示例
- Axure 基础教程
- 表面缺陷检测的意义及现状
- Ecstore的微信账号绑定会员免登录
- 关于excel中一部分表格显示但打印时不打印呢
- RedHat RHEL7.2 系统安装详细步骤
- virtualbox E_INVALIDARG (0x80070057) 和 E_FAIL (0x80004005) SessionMachine
- 初识Excel的IF, IFERROR, MATCH, COUNTA公式
- 跑路、清退or出海?这道留给交易所的题太难
- 2021秋季开学必备数码产品!学生党的超实用好物清单
- 解决ifconfig command not found 问题
- Unreal资源引用(一)
热门文章
- Mysql 按条件排序查询一条记录 top 1 对应Mysql的LIMIT 关键字
- sql server 2008学习4 设计索引的建议
- html 图片点击查看大图_【神游千年,大美敦煌】北魏-260窟【高清大图】
- 编译安装日志分析平台 elk + beats(个人感觉不错1)
- Laravel的Class Laravel\Passport\Passport not found
- Linux之Nginx配置多个虚拟主机:静态转发
- 台安变频器n2按键说明_力扣 925. 长按键入
- 将一个键值对添加入一个对象_细品Redis高性能数据结构之hash对象
- java set中元素是数组_将HashSet中的元素转换为Java中的数组
- Vue 中的 v-cloak 作用及用法-vue页面加载时会闪烁