参考博客 https://cloud.tencent.com/developer/article/1738836

数据类型为左流 FlinkClick(userid=gk01, click=Pay, ctime=2020-12-14 09:55:00.000) ; 右流为 FlinkPay(userid=gk01, payway=alipy, ptime=2020-12-14 09:58:00.000)

join的这段代码如下

  clickOut.keyBy(t->t.getUserid()).intervalJoin(payOunt.keyBy(t->t.getUserid())).between(Time.minutes(1),Time.minutes(5)).lowerBoundExclusive()    //默认是闭区间,这样就变成了开区间.upperBoundExclusive().process(new ProcessJoinFunction<FlinkClick, FlinkPay, String>() {@Overridepublic void processElement(FlinkClick left, FlinkPay right, Context ctx, Collector<String> out) throws Exception {out.collect(StringUtils.join(Arrays.asList(left.getUserid(),left.getClick(),right.getPayway()),'\t'));}}).print().setParallelism(1);

一:watermark生成规则:

watermark的计算为 min(ctime,ptime)-watermark (watermark为左右流定义的乱序时间,我这里设置的0),贴出其中一个流的demo,注意watermark

env.addSource(payConsumer).map(new MapFunction<String, FlinkPay>() {@Overridepublic FlinkPay map(String pv) throws Exception {JSONObject clickObject = JSONObject.parseObject(pv);String userid = clickObject.getString("userid");String payway = clickObject.getString("payway");String ptime = clickObject.getString("ptime");FlinkPay payO = new FlinkPay(userid, payway, ptime);return payO;}}).assignTimestampsAndWatermarks(WatermarkStrategy.<FlinkPay>forBoundedOutOfOrderness(Duration.ZERO)   //watermark时间.withTimestampAssigner(new SerializableTimestampAssigner<FlinkPay>() {@Overridepublic long extractTimestamp(FlinkPay element, long recordTimestamp) {Date dateP = new Date();try {System.out.println(element);dateP = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(element.getPtime());} catch (ParseException e) {e.printStackTrace();}
//                                                        System.out.println(dateP.getTime());return dateP.getTime();}}));

二:状态清理机制

贴上几段源码,均在  IntervalJoinOperator 类中

private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;@Override
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))));this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))));
}

在IntervalJoinOperator中,会利用两个MapState分别缓存左流和右流的数据。其中,Long表示时间时间戳,List<BufferEntry<T>>表示该时刻到来的数据记录,当左流和右流有数据到达时,会分别调用processElement1()和processElement2()方法,它们都调用了processElement()方法

@Overridepublic void processElement1(StreamRecord<T1> record) throws Exception {processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);} @Overridepublic void processElement2(StreamRecord<T2> record) throws Exception {processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);}private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {final THIS ourValue = record.getValue();final long ourTimestamp = record.getTimestamp();if (ourTimestamp == Long.MIN_VALUE) {throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +"interval stream joins need to have timestamps meaningful timestamps.");}if (isLate(ourTimestamp)) {return;}addToBuffer(ourBuffer, ourValue, ourTimestamp);for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp  = bucket.getKey();if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}}

代码最后调用TimerService.registerEventTimeTimer(),注册时间戳为timestamp+relativeUpperBound的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的namespace是不同的,具体逻辑位于onEventTime()方法中

 @Overridepublic void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);}}

先把测试数据及结果贴在这里

id 左流数据时间戳(ctime) 右流数据时间戳(ptime) 左流清理时间 右侧清理时间
1 2020-12-14 01:55:00.000 2020-12-14 02:00:00.000  
2 2020-12-14 01:55:00.000   2020-12-14 01:55:00.000

对这个结果说明一下:

我们在自己的代码里设置了:.between(Time.minutes(1),Time.minutes(5))

上述源码中有这一行

long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

从这里我们就可以计算左右流的清理时间了:

当左流数据进来时,(lowerBound, upperBound) 为 (1 ,5) ,当右流数据进来时,(lowerBound, upperBound) 为 (-5 ,-1),其实就是   left+1min <  right  <left+5min ,反过来就是 right  -5min <  left <right -1min

2020-12-14 01:55:00.000 的左侧数据进来,upperBound大于0,cleanupTime = 时间戳+5min 即等于2020-12-14 02:00:00.000;这是因为,当右侧流在2020-12-14 02:00:00.000需要查找左侧流的数据时间为 [2020-12-14 01:55:00.000,2020-12-14 01:59:00.000],所以watermark> 2020-12-14 02:00:00.000 时可以清除2020-12-14 01:55:00.000的数据

