flink1.12.0学习笔记第 2 篇-流批一体API

flink1.12.0学习笔记第1篇-部署与入门
flink1.12.0学习笔记第2篇-流批一体API
flink1.12.0学习笔记第3篇-高级API
flink1.12.0学习笔记第4篇-Table与SQL
flink1.12.0学习笔记第5篇-业务案例实践
flink1.12.0学习笔记第6篇-高级特性与新特性
flink1.12.0学习笔记第7篇-监控与优化

2-1.流处理相关概念

数据时效性

​ 日常工作中,我们一般会先把数据存储在表,然后对表的数据进行加工、分析。既然先存储在表中,那就会涉及到时效性概念。

​ 如果我们处理以年,月为单位的级别的数据处理,进行统计分析,个性化推荐,那么数据的的最新日期离当前有几个甚至上月都没有问题。但是如果我们处理的是以天为级别,或者一小时甚至更小粒度的数据处理,那么就要求数据的时效性更高了。比如:对网站的实时监控、对异常日志的监控,这些场景需要工作人员立即响应,这样的场景下,传统的统一收集数据,再存到数据库中,再取出来进行分析就无法满足高时效性的需求了。

流处理和批处理

  • Batch Analytics:批量计算,统一收集数据->存储到DB->对数据进行批量处理,就是传统意义上使用类似于 Map Reduce、Hive、Spark Batch 等,对作业进行分析、处理、生成离线报表。
  • Streaming Analytics:流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如实时大屏、实时报表。

流计算与批计算对比

  • 数据时效性不同

    • 流式计算实时、低延迟,批量计算非实时、高延迟
  • 数据特征不同
    • 流式计算的数据一般是动态的、没有边界的,而批处理的数据一般则是静态数据
  • 应用场景不同
    • 流式计算应用再实时场景,时效性要求比较高的场景,如实时推荐、业务监控,而批处理应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。
  • 运行方式不同
    • 流式计算的任务持续进行的,批量计算的任务则一次性完成

流批一体API

  • DataStream API 支持批执行模式
    Flink 的核心 API 最初是针对特定的场景设计的,尽管 Table API / SQL 针对流处理和批处理已经实现了统一的 API,但当用户使用较底层的 API 时,仍然需要在批处理(DataSet API)和流处理(DataStream API)这两种不同的 API 之间进行选择。鉴于批处理是流处理的一种特例,将这两种 API 合并成统一的 API,有一些非常明显的好处

    • 可复用性:作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。
    • 维护简单:统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。

考虑到这些优点,社区已朝着流批统一的 DataStream API 迈出了第一步:支持高效的批处理(FLIP-134)。从长远来看,这意味着 DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。

FLIP-134: DataStream API 的批处理执行

  • 容许在 KeyedStream.intervalJoin() 的配置时间属性,在 Flink 1.12 以前 KeyedStream.intervalJoin() 算子的时间属性依赖于全局设置的时间属性。在 Flink 1.12 中咱们能够在 IntervalJoin 方法后加上 inProcessingTime()inEventTime() ,这样 Join 就再也不依赖于全局的时间属性。
  • 在 Flink 1.12 中将 DataStream API 的 timeWindow() 方法标记为过时,请使用 window(WindowAssigner)TumblingEventTimeWindowsSlidingEventTimeWindowsTumblingProcessingTimeWindows 或者 SlidingProcessingTimeWindows
  • StreamExecutionEnvironment.setStreamTimeCharacteristic()TimeCharacteristic 方法标记为过时。在 Flink 1.12 中,默认的时间属性改变成 EventTime 了,因而你再也不须要该方法去开启 EventTime 了。在 EventTime 时间属性下,你使用 processing-time 的 windows 和 timers 也都依旧会生效。若是你想禁用水印,请使用 ExecutionConfig.setAutoWatermarkInterval(long) 方法。若是你想使用 IngestionTime,请手动设置适当的 WatermarkStrategy。若是你使用的是基于时间属性更改行为的通用 ‘time window’ 算子(eg: KeyedStream.timeWindow()),请使用等效操做明确的指定处理时间和事件时间。
  • 容许在 CEP PatternStream 上显式配置时间属性在 Flink 1.12 以前,CEP 算子里面的时间依赖于全局配置的时间属性,在 1.12 以后能够在 PatternStream 上使用 inProcessingTime()inEventTime() 方法。
  • API
    Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度也越大

  • 编程模型
    Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示:

