目录

Transformation

官网API列表

基本操作-略

map

flatMap

keyBy

filter

sum

reduce

代码演示

合并-拆分

union和connect

split、select和Side Outputs

分区

rebalance重平衡分区

其他分区


Transformation

官网API列表

Apache Flink 1.12 Documentation: Operators

整体来说,流式数据上的操作可以分为四类。

l第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)

l第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理

l第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。

l最后, DataStream 还支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

基本操作-略

map

  • API

map:将函数作用在集合中的每一个元素上,并返回作用后的结果

flatMap

  • API

flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

​​​​​​​keyBy

按照指定的key来对流中的数据进行分组,前面入门案例中已经演示过

注意:

流处理中没有groupBy,而是keyBy

​​​​​​​filter

  • API

filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

​​​​​​​sum

  • API

sum:按照指定的字段对集合中的元素进行求和

​​​​​​​reduce

  • API

reduce:对集合中的元素进行聚合

​​​​​​​代码演示

  • 需求:

对流数据中的单词进行统计,排除敏感词TMD

  • 代码演示
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc*/
public class TransformationDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.处理数据-transformationDataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的数据String[] words = value.split(" ");for (String word : words) {out.collect(word);//将切割处理的一个个的单词收集起来并返回}}});DataStream<String> filtedDS = wordsDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.equals("heihei");}});DataStream<Tuple2<String, Integer>> wordAndOnesDS = filtedDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是进来一个个的单词return Tuple2.of(value, 1);}});//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);DataStream<Tuple2<String, Integer>> result1 = groupedDS.sum(1);DataStream<Tuple2<String, Integer>> result2 = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value1.f1);}});//4.输出结果-sinkresult1.print("result1");result2.print("result2");//5.触发执行-executeenv.execute();}
}

​​​​​​​合并-拆分

​​​​​​​union和connect

  • API

union:

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

connect:

connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

connect只能连接两个数据流,union可以连接多个数据流。

connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

  • 需求

将两个String类型的流进行union

将一个String类型和一个Long类型的流进行connect

  • 代码实现
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** Author lanson* Desc*/
public class TransformationDemo02 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);//3.TransformationDataStream<String> result1 = ds1.union(ds2);//合并但不去重 https://blog.csdn.net/valada/article/details/104367378ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);//interface CoMapFunction<IN1, IN2, OUT>DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return "String->String:" + value;}@Overridepublic String map2(Long value) throws Exception {return "Long->String:" + value.toString();}});//4.Sinkresult1.print();result2.print();//5.executeenv.execute();}
}

​​​​​​​split、select和Side Outputs

  • API

Split就是将一个流分成多个流

Select就是获取分流后对应的数据

注意:split函数已过期并移除

Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

  • 需求:

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

  • 代码实现:
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** Author lanson* Desc*/
public class TransformationDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);//3.Transformation/*SplitStream<Integer> splitResult = ds.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {//value是进来的数字if (value % 2 == 0) {//偶数ArrayList<String> list = new ArrayList<>();list.add("偶数");return list;} else {//奇数ArrayList<String> list = new ArrayList<>();list.add("奇数");return list;}}});DataStream<Integer> evenResult = splitResult.select("偶数");DataStream<Integer> oddResult = splitResult.select("奇数");*///定义两个输出标签OutputTag<Integer> tag_even = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇数"){};//对ds中的数据进行处理SingleOutputStreamOperator<Integer> tagResult = ds.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {//偶数ctx.output(tag_even, value);} else {//奇数ctx.output(tag_odd, value);}}});//取出标记好的数据DataStream<Integer> evenResult = tagResult.getSideOutput(tag_even);DataStream<Integer> oddResult = tagResult.getSideOutput(tag_odd);//4.SinkevenResult.print("偶数");oddResult.print("奇数");//5.executeenv.execute();}
}

分区

rebalance重平衡分区

  • API

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

  • 代码演示:
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc*/
public class TransformationDemo04 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC).setParallelism(3);//2.sourceDataStream<Long> longDS = env.fromSequence(0, 100);//3.Transformation//下面的操作相当于将数据随机分配一下,有可能出现数据倾斜DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long num) throws Exception {return num > 10;}});//接下来使用map操作,将数据转为(分区编号/子任务编号, 数据)//Rich表示多功能的,比MapFunction要多一些API可以供我们使用DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {//获取分区编号/子任务编号int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id, 1);}}).keyBy(t -> t.f0).sum(1);DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {//获取分区编号/子任务编号int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id, 1);}}).keyBy(t -> t.f0).sum(1);//4.sink//result1.print();//有可能出现数据倾斜result2.print();//在输出前进行了rebalance重分区平衡,解决了数据倾斜//5.executeenv.execute();}
}

