Flink DataStream中join
窗口Join(Window Join)
window join
将共享相同key
并位于同一窗口中的两个流的元素联接在一起。可以使用窗口分配器定义这些窗口,并根据两个流中的元素对其进行评估。
然后将双方的元素传递到用户定义的JoinFunction
或FlatJoinFunction
,在此用户可以发出满足联接条件的结果。
通用用法可总结如下:
stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
有关语义的一些说明:
- 创建两个流的元素的成对组合的行为就像一个
inner-join
,这意味着如果一个流中的元素没有与另一流中要连接的元素对应的元素,则不会发出该元素。 - 那些确实加入的元素将以最大的时间戳(仍位于相应窗口中)作为时间戳。例如,以[5,10)为边界的窗口将导致连接的元素具有9作为其时间戳。
在以下部分中,我们将概述使用某些示例性场景时不同类型的window join
的行为。
Tumbling Window Join
在执行Tumbling Window Join
时,所有具有公共key
和公共Tumbling Window Join
的元素都按成对组合联接,并传递到JoinFunction
或FlatJoinFunction
。因为它的行为就像一个inner join
,所以不会发出在tumbling window
中没有来自另一个流的元素的一个流的元素!
如图所示,我们定义了一个大小为2毫秒的Tumbling Window
,该窗口的形式为[0,1],[2,3],...。该图显示了所有元素的成对组合在每个窗口中,该窗口将传递给JoinFunction。请注意,在翻转窗口[6,7]中,由于在绿色流中不存在要与橙色元素⑥和joined连接的元素,因此不发出任何内容。
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.join(greenStream).where(elem => /* select key */).equalTo(elem => /* select key */).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply { (e1, e2) => e1 + "," + e2 }
Sliding Window Join
执行Sliding Window Join
时,所有具有公共key
和公共Sliding Window
的元素都按成对组合进行连接,并传递到JoinFunction
或FlatJoinFunction
。在当前Sliding Window
中,一个流中没有其他流元素的元素不会被发出!
请注意,某些元素可能在一个滑动窗口中连接,但不能同时在另一个滑动窗口中连接!
在此示例中,我们使用大小为2毫秒的滑动窗口滑动时间1毫秒,从而得出滑动窗口[-1,0],[0,1],[1,2],[2,3],…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里您还可以看到,例如,在窗口[2,3]中橙色②与绿色③如何结合,但在窗口[1,2]中却没有与绿色③结合。
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.join(greenStream).where(elem => /* select key */).equalTo(elem => /* select key */).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply { (e1, e2) => e1 + "," + e2 }
Session Window Join
执行Session Window Join
时,具有“组合”时满足会话条件的相同key
的所有元素将以成对组合的方式连接在一起,并传递给JoinFunction
或FlatJoinFunction
。再次执行inner join
,因此,如果有一个Session Window Join
仅包含一个流中的元素,则不会发出任何输出!
在这里,我们定义了一个Session Window Join
连接,其中每个会话之间的间隔至少为1ms。共有三个会话,在前两个会话中,两个流中的联接元素都传递给JoinFunction。在第三个会话中,绿色流中没有元素,因此⑧和⑨不连接!
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.join(greenStream).where(elem => /* select key */).equalTo(elem => /* select key */).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply { (e1, e2) => e1 + "," + e2 }
间隔Join(Interval Join)
Interval Join
使用公共key
连接两个流(现在将它们分别称为A和B)的元素,并且流B的元素具有与流A的元素时间戳相对时间间隔的时间戳。
这也可以更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
或a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
其中a和b是A和B的元素,它们共享一个公共key
。只要下限始终小于或等于上限,则下限和上限都可以为负或正。Interval Join
当前仅执行inner joins
。将一对元素传递给ProcessJoinFunction
时,将为它们分配两个元素的较大时间戳(可通过ProcessJoinFunction.Context
访问)。
注意:Interval Join
当前仅支持事件时间。
在上面的示例中,我们将两个流“橙色”和“绿色”连接在一起,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含边界的,但是.lowerBoundExclusive()
和.upperBoundExclusive
可以应用于更改行为。
再次使用更正式的符号,这将转化为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
如三角形所示。
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.keyBy(elem => /* select key */).intervalJoin(greenStream.keyBy(elem => /* select key */)).between(Time.milliseconds(-2), Time.milliseconds(1)).process(new ProcessJoinFunction[Integer, Integer, String] {override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {out.collect(left + "," + right); }});});
建议阅读:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html
https://github.com/perkinls/flink-local-train
https://yq.aliyun.com/users/ohyfzrwxmb3me?spm=a2c4e.11153940.0.0.763648d5EoX4bX
Flink DataStream中join相关推荐
- [Flink]Flink DataStream window join 和interval join
目录 window join interval join window join 窗口连接把两个流中相同窗口通过一个键值连接起来.然后,两边的元素被传递到用户定义的JoinFunction或FlatJ ...
- 大数据——Flink dataStream 中窗口函数的使用
目录 一.窗口类型 二.窗口分配程序(WindowAssigner) 2.1 基于数量的 CountWindow 2.1.1 滚动 2.1.2 滑动 2.2 基于时间的 TimeWindow 2.2. ...
- Flink DataStream API 介绍
Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...
- Flink temporal table join研究
作者:王东阳 前言 ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准.Temporal Table记录了历史上 ...
- Apache Flink 漫谈系列 - JOIN 算子
2019独角兽企业重金招聘Python工程师标准>>> 聊什么 在<Apache Flink 漫谈系列 - SQL概览>中我们介绍了JOIN算子的语义和基本的使用方式,介 ...
- flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...
- Flink SQL中的函数
Table API是内嵌在Java语言中的,很多方法需要在类中额外添加,扩展功能比较麻烦,目前支持的函数比较少,故一般情况下我们使用Flink SQL中的函数 Flink SQL中的函数主要分为两类: ...
- [Flink]Flink DataStream API 概览
目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
最新文章
- iOS网络开发之:NSURLConnection
- shell编程开发应用指南
- [vim]高亮查找匹配
- mysql两者之间_Django和MySQL - 两者之间的数据是不同的
- python网络爬虫系列(十)——chrome在爬虫中的使用
- 使用Eclipse+PyDev创建Django项目一windows下
- C语言程序设计double,C语言中double类型数据占字节数为
- python与线性代数 矩阵
- python2和3通用的头文件写法 不确定是否叫头文件 这种说法 大概就是这个意思
- P1279 字串距离 (动态规划)
- PAT (Basic Level) Practice1010 一元多项式求导
- 华为OJ编程 动态规划类
- 操作系统原理第五章(资源分配与调度)
- vue-bilibili学习笔记
- 计算机网络存在的漏洞,常见的计算机网络安全漏洞有哪些
- 夜神模拟器和安卓连接
- 区块链和大数据的关系
- 数据库---数据库恢复技术
- SSM项目之电商系统-爱生鲜
- Java 圆锥类 圆类 面积 体积