2-2.Source

基于集合的Source(Collection-based)

  1. env.fromElements(可变参数);
  2. env.fromColletion(各种集合);
  3. env.generateSequence(开始,结束);
  4. env.fromSequence(开始,结束);
package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class SourceDemo01 {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStreamSource<String> ds1 = env.fromElements("wang", "ting", "flink", "spark");DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList("wang", "ting", "flink", "spark"));DataStreamSource<Long> ds3 = env.generateSequence(1, 10);DataStreamSource<Long> ds4 = env.fromSequence(1, 10);ds1.print();ds2.print();ds3.print();ds4.print();// transformation// sink// executeenv.execute();}
}

基于文件的Source(File-based)

env.readTextFile (本地/HDFS文件/文件夹);//压缩文件也可以

在项目根目录创建data目录,data目录下创建input和output目录

package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo02 {public static void main(String[] args) throws Exception {//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//sourceDataStreamSource<String> ds1 = env.readTextFile("data/input/wlk.txt");DataStreamSource<String> ds2 = env.readTextFile("data/input/20220924");DataStreamSource<String> ds3 = env.readTextFile("data/input/words.tar.gz");//transformation//sinkds1.print();ds2.print();ds3.print();//executeenv.execute();}
}

基于Socket的Source(Socket-based)

[root@ops01 ~]# yum install -y nc
[root@ops01 ~]# nc -lk 6666
package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SourceDemo03 {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> linesDS = env.socketTextStream("ops01", 6666);// transformationDataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}});DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);// sinkresult.print();// executeenv.execute();}
}

验证:

# 服务器命令行输出:
[root@ops01 lib]# nc -lk 6666
wangting 666
today 20220924
wlk is comming# ide控制台输出:
2> (wangting,1)
3> (666,1)
4> (today,1)
4> (20220924,1)
1> (comming,1)
4> (wlk,1)
4> (is,1)
Process finished with exit code -1

自定义Source(Custom)

实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据

从MySQL中实时加载数据并且MySQL中的数据有变化,也能被实时加载出来

MariaDB [(none)]> CREATE DATABASE `wow`;
Query OK, 1 row affected (0.000 sec)MariaDB [(none)]> use wow;
Database changed
MariaDB [wow]> CREATE TABLE `wow_info` (->     `id` int(11) NOT NULL AUTO_INCREMENT,->     `role` varchar(255) DEFAULT NULL,->     `pinyin` varchar(255) DEFAULT NULL,->     PRIMARY KEY (`id`)-> ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.014 sec)MariaDB [wow]> INSERT INTO `wow_info` VALUES ('1', 'fs', 'fashi');
Query OK, 1 row affected (0.002 sec)
MariaDB [wow]> INSERT INTO `wow_info` VALUES ('2', 'ms', 'mushi');
Query OK, 1 row affected (0.002 sec)
MariaDB [wow]> INSERT INTO `wow_info` VALUES ('3', 'ss', 'shushi');
Query OK, 1 row affected (0.005 sec)
MariaDB [wow]> INSERT INTO `wow_info` VALUES ('4', 'dz', 'daozei');
Query OK, 1 row affected (0.004 sec)
MariaDB [wow]> INSERT INTO `wow_info` VALUES ('5', 'ws', 'wuseng');
Query OK, 1 row affected (0.002 sec)
MariaDB [wow]> INSERT INTO `wow_info` VALUES ('6', 'xd', 'xiaode');
Query OK, 1 row affected (0.002 sec)
MariaDB [wow]>
package cn.wangting;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class SourceDemo04 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<wow_info> wow_infoDS = env.addSource(new MySQLSource()).setParallelism(1);//TODO 2.transformation//TODO 3.sinkwow_infoDS.print();//TODO 4.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class wow_info {private Integer id;private String role;private String pinyin;}public static class MySQLSource extends RichParallelSourceFunction<wow_info> {private boolean flag = true;private Connection conn = null;private PreparedStatement ps =null;private ResultSet rs  = null;//open只执行一次,适合开启资源@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://ops01:3306/wow", "root", "123456");String sql = "select id,role,pinyin from wow_info";ps = conn.prepareStatement(sql);}@Overridepublic void run(SourceContext<wow_info> ctx) throws Exception {while (flag) {rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String role = rs.getString("role");String pinyin  = rs.getString("pinyin");ctx.collect(new wow_info(id,role,pinyin));}Thread.sleep(5000);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if(conn != null) conn.close();if(ps != null) ps.close();if(rs != null) rs.close();}}
}

