前面讲了常用的DataSource的用法,DataSource其实是把数据加载进来,加载进来之后就需要做Transformation操作了。

Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into sophisticated assemblies.

数据转化可以将一个或多个DataSets转化到一个新的DataSet。就是一个算法的综合使用。

Map Function

Scala

新建一个Object

object DataSetTransformationApp {def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironment}def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))}}

这里的数据源是一个1到10的list集合。Map的原理是:假设data数据集中有N个元素,将每一个元素进行转化:

data.map { x => x.toInt }

好比:y=f(x)

    // 对data中的每一个元素都去做一个+1操作data.map((x:Int) => x + 1 ).print()

然后对每一个元素都做一个+1操作。

简单写法:

如果这个里面只有一个元素,就可以直接写成下面形式:

data.map((x) => x + 1).print()

更简洁的写法:

data.map(x => x + 1).print()

更简洁的方法:

data.map(_ + 1).print()

输出结果:

2
3
4
5
6
7
8
9
10
11

Java

    public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();mapFunction(executionEnvironment);}public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception {List<String> list = new ArrayList<>();for (int i = 1; i <= 10; i++) {list.add(i + "");}DataSource<String> data = executionEnvironment.fromCollection(list);data.map(new MapFunction<String, Integer>() {public Integer map(String input) {return Integer.parseInt(input) + 1;}}).print();}

因为我们定义的List是一个String的泛型,因此MapFunction的泛型是<String, Integer>,第一个参数表示输入的类型,第二个参数表示输出是一个Integer类型。

Filter Function

将每个元素执行+1操作,并取出大于5的元素。

Scala

  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))data.map(_ + 1).filter(_ > 5).print()}

filter只会返回满足条件的记录。

Java

    public static void filterFunction(ExecutionEnvironment env) throws Exception {List<Integer> list = new ArrayList<>();for (int i = 1; i <= 10; i++) {list.add(i);}DataSource<Integer> data = env.fromCollection(list);data.map(new MapFunction<Integer, Integer>() {public Integer map(Integer input) {return input + 1;}}).filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer input) throws Exception {return input > 5;}}).print();}

MapPartition Function

map function 与 MapPartition function有什么区别?

需求:DataSource 中有100个元素,把结果存储在数据库中

如果使用map function ,那么实现方法如下:

  // DataSource 中有100个元素,把结果存储在数据库中def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for (i <- 1 to 100) {students.append("Student" + i)}val data = env.fromCollection(students)data.map(x=>{// 每一个元素要存储到数据库中去,肯定需要先获取到connectionval connection = DBUtils.getConnection()println(connection + " ... ")// TODO .... 保存数据到DBDBUtils.returnConnection(connection)}).print()}

打印结果,将会打印100个获取DBUtils.getConnection()的请求。如果数据量增多,显然不停的获取连接是不现实的。

因此MapPartition就应运而生了,转换一个分区里面的数据,也就是说一个分区中的数据调用一次。

因此要首先设置分区:

val data = env.fromCollection(students).setParallelism(4)

设置4个分区,也就是并行度,然后使用mapPartition来处理:

data.mapPartition(x => {val connection = DBUtils.getConnection()println(connection + " ... ")// TODO .... 保存数据到DBDBUtils.returnConnection(connection)x}).print()

那么就会的到4次连接请求,每一个分区获取一个connection。

Java

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {List<String> list = new ArrayList<>();for (int i = 1; i <= 100; i++) {list.add("student:" + i);}DataSource<String> data = env.fromCollection(list);/*data.map(new MapFunction<String, String>() {@Overridepublic String map(String input) throws Exception {String connection = DBUtils.getConnection();System.out.println("connection = [" + connection + "]");DBUtils.returnConnection(connection);return input;}}).print();*/data.mapPartition(new MapPartitionFunction<String, Object>() {@Overridepublic void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception {String connection = DBUtils.getConnection();System.out.println("connection = [" + connection + "]");DBUtils.returnConnection(connection);}}).print();}

first   groupBy sortGroup

Scala

first表示获取前几个,groupBy表示分组,sortGroup表示分组内排序

def firstFunction(env:ExecutionEnvironment): Unit = {val info = ListBuffer[(Int, String)]()info.append((1, "hadoop"))info.append((1, "spark"))info.append((1, "flink"))info.append((2, "java"))info.append((2, "springboot"))info.append((3, "linux"))info.append((4, "vue"))val data = env.fromCollection(info)data.first(3).print()//输出:(1,hadoop)//(1,spark)//(1,flink)data.groupBy(0).first(2).print()//根据第一个字段分组,每个分组获取前两个数据//(3,linux)//(1,hadoop)//(1,spark)//(2,java)//(2,springboot)//(4,vue)data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根据第一个字段分组,然后在分组内根据第二个字段升序排序,并取出前两个数据//输出(3,linux)//(1,flink)//(1,hadoop)//(2,java)//(2,springboot)//(4,vue)}

Java

    public static void firstFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info = new ArrayList<>();info.add(new Tuple2<>(1, "hadoop"));info.add(new Tuple2<>(1, "spark"));info.add(new Tuple2<>(1, "flink"));info.add(new Tuple2<>(2, "java"));info.add(new Tuple2<>(2, "springboot"));info.add(new Tuple2<>(3, "linux"));info.add(new Tuple2<>(4, "vue"));DataSource<Tuple2<Integer, String>> data = env.fromCollection(info);data.first(3).print();data.groupBy(0).first(2).print();data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();}

FlatMap Function

获取一个元素,然后产生0个、1个或多个元素

Scala

  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = ListBuffer[(String)]()info.append("hadoop,spark");info.append("hadoop,flink");info.append("flink,flink");val data = env.fromCollection(info)data.flatMap(_.split(",")).print()}

输出:

hadoop
spark
hadoop
flink
flink
flink

FlatMap将每个元素都用逗号分割,然后变成多个。

经典例子:

data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()

将每个元素用逗号分割,然后每个元素做map,然后根据第一个字段分组,然后根据第二个字段求和。

输出结果如下:

(hadoop,2)
(flink,3)
(spark,1)

Java

同样实现一个经典案例wordcount

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("hadoop,spark");info.add("hadoop,flink");info.add("flink,flink");DataSource<String> data = env.fromCollection(info);data.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String input, Collector<String> out) throws Exception {String[] splits = input.split(",");for(String split: splits) {//发送出去out.collect(split);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return new Tuple2<>(value,1);}}).groupBy(0).sum(1).print();}

Distinct

去重操作

Scala

  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = ListBuffer[(String)]()info.append("hadoop,spark");info.append("hadoop,flink");info.append("flink,flink");val data = env.fromCollection(info)data.flatMap(_.split(",")).distinct().print()}

这样就将每一个元素都做了去重操作。输出如下:

hadoop
flink
spark

Java

    public static void distinctFunction(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("hadoop,spark");info.add("hadoop,flink");info.add("flink,flink");DataSource<String> data = env.fromCollection(info);data.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String input, Collector<String> out) throws Exception {String[] splits = input.split(",");for(String split: splits) {//发送出去out.collect(split);}}}).distinct().print();}

Join

Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.

result = input1.join(input2).where(0)       // key of the first input (tuple field 0).equalTo(1);    // key of the second input (tuple field 1)

表示第一个tuple input1中的第0个字段,与第二个tuple input2中的第一个字段进行join。

  def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = ListBuffer[(Int, String)]() //编号 名字info1.append((1, "hadoop"))info1.append((2, "spark"))info1.append((3, "flink"))info1.append((4, "java"))val info2 = ListBuffer[(Int, String)]() //编号 城市info2.append((1, "北京"))info2.append((2, "上海"))info2.append((3, "深圳"))info2.append((5, "广州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.join(data2).where(0).equalTo(0).apply((first, second)=>{(first._1, first._2, second._2)}).print()}

输出结果如下:

(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)

Java

    public static void joinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //编号 名字info1.add(new Tuple2<>(1, "hadoop"));info1.add(new Tuple2<>(2, "spark"));info1.add(new Tuple2<>(3, "flink"));info1.add(new Tuple2<>(4, "java"));List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //编号 城市info2.add(new Tuple2<>(1, "北京"));info2.add(new Tuple2<>(2, "上海"));info2.add(new Tuple2<>(3, "深圳"));info2.add(new Tuple2<>(5, "广州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);}}).print();}

Tuple2<Integer, String>, Tuple2<Integer, String>表示两个输入的集合,Tuple3<Integer, String, String>>表示输出的Tuple3

OuterJoin

上面讲的join是内连接,这个OuterJoin是外连接,包括左外连接,右外连接,全连接在两个数据集上。

def outJoinFunction(env: ExecutionEnvironment): Unit = {val info1 = ListBuffer[(Int, String)]() //编号 名字info1.append((1, "hadoop"))info1.append((2, "spark"))info1.append((3, "flink"))info1.append((4, "java"))val info2 = ListBuffer[(Int, String)]() //编号 城市info2.append((1, "北京"))info2.append((2, "上海"))info2.append((3, "深圳"))info2.append((5, "广州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (second == null) {(first._1, first._2, "-")}else {(first._1, first._2, second._2)}}).print() //左外连接 把左边的所有数据展示出来}

左外连接,当左边的数据在右边没有对应的数据时,需要进行处理,否则会出现空指针异常。输出如下:

(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)
(4,java,-)

右外连接:

    data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (first == null) {(second._1, "-", second._2)}else {(first._1, first._2, second._2)}}).print() 

右外连接,输出:

(3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海)

全连接:

    data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (first == null) {(second._1, "-", second._2)}else if (second == null){(second._1, "-", second._2)} else {(first._1, first._2, second._2)}}).print()
