背景

刚开始接触 FlinkSQL 时,对 LAST_VALUE 特别好奇,虽然工作当中有在用到,但还是特别的想知道它是怎么实现的,今天终于可以总结一下

原理

当我们写入如下类似的 sql 时,就会用到 LAST_VALUE 函数

select LAST_VALUE(status) from temp;

LAST_VALUE 函数对应的具体类为 LastValueWithRetractAggFunction。
LAST_VALUE函数之所以能够起作用最关键的是

 /** Accumulator for LAST_VALUE with retraction. */public static class LastValueWithRetractAccumulator<T> {public T lastValue = null;public Long lastOrder = null;// value timestamppublic MapView<T, List<Long>> valueToOrderMap = new MapView<>();// timestamp valuepublic MapView<Long, List<T>> orderToValueMap = new MapView<>();......}@SuppressWarnings("unchecked")public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {if (value != null) {//传进来的是 null 不做任何操作T v = (T) value;Long order = System.currentTimeMillis();List<Long> orderList = acc.valueToOrderMap.get(v);if (orderList == null) {orderList = new ArrayList<>();}orderList.add(order);acc.valueToOrderMap.put(v, orderList);accumulate(acc, value, order);}}@SuppressWarnings("unchecked")public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value, Long order)throws Exception {if (value != null) {T v = (T) value;Long prevOrder = acc.lastOrder;// 默认是 nullif (prevOrder == null || prevOrder <= order) {//类似链表头插法acc.lastValue = v;acc.lastOrder = order;}List<T> valueList = acc.orderToValueMap.get(order);if (valueList == null) {valueList = new ArrayList<>();}valueList.add(v);acc.orderToValueMap.put(order, valueList);}}@SuppressWarnings("unchecked")public void retract(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {if (value != null) {T v = (T) value;List<Long> orderList = acc.valueToOrderMap.get(v);// 查出所有的 timestampif (orderList != null && orderList.size() > 0) {// 说明之前已经发出过了.此刻该 retractLong order = orderList.get(0);orderList.remove(0);//最早进入的那个 value 对应的 timestamp removeif (orderList.isEmpty()) {//说明该 value 有且仅进入了一次acc.valueToOrderMap.remove(v);} else {acc.valueToOrderMap.put(v, orderList);}retract(acc, value, order);}}}@SuppressWarnings("unchecked")public void retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order)throws Exception {if (value != null) {T v = (T) value;List<T> valueList = acc.orderToValueMap.get(order);//取出相同 timestamp 对应的所有 valueif (valueList == null) {return;}int index = valueList.indexOf(v);// 找到对应的 value 并将其删除if (index >= 0) {valueList.remove(index);if (valueList.isEmpty()) {acc.orderToValueMap.remove(order);} else {acc.orderToValueMap.put(order, valueList);}}if (v.equals(acc.lastValue)) {Long startKey = acc.lastOrder;Iterator<Long> iter = acc.orderToValueMap.keys().iterator();// find the maximal order which is less than or equal to `startKey`//找到小于要删除值对应时间戳的最大值Long nextKey = Long.MIN_VALUE;while (iter.hasNext()) {Long key = iter.next();if (key <= startKey && key > nextKey) {nextKey = key;}}if (nextKey != Long.MIN_VALUE) {List<T> values = acc.orderToValueMap.get(nextKey);acc.lastValue = values.get(values.size() - 1);acc.lastOrder = nextKey;} else {acc.lastValue = null;acc.lastOrder = null;}}}}

首先呢是两个 MapView valueToOrderMap、orderToValueMap

valueToOrderMap 值( 此刻最终的结果 )---->消息进入accumulate 方法的系统时间戳
orderToValueMap 消息进入accumulate 方法的系统时间戳 ----->值( 此刻最终的结果 )

当 RowData( 内部使用 )对应的 rowKind 为 insert 或者 update_after 时,会进入 accumulate(LastValueWithRetractAccumulator acc, Object value) 方法。accumulate 方法相对比较简单其实就是分别对 valueToOrderMap、orderToValueMap 进行赋值。

当 RowData( 内部使用 )对应的 rowKind 为 delete 或者 update_before 时,会进入 retract(LastValueWithRetractAccumulator acc, Object value) 方法,主要是操作 valueToOrderMap 删除之前已经发出去的消息记录,然后进入 retract(LastValueWithRetractAccumulator acc, Object value, Long order),主要就是操作 orderToValueMap 删除对应时间戳的值,然后找出 不大于要删除数据对应时间戳的最大时间戳,下一步要 retract 就该它了

总结

其实就是通过 时间戳 来进行判断的

一文搞懂 FlinkSQL函数 LAST_VALUE 的原理相关推荐

  1. 一文搞懂property函数

    一文搞懂property函数 接下来我带大家了解一个函数的作用以及使用技巧,希望对大家都有帮助,话不多说,接下来就开始我的表演 特性 通过存取方法定义的属性通常称为特性(property) 让所有的属 ...

  2. 一文搞懂 Node入门框架 Koa2 原理,学它!

    点击上方 前端Q,关注公众号 回复加群,加入前端Q技术交流群 作者:林不渡 (已获转载授权) 链接:https://juejin.cn/post/6844904071934001160 写在前面 看了 ...

  3. 一文搞懂go并发编程设计原理

    前言 主要学习其设计原则,大体流程,权衡利弊 不要纠结于部分难懂的实现细节,因为不同的人对相同接口的实现细节不一样,就算是相同的人实现两次也可能不一样 context context的作用主要有两个: ...

  4. 一文搞懂C语言回调函数

    转载自:https://segmentfault.com/a/1190000008293902?utm_source=tag-newest 博主:Rdou Typing 来源:segmentfault ...

  5. 一文搞懂RNN(循环神经网络)

    基础篇|一文搞懂RNN(循环神经网络) https://mp.weixin.qq.com/s/va1gmavl2ZESgnM7biORQg 神经网络基础 神经网络可以当做是能够拟合任意函数的黑盒子,只 ...

  6. 一文搞懂 Python 的 import 机制

    一.前言 希望能够让读者一文搞懂 Python 的 import 机制 1.什么是 import 机制? 通常来讲,在一段 Python 代码中去执行引用另一个模块中的代码,就需要使用 Python ...

  7. 一文搞懂如何使用Node.js进行TCP网络通信

    摘要: 网络是通信互联的基础,Node.js提供了net.http.dgram等模块,分别用来实现TCP.HTTP.UDP的通信,本文主要对使用Node.js的TCP通信部份进行实践记录. 本文分享自 ...

  8. 【UE·蓝图底层篇】一文搞懂NativeClass、GeneratedClass、BlueprintClass、ParentClass

    本文将对蓝图类UBlueprint的几个UClass成员变量NativeClass.GeneratedClass.BlueprintClass.ParentClass进行比较深入的讲解,看完之后对蓝图 ...

  9. 一文搞懂指标采集利器 Telegraf

    作者| 姜闻名 来源|尔达 Erda 公众号 ​ 导读:为了让大家更好的了解 MSP 中 APM 系统的设计实现,我们决定编写一个<详聊微服务观测>系列文章,深入 APM 系统的产品.架构 ...

  10. 《一文搞懂NMS发展历程》Soft-NMS、Weighted NMS、IoU-Net、Softer-NMS、Adaptive NMS、DIoU-NMS

    <一文搞懂NMS发展历程>Soft-NMS.Weighted NMS.IoU-Net.Softer-NMS.Adaptive NMS.DIoU-NMS 文章目录 <一文搞懂NMS发展 ...

最新文章

  1. mysql更改数据库数据存储目录_MySQL更改数据库数据存储目录
  2. android 布局 站位符,基于android布局中的常用占位符介绍
  3. [Linux] PHP程序员玩转Linux系列-Linux和Windows安装nginx
  4. java Proxy.newProxyInstance 动态代理 简介
  5. 本地搭建WordPress (XAMPP环境)
  6. CCNA-第八篇-OSPF-上
  7. 面试精讲之面试考点及大厂真题 - 分布式专栏 09 缓存必问:Reids持久化,高可用集群
  8. Atitit.程序包装exe启动器 打包 发布 设计 -生成exe java
  9. word如何快速找到自己需要的符号/特殊符号?(干货满满)
  10. visio2013里画带箭头的折线连接线
  11. 基于微信小程序评选投票系统 开题报告
  12. power apps -- Game Demo page 制作游戏步骤详解
  13. 第三方支付的概述 第三方支付通道对接
  14. 如何实现emoji文本字数计算?以及输入框限制指定字符数?
  15. Python数据分析第八课:初识Matplotlib
  16. 【自动驾驶汽车技术 | 车载雷达系统】
  17. pyserial 全双工通信
  18. 在VMware Player中安装启用Aero的操作系统
  19. MC9S12单片机(学生实验用)PIT编程实例
  20. Spring Boot快速入门(尚硅谷笔记)

热门文章

  1. 职场解释系:张三丰的接班人计划
  2. Linux里怎么打开pt文件,在 Linux 上安装 transmission 进行远程 PT 下载
  3. 有关微型计算机控制系统的论文,计算机控制系统 徐文尚
  4. 发送邮件被退回,提示: Helo command rejected: Invalid name 错误
  5. windows在cmd中用cl、link工具编译生成dll与lib
  6. win远程桌面连接服务器,远程桌面连接windows服务器
  7. Unity-之-物体旋转-跳跃-2021.5.13
  8. javascript文字特效
  9. mysql 多表左连接查询
  10. 大学生数学竞赛试题荟萃