Fink DataStreamAPI介绍与使用


DataSources数据输入

第一步

// 第一步:创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

1.内置数据源

(1)文件数据源

// 直接读取文本文件
DataStreamSource<String> textStream = env.readTextFile("D:\\project\\flinkLearn\\file\\hello.txt");
// 通过指定CsvInputFormat读取CSV文件
DataStreamSource<String> csvStream = env.readFile(new CsvInputFormat<String>(new Path("D:\\project\\flinkLearn\\file\\hello.csv")) {@Overrideprotected String fillRecord(String reuse, Object[] parsedValues) {return null;}
}, "D:\\project\\flinkLearn\\file\\hello.csv");

(2)Socket数据源

DataStreamSource<String> socketDataStream = env.socketTextStream("localhost", 9999);

(3)集合数据源数据源

从集合中创建DataStream数据集

DataStreamSource<Tuple2> dataStream = env.fromElements(new Tuple2(1L, 3L), new Tuple2(1L, 5L), new Tuple2(1L, 7L));

从数组中创建DataStream数据集

String[] elements = new String[]{"hello", "flink"};
DataStreamSource<String> dataStream = env.fromCollection(Arrays.asList(elements));

将java.util.List转换成DataStream数据集

List<String> arrayList = new ArrayList<>();
arrayList.add("hello flink");
DataStreamSource<String> dataList = env.fromCollection(arrayList);

2.外部数据源

(1)数据源连接器

上文提到的基本的数据接入方式,例如从文件、Socket端口中接入数据,实质是实现了不同的SourceFunction,Flink将其封装成高级API,减少了用户的使用成本。
Flink为了京可能降低用户再使用Flink进行应用开发时的依赖复杂度,所有的第三方连接器依赖配置放置在flink基本依赖库以外,用户使用时,需要根据需要将用到的Connector依赖库引入到应用工程中。以kafka为例:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version>
</dependency>
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer010(args[0], new SimpleStringSchema(), properties));

用户可以通过自定义Schema将接入数据转换成定制的数据接口,主要是实现DeserializationScchema接口来完成。

(2)自定义数据源连接器

Flink中已经实现了大多数主流的数据源连接器,但是Flink的架构非常开放,用户可以根据需求自定义连接器。可以通过显示SourceFunction定义单个线程的接入的数据介入其,也可以通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction类定义并发数据源介入器。
定义完成后,就可以通过env.addSources方法添加数据源。


DataStream转换操作

1. Single-DataStream操作

(1) Map[DataStream -> DataStream]

调用用户定义的MapFunction对DataStream[T]数据进行处理,新城新的DataStream[T],其中数据格式可能会发生变化,常用作对数据集内的数据的清洗和转换。

DataStreamSource<Integer> dataStream = env.fromElements(1, 2, 3, 4);
SingleOutputStreamOperator<Integer> map = dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return ++value;}
});
// 或使用Lambda表达四
dataStream.map(t -> ++t);
// 或使用自定义类
dataStream.map(new MyMapper());

(2) FlatMap[DataStream -> DataStream]

输入一个元素,输出0个、1个或多个元素。FlatMapFunction<T,V>中T代表输入元素数据类型(flatMap方法的第一个参数类型),V代表输出集合中元素类型(flatMap中的Collector类型参数)

// wordCount中根据空格分出单词
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 便利所有words包成二元组输出for (String word : words) {out.collect(new Tuple2<>(word, 1));}
}
});

(3) Filter[DataStream -> DataStream]

过滤指定元素数据,如果返回true则该元素继续向下传递,如果为false则将该元素过滤掉。FilterFunction中T代表输入元素的数据类型。

dataStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {if(line.contains("flink"))return true;elsereturn false;}
});

(4) KeyBy[DataStream -> KeyedStream ]

逻辑上将数据流元素进行分区,具有相同key的记录将被划分到同一分区。返回类型KeyedStream<T,KEY>中T代表KeyedStream中元素数据类型,KEY代表虚拟KEY的数据类型。

KeyedStream<String,Tuple> keyedStream = dataStream.keyBy(0);

以下情况的元素不能作为key使用:

  1. POJO类型,但没有重写hashCode(),而是依赖Object.hashCode()。 该元素是数组类型。
  2. 任何数据类型的数组结构

(5) Reduce[KeyedStream-> DataStream]

主要目的是将输入的KeyedStream通过传入的用户自定义的ReduceFunction滚动地进行数据聚合处理,其中定义的ReduceFunction必须满足运算结合律和交换律。

DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromElements(new Tuple2("a", 3), new Tuple2("d", 4), new Tuple2("c", 2), new Tuple2("a", 2));
// keyBy
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = dataStream.keyBy(0);
// reduce
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}
});

(6) Aggregations[KeyedStream-> DataStream]

滚动聚合具有相同key的数据流元素,我们可以指定需要聚合的字段(field)。DataStream中的T为聚合之后的结果。

//对KeyedStream中元素的第一个Filed求和
DataStream<String> dataStream1 = keyedStream.sum(0);
//对KeyedStream中元素的“count”字段求和
keyedStream.sum("count");
//获取keyedStream中第一个字段的最小值
keyedStream.min(0);
//获取keyedStream中count字段的最小值的元素
keyedStream.minBy("count");
keyedStream.max("count");
keyedStream.maxBy(0);

min和minBy的区别是:min返回指定字段的最小值,而minBy返回最小值所在的元素。

2. Multi-DataStream操作

(1) Union [DataStream* -> DataStream]

将两个或多个DataStream,所有DataStream中的元素都会组合成一个新的DataStream。如果联合自身,则每个元素出现两次在新的DataStream。

dataStream.union(dataStream1);

(2) Connect [DataStream,DataStream -> ConnectedDataStreams]

连接(connect)两个流,并且保留其类型。两个数据流之间可以共享状态。ConnectedStreams<IN1,IN2>中IN1代表第一个数据流中的数据类型,IN2代表第二个数据流中的数据类型。

ConnectedStreams<String,String> connectedStreams = dataStream1.connect(dataStream2);

对于ConnectedStreams类型的数据不能直接进行类似print()的操作,需要再转换成DataStream类型数据集

(3) CoFlatMap/CoMap [ConnectedDataStreams->DataStream]

可以对连接流执行类似map和flatMap操作。

// CoMap
connectedStreams.map(new CoMapFunction<String, String, String>() {@Overridepublic String map1(String value) throws Exception {return value.toUpperCase();}@Overridepublic String map2(String value) throws Exception {return value.toLowerCase();}
});// CoFlatMap同理,区别时,同事可以在两个函数之间共享变量,完成两个数据集的数据整合

通常情况下,CoMapFunction 和 CoFlatMapFunction函数并不能有效地解决数据集关联问题,产生的结构可能也不是用户想使用的,因为用户可能想通过指定的条件对两个数据集进行关联,然后产生相关性比较强的结果数据集。这个时候就需要借助keyBy函数或broadcast广播变量实现。
dataStream1.connect(dataStream2).keyBy(1, 0);
dataStream1.connect(dataStream2.broadcast());

(3) Split [DataStream -> SplitStream] (已经标记为废弃)

我们可以根据某些规则将数据流切分成两个或多个数据流。

dataStream.split(new OutputSelector<String>() {@Overridepublic Iterable<String> select(String value) {List<String> outList = new ArrayList<>();if(value.contains("flink"))outList.add("flink");else outList.add("other");return outList;}
});

(4) Select [SplitStream-> DataStream]

我们可以对SplitStream分开的流进行选择,可以将其转换成一个或多个DataStream。

splitStream.select("flink");
splitStream.select("other");

(5) Iterate [DataStream -> IterativeStream]

可以使用iterate方法来获取IterativeStream。

IterativeStream<String> iterativeStream = dataStream.iterate();

(6) Project [DataStream -> DataStream]

对元组类型的DataStream可以使用Project选取子元组。

DataStream<Tuple2<String,Integer>> dataStream2 = dataStream.project(0,2);

3. 实现UDF函数——更细粒度的控制流

1) 函数类(Function Classes)

Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

​ 下面例子实现了FilterFunction接口:

DataStream<String> flinkTweets = tweets.filter(new FlinkFilter());
public static class FlinkFilter implements FilterFunction<String> { @Override public boolean filter(String value) throws Exception { return value.contains("flink");}
}

​ 还可以将函数实现成匿名类

DataStream<String> flinkTweets = tweets.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); }}
);

​ 我们filter的字符串"flink"还可以当作参数传进去。

DataStream<String> tweets = env.readTextFile("INPUT_FILE ");
DataStream<String> flinkTweets = tweets.filter(new KeyWordFilter("flink"));
public static class KeyWordFilter implements FilterFunction<String> { private String keyWord; KeyWordFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception { return value.contains(this.keyWord); }
}

2) 匿名函数(Lambda Functions)

