Time类型

在Flink中常用的Time类型:

  • 处理时间
  • 摄取时间
  • 事件时间

处理时间

是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间。例如一些算子操作时间,在服务器上面的时间。

如果你以处理时间作为流处理的时间处理方式,那么所有的基于时间的操作都会使用服务器的时间,来运行相关的操作。例如:一个小时的处理时间窗口,将会包含一个小时内的到达服务器内的所有数据。例如应用程序9:15am开始执行,第一个小时的时间处理窗口会包含所有的9:15到10:15内的事件数据,下一个时间窗口是10:15到11:15内的所有数据。

处理时间是最简单的事件处理方式,并不需要流和机器的时间协调。因此提供了高性能和低延迟。然而在分布式环境中或者异步环境中处理时间并不能够提供准确性(也就是说在处理数据时,由于网络的抖动在一个处理时间窗口中例如9:15到10:15,很大可能包括9:00的事件数据)。

事件时间

事件时间是每一个设备上每一个单独事件发生的时间例如手机登录APP的日志时间。这个时间就是这条数据记录的时间。每一条数据都有一个时间戳表示这条数据的事件发生时间。这个时间取决于每条数据,而并不会依赖于机器的时间。事件时间处理时必须指定如何获得Event Time watermarks(用来描述Event Time如何处理)。

按照事件时间处理数据,处理结果应该是完全一致,也就是说无论处理多少次结果都是一样的,这就是所谓的大数据处理的幂等性。 不管事件到达时间和事件是不是有序到达(在生产环境中,数据往往进入到服务器中的时间和顺序是不一定的,有可能先产生的数据后到达服务器,这取决于很多网络因素)

摄取时间

摄取时间表示某个事件数据进入到Flink的时间。在source操作中,每条记录都会得到source的当前时间戳,也就是接收到的数据自动会有一个摄取时间,也就是例如时间窗都是基于这个时间来处理的。

摄取时间是处于事件时间和处理时间之间。如上图所示。摄取时间是有成本的,但是却是结果可预测的。因为摄取时间使用了稳定的时间戳(在source端只会分配一次),每一条数据的时间戳都是固定的。并且同一摄取时间的数据有可能被分配到不同的处理时间窗口中。

Windows

Windows使我们处理无限数据流(源源不断的进来)的核心部件。Windows把我们的数据流拆成一个个的buckets。我们需要把算子作用到buckets上面去。

第一件事情就是需要指定我们的流数据是不是有key,有key和没有key对应的算子是完全不一样的。

Keyed windows

带keyby,会结合windows一起使用。输入的数据内容中的任意属性都可以作为一个key。在这个流上可以允许窗口多任务并行计算,每一个逻辑key都可以被独立计算,相同的key的数据会被发送到相同的并行任务中去处理。

Non-Keyed windows

通过使用windowAll来指定。原始的数据流不会被拆分成多个逻辑任务,所有窗口逻辑都是一个窗口任务来执行,所以并行度是1。

windows 生命周期

简而言之,当第一个元素到达对应的窗口时,一个windows就会被开始创建。当时间(不管是event时间还是processing时间)达到时间戳范围,就会移除窗口。另外,每一个窗口都有一个Trigger和window Functions,当数据到达窗口后,执行的函数就是window Functions,这个函数包含了对这个窗口内容的所有计算,当Trigger达到一定条件之后,就会触发。

Windows Assigners

在指定流数据是否带key之后,下一步就是定义窗口的分配器(windows assigner),windows assigner的职责是定义每一个传入的元素如何分配到窗口内。对于keyby使用window()方法,对于non-keyby使用windowAll()方法。

WindowAssigner is responsible for assigning each incoming element to one or more windows.

每个传入的数据分配给一个或多个窗口。

Flink内置的window assigner对于大多数场景来讲基本上是够用的(tumbling windows滚动窗口, sliding windows滑动窗口, session windows会话窗口 and global windows全局窗口)。也可以通过继承WindowAssigner来自定义一个window assigner。所有的内置window assigner(除了全局窗口)都是基于时间(处理时间或事件时间)来分配数据的。