2020-12-14 01:55:00.000的右侧数据进来,upperBound小于0,clearnupTime = 时间戳,即等于 2020-12-14 01:55:00.000;这是因为,左侧数据流在 2020-12-14 01:55:00.000时,需要查找的右侧流时间戳范围 [2020-12-14 01:56:00.000, 2020-12-14 02:00:00.000],所以当watermark达到2020-12-14 01:55:00.000时 可以清除 2020-12-14 01:55:00.000 的数据

在 https://cloud.tencent.com/developer/article/1417447 这篇博客中,博主说watermark讲到 WaterMark是根据实际最小值减去UpperBound生成,即:Min(左,右)-upperBound,个人觉得不太对,如果有小伙伴对我这篇博客有疑问,欢迎留言,会积极改正!!

Flink1.11 intervalJoin watermark生成,状态清理机制源码理解Demo分析相关推荐

  1. Spring Security OAuth2.0 token生成与刷新机制源码阅读

    一.介绍 Spring Security Oauth2是目前市面上非常流行的实现了OAuth2.0协议的权限框架.本文会介绍其是如何获取token以及刷新token的. 二.AbstractEndPo ...

  2. java如何将数据保存为xml6_用Java实现可保存状态的数据库生成XML树,源码来了(9)...

    用Java实现可保存状态的数据库生成XML树,源码来了(9) 时间:2006/7/19 5:38:30 作者:佚名 人气:30 6.3.2.Servlet源码 1.RefreshServlet.jav ...

  3. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  4. Zookeeper--Watcher机制源码剖析二

    Watcher触发 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是"Watcher监听的对应数据节点的数据内容发生变更& ...

  5. Zookeeper--Watcher机制源码剖析一

    Watcher-- 数据变更通知 我们知道Zookeeper提供来分布式数据的订阅/发布功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某个主题对象,当这个被监听对 ...

  6. Android View系列(二):事件分发机制源码解析

    概述 在介绍点击事件规则之前,我们需要知道我们分析的是MotionEvent,即点击事件,所谓的事件分发就是对MotionEvent事件的分发过程,即当一个MotionEvent生成以后,系统需要把这 ...

  7. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  8. flink设置watermark以及事件时间字段源码分析

    flink设置watermark以及事件时间字段源码分析 背景 1.1.提取时间戳字段,用于事件时间语义处理数据 1.2.设置水位线(水印)watermark TimestampAssigner 核心 ...

  9. React事件机制 - 源码概览(下)

    上篇文档 React事件机制 - 源码概览(上)说到了事件执行阶段的构造合成事件部分,本文接着继续往下分析 批处理合成事件 入口是 runEventsInBatch // runEventsInBat ...

最新文章

  1. elementui table 固定列_elementUI Table组件 如何设置合并列
  2. mysql 主从单表_MySQL主从复制单表或者多表
  3. matlab 自定义对象,面向对象:MATLAB的自定义类 [MATLAB]
  4. 最小生成树prim算法———模板
  5. 如何处理数据中心电缆管理问题?
  6. ABAP中的动态运算函数
  7. (转)IOS学习笔记-2015-03-29 int、long、long long取值范围
  8. DDoS(Distributed Denial of Service,分布式拒绝服务)
  9. python集合运算_Python 集合set()添加删除、交集、并集、集合操作详解
  10. 如何在三个月内获得三年的工作经验
  11. 【转】PHP发送邮件之PHPMailer
  12. Java连接数据库代码
  13. 组态软件哪个好_组态软件推荐
  14. 利用python爬虫程序爬取豆瓣影评
  15. 如何免费下载和翻译论文
  16. MYSQL时间函数之NOW()
  17. 解决tooltips鬼畜问题
  18. 计算机主板复位电路的组成,教你认识液晶彩电主板中的复位电路
  19. 蓝精灵协会 (The Smurfs‘ Society) 宣布与著名艺术家展开一系列的合作,打造传奇 PFP 系列
  20. 理解小球下落(Dropping Balls)

热门文章

  1. element ui 图片加载失败_element图片懒加载的问题
  2. 07-字体及文本样式
  3. 2021-7-7HTML5前端基础
  4. W801/W800-wifi-socket开发(二)-UDP蓝牙控制wifi连接
  5. 简单的视频剪辑教程:使用win10自带的工具剪切和合并视频
  6. 【20190427】【Python】MOOC学习中的小代码总结(已完结)
  7. 不要逼我……我想做乖孩子
  8. 数学小课堂:利用信息的等价性进行信息压缩
  9. react后台管理系统项目总结
  10. C++实现矩阵计算器