DataStream<String> tweets = env.readTextFile("INPUT_FILE");
DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );

3) 富函数(Rich Functions)

“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。

​ 它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

​ Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
public static class MyMapFunction extends RichMapFunction<SensorReading, Tuple2<Integer, String>> { @Override public Tuple2<Integer, String> map(SensorReading value) throws Exception {return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); } @Override public void open(Configuration parameters) throws Exception { System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和HDFS的连接 } @Override public void close() throws Exception { System.out.println("my map close"); // 以下做一些清理工作,例如断开和HDFS的连接 }
}

4. 物理分区操作

重分区操作,在DataStream类中可以看到很多Partitioner字眼的类。

其中partitionCustom(...)方法用于自定义重分区。

SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配

Flink内部提供的常见数据重分区策略如下所示:

(1)随机分区(Random Partitioning)[DataStream -> DataStream]

通过随机的方式将数据分配在下游算子的每个分区中,分区相对均匀,但是较容易市区原有数据的分区结构

// 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区)
DataStream<String> shuffleStream = dataStream.shuffle();
shuffleStream.print("shuffle");

(2)Roundrobin Partitioning [DataStream -> DataStream]

通过循环的方式对数据集中的数据进行重分区,能够尽可能包整每个分区的数据平衡,当数据集倾斜的时候使用这种策略就是比较有效的优化方法

DataStream<String> rebalanceStream = dataStream.rebalance();

(3)Rescaling Partitioning [DataStream -> DataStream]

Rescaling Partitioning 也是一种通过循环的方式进行数据重平衡的分区策略。不同的是,Roundrobin Partitioning 数据会全局性的通过网络介质传输到其他的节点完成数据的重新平衡,而Rescaling Partitioning 仅仅会对上下游的算子数据进行重平衡,具体的分区主要根据上下游算子的并行度决定。例如上游算子的并发度为2,下游算子的并发度为4,就会发生上有算子中一个分区的数据按照同等比例将数据路由在下游的固定的两个分区中,另外一个分区同理。

DataStream<String> rebalanceStream = dataStream.rescale();

(4)广播操作(Broadcasting) [DataStream -> DataStream]

将输入的数据集复制到下游算子的并行的Task实例中,下游算子中的Tasks可以直接从本地内存中获取广播数据集,不再依赖于网络传输。这种分区策略适合于小数据集。

DataStream<String> broadcastStream = dataStream.broadcast();

(5)自定义分区(Custom Partitioning) [DataStream -> DataStream]

调用partitionCustom(...)方法用于自定义重分区。

// 通过数据集字段索引指定分区字段
dataStream.partitionCustom(new CustomPartitioner(), 0);
// 通过数据集字段名称指定分区字段
dataStream.partitionCustom(new CustomPartitioner(), "filed_name");// 通过实现 Partitioner 来自定义重分区规则
class CustomPartitioner implements Partitioner<Tuple2<String, Integer>> {Random random = new Random(8);// 这里可以改为并行的slot数量@Overridepublic int partition(Tuple2<String, Integer> key, int numPartitions) {return "flink".equals(key.f0) ? 0 : random.nextInt(numPartitions);}
}

(6)其他分区

1)keyBy (Hash,然后取模)

dataStream.keyBy(SensorReading::getId).print("keyBy");

2)global (直接发送给第一个分区,少数特殊情况才用)

dataStream.global().print("global");

DataSinks数据输出

(1) 基本数据输出

dataStream.writeAsText("file:///path/to/demo.txt");//标记为过时
dataStream.writeAsCsv("file:///path/to/demo.csv", FileSystem.WriteMode.OVERWRITE);//标记为过时
dataStream.writeToSocket(hostName, port, new SimpleStringSchema());

(2) 第三方数据输出

所有的数据输出都可以基于实现SinkFunction完成定义。比如将DataStream中的数据写入到Kafka的topic中:

FlinkKafkaProducer011 kafkaProducer = new FlinkKafkaProducer011("localhost:9092", // 指定Broker list 参数"kafka-topic", // 指定目标Kafka Topic 名称new SimpleStringSchema() // 设定序列化Schema
);
// 通过addSink添加kafkaProducer到算子拓扑中
dataStream.addSink(kafkaProducer);

时间 与 Watermark

可以参考:[白话解析] Flink的Watermark机制