执行后控制台输出:

1> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)
2> SourceDemo04.wow_info(id=4, role=dz, pinyin=daozei)
3> SourceDemo04.wow_info(id=1, role=fs, pinyin=fashi)
3> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)
4> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)
4> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)
2> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)
2> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)
4> SourceDemo04.wow_info(id=4, role=dz, pinyin=daozei)
3> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)
1> SourceDemo04.wow_info(id=1, role=fs, pinyin=fashi)
1> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)

在MySQL中插入新数据验证ide控制台是否实时查到新数据输出:

INSERT INTO `wow_info` VALUES ('7', 'sq', 'shengqi');
INSERT INTO `wow_info` VALUES ('8', 'zs', 'zhanshi');
INSERT INTO `wow_info` VALUES ('9', 'dk', 'siwangqishi');

插入新数据后控制台输出:

1> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)
1> SourceDemo04.wow_info(id=9, role=dk, pinyin=siwangqishi)
3> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)
3> SourceDemo04.wow_info(id=7, role=sq, pinyin=shengqi)
2> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)
2> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)
1> SourceDemo04.wow_info(id=4, role=dz, pinyin=daozei)
1> SourceDemo04.wow_info(id=8, role=zs, pinyin=zhanshi)
2> SourceDemo04.wow_info(id=1, role=fs, pinyin=fashi)
2> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)
2> SourceDemo04.wow_info(id=9, role=dk, pinyin=siwangqishi)
4> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)
4> SourceDemo04.wow_info(id=7, role=sq, pinyin=shengqi)
3> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)
3> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)

2-3.Transformation

map

  • 将函数作用在集合中的每一个元素上,并返回作用后的结果

flatMap

  • 将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

keyby

  • 按照指定的key来对流中的数据进行分组

注意:

流处理中没有groupBy,而是keyBy

filter

  • 按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

sum

  • 按照指定的字段对集合中的元素进行求和

reduce

  • 对集合中的元素进行聚合

示例:统计信息传输过来的单词统计,并过滤掉"TMD"敏感词

[root@ops01 lib]# nc -lk 6666
package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransformationDemo01 {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> lines = env.socketTextStream("ops01", 6666);// transformationDataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] arr = value.split(" ");for (String word : arr) {out.collect(word);}}});DataStream<String> filted = words.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.equals("TMD");}});SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1); //_+_}});// sinkresult.print();// executeenv.execute();}
}

服务器6666端口发送信息:

ni hao a laotie
zou yiqi dafuben!
TMD buqu
TMD ni bie lao TMD

查看代码控制台输出信息:

4> (ni,1)
3> (hao,1)
3> (a,1)
3> (laotie,1)
3> (dafuben!,1)
4> (zou,1)
4> (yiqi,1)
1> (buqu,1)
3> (bie,1)
4> (ni,2)
4> (lao,1)
Process finished with exit code -1

合并与拆分

union

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

connect

connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • connect只能连接两个数据流,union可以连接多个数据流。

  • connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

代码示例:

package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class TransformationDemo02 {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);// transformationDataStream<String> result1 = ds1.union(ds2);ConnectedStreams<String, String> result2 = ds1.connect(ds2);ConnectedStreams<String, Long> result3 = ds1.connect(ds3);SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return "String:" + value;}@Overridepublic String map2(Long value) throws Exception {return "Long:" + value;}});// sinkresult1.print();result.print();// executeenv.execute();}
}

控制台输出:

1> hadoop
4> String:flink
1> Long:2
4> Long:1
3> flink
3> hadoop
3> String:spark
2> spark
2> String:hadoop
2> Long:3
4> spark
1> flinkProcess finished with exit code 0

split、select、Side Outputs

  • Split就是将一个流分成多个流
  • Select就是获取分流后对应的数据
  • Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

代码示例:

package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class TransformationDemo03 {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// transformationOutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));OutputTag<Integer> evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class));SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {ctx.output(evenTag, value);} else {ctx.output(oddTag, value);}}});DataStream<Integer> oddResult = result.getSideOutput(oddTag);DataStream<Integer> evenResult = result.getSideOutput(evenTag);// sinkSystem.out.println(oddTag);//OutputTag(Integer, 奇数)System.out.println(evenTag);//OutputTag(Integer, 偶数)oddResult.print("奇数:");evenResult.print("偶数:");// executeenv.execute();}
}

