1、Interval Join 概述

在之前的Join算子中,一般使用的是coGroup算子,因为一个算子可以提供多种语义,但是也是有一些弊端的。因为coGroup只能实现在同一个窗口的两个数据流之间进行join,在实际的计算过程中,往往会遇到当req发生时,resp迟迟无法响应,这个时候,就会出现一个跨窗口的问题。也就是说经常会出现数据乱序,或者数据延迟的情况,导致两个流的数据是不同步的,也就会导致,join的过程中丢失数据问题。不在同一个窗口中的数据无法join,这个问题flink官方提供了另外一种join的方式,也就是interval join。他的核心思想就是,将两个流通过keyed分区。然后,按照key 在一个相对的时间段内进行Join。

Interval join用一个公共键连接两个流的元素(我们现在称它们为A&B),其中流B的元素的时间戳位于流A中元素的时间戳的相对时间间隔内。

这也可以更正式地表达为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里要表达的意思也就是说,当两个流进行join的时候,会根据左流的时间戳在右流中寻找公共键。

其中a和b是a和b中共享公共密钥的元素。只要下限始终小于或等于上限,下限和上限都可以是负值或正值。间隔联接当前仅执行内部联接。

当一对元素传递给ProcessJoinFunction时,它们将被分配有两个元素的较大时间戳(可以通过ProcessJoinFunction.context访问)。

注意:Interval Join 仅支持event time

在上面的例子中,我们将两个流“橙色”和“绿色”连接起来,下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是。lowerBoundExclusive()和upperBoundExclusive可用于更改是否包含上下界。

再次使用更正式的符号,这将转化为

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

如三角形所示。

2、代码实现

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

3、Interval Join源码实现原理

在Interval Join的实现当中,其中的核心实现为IntervalJoinOperator类,这个类提供了执行IntervalJoin的核心逻辑

构造方法

public IntervalJoinOperator(long lowerBound,long upperBound,boolean lowerBoundInclusive,boolean upperBoundInclusive,TypeSerializer<T1> leftTypeSerializer,TypeSerializer<T2> rightTypeSerializer,ProcessJoinFunction<T1, T2, OUT> udf) {super(Preconditions.checkNotNull(udf));Preconditions.checkArgument(lowerBound <= upperBound,"lowerBound <= upperBound must be fulfilled");// Move buffer by +1 / -1 depending on inclusiveness in order not needing// to check for inclusiveness later on// 这里根据是否包含上下界,来进行判断,是否执行 +1 或者 -1 操作this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;}

初始化状态

 public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//构建 左流缓冲区,类型为keyedState的MapState 其中时间戳是key,值为BufferEntry 类型的List ArrayListthis.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))));//构建右流缓冲区 类型为keyedState的MapState 其中时间戳是key,值为BufferEntry类型的List ArrayListthis.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))));}

处理左流和右流的数据

 @Overridepublic void processElement1(StreamRecord<T1> record) throws Exception {//左流processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);}@Overridepublic void processElement2(StreamRecord<T2> record) throws Exception {//右流processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);}

我们可以明显的看出来,左流和右流的处理,都是依靠于processElement方法。

private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {//获取当前流的值,可以是左流也可以是右流final THIS ourValue = record.getValue();//获取当前元素的时间戳,左流 or 右流final long ourTimestamp = record.getTimestamp();//是否迟到,是否小于 当前的 watermarkif (isLate(ourTimestamp)) {return;}//将该方法的实现写到下方了,这里的意思是将当前的元素写入,当前key 所属的 state中,也就是左流keyedstate 或者右流keyed state //addToBuffer(ourBuffer, ourValue, ourTimestamp);List<BufferEntry<THIS>> elemsInBucket = ourBuffer.get(ourTimestamp);if (elemsInBucket == null) {elemsInBucket = new ArrayList<>();}elemsInBucket.add(new BufferEntry<>(ourValue, false));ourBuffer.put(ourTimestamp, elemsInBucket);//遍历 其他流的state 。for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp  = bucket.getKey();//如果时间不在范围内 则看一下保存的元素if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}//如果在说明有值啊,当前值对应 other 多个元素的时候,会执行for循环,也就是 1 x n 的输出for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}//计算清理时间是否到了,到了的话就清理long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}}

经过我们上述的分析,可以分析出来,Interval Join 是一种依赖于EventTime的一种join方式,它将左流和右流相同的key的数据按照时间戳来进行存储在不同的缓存里面,leftBuffer 和rightBuffer。

运行思路是这样的。

首先进来一个元素,这个元素可能是左流也可能是右流的元素,然后校验是否过期,过期就丢弃,不过期继续处理。

然后将他放入到左流或右流单独所属的cache中,时间戳为key,然后判断时间段,和他所对应的缓存 里面是否有值,如果有则返回,没有则等待右流将他唤醒。

然后会判断时间戳到达什么位置了。是否到了该清理的时候,如果到了,则会按照时间戳来进行清理。

 public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);}}