Fink DataStreamAPI介绍与使用相关推荐

  1. 第七课 大数据技术之Fink1.13的实战学习-Fink CEP

    第七课 大数据技术之Fink1.13的实战学习-Fink CEP 文章目录 第七课 大数据技术之Fink1.13的实战学习-Fink CEP 第一节 Fink CEP介绍 1.1 Flink CEP背 ...

  2. 架构设计参考项目系列主题:智能风控决策引擎系统可落地实现方案:风控监控大盘实现

    本文转自: 技术岁月 Author 贺鹏Kavin 目录 I.前文提要 II.完整决策流 III.风控结果数据分析监控 IV.引入 Fink I.前文提要 通过之前五篇文章,分别介绍了决策引擎的主要功 ...

  3. 第一章-Flink介绍-《Fink原理、实战与性能优化》读书笔记

    Flink介绍-<Fink原理.实战与性能优化>读书笔记 1.1 Apache Flink是什么? 在当代数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如 ...

  4. 【fink】dataStreamAPI开发

    一.Time与Window (一)Time Event Time:是事件创建的时间.它通常由事件中的时间戳描述,例如采集的日志数据中, 每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访 ...

  5. 07/08_flink shell,基本原理及应用场景、特点、架构图、集群解剖、JobManager、TaskManagers、tasks和操作链、Session/job集群、组件介绍等、应用场景

    1.7.Flink scala shell代码调试 1.7.1.Flink scala shell代码调试语法 1.8.Flink基本原理及应用场景 1.8.1.Flink特点 1.8.2.Flink ...

  6. puppet成长日记二 Package资源详细介绍及案例分析

    puppet成长日记二 Package资源详细介绍及案例分析 一.系统环境 1.puppet服务端 Release:RHEL6.4 HOSTNAME: puppetserver.rsyslog.org ...

  7. 数据流—DataStreamAPI

    Hello Flink 1:构建一个典型的Flink流式应用需要一下几步: 1:设置执行环境. 2:从数据源中读取一条或多条流 3:通过一系列流式转换来实现应用逻辑. 4:选择性的将结果输出到一个或多 ...

  8. 新闻推荐系统-项目介绍(PRD)

    新闻推荐系统 b站链接 制作不易,6月后开源全部代码以及数据,记得一键三联哦! 开源代码: github代码库 mysql: 链接:https://pan.baidu.com/s/1jLzfYbpsf ...

  9. Apache Flink介绍、架构、原理以及实现

    文章目录 一 Flink简介 1.1 什么是flink 1.2 flink的特点 1.3 编程API 二 Flink架构 2.1 架构图 2.2 运行组件 2.3 关键词含义 三 Flink原理 3. ...

最新文章

  1. leetcode算法题--删除链表的节点
  2. try catch finally语句详解
  3. Github上36893颗星!这个被称为下一代企业级应用首选技术你学了么?
  4. ASP.NET:页面保存为WORD出现的问题!
  5. 获取GridView中RowCommand的当前索引行
  6. ZigBee开发(15)--组网实验点播
  7. java中如何实例化一个接口_「实例化」java之接口实例化 - seo实验室
  8. Windows NT操作系统
  9. 固体发动机内弹道matlab,固体火箭发动机内弹道性能的仿真研究
  10. 手把手教你智能硬件开发(一) 我选Arduino
  11. win7家庭版和旗舰版区别_Windows系统的家庭版、专业版、旗舰版,都有什么区别?...
  12. Android 使用图片缓存,避免OOM(实现照片墙)
  13. CF 1715 D. 2+ doors 位运算 1900
  14. 思源笔记局域网内访问【使用教程】
  15. 蓝桥杯刷题冲刺 | 倒计时14天
  16. Mac平台安卓模拟器:网易MuMu mac中文免费版(支持12系统)
  17. 生成化学表达式下标、上标数字
  18. js 中的this指针
  19. 基于STM32的开源简易示波器项目
  20. Web初学-2022.10.28-11.5

热门文章

  1. [sig12][PBS]Farcry3的物理光照
  2. C语言while循环和do while循环详解
  3. 计算机编程情话,[程序员的爱情表白代码]献给程序员们的爱情表白书
  4. sqlite 打开扩展名为.DB 文件
  5. springboot输出json格式日志
  6. C/C++之switch范围判断
  7. 用c语言编写 输入一个月份,判断此月份所在的季节
  8. 区块链产业园拔地而起,多方面亟待问题解决?
  9. 华为WLAN产品介绍与组网(包括capwap隧道,ap上线,STA上线,组网方式,转发方式)
  10. 哪家的收银系统服务器做得好,收银系统哪个好,商家该如何选择?