文章目录

  • 01 引言
  • 02 Transformation
    • 2.1 基本操作
      • 2.1.1 API 解析
      • 2.1.2 示例代码
    • 2.2 合并
      • 2.2.1 union
      • 2.2.2 connect
      • 2.2.3 示例代码
    • 2.3 拆分
      • 2.3.1 API
      • 2.3.2 示例代码
    • 2.4 分区
      • 2.4.1 rebalance重平衡分区
      • 2.4.2 其它分区
  • 03 文末

01 引言

在前面的博客,我们已经对Flink的程序模型里的Source使用有了一定的了解了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》
  • 《Flink教程(06)- Flink批流一体API(Source示例)》

本文开始继续讲解Flink程序模型对里面的Transformation

02 Transformation

Transformation的官方API文档在:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/

操作概览如下图:

整体来说,流式数据上的操作可以分为四类:

  • 第一类是 “对于单条记录的操作”:比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map操作);
  • 第二类是 “对多条记录的操作”:比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window将需要的记录关联到一起进行处理;
  • 第三类是 “对多个流进行操作并转换为单个流”:例如,多个流可以通过 UnionJoinConnect 等操作合到一起,这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作;
  • 第四类是 “DataStream 还支持与合并对称的拆分操作”:即把一个流按一定规则拆分为多个流(Split操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

2.1 基本操作

2.1.1 API 解析

分类 描述 示意图
map 将函数作用在集合中的每一个元素上,并返回作用后的结果
flatMap 集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果
keyBy 按照指定的key来对流中的数据进行分组,前面入门案例中已经演示过(注意: 流处理中没有groupBy,而是keyBy
filter 按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
sum 按照指定的字段对集合中的元素进行求和
reduce 对集合中的元素进行聚合

2.1.2 示例代码

示例代码如下:

/*** Transformation-基本操作** @author : YangLinWei* @createTime: 2022/3/7 3:36 下午*/
public class TransformationDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");//3.处理数据-transformationDataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的数据String[] words = value.split(" ");for (String word : words) {out.collect(word);//将切割处理的一个个的单词收集起来并返回}}});DataStream<String> filtedDS = wordsDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.equals("ylw");}});DataStream<Tuple2<String, Integer>> wordAndOnesDS = filtedDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是进来一个个的单词return Tuple2.of(value, 1);}});//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);DataStream<Tuple2<String, Integer>> result1 = groupedDS.sum(1);DataStream<Tuple2<String, Integer>> result2 = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value1.f1);}});//4.输出结果-sinkresult1.print("result1");result2.print("result2");//5.触发执行-executeenv.execute();}
}

运行结果:

2.2 合并

2.2.1 union

union:union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

2.2.2 connect

connect:connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • connect只能连接两个数据流,union可以连接多个数据流。
  • connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  • 两个DataStream经过connect之后被转化为ConnectedStreamsConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

2.2.3 示例代码

现在有一个需求:将两个String类型的流进行union,将一个String类型和一个Long类型的流进行connect

示例代码:

/*** Transformation- union和connect** @author : YangLinWei* @createTime: 2022/3/7 3:44 下午*/
public class TransformationDemo02 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);//3.TransformationDataStream<String> result1 = ds1.union(ds2);//合并但不去重 https://blog.csdn.net/valada/article/details/104367378ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);//interface CoMapFunction<IN1, IN2, OUT>DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return "String->String:" + value;}@Overridepublic String map2(Long value) throws Exception {return "Long->String:" + value.toString();}});//4.Sinkresult1.print();result2.print();//5.executeenv.execute();}
}

运行结果:

2.3 拆分

2.3.1 API

拆分用到的API:

  • Split就是将一个流分成多个流(注意:split函数已过期并移除);
  • Select就是获取分流后对应的数据;
  • Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中。

2.3.2 示例代码

需求:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据。

示例代码如下:

/*** Transformation -拆分** @author : YangLinWei* @createTime: 2022/3/7 3:50 下午*/
public class TransformationDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);//3.Transformation/*SplitStream<Integer> splitResult = ds.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {//value是进来的数字if (value % 2 == 0) {//偶数ArrayList<String> list = new ArrayList<>();list.add("偶数");return list;} else {//奇数ArrayList<String> list = new ArrayList<>();list.add("奇数");return list;}}});DataStream<Integer> evenResult = splitResult.select("偶数");DataStream<Integer> oddResult = splitResult.select("奇数");*///定义两个输出标签OutputTag<Integer> tag_even = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇数") {};//对ds中的数据进行处理SingleOutputStreamOperator<Integer> tagResult = ds.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {//偶数ctx.output(tag_even, value);} else {//奇数ctx.output(tag_odd, value);}}});//取出标记好的数据DataStream<Integer> evenResult = tagResult.getSideOutput(tag_even);DataStream<Integer> oddResult = tagResult.getSideOutput(tag_odd);//4.SinkevenResult.print("偶数");oddResult.print("奇数");//5.executeenv.execute();}
}

运行结果:

2.4 分区

2.4.1 rebalance重平衡分区

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜。

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成:

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散):

示例代码如下:

/*** Transformation -rebalance* @author : YangLinWei* @createTime: 2022/3/7 4:05 下午*/
public class TransformationDemo04 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC).setParallelism(3);//2.sourceDataStream<Long> longDS = env.fromSequence(0, 100);//3.Transformation//下面的操作相当于将数据随机分配一下,有可能出现数据倾斜DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long num) throws Exception {return num > 10;}});//接下来使用map操作,将数据转为(分区编号/子任务编号, 数据)//Rich表示多功能的,比MapFunction要多一些API可以供我们使用DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {//获取分区编号/子任务编号int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id, 1);}}).keyBy(t -> t.f0).sum(1);DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {//获取分区编号/子任务编号int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id, 1);}}).keyBy(t -> t.f0).sum(1);//4.sink//result1.print();//有可能出现数据倾斜result2.print();//在输出前进行了rebalance重分区平衡,解决了数据倾斜//5.executeenv.execute();}
}