控制台输出:

OutputTag(Integer, 奇数)
OutputTag(Integer, 偶数)
偶数::1> 2
偶数::1> 6
偶数::1> 10
奇数::2> 3
奇数::2> 7
偶数::3> 4
偶数::3> 8
奇数::4> 1
奇数::4> 5
奇数::4> 9
Process finished with exit code 0

分区

rebalance重平衡分区

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

代码示例:

package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformationDemo04 {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<Long> longDS = env.fromSequence(0, 100);DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long num) throws Exception {return num > 10;}});// transformationSingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {int subTaskId = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(subTaskId, 1);}}).keyBy(t -> t.f0).sum(1);SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {int subTaskId = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(subTaskId, 1);}}).keyBy(t -> t.f0).sum(1);// sinkresult1.print("result1");result2.print("result2");// executeenv.execute();}
}

控制台输出:

result1:3> (0,25)
result1:3> (1,15)
result1:4> (2,25)
result1:4> (3,25)
result2:4> (2,23)
result2:4> (3,23)
result2:3> (0,21)
result2:3> (1,23)
Process finished with exit code 0

其他分区

说明:

recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例:

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

2-4.Sink

预定义-基于控制台和文件

  1. ds.print 直接输出到控制台
  2. ds.printToErr() 直接输出到控制台,用红色
  3. ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)

在输出到path的时候,可以在前面设置并行度,如果

并行度>1,则path为目录

并行度=1,则path为文件名

代码示例:

wlk.txt

wlk is comming!
date is 20220927
tbc is goodbye!
package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SinkDemo01 {public static void main(String[] args) throws Exception {//TODO envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO sourceDataStreamSource<String> ds = env.readTextFile("data/input/wlk.txt");//TODO transformation//TODO sinkds.print();ds.print("输出标识");ds.printToErr();// 红色输出ds.printToErr("输出标识");ds.writeAsText("data/output/wlkoutput1").setParallelism(1);ds.writeAsText("data/output/wlkoutput2").setParallelism(2);//TODO executeenv.execute();}
}

wlkoutput1

date is 20220927
wlk is comming!
tbc is goodbye!

1

tbc is goodbye!
wlk is comming!

2

date is 20220927

自定义-MySQL

将Flink集合中的数据通过自定义Sink保存到MySQL

MariaDB [(none)]> use wow;
Database changed
MariaDB [wow]> show tables;
+---------------+
| Tables_in_wow |
+---------------+
| wow_info      |
+---------------+
1 row in set (0.000 sec)MariaDB [wow]> select * from wow_info;
+----+------+-------------+
| id | role | pinyin      |
+----+------+-------------+
|  1 | fs   | fashi       |
|  2 | ms   | mushi       |
|  3 | ss   | shushi      |
|  4 | dz   | daozei      |
|  5 | ws   | wuseng      |
|  6 | xd   | xiaode      |
|  7 | sq   | shengqi     |
|  8 | zs   | zhanshi     |
|  9 | dk   | siwangqishi |
+----+------+-------------+
9 rows in set (0.000 sec)
package cn.wangting;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkDemo02 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<wow_info> wow_infoDS = env.fromElements(new wow_info(null, "dh", "emolieshou"));//TODO 2.transformation//TODO 3.sinkwow_infoDS.addSink(new MySQLSink());//TODO 4.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class wow_info {private Integer id;private String role;private String pinyin;}public static class MySQLSink extends RichSinkFunction<wow_info> {private Connection conn = null;private PreparedStatement ps =null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://ops01:3306/wow", "root", "123456");String sql = "INSERT INTO `wow_info` (`id`, `role`, `pinyin`) VALUES (null, ?, ?);";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(wow_info value, Context context) throws Exception {//设置?占位符参数值ps.setString(1,value.role);ps.setString(2,value.pinyin);//执行sqlps.executeUpdate();}@Overridepublic void close() throws Exception {if(conn != null) conn.close();if(ps != null) ps.close();}}
}

执行后效果:

MariaDB [wow]> select * from wow_info;
+----+------+-------------+
| id | role | pinyin      |
+----+------+-------------+
|  1 | fs   | fashi       |
|  2 | ms   | mushi       |
|  3 | ss   | shushi      |
|  4 | dz   | daozei      |
|  5 | ws   | wuseng      |
|  6 | xd   | xiaode      |
|  7 | sq   | shengqi     |
|  8 | zs   | zhanshi     |
|  9 | dk   | siwangqishi |
| 10 | dh   | emolieshou  |
+----+------+-------------+
10 rows in set (0.000 sec)

