一文搞懂 FlinkSQL函数 LAST_VALUE 的原理
背景
刚开始接触 FlinkSQL 时,对 LAST_VALUE 特别好奇,虽然工作当中有在用到,但还是特别的想知道它是怎么实现的,今天终于可以总结一下
原理
当我们写入如下类似的 sql 时,就会用到 LAST_VALUE 函数
select LAST_VALUE(status) from temp;
LAST_VALUE 函数对应的具体类为 LastValueWithRetractAggFunction。
LAST_VALUE函数之所以能够起作用最关键的是
/** Accumulator for LAST_VALUE with retraction. */public static class LastValueWithRetractAccumulator<T> {public T lastValue = null;public Long lastOrder = null;// value timestamppublic MapView<T, List<Long>> valueToOrderMap = new MapView<>();// timestamp valuepublic MapView<Long, List<T>> orderToValueMap = new MapView<>();......}@SuppressWarnings("unchecked")public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {if (value != null) {//传进来的是 null 不做任何操作T v = (T) value;Long order = System.currentTimeMillis();List<Long> orderList = acc.valueToOrderMap.get(v);if (orderList == null) {orderList = new ArrayList<>();}orderList.add(order);acc.valueToOrderMap.put(v, orderList);accumulate(acc, value, order);}}@SuppressWarnings("unchecked")public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value, Long order)throws Exception {if (value != null) {T v = (T) value;Long prevOrder = acc.lastOrder;// 默认是 nullif (prevOrder == null || prevOrder <= order) {//类似链表头插法acc.lastValue = v;acc.lastOrder = order;}List<T> valueList = acc.orderToValueMap.get(order);if (valueList == null) {valueList = new ArrayList<>();}valueList.add(v);acc.orderToValueMap.put(order, valueList);}}@SuppressWarnings("unchecked")public void retract(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {if (value != null) {T v = (T) value;List<Long> orderList = acc.valueToOrderMap.get(v);// 查出所有的 timestampif (orderList != null && orderList.size() > 0) {// 说明之前已经发出过了.此刻该 retractLong order = orderList.get(0);orderList.remove(0);//最早进入的那个 value 对应的 timestamp removeif (orderList.isEmpty()) {//说明该 value 有且仅进入了一次acc.valueToOrderMap.remove(v);} else {acc.valueToOrderMap.put(v, orderList);}retract(acc, value, order);}}}@SuppressWarnings("unchecked")public void retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order)throws Exception {if (value != null) {T v = (T) value;List<T> valueList = acc.orderToValueMap.get(order);//取出相同 timestamp 对应的所有 valueif (valueList == null) {return;}int index = valueList.indexOf(v);// 找到对应的 value 并将其删除if (index >= 0) {valueList.remove(index);if (valueList.isEmpty()) {acc.orderToValueMap.remove(order);} else {acc.orderToValueMap.put(order, valueList);}}if (v.equals(acc.lastValue)) {Long startKey = acc.lastOrder;Iterator<Long> iter = acc.orderToValueMap.keys().iterator();// find the maximal order which is less than or equal to `startKey`//找到小于要删除值对应时间戳的最大值Long nextKey = Long.MIN_VALUE;while (iter.hasNext()) {Long key = iter.next();if (key <= startKey && key > nextKey) {nextKey = key;}}if (nextKey != Long.MIN_VALUE) {List<T> values = acc.orderToValueMap.get(nextKey);acc.lastValue = values.get(values.size() - 1);acc.lastOrder = nextKey;} else {acc.lastValue = null;acc.lastOrder = null;}}}}
首先呢是两个 MapView valueToOrderMap、orderToValueMap
valueToOrderMap 值( 此刻最终的结果 )---->消息进入accumulate 方法的系统时间戳
orderToValueMap 消息进入accumulate 方法的系统时间戳 ----->值( 此刻最终的结果 )
当 RowData( 内部使用 )对应的 rowKind 为 insert 或者 update_after 时,会进入 accumulate(LastValueWithRetractAccumulator acc, Object value) 方法。accumulate 方法相对比较简单其实就是分别对 valueToOrderMap、orderToValueMap 进行赋值。
当 RowData( 内部使用 )对应的 rowKind 为 delete 或者 update_before 时,会进入 retract(LastValueWithRetractAccumulator acc, Object value) 方法,主要是操作 valueToOrderMap 删除之前已经发出去的消息记录,然后进入 retract(LastValueWithRetractAccumulator acc, Object value, Long order),主要就是操作 orderToValueMap 删除对应时间戳的值,然后找出 不大于要删除数据对应时间戳的最大时间戳,下一步要 retract 就该它了
总结
其实就是通过 时间戳 来进行判断的
一文搞懂 FlinkSQL函数 LAST_VALUE 的原理相关推荐
- 一文搞懂property函数
一文搞懂property函数 接下来我带大家了解一个函数的作用以及使用技巧,希望对大家都有帮助,话不多说,接下来就开始我的表演 特性 通过存取方法定义的属性通常称为特性(property) 让所有的属 ...
- 一文搞懂 Node入门框架 Koa2 原理,学它!
点击上方 前端Q,关注公众号 回复加群,加入前端Q技术交流群 作者:林不渡 (已获转载授权) 链接:https://juejin.cn/post/6844904071934001160 写在前面 看了 ...
- 一文搞懂go并发编程设计原理
前言 主要学习其设计原则,大体流程,权衡利弊 不要纠结于部分难懂的实现细节,因为不同的人对相同接口的实现细节不一样,就算是相同的人实现两次也可能不一样 context context的作用主要有两个: ...
- 一文搞懂C语言回调函数
转载自:https://segmentfault.com/a/1190000008293902?utm_source=tag-newest 博主:Rdou Typing 来源:segmentfault ...
- 一文搞懂RNN(循环神经网络)
基础篇|一文搞懂RNN(循环神经网络) https://mp.weixin.qq.com/s/va1gmavl2ZESgnM7biORQg 神经网络基础 神经网络可以当做是能够拟合任意函数的黑盒子,只 ...
- 一文搞懂 Python 的 import 机制
一.前言 希望能够让读者一文搞懂 Python 的 import 机制 1.什么是 import 机制? 通常来讲,在一段 Python 代码中去执行引用另一个模块中的代码,就需要使用 Python ...
- 一文搞懂如何使用Node.js进行TCP网络通信
摘要: 网络是通信互联的基础,Node.js提供了net.http.dgram等模块,分别用来实现TCP.HTTP.UDP的通信,本文主要对使用Node.js的TCP通信部份进行实践记录. 本文分享自 ...
- 【UE·蓝图底层篇】一文搞懂NativeClass、GeneratedClass、BlueprintClass、ParentClass
本文将对蓝图类UBlueprint的几个UClass成员变量NativeClass.GeneratedClass.BlueprintClass.ParentClass进行比较深入的讲解,看完之后对蓝图 ...
- 一文搞懂指标采集利器 Telegraf
作者| 姜闻名 来源|尔达 Erda 公众号 导读:为了让大家更好的了解 MSP 中 APM 系统的设计实现,我们决定编写一个<详聊微服务观测>系列文章,深入 APM 系统的产品.架构 ...
- 《一文搞懂NMS发展历程》Soft-NMS、Weighted NMS、IoU-Net、Softer-NMS、Adaptive NMS、DIoU-NMS
<一文搞懂NMS发展历程>Soft-NMS.Weighted NMS.IoU-Net.Softer-NMS.Adaptive NMS.DIoU-NMS 文章目录 <一文搞懂NMS发展 ...
最新文章
- mysql更改数据库数据存储目录_MySQL更改数据库数据存储目录
- android 布局 站位符,基于android布局中的常用占位符介绍
- [Linux] PHP程序员玩转Linux系列-Linux和Windows安装nginx
- java Proxy.newProxyInstance 动态代理 简介
- 本地搭建WordPress (XAMPP环境)
- CCNA-第八篇-OSPF-上
- 面试精讲之面试考点及大厂真题 - 分布式专栏 09 缓存必问:Reids持久化,高可用集群
- Atitit.程序包装exe启动器 打包 发布 设计 -生成exe java
- word如何快速找到自己需要的符号/特殊符号?(干货满满)
- visio2013里画带箭头的折线连接线
- 基于微信小程序评选投票系统 开题报告
- power apps -- Game Demo page 制作游戏步骤详解
- 第三方支付的概述 第三方支付通道对接
- 如何实现emoji文本字数计算?以及输入框限制指定字符数?
- Python数据分析第八课:初识Matplotlib
- 【自动驾驶汽车技术 | 车载雷达系统】
- pyserial 全双工通信
- 在VMware Player中安装启用Aero的操作系统
- MC9S12单片机(学生实验用)PIT编程实例
- Spring Boot快速入门(尚硅谷笔记)
热门文章
- 职场解释系:张三丰的接班人计划
- Linux里怎么打开pt文件,在 Linux 上安装 transmission 进行远程 PT 下载
- 有关微型计算机控制系统的论文,计算机控制系统 徐文尚
- 发送邮件被退回,提示: Helo command rejected: Invalid name 错误
- windows在cmd中用cl、link工具编译生成dll与lib
- win远程桌面连接服务器,远程桌面连接windows服务器
- Unity-之-物体旋转-跳跃-2021.5.13
- javascript文字特效
- mysql 多表左连接查询
- 大学生数学竞赛试题荟萃