​​​​​​​其他分区

  • API

说明:

recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例:

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

  • 需求:

对流中的元素使用各种分区,并输出

  • 代码实现
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc*/
public class TransformationDemo05 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<String> linesDS = env.readTextFile("data/input/words.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.TransformationDataStream<Tuple2<String, Integer>> result1 = tupleDS.global();DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return key.equals("hello") ? 0 : 1;}}, t -> t.f0);//4.sink//result1.print();//result2.print();//result3.print();//result4.print();//result5.print();//result6.print();result7.print();//5.executeenv.execute();}
}

2021年大数据Flink(十二):流批一体API Transformation相关推荐

  1. 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】

    导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...

  2. 大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  3. pb数据窗口怎么调用视图_大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  4. 2021年大数据Flink(二):Flink用武之地

    目录 ​​​​​​​Flink用武之地 ​​​​​​​Event-driven Applications[事件驱动] ​​​​​​​Data Analytics Applications[数据分析] ...

  5. 2021年大数据Flink(二十六):​​​​​​​State代码示例

    目录 State代码示例 Keyed State 官网代码示例 需求: 编码步骤 代码示例 Operator State 官网代码示例 需求: 编码步骤: 代码示例 State代码示例 Keyed S ...

  6. 2021年大数据Flink(二十五):Flink 状态管理

    目录 Flink-状态管理 Flink中的有状态计算 无状态计算和有状态计算 无状态计算 有状态计算 有状态计算的场景 状态的分类 Managed State & Raw State Keye ...

  7. 2021年大数据Flink(二十二):Time与Watermaker

    目录 Flink-Time与Watermaker Time分类 EventTime的重要性 示例1 示例2 ​​​​​​​示例3 ​​​​​​​示例4 ​​​​​​​总结 Watermaker水印机制 ...

  8. 2021年大数据Flink(二十九):Flink 容错机制 Savepoint

    目录 Savepoint Savepoint介绍 Savepoint VS Checkpoint Savepoint演示 Savepoint Savepoint介绍 Savepoint:保存点,类似于 ...

  9. 2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复

    目录 自动重启策略和恢复 重启策略配置方式 重启策略分类 代码演示 手动重启并恢复-了解 1.把程序打包 2.启动Flink集群(本地单机版,集群版都可以) 3.访问webUI 4.使用FlinkWe ...

最新文章

  1. 【Qt】编译QtCreator
  2. css如何转为html5,HTML与CSS中的3D转换模块
  3. 生态伙伴 | 飞书携手ProcessOn助力企业高效远程办公,共同“战疫”
  4. BCH生态建设逐步推进: Electron Cash钱包即将推出IOS版
  5. RunJar是什么进程?
  6. Linux编程手册读书笔记第三章(20140407)
  7. 解决 QtCreator 3.5(4.0)无法输入中文的问题
  8. 能不能用python开发qq_用Python写一个模拟qq聊天小程序的代码实例
  9. springboot入门书籍推荐,“最粉嫩
  10. ASP.NET MVC 3.0(一): MVC 3.0 的新特性 摘要
  11. IT民工金鱼哥从业8年的历程与感悟
  12. 如何快速查找综述类论文
  13. 1-十四烷基-3-甲基咪唑六氟磷酸盐([C14MIm][PF6])修饰纳米SiO2二氧化硅(mg级瓶装)
  14. 【Linux基础】文件及文件权限(图文详解)
  15. Teamwork(The first day of the team)
  16. css template download
  17. 【日常】FAB法则在产品设计的应用
  18. 如何下载网易云音乐APP里的MV和短视频?
  19. AngularJS之表格设置样式
  20. Verilog HDL错误总结

热门文章

  1. 提高班第三周周记(中秋第三天)
  2. 各bert 模型下载
  3. torch学习笔记(二) nn类结构-Linear
  4. java锁(公平锁和非公平锁、可重入锁(又名递归锁)、自旋锁、独占锁(写)/共享锁(读)/互斥锁、读写锁)
  5. LeetCode简单题之拥有最多糖果的孩‭子
  6. TVM编译机器学习到 WASM 和 WebGPU
  7. Halide应用开发
  8. 自动调度GPU的卷积层
  9. TinyML-TVM是如何驯服Tiny的(下)
  10. CVPR2020:训练多视图三维点云配准