Flink Interval Join使用以及源码解析
1、Interval Join 概述
在之前的Join算子中,一般使用的是coGroup算子,因为一个算子可以提供多种语义,但是也是有一些弊端的。因为coGroup只能实现在同一个窗口的两个数据流之间进行join,在实际的计算过程中,往往会遇到当req发生时,resp迟迟无法响应,这个时候,就会出现一个跨窗口的问题。也就是说经常会出现数据乱序,或者数据延迟的情况,导致两个流的数据是不同步的,也就会导致,join的过程中丢失数据问题。不在同一个窗口中的数据无法join,这个问题flink官方提供了另外一种join的方式,也就是interval join。他的核心思想就是,将两个流通过keyed分区。然后,按照key 在一个相对的时间段内进行Join。
Interval join用一个公共键连接两个流的元素(我们现在称它们为A&B),其中流B的元素的时间戳位于流A中元素的时间戳的相对时间间隔内。
这也可以更正式地表达为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]or
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里要表达的意思也就是说,当两个流进行join的时候,会根据左流的时间戳在右流中寻找公共键。
其中a和b是a和b中共享公共密钥的元素。只要下限始终小于或等于上限,下限和上限都可以是负值或正值。间隔联接当前仅执行内部联接。
当一对元素传递给ProcessJoinFunction时,它们将被分配有两个元素的较大时间戳(可以通过ProcessJoinFunction.context访问)。
注意:Interval Join 仅支持event time
在上面的例子中,我们将两个流“橙色”和“绿色”连接起来,下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是。lowerBoundExclusive()和upperBoundExclusive可用于更改是否包含上下界。
再次使用更正式的符号,这将转化为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
如三角形所示。
2、代码实现
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});
3、Interval Join源码实现原理
在Interval Join的实现当中,其中的核心实现为IntervalJoinOperator类,这个类提供了执行IntervalJoin的核心逻辑
构造方法
public IntervalJoinOperator(long lowerBound,long upperBound,boolean lowerBoundInclusive,boolean upperBoundInclusive,TypeSerializer<T1> leftTypeSerializer,TypeSerializer<T2> rightTypeSerializer,ProcessJoinFunction<T1, T2, OUT> udf) {super(Preconditions.checkNotNull(udf));Preconditions.checkArgument(lowerBound <= upperBound,"lowerBound <= upperBound must be fulfilled");// Move buffer by +1 / -1 depending on inclusiveness in order not needing// to check for inclusiveness later on// 这里根据是否包含上下界,来进行判断,是否执行 +1 或者 -1 操作this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;}
初始化状态
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//构建 左流缓冲区,类型为keyedState的MapState 其中时间戳是key,值为BufferEntry 类型的List ArrayListthis.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))));//构建右流缓冲区 类型为keyedState的MapState 其中时间戳是key,值为BufferEntry类型的List ArrayListthis.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))));}
处理左流和右流的数据
@Overridepublic void processElement1(StreamRecord<T1> record) throws Exception {//左流processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);}@Overridepublic void processElement2(StreamRecord<T2> record) throws Exception {//右流processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);}
我们可以明显的看出来,左流和右流的处理,都是依靠于processElement方法。
private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {//获取当前流的值,可以是左流也可以是右流final THIS ourValue = record.getValue();//获取当前元素的时间戳,左流 or 右流final long ourTimestamp = record.getTimestamp();//是否迟到,是否小于 当前的 watermarkif (isLate(ourTimestamp)) {return;}//将该方法的实现写到下方了,这里的意思是将当前的元素写入,当前key 所属的 state中,也就是左流keyedstate 或者右流keyed state //addToBuffer(ourBuffer, ourValue, ourTimestamp);List<BufferEntry<THIS>> elemsInBucket = ourBuffer.get(ourTimestamp);if (elemsInBucket == null) {elemsInBucket = new ArrayList<>();}elemsInBucket.add(new BufferEntry<>(ourValue, false));ourBuffer.put(ourTimestamp, elemsInBucket);//遍历 其他流的state 。for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp = bucket.getKey();//如果时间不在范围内 则看一下保存的元素if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}//如果在说明有值啊,当前值对应 other 多个元素的时候,会执行for循环,也就是 1 x n 的输出for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}//计算清理时间是否到了,到了的话就清理long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}}
经过我们上述的分析,可以分析出来,Interval Join 是一种依赖于EventTime的一种join方式,它将左流和右流相同的key的数据按照时间戳来进行存储在不同的缓存里面,leftBuffer 和rightBuffer。
运行思路是这样的。
首先进来一个元素,这个元素可能是左流也可能是右流的元素,然后校验是否过期,过期就丢弃,不过期继续处理。
然后将他放入到左流或右流单独所属的cache中,时间戳为key,然后判断时间段,和他所对应的缓存 里面是否有值,如果有则返回,没有则等待右流将他唤醒。
然后会判断时间戳到达什么位置了。是否到了该清理的时候,如果到了,则会按照时间戳来进行清理。
public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);}}
到这里,我想我们对Interval Join 有了一些深入的理解了。
1、根据时间段来进行join,可以处于边界,也可以不处于边界
2、根据双cache来进行存储数据,以及根据keyed来进行join逻辑的实现
3、他是内连接的,目前不支持左外链接,想做的话,可以手动指定清理策略(改源码重新打包,或者基于双亲委派机制的在项目中添加对应的类,来进行改造)。
Flink Interval Join使用以及源码解析相关推荐
- Seatunnel提交任务到Flink集群源码解析
一:首先查看seatunnel提交任务到flink集群的时候的shell脚本start-seatunnel-flink-13-connector-v2.sh,查看最后会调用一个类FlinkStarte ...
- Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)
Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...
- [源码解析] 从TimeoutException看Flink的心跳机制
[源码解析] 从TimeoutException看Flink的心跳机制 文章目录 [源码解析] 从TimeoutException看Flink的心跳机制 0x00 摘要 0x01 缘由 0x02 背景 ...
- Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)...
Flink 学习 github.com/zhisheng17/- 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! 本项目结构 博客 1.Flink 从0 ...
- Flink 源码解析 —— 源码编译运行
更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...
- [源码解析] 当 Java Stream 遇见 Flink
[源码解析] 当 Java Stream 遇见 Flink 文章目录 [源码解析] 当 Java Stream 遇见 Flink 0x00 摘要 0x01 领域 1.1 Flink 1.2 Java ...
- Flink源码解析 | 从Example出发:理解Flink启动流程
从<Apache Flink本地部署>这篇文章中可以看到,我们启动集群都是通过脚本start-cluster.sh开始执行. 我们的源码解析之路就从flink的bash脚本入手. star ...
- Flink SQL JSON Format 源码解析
用 Flink SQL 解析 JSON 格式的数据是非常简单的,只需要在 DDL 语句中设置 Format 为 json 即可,像下面这样: CREATE TABLE kafka_source (fu ...
- Apache Sedona(GeoSpark) spatial join 源码解析
文章目录 Apache Sedona(GeoSpark) Spatial Join Range join Distance join 源码解析 SedonSQLRegistrator.register ...
- StarRocks Join Reorder 源码解析
导读:欢迎来到 StarRocks 源码解析系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你逐步了解这款明星开源数据库产品. 本期 StarRocks 技术内幕将介绍 ...
最新文章
- lamp介绍,wordpress,phpmyadmin,discuzz安装
- 基于分代的垃圾回收算法
- The security settings could not be applied to the database because the connection has failed安装Mysql
- 【Linux】Linux查看机器负载-IO负载
- 删除xenserver的iso库
- springMVC框架下JQuery传递并解析Json数据
- 20191112每日一句
- cscd论坛_高压电器第九届电工技术前沿问题学术论坛“先进电磁技术”分论坛及专题征稿...
- python-提取特征 特征选择
- Causal Representation Learning for Out-of-Distribution Recommendation
- android 8.0 耳机线控,最新资讯 | 安卓8.0这大变化没法忍!要弃耳机孔
- SIPM模拟器 MIPS汇编语言实现读取文件
- 如何搭建网站?第二步:购买服务器域名
- 【MySQL】作业一
- 了解HTT1/HTT2/HTT3 ?
- matplotlib 减少subplot空白
- 研究生计算机论文怎么写,研究生计算机论文摘要怎么写 研究生计算机论文摘要范文参考...
- elementUIel-input和el-select宽度不一样
- Python进行office操作 - 用Python读写Word文档入门
- Photoshop合并多个图片为PDF格式文件的(PDF文件编辑删除页面及合并的操作方法)解决方案
热门文章
- vs 的 tfs 账号更改
- csdner: china_jeffery, C++默认构造函数; csdner: thief thief, 什么情况下C++编译器会生成默认的构造函数
- LSD_SLAM编译运行
- 天津大学仁爱学院c语言期末考试题,天津大学仁爱学院2014-2015学年第1学期期末C语言复习...
- 天天向上的力量python代码解释_天天向上的力量 B
- 计算机实践游戏报告范文,关于在校学生玩电脑游戏的调查报告
- 【CF643D】Bearish Fanpages(set)(模拟)
- 上周热点回顾(4.1-4.7)
- justinmind夜话:数据母板系列视频教程之原型设计二十一条军规
- jquery AJAX清除IE缓存问题