2-5.Connectors

JDBC-connectors

MariaDB [wow]> create database bigdata;MariaDB [wow]>
CREATE TABLE `t_student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
INSERT INTO `t_student` VALUES ('1', 'jack', '18');
INSERT INTO `t_student` VALUES ('2', 'tom', '19');
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
INSERT INTO `t_student` VALUES ('5', 'jack', '18');
INSERT INTO `t_student` VALUES ('6', 'rose', '20');
MariaDB [bigdata]> select * from t_student;
+----+----------+------+
| id | name     | age  |
+----+----------+------+
|  1 | jack     |   18 |
|  2 | tom      |   19 |
|  3 | rose     |   20 |
|  4 | tom      |   19 |
|  5 | jack     |   18 |
|  6 | rose     |   20 |
+----+----------+------+
6 rows in set (0.000 sec)

代码示例:

package cn.wangting;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class JDBCDemo {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<Student> studentDS = env.fromElements(new Student(null, "wangting", 666));//TODO 2.transformation//TODO 3.sinkstudentDS.addSink(JdbcSink.sink("INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",(ps, value) -> {ps.setString(1, value.getName());ps.setInt(2, value.getAge());}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://ops01:3306/bigdata").withUsername("root").withPassword("123456").withDriverName("com.mysql.jdbc.Driver").build()));//TODO 4.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}
}
MariaDB [bigdata]> select * from t_student;
+----+----------+------+
| id | name     | age  |
+----+----------+------+
|  1 | jack     |   18 |
|  2 | tom      |   19 |
|  3 | rose     |   20 |
|  4 | tom      |   19 |
|  5 | jack     |   18 |
|  6 | rose     |   20 |
|  7 | wangting |  666 |
+----+----------+------+
7 rows in set (0.000 sec)

Kafka

​ Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写 kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

以下参数都必须/建议设置上

1.订阅的主题

2.反序列化规则

3.消费者属性-集群地址

4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)

5.消费者属性-offset重置规则,如earliest/latest…

6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)

7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中

kafka consumer代码示例

package cn.wangting;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;public class ConnectorsDemo_KafkaConsumer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceProperties props  = new Properties();props.setProperty("bootstrap.servers", "ops01:9092");props.setProperty("group.id", "flink");props.setProperty("auto.offset.reset","latest");props.setProperty("flink.partition-discovery.interval-millis","5000");props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);kafkaSource.setStartFromGroupOffsets();DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.Sinkresult.print();//5.executeenv.execute();}
}

kafka producer代码示例

package cn.wangting;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class ConnectorsDemo_KafkaProducer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {@Overridepublic String map(Student value) throws Exception {String jsonStr = JSON.toJSONString(value);return jsonStr;}});//4.SinkjsonDS.print();Properties props = new Properties();props.setProperty("bootstrap.servers", "ops01:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka",  new SimpleStringSchema(),  props);jsonDS.addSink(kafkaSink);//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}
}

Redis

RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个接口中的三个方法,如下所示

  1. getCommandDescription() :
  • 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
  1. String getKeyFromData(T data):
  • 设置value 中的键值对key的值
  1. String getValueFromData(T data);
  • 设置value 中的键值对value的值

将Flink集合中的数据通过自定义Sink保存到Redis

package cn.wangting;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;public class RedisDemo {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<String> lines = env.socketTextStream("ops01", 6666);//TODO 2.transformationSingleOutputStreamOperator<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(" ");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1);//TODO 3.sinkresult.print();FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("ops01").build();RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<Tuple2<String, Integer>>(conf,new MyRedisMapper());result.addSink(redisSink);//TODO 4.executeenv.execute();}public static class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>>{@Overridepublic RedisCommandDescription getCommandDescription() {//我们选择的数据结构对应的是 key:String("wcresult"),value:Hash(单词,数量),命令为HSETreturn new RedisCommandDescription(RedisCommand.HSET,"wcresult");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> t) {return  t.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> t) {return t.f1.toString();}}
}

flink1.12.0学习笔记第1篇-部署与入门
flink1.12.0学习笔记第2篇-流批一体API
flink1.12.0学习笔记第3篇-高级API
flink1.12.0学习笔记第4篇-Table与SQL
flink1.12.0学习笔记第5篇-业务案例实践
flink1.12.0学习笔记第6篇-高级特性与新特性
flink1.12.0学习笔记第7篇-监控与优化

flink1.12.0学习笔记第2篇-流批一体API相关推荐

  1. flink1.12.0学习笔记第1篇-部署与入门

    flink1.12.0学习笔记第 1 篇-部署与入门 flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇-高级 ...

  2. extJs 2.0学习笔记(Ext.Panel篇四)

    我刚才禁不住诱惑去看了一下Ext.Window的API文档,发现只是比Panel多了点什么最大化.最小化.关闭.置前.置后.动画引发目标设置.可调整大小这些功能.像什么标题栏.工具栏之类的东西在Ext ...

  3. 《TP5.0学习笔记---配置篇》

    TP5.0学习笔记 TP5目录结构介绍 application目录是应用目录,我们整个应用所有的内容都写在这个目录中,在后续开发中,我们更多的时候都是在编写这个目录中的文件.在它里边有一个index文 ...

  4. Redis学习笔记1-理论篇

    目录 1,Redis 数据类型的底层结构 1.1,Redis 中的数据类型 1.2,全局哈希表 1.3,数据类型的底层结构 1.4,哈希冲突 1.5,rehash 操作 2,Redis 的 IO 模型 ...

  5. 树莓派4B学习笔记——IO通信篇(UART)

    文章目录 UART简介 树莓派使用UART与串口屏通信 串口屏简介 硬件连接 配置串口接口 树莓派打开UART接口 树莓派安装串口调试助手 编程实现 wiringSerial.h Serial简介 C ...

  6. RabbitMQ学习笔记(高级篇)

    RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...

  7. RCNN学习笔记——第三篇: 实现FRCNN网络训练、评价与预测(附全部源码)

    RCNN学习笔记--第三篇: 实现FRCNN网络训练.评价与预测(附全部源码) 本文是个人根据B站大佬Bubbliiiing的FRCNN系列视频同步完成FRCNN训练,记录心得和遇见的问题. 关于RC ...

  8. mysql5.0镜像_Mysql5.0学习笔记(一)

    Mysql5.0学习笔记(一) -基本sql语句与支持字符集 1.登录 mysql -h localhost -u root 2.创建用户firstdb(密码firstdb)和数据库,并赋予权限于fi ...

  9. OracleDesigner学习笔记1――安装篇

    OracleDesigner学习笔记1――安装篇   QQ:King MSN:qiutianwh@msn.com Email:qqking@gmail.com 一.       前言 Oracle是当 ...

最新文章

  1. java 1 11 111_45675678ystem111111111111
  2. 【linux】Valgrind工具集详解(六):使用Valgrind gdbserver和GDB调试程序
  3. 用python画猪_用python画小猪票佩奇
  4. C# json解析字符串总是多出双引号_Python小白必备知识:Python字符串详解
  5. full GC排查案例
  6. Mac Nginx 配置 Tomcat 配置 jdk环境变量 Nginx部署服务遇到的坑(3)
  7. Netty工作笔记0023---NIO服务器客户端总结
  8. DOM编程系列之Node对象个人分享
  9. 26_多易教育之《yiee数据运营系统》数据治理-atlas介绍篇
  10. Lenovo Quick Fix 联想智能解决工具
  11. JMP软件与康涅狄格大学商学院
  12. Vue项目中引入阿里icon图标
  13. u盘写保护,无法格式化
  14. 几行代码,把你的小电影全部藏好了!
  15. python菜单怎么做_Python 城市菜单详解(超详解)
  16. 常用字符串函数的使用
  17. C语言 | 求奇偶数个数
  18. 【Leetcode】460. LFU Cache
  19. UI自动化框架 基于selenium+pytest和PO分层思想
  20. 英语caement单词caement水泥

热门文章

  1. spotify使用教程_如何在iPhone上的Siri中使用Spotify
  2. 跨境电商须知: Google CWV新政及应对之道(上)
  3. 计算机课的底纹在哪,2017计算机二级MSOffice考试边框与底纹的设置
  4. P4707 重返现世 扩展 MinMax 容斥+DP
  5. 2016-6-2-第二个sprint
  6. python怎么加载图片png_python怎么读取图片?
  7. avg制作工具 开源_23种开源视听制作工具
  8. python文件打包成之pyinstaller使用
  9. 怎么做好直播引流?直播引流方案介绍
  10. 将yolov2-tiny模型部署到前端