基于时间的窗口有一个开始的timestamp(inclusive)和结束timestamp(exclusive)表示窗口的大小。

Flink中对于窗口的划分有两大类,第一大类是基于time(用的最多),第二大类是基于count。

Tumbling Windows 滚动窗口

滚动窗口分配器将分配每一个元素到一个指定大小的窗口,这种类型的窗口有一个固定的大小而且不会有重叠的。上面这张图就是随着时间流按照指定的时间间隔拆开。

简单实例代码:

Scala

object WindosApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("192.168.227.128", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("WindosApp")}}

上面的代码表示监听socket数据流,每隔5秒获取一次数据。timeWindow表示根据时间来划分窗口,(此外还有countWindow根据数量来划分窗口)。默认时间是processTime处理时间。

Java

public class JavaWindowApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length()>0){out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);env.execute("JavaWindowApp");}
}

Sliding Windows滑动窗口

滑动窗口分配器分配每一个元素到一个固定大小的窗口,类似于滚动窗口,窗口大小可以通过配置进行修改,但是滑动窗口还有另外一个附加滑动参数控制滑动窗口什么时候启动,所以这个窗口是有可能重叠的。

上面图的意思是window1的窗口大小是10分钟,滑动大小是5分钟,也就是每隔5分钟产生一个窗口,这个窗口的大小是10分钟,这个窗口就是window2,然后window2又过5分钟产生一个窗口,窗口的大小是10分钟 window3,以此类推。所以滑动窗口处理的数据可能会有重叠。一个数据元素可能会在多个窗口中进行处理。

使用场景:每个半个小时统计前一个小时的TopN。

object WindosApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("192.168.227.128", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0)//.timeWindow(Time.seconds(5)) # 滚动窗口.timeWindow(Time.seconds(10),Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("WindosApp")}}

每隔5秒统计近10秒的数据。所以当服务器端输入:

a,a,a,b,b,b
a,a,a,b,b,b
a,b,a,b,a,a

时,控制台会打印两遍结果:

(a,10)
(b,8)
(b,8)
(a,10)

Window Functions

在定义窗口分配器之后,就需要指定基于每一个窗口的计算方法了(在上面的例子中我们做了一个keyby sum操作)。window function会处理窗口中的每一个元素。window function包括如下几个:

  • ReduceFunction
  • AggregationFunction
  • FoldFunction
  • ProcessWindowFunction

ReduceFunction和AggregationFunction的执行效率更高,因为Flink会在数据到达每一个窗口时首先做一个增量聚合操作。ProcessWindowFunction拿到的是包含在窗口中的所有的元素以及附加信息一个Iterable,是一个全量聚合。因此ProcessWindowFunction的执行效率不高,因为Flink会缓存窗口中的所有数据。

ReduceFunction

input中的两个元素进行结合产生一个同样类型的输出。这里我们举例,通过传入的数据类型是数值类型来演示增量效果。

Scala

object WindowReduceApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("192.168.227.128", 9999)text.flatMap(_.split(",")).map(x=>(1,x.toInt)) // 1,2,3,4,5 => (1,1) (1,2) (1,3) (1,4) (1,5).keyBy(0) //因为key都是1, 所以所有的元素都到一个task去执行.timeWindow(Time.seconds(5)) // 滚动窗口.reduce((v1, v2) => {  reduce函数作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的数据到达之后进行一次性处理,而是数据两两处理println(v1 + "....." + v2)(v1._1, v1._2 + v2._2)}).print().setParallelism(1)env.execute("WindowReduceApp")}
}

服务器端输入:

1,2,3,4,5

控制台中输出如下:

(1,1).....(1,2)
(1,3).....(1,3)
(1,6).....(1,4)
(1,10).....(1,5)
(1,15)

reduce函数作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的数据到达之后进行一次性处理,而是数据两两处理。

Java

public class JavaWindowReduceApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length()>0){out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));}}}}).keyBy(0).timeWindow(Time.seconds(5)).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]");return new Tuple2<>(value1.f0,value1.f1 + value2.f1);}}).print().setParallelism(1);env.execute("JavaWindowApp");}
}