到这里,我想我们对Interval Join 有了一些深入的理解了。

1、根据时间段来进行join,可以处于边界,也可以不处于边界

2、根据双cache来进行存储数据,以及根据keyed来进行join逻辑的实现

3、他是内连接的,目前不支持左外链接,想做的话,可以手动指定清理策略(改源码重新打包,或者基于双亲委派机制的在项目中添加对应的类,来进行改造)。

Flink Interval Join使用以及源码解析相关推荐

  1. Seatunnel提交任务到Flink集群源码解析

    一:首先查看seatunnel提交任务到flink集群的时候的shell脚本start-seatunnel-flink-13-connector-v2.sh,查看最后会调用一个类FlinkStarte ...

  2. Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...

  3. [源码解析] 从TimeoutException看Flink的心跳机制

    [源码解析] 从TimeoutException看Flink的心跳机制 文章目录 [源码解析] 从TimeoutException看Flink的心跳机制 0x00 摘要 0x01 缘由 0x02 背景 ...

  4. Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)...

    Flink 学习 github.com/zhisheng17/- 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! 本项目结构 博客 1.Flink 从0 ...

  5. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  6. [源码解析] 当 Java Stream 遇见 Flink

    [源码解析] 当 Java Stream 遇见 Flink 文章目录 [源码解析] 当 Java Stream 遇见 Flink 0x00 摘要 0x01 领域 1.1 Flink 1.2 Java ...

  7. Flink源码解析 | 从Example出发:理解Flink启动流程

    从<Apache Flink本地部署>这篇文章中可以看到,我们启动集群都是通过脚本start-cluster.sh开始执行. 我们的源码解析之路就从flink的bash脚本入手. star ...

  8. Flink SQL JSON Format 源码解析

    用 Flink SQL 解析 JSON 格式的数据是非常简单的,只需要在 DDL 语句中设置 Format 为 json 即可,像下面这样: CREATE TABLE kafka_source (fu ...

  9. Apache Sedona(GeoSpark) spatial join 源码解析

    文章目录 Apache Sedona(GeoSpark) Spatial Join Range join Distance join 源码解析 SedonSQLRegistrator.register ...

  10. StarRocks Join Reorder 源码解析

    导读:欢迎来到 StarRocks 源码解析系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你逐步了解这款明星开源数据库产品. 本期 StarRocks 技术内幕将介绍 ...

最新文章

  1. lamp介绍,wordpress,phpmyadmin,discuzz安装
  2. 基于分代的垃圾回收算法
  3. The security settings could not be applied to the database because the connection has failed安装Mysql
  4. 【Linux】Linux查看机器负载-IO负载
  5. 删除xenserver的iso库
  6. springMVC框架下JQuery传递并解析Json数据
  7. 20191112每日一句
  8. cscd论坛_高压电器第九届电工技术前沿问题学术论坛“先进电磁技术”分论坛及专题征稿...
  9. python-提取特征 特征选择
  10. Causal Representation Learning for Out-of-Distribution Recommendation
  11. android 8.0 耳机线控,最新资讯 | 安卓8.0这大变化没法忍!要弃耳机孔
  12. SIPM模拟器 MIPS汇编语言实现读取文件
  13. 如何搭建网站?第二步:购买服务器域名
  14. 【MySQL】作业一
  15. 了解HTT1/HTT2/HTT3 ?
  16. matplotlib 减少subplot空白
  17. 研究生计算机论文怎么写,研究生计算机论文摘要怎么写 研究生计算机论文摘要范文参考...
  18. elementUIel-input和el-select宽度不一样
  19. Python进行office操作 - 用Python读写Word文档入门
  20. Photoshop合并多个图片为PDF格式文件的(PDF文件编辑删除页面及合并的操作方法)解决方案

热门文章

  1. vs 的 tfs 账号更改
  2. csdner: china_jeffery, C++默认构造函数; csdner: thief thief, 什么情况下C++编译器会生成默认的构造函数
  3. LSD_SLAM编译运行
  4. 天津大学仁爱学院c语言期末考试题,天津大学仁爱学院2014-2015学年第1学期期末C语言复习...
  5. 天天向上的力量python代码解释_天天向上的力量 B
  6. 计算机实践游戏报告范文,关于在校学生玩电脑游戏的调查报告
  7. 【CF643D】Bearish Fanpages(set)(模拟)
  8. 上周热点回顾(4.1-4.7)
  9. justinmind夜话:数据母板系列视频教程之原型设计二十一条军规
  10. jquery AJAX清除IE缓存问题