窗口Join(Window Join)

window join将共享相同key并位于同一窗口中的两个流的元素联接在一起。可以使用窗口分配器定义这些窗口,并根据两个流中的元素对其进行评估。

然后将双方的元素传递到用户定义的JoinFunctionFlatJoinFunction,在此用户可以发出满足联接条件的结果。

通用用法可总结如下:

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)

有关语义的一些说明:

  • 创建两个流的元素的成对组合的行为就像一个inner-join,这意味着如果一个流中的元素没有与另一流中要连接的元素对应的元素,则不会发出该元素。
  • 那些确实加入的元素将以最大的时间戳(仍位于相应窗口中)作为时间戳。例如,以[5,10)为边界的窗口将导致连接的元素具有9作为其时间戳。

在以下部分中,我们将概述使用某些示例性场景时不同类型的window join的行为。

Tumbling Window Join

在执行Tumbling Window Join时,所有具有公共key和公共Tumbling Window Join的元素都按成对组合联接,并传递到JoinFunctionFlatJoinFunction。因为它的行为就像一个inner join,所以不会发出在tumbling window中没有来自另一个流的元素的一个流的元素!

如图所示,我们定义了一个大小为2毫秒的Tumbling Window,该窗口的形式为[0,1],[2,3],...。该图显示了所有元素的成对组合在每个窗口中,该窗口将传递给JoinFunction。请注意,在翻转窗口[6,7]中,由于在绿色流中不存在要与橙色元素⑥和joined连接的元素,因此不发出任何内容。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.join(greenStream).where(elem => /* select key */).equalTo(elem => /* select key */).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply { (e1, e2) => e1 + "," + e2 }

Sliding Window Join

执行Sliding Window Join时,所有具有公共key和公共Sliding Window的元素都按成对组合进行连接,并传递到JoinFunctionFlatJoinFunction。在当前Sliding Window中,一个流中没有其他流元素的元素不会被发出!

请注意,某些元素可能在一个滑动窗口中连接,但不能同时在另一个滑动窗口中连接!

在此示例中,我们使用大小为2毫秒的滑动窗口滑动时间1毫秒,从而得出滑动窗口[-1,0],[0,1],[1,2],[2,3],…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里您还可以看到,例如,在窗口[2,3]中橙色②与绿色③如何结合,但在窗口[1,2]中却没有与绿色③结合。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.join(greenStream).where(elem => /* select key */).equalTo(elem => /* select key */).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply { (e1, e2) => e1 + "," + e2 }

Session Window Join

执行Session Window Join时,具有“组合”时满足会话条件的相同key的所有元素将以成对组合的方式连接在一起,并传递给JoinFunctionFlatJoinFunction。再次执行inner join,因此,如果有一个Session Window Join仅包含一个流中的元素,则不会发出任何输出!

在这里,我们定义了一个Session Window Join连接,其中每个会话之间的间隔至少为1ms。共有三个会话,在前两个会话中,两个流中的联接元素都传递给JoinFunction。在第三个会话中,绿色流中没有元素,因此⑧和⑨不连接!

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.join(greenStream).where(elem => /* select key */).equalTo(elem => /* select key */).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply { (e1, e2) => e1 + "," + e2 }

间隔Join(Interval Join)

Interval Join使用公共key连接两个流(现在将它们分别称为A和B)的元素,并且流B的元素具有与流A的元素时间戳相对时间间隔的时间戳

这也可以更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B的元素,它们共享一个公共key。只要下限始终小于或等于上限,则下限和上限都可以为负或正。Interval Join当前仅执行inner joins。将一对元素传递给ProcessJoinFunction时,将为它们分配两个元素的较大时间戳(可通过ProcessJoinFunction.Context访问)。

注意:Interval Join当前仅支持事件时间。

在上面的示例中,我们将两个流“橙色”和“绿色”连接在一起,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含边界的,但是.lowerBoundExclusive().upperBoundExclusive可以应用于更改行为。

再次使用更正式的符号,这将转化为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...orangeStream.keyBy(elem => /* select key */).intervalJoin(greenStream.keyBy(elem => /* select key */)).between(Time.milliseconds(-2), Time.milliseconds(1)).process(new ProcessJoinFunction[Integer, Integer, String] {override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {out.collect(left + "," + right); }});});

建议阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html

https://github.com/perkinls/flink-local-train

https://yq.aliyun.com/users/ohyfzrwxmb3me?spm=a2c4e.11153940.0.0.763648d5EoX4bX

Flink DataStream中join相关推荐

  1. [Flink]Flink DataStream window join 和interval join

    目录 window join interval join window join 窗口连接把两个流中相同窗口通过一个键值连接起来.然后,两边的元素被传递到用户定义的JoinFunction或FlatJ ...

  2. 大数据——Flink dataStream 中窗口函数的使用

    目录 一.窗口类型 二.窗口分配程序(WindowAssigner) 2.1 基于数量的 CountWindow 2.1.1 滚动 2.1.2 滑动 2.2 基于时间的 TimeWindow 2.2. ...

  3. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  4. Flink temporal table join研究

    作者:王东阳 前言 ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准.Temporal Table记录了历史上 ...

  5. Apache Flink 漫谈系列 - JOIN 算子

    2019独角兽企业重金招聘Python工程师标准>>> 聊什么 在<Apache Flink 漫谈系列 - SQL概览>中我们介绍了JOIN算子的语义和基本的使用方式,介 ...

  6. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  7. Flink SQL中的函数

    Table API是内嵌在Java语言中的,很多方法需要在类中额外添加,扩展功能比较麻烦,目前支持的函数比较少,故一般情况下我们使用Flink SQL中的函数 Flink SQL中的函数主要分为两类: ...

  8. [Flink]Flink DataStream API 概览

    目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...

  9. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

最新文章

  1. iOS网络开发之:NSURLConnection
  2. shell编程开发应用指南
  3. [vim]高亮查找匹配
  4. mysql两者之间_Django和MySQL - 两者之间的数据是不同的
  5. python网络爬虫系列(十)——chrome在爬虫中的使用
  6. 使用Eclipse+PyDev创建Django项目一windows下
  7. C语言程序设计double,C语言中double类型数据占字节数为
  8. python与线性代数 矩阵
  9. python2和3通用的头文件写法 不确定是否叫头文件 这种说法 大概就是这个意思
  10. P1279 字串距离 (动态规划)
  11. PAT (Basic Level) Practice1010 一元多项式求导
  12. 华为OJ编程 动态规划类
  13. 操作系统原理第五章(资源分配与调度)
  14. vue-bilibili学习笔记
  15. 计算机网络存在的漏洞,常见的计算机网络安全漏洞有哪些
  16. 夜神模拟器和安卓连接
  17. 区块链和大数据的关系
  18. 数据库---数据库恢复技术
  19. SSM项目之电商系统-爱生鲜
  20. Java 圆锥类 圆类 面积 体积

热门文章

  1. php如何生成一年的日历表_html - PHP如何生成一个指定年份一整年的日历
  2. java使用Jodd-http发送http请求
  3. UWB定位技术融合定位革新物联发展
  4. ROS入门教程(三)—— 用C++实现Hello world
  5. window.open()和window.showModalDialog 的使用及传值操作
  6. MATLAB实战应用-【数据处理篇】数据清洗(从方法论到实战应用)
  7. 最常用的酒店IPTV系统实施方案
  8. IHE测试系列之三:测试要求
  9. 机器学习编程sklearn常用语句
  10. 「每日一题」回旋镖的数量