输出结果如下:

value1 = [(1,1)], value2 = [(1,2)]
value1 = [(1,3)], value2 = [(1,3)]
value1 = [(1,6)], value2 = [(1,4)]
value1 = [(1,10)], value2 = [(1,5)]
(1,15)

ProcessWindowFunction

ProcessWindowFunction可以拿到一个Iterable,可以拿到窗口中的所有元素,并且有一个上下文对象可以访问时间和状态信息,比reducefunction可以提供更多的功能。但这样却可以带来资源和性能的开销,因为元素并不能通过增量的方式去聚合,相反,它需要把所有的数据都放在一个buffer中。

public class JavaWindowProcessApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length()>0){out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));}}}}).keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {System.out.println("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]");long count = 0;for(Tuple2<Integer, Integer> in:elements) {count++;}out.collect("window:" + context.window() + "count:" + count);}}).print().setParallelism(1);env.execute("JavaWindowApp");}
}

服务器输入:

1,2,3,4,5

控制台输出:

tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1,1), (1,2), (1,3), (1,4), (1,5)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00]
window:TimeWindow{start=1568542160000, end=1568542165000}count:5

只输出一次,说明是等待所有数据都拿到之后才进行处理。

使用场景:窗口内的数据进行排序。在Reduce中是无法进行排序的。

Apache Flink 零基础入门(十九)Flink windows和Time操作相关推荐

  1. Apache Flink 零基础入门(九)Flink支持哪些数据类型

    Flink有7种数据类型分别是: Java Tuples and Scala Case Classes Java POJOs Primitive Types Regular Classes Value ...

  2. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  3. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

  4. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  5. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  6. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  7. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  8. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  9. Apache Flink 零基础入门(十四)Flink 分布式缓存

    Apache Flink 提供了一个分布式缓存,类似于Hadoop,用户可以并行获取数据. 通过注册一个文件或者文件夹到本地或者远程HDFS等,在getExecutionEnvironment中指定一 ...

最新文章

  1. 重磅!GitHub 日收 7000 星, Windows 计算器项目开源即爆红!
  2. 查看服务器硬盘负载——判断硬盘是否为瓶颈
  3. [好书推荐].计算机原理与设计——Verilog HDL版等;待续
  4. 暴富、投机,金钱、欲望、人性,一个叫 Fomo3D的游戏正在刷新你的世界观
  5. 8086汇编-实验6(微调)-小写转换大写
  6. iframe的src怎么携带参数_三种传递gRPC动态参数方式的使用体验
  7. java密码学原型算法_java密码学原型算法实现——双线性对.pdf
  8. macos php无法访问,Mac上,Apache启动正常,却无法访问localhost和127.0.0.1
  9. Azure Sentinel -- 云原生企业安全信息和事件管理平台(SIEM)初探系列一
  10. CSP - J 2020 T1 优秀的拆分
  11. Android Studio 常用快捷键 for mac
  12. 吴恩达|机器学习作业1.1多变量线性回归
  13. vs2008软件测试实战 3 web test(1)
  14. 如何创建带有.的文件夹
  15. TINA-TI仿真软件使用教程
  16. python模拟微信登录公众号_python通过手机抓取微信公众号
  17. 协方差与相关系数介绍
  18. a标签href的几种写法
  19. guid主分区表损坏如何处理_什么是GPT或GUID分区表
  20. 计算机d盘hlddz是啥,hlddzsdk.exe是什么进程?

热门文章

  1. 门户网站运营的几个方法
  2. Dojo API略解续
  3. elk中kibana中如何显示写入elasticsearch的数据
  4. 获取一亿数据获取前100个最大值
  5. ELK+kafka日志系统搭建-实战
  6. Laravel经常使用的语法总结
  7. Linux报错./configure: error: C compiler cc is not found
  8. PHP随机数:mt_rand更快
  9. mysql oracle mvcc_PostgreSQL、Oracle/MySQL和SQL Server的MVCC实现原理方式
  10. 阿里 框架 原声Android,阿里P8大佬亲自教你!你所不知道的Android原生开发的现状,含泪整理面经...