运行结果如下:

  • 发生数据倾斜:

  • 使用rebalance:

2.4.2 其它分区

类型 描述
dataStream.global(); 全部发往第一个task
dataStream.broadcast(); 广播
dataStream.forward(); 上下游并发度一样时,一对一发送
dataStream.shuffle(); 随机均匀分配
dataStream.reblance(); Round-Robin(轮流分配)
dataStream.recale(); Local Round-Robin(本地轮流分配)
dataStream.partitionCustom(); 自定义单播

说明:

  • recale分区:基于上下游 Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例:

  • 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

示例代码如下:

/*** Transformation -partiton* * @author : YangLinWei* @createTime: 2022/3/7 4:17 下午*/
public class TransformationDemo05 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<String> linesDS = env.fromElements("hello me you her", "hello me you", "hello me", "hello");SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.TransformationDataStream<Tuple2<String, Integer>> result1 = tupleDS.global();DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return key.equals("hello") ? 0 : 1;}}, t -> t.f0);//4.sink//result1.print();//result2.print();//result3.print();//result4.print();//result5.print();//result6.print();result7.print();//5.executeenv.execute();}
}

运行结果如下:

03 文末

本文主要讲解Flink批流一体API中的Transformation用法,谢谢大家的阅读,本文完!

Flink教程(07)- Flink批流一体API(Transformation示例)相关推荐

  1. Flink教程(10)- Flink批流一体API(其它)

    文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...

  2. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  3. Flink教程(06)- Flink批流一体API(Source示例)

    文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...

  4. Flink 和 Pulsar 的批流融合

    简介:如何通过 Apache Pulsar 原生的存储计算分离的架构提供批流融合的基础,以及 Apache Pulsar 如何与 Flink 结合,实现批流一体的计算. 简介:StreamNative ...

  5. Flink 1.11 与 Hive 批流一体数仓实践

    导读:Flink 从 1.9.0 开始提供与 Hive 集成的功能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的功能进一步深化,并且开始尝试将流计算场景与Hive 进行 ...

  6. [数据湖] 基于flink hudi的批流一体实践

    1.业务背景介绍 广告主和代理商通过广告投放平台来进行广告投放,由多个媒介进行广告展示 ,从而触达到潜在用户.整个过程中会产生各种各样的数据,比如展现数据.点击数据.其中非常重要的数据是计费数据,以计 ...

  7. hive表ddl导出_Flink 1.11 与 Hive 批流一体数仓实践

    简介:Flink 从 1.9.0 开始提供与 Hive 集成的功能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的功能进一步深化,并且开始尝试将流计算场景与Hive 进行 ...

  8. 开源 | 全球首个批流一体机器学习平台 Alink

    背景 随着大数据时代的到来和人工智能的崛起,机器学习所能处理的场景更加广泛和多样.构建的模型需要对批量数据进行处理,为了达到实时性的要求还需要直接对流式数据进行实时预测,还要具备将模型应用在企业应用和 ...

  9. 袋鼠云批流一体分布式同步引擎ChunJun(原FlinkX)的前世今生

      一.前言 ChunJun(原FlinkX)是一个基于Flink提供易用.稳定.高效的批流统一的数据集成工具,是袋鼠云一站式大数据开发平台-数栈DTinsight的核心计算引擎,其技术架构基于实时计 ...

最新文章

  1. springmvc前台String转后台Date
  2. Cisco呼吁Arista停止在美国销售产品
  3. 解决mysql获取不到连接的问题
  4. BZOJ.4888.[TJOI2017]异或和(树状数组)
  5. ITK:使用二项式内核模糊图像
  6. 上传一批动画教程(链接已失效)
  7. 阅读nopcommerce startup源码
  8. 一分钟学会在Mac中如何将文件批量重命名
  9. bfv同态加密_lattigo: 基于Lattice代数结构的Go同态加密库
  10. apt-get install
  11. spss和python什么区别_资深大牛:Python、R语言、SAS、SPSS优缺点比较
  12. html5时间插件mui,移动端时间控件 HTML5+ 的 pickDate 和 MUI 的 dtpicker 的使用与对比...
  13. Android Wi-Fi源码分析之wpa_supplicant初始化(三):wpa_supplicant_add_iface函数分析
  14. 【第二周】吴恩达团队AI for Medical Diagnosis课程笔记
  15. 扬州大学研究生计算机专业分数线,扬州大学考研历年分数线汇总
  16. Glide源码分析以及三级缓存原理
  17. Marlin固件的欢迎界面
  18. 前方高能!公司来了一个low逼程序员
  19. column xxx is of type integer but expression is of type character varying。String的成员变量存到int的数据库列 报错
  20. 在百度地图中叠加CAD图及GIS数据展示踩坑记

热门文章

  1. 麒麟810处理器_配置7nm处理器麒麟810,重磅新机荣耀9X开箱图赏
  2. 总裁内部信号召战全球,荣耀是颠覆者还是开创者?
  3. OpenGL 绘制螺旋线
  4. 华为哪款手机是鸿蒙系统_华为官宣旗下手机可以直接更新鸿蒙系统,其它手机只能通过刷机...
  5. msf win10系统攻击
  6. Digit(湘潭大学比赛)
  7. MySQL这些拿来就能用!!!
  8. 优秀课程案例:图形化编程画圆方法汇总
  9. ios 旋转屏幕试图切换_iOS指定页面屏幕旋转,手动旋转(某app实现功能全过程)...
  10. FlatList 优化