(3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海)
(4,java,-)

Java

左外连接:

    public static void outjoinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //编号 名字info1.add(new Tuple2<>(1, "hadoop"));info1.add(new Tuple2<>(2, "spark"));info1.add(new Tuple2<>(3, "flink"));info1.add(new Tuple2<>(4, "java"));List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //编号 城市info2.add(new Tuple2<>(1, "北京"));info2.add(new Tuple2<>(2, "上海"));info2.add(new Tuple2<>(3, "深圳"));info2.add(new Tuple2<>(5, "广州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(second == null) {return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");}return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);}}).print();}

右外连接:

        data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if (first == null) {return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);}return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);}}).print();

全连接:

        data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if (first == null) {return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);} else if (second == null) {return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");}return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);}}).print();

cross function

Scala

笛卡尔积,左边与右边交叉处理

  def crossFunction(env: ExecutionEnvironment): Unit = {val info1 = List("乔峰", "慕容复")val info2 = List(3,1,0)val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.cross(data2).print()}

输出:

(乔峰,3)
(乔峰,1)
(乔峰,0)
(慕容复,3)
(慕容复,1)
(慕容复,0)

Java

public static void crossFunction(ExecutionEnvironment env) throws Exception {List<String> info1 = new ArrayList<>();info1.add("乔峰");info1.add("慕容复");List<String> info2 = new ArrayList<>();info2.add("3");info2.add("1");info2.add("0");DataSource<String> data1 = env.fromCollection(info1);DataSource<String> data2 = env.fromCollection(info2);data1.cross(data2).print();}

Broadcast Variables

广播变量是一组数据,这些数据可以用来清洗数据,广播变量常驻在内存中,所以数据量一定不能太大

Apache Flink 零基础入门(十一)Flink transformation相关推荐

  1. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  2. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  3. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  4. Apache Flink 零基础入门(二十一)Flink HistoryServer概述与配置

    之前我们做了Flink的开发,开发完成之后就需要进行监控. Flink 有一个History Server可以在相关Flink 集群关闭之后,还可以查看一些信息.也就是一些作业运行完成之后,可以用Hi ...

  5. Apache Flink 零基础入门(十六)Flink DataStream transformation

    Operators transform one or more DataStreams into a new DataStream. Operators操作转换一个或多个DataStream到一个新的 ...

  6. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  7. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  8. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

  9. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

最新文章

  1. 赢得高薪的锦囊三秘诀
  2. oracle ocr组成员替换,Oracle RAC 迁移替换 OCR 盘
  3. jenkins运行日志时间与linux,持续集成之Jenkins结合脚本实现代码自动化部署及一键回滚至上一版本...
  4. 图论模型迪杰斯特拉算法
  5. 一周学C#第五天——命名空间
  6. 独立站运营模式怎么选择?
  7. Scala 基础 —— String(StringOps)、tuple、Range
  8. 动态sql语句基本语法
  9. java字符串模糊匹配_正则表达式实现字符的模糊匹配功能示例
  10. Axure 基础教程
  11. 表面缺陷检测的意义及现状
  12. Ecstore的微信账号绑定会员免登录
  13. 关于excel中一部分表格显示但打印时不打印呢
  14. RedHat RHEL7.2 系统安装详细步骤
  15. virtualbox E_INVALIDARG (0x80070057) 和 E_FAIL (0x80004005) SessionMachine
  16. 初识Excel的IF, IFERROR, MATCH, COUNTA公式
  17. 跑路、清退or出海?这道留给交易所的题太难
  18. 2021秋季开学必备数码产品!学生党的超实用好物清单
  19. 解决ifconfig command not found 问题
  20. Unreal资源引用(一)

热门文章

  1. Mysql 按条件排序查询一条记录 top 1 对应Mysql的LIMIT 关键字
  2. sql server 2008学习4 设计索引的建议
  3. html 图片点击查看大图_【神游千年,大美敦煌】北魏-260窟【高清大图】
  4. 编译安装日志分析平台 elk + beats(个人感觉不错1)
  5. Laravel的Class Laravel\Passport\Passport not found
  6. Linux之Nginx配置多个虚拟主机:静态转发
  7. 台安变频器n2按键说明_力扣 925. 长按键入
  8. 将一个键值对添加入一个对象_细品Redis高性能数据结构之hash对象
  9. java set中元素是数组_将HashSet中的元素转换为Java中的数组
  10. Vue 中的 v-cloak 作用及用法-vue页面加载时会闪烁