本案例根据某电网公司的真实业务需求,通过Blink SQL+UDAF实现实时流上的差值聚合计算,通过本案例,让读者熟悉UDAF编写,并理解UDAF中的方法调用关系和顺序。
感谢@军长在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。

一、客户需求

电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。

no(string) data_date(string) cons_id(string) org_no(string) r1(double)
101 20190716 100000002 35401 13.76
101 20190717 100000002 35401 14.12
101 20190718 100000002 35401 16.59
101 20190719 100000002 35401 18.89

表1:输入数据
电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:

cons_id(string) data_date(string) subDegreeR1(double)
100000002 20190717 0.36
100000002 20190718 2.47
100000002 20190719 2.3

表2:期望的输出数据

二、需求分析

根据客户的需求,比较容易得到两种解决方案:1、通过over窗口(2 rows over window)开窗进行差值聚合;2、通过hop窗口(sliding=1天,size=2天)进行差值聚合。
over窗口和hop窗口均是Blink支持的标准窗口,使用起来非常简单。本需求的最大难点在于差值聚合,Blink支持SUM、MAX、MIN、AVG等内置的聚合函数,但没有满足业务需求的差值聚合函数,因此需要通过自定义聚合函数(UDAF)来实现。

三、UDAF开发

实时计算自定义函数开发搭建环境请参考UDX概述,在此不再赘述。本案例使用Blink2.2.7版本,下面简要描述关键代码的编写。
完整代码(为了方便上传,使用了txt格式):SubtractionUdaf.txt
1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中创建一个继承AggregateFunction类的SubtractionUdaf类。

public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum> 

其中Double是UDAF输出的类型,在本案例中为相邻两天的电表差值度数。SubtractionUdaf.Accum是内部自定义的accumulator数据结构。
2、定义accumulator数据结构,用户保存UDAF的状态。

    public static class Accum {private long currentTime;//最新度数的上报时间private double oldDegree;//前一次度数private double newDegree;//当前最新度数private long num;   //accumulator中已经计算的record数量,主要用于mergeprivate List<Tuple2<Double, Long>> listInput;//缓存所有的输入,主要用于retract}

3、实现createAccumulator方法,初始化UDAF的accumulator

    //初始化udaf的accumulatorpublic SubtractionUdaf.Accum createAccumulator() {SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum();acc.currentTime = 0;acc.oldDegree = 0.0;acc.newDegree = 0.0;acc.num = 0;acc.listInput = new ArrayList<Tuple2<Double, Long>>();return acc;}

4、实现getValue方法,用于通过存放状态的accumulator计算UDAF的结果,本案例需求是计算新旧数据两者的差值。

    public Double getValue(SubtractionUdaf.Accum accumulator) {return accumulator.newDegree - accumulator.oldDegree;}

5、实现accumulate方法,用于根据输入数据更新UDAF存放状态的accumulator。考虑到数据可能乱序以及可能的retract,数据数据包括了对应的度数iValue,还包括上报度数的时间(构造的事件时间ts)。

    public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {System.out.println("method : accumulate" );accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));Collections.sort(accumulator.listInput,this.comparator);//按照时间排序accumulator.num ++;if(accumulator.listInput.size() == 1){accumulator.newDegree = iValue;accumulator.oldDegree = 0.0;accumulator.currentTime = ts;}else {//处理可能存在的数据乱序问题accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.oldDegree = accumulator.listInput.get(1).f0;}}

其中accumulator为UDAF的状态,iValue和ts为实际的输入数据。
注意需要处理可能存在的输入数据乱序问题。
6、实现retract方法,用于在某些优化场景下(如使用over窗口)对retract的数据进行处理。

    public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值accumulator.listInput.remove(0);accumulator.num--;if(accumulator.listInput.isEmpty()){accumulator.currentTime = 0;accumulator.oldDegree = 0.0;accumulator.newDegree = 0.0;}else if(accumulator.listInput.size() == 1) {accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.oldDegree = 0.0;}else{accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.oldDegree = accumulator.listInput.get(1).f0;}} else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值accumulator.listInput.remove(1);accumulator.num--;if(accumulator.listInput.size() == 1){accumulator.oldDegree = 0.0;}else {accumulator.oldDegree = accumulator.listInput.get(1).f0;}}else {//retract的是其他值accumulator.listInput.remove(Tuple2.of(iValue, ts));accumulator.num--;}}else {throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts);}}

需要考虑retract的是最新的数据还是次新的数据,需要不同的逻辑处理。
7、实现merge方法,用于某些优化场景(如使用hop窗口)。

    public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) {int i = 0;System.out.println("method : merge" );System.out.println("accumulator : "+ accumulator.newDegree);System.out.println("accumulator : "+ accumulator.currentTime);for (SubtractionUdaf.Accum entry : its) {if(accumulator.currentTime < entry.currentTime){if(entry.num > 1){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = entry.oldDegree;accumulator.newDegree = entry.newDegree;accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(entry.num == 1){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = accumulator.newDegree;accumulator.newDegree = entry.newDegree;accumulator.num ++;accumulator.listInput.addAll(entry.listInput);}}else{if(accumulator.num > 1){accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num == 1){accumulator.oldDegree = entry.newDegree;accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num == 0){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = entry.oldDegree;accumulator.newDegree = entry.newDegree;accumulator.num = entry.num;accumulator.listInput.addAll(entry.listInput);}}Collections.sort(accumulator.listInput,this.comparator);System.out.println("merge : "+i);System.out.println("newDegree : "+entry.newDegree);System.out.println("oldDegree = "+entry.oldDegree);System.out.println("currentTime : "+entry.currentTime);}}

需要考虑merge的是否是比当前新的数据,需要不同的处理逻辑。
8、其他方面,考虑到需要对输入度数按照事件时间排序,在open方法中实例化了自定义的Comparator类,对accumulator数据结构中的inputList按事件时间的降序排序。

    public void open(FunctionContext context) throws Exception {//定义record的先后顺序,用于listInput的排序,时间越新的record在list中越前面this.comparator = new Comparator<Tuple2<Double, Long>>() {public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) {if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) {return 1;} else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) {return -1;}else {return 0;}}};}

请参考[使用IntelliJ IDEA开发自定义函数]()完成UDAF编译、打包,并参考UDX概述完成资源的上传和引用。

四、SQL开发及测试结果

(一)over窗口

SQL代码如下,语法检查、上线、启动作业(选择当前启动位点)。并将表1数据上传至datahub。

CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';CREATE TABLE input_dh_e_mp_read_curve (`no`                  VARCHAR,data_date             VARCHAR,cons_id               VARCHAR,org_no                VARCHAR,r1                    DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE
)with(type = 'print'
);INSERT into data_out
SELECTcons_id,last_value(data_date) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
FROM input_dh_e_mp_read_curve

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190716输出为13.76,这是因为第一个over窗口只有一条数据导致的,这种数据可以在业务层过滤掉;
(2)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

(二)hop窗口

SQL代码如下:语法检查、上线、启动作业(选择当前启动位点)。并将表1数据上传至datahub。

CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';CREATE TABLE input_dh_e_mp_read_curve (`no`                  VARCHAR,data_date             VARCHAR,cons_id               VARCHAR,org_no                VARCHAR,r1                    DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE
)with(type = 'print'
);
INSERT into data_out
SELECTcons_id,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd'),HopWindowSubtractionUdaf(r1,unix_timestamp(ts))
FROM input_dh_e_mp_read_curve
group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190716输出为13.76,这是因为第一个hop窗口只有一条数据导致的,这种数据可以在业务层过滤掉;
(2)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

五、几点思考

1、关于UDAF内部方法的调用关系和顺序

UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法,其调用关系和顺序并不是完全确定,而是与Blink底层优化、Blink版本、开窗类型(如hop还是over窗口)等相关。
比较确定的是一次正常(没有failover)的作业,createAccumulator方法只在作业启动时调用一次,accumulate方法在每条数据输入时调用一次,在触发数据输出时会调用一次getValue(并不代表只调用一次)。
而retract方法和merge方法则跟具体的优化方式或开窗类型有关,本案例中over窗口调用retract方法而不调用merge方法,hop窗口调用merge方法而不调用retract方法。
大家可以增加日志,观察这几个方法的调用顺序,还是蛮有意思的。

2、如何知道需要实现UDAF中的哪些方法

UDAF中必须实现createAccumulator、getValue、accumulate方法,可选择实现retract和merge方法。
一般情况下,可先实现createAccumulator、getValue、accumulate三个方法,然后编写SQL后进行语法检查,SQL编译器会提示是否需要retract或merge方法。
比如,如果没有实现retract方法,在使用over窗口时,语法检查会报类似如下错误:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.

比如,如果没有实现merge方法,在使用over窗口时,语法检查会报类似如下错误:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.

3、本案例存在优化空间的地方

(1)本案例没有考虑数据缺失的问题,比如因为某种原因(网络问题、数据采集问题等)缺少20190717的数据。这种情况下会是什么样的结果?大家可以自行测试下;
(2)本案例使用了一个List,然后通过Collections.sort方法进行排序,这不是很优的方法,如果用优先级队列(priority queue)性能应该会更好;

原文链接
本文为云栖社区原创内容,未经允许不得转载。

使用Blink SQL+UDAF实现差值聚合计算相关推荐

  1. sql 差值_使用Blink CEP实现差值聚合计算

    简介: 本文介绍通过CEP实现实时流上的差值聚合计算. 使用Blink SQL+UDAF实现差值聚合计算介绍了如何使用Blink SQL+UDAF实现实时流上的差值聚合计算,后来在与@付典就业务需求和 ...

  2. 使用Blink CEP实现差值聚合计算

    使用Blink SQL+UDAF实现差值聚合计算介绍了如何使用Blink SQL+UDAF实现实时流上的差值聚合计算,后来在与@付典就业务需求和具体实现方式进行探讨时,付典提出通过CEP实现的思路和方 ...

  3. SQL SERVER 获取差值最小的数据

    分享一个小技巧,获取和自己差值最小的数据,测试数据如下: --测试数据 if not object_id(N'Tempdb..#T1') is nulldrop table #T1 Go Create ...

  4. mysql 取差值_MySQL计算相邻两行某列差值的方法

    简述 博主最近因工作任务缠身,都无暇顾及到我的这片自留地了.前段时间稍有空闲,花了较多的精力学习<啊哈算法>,从中学习到很多之前没有太注重的内容,收益颇丰.但是这些算法题目还没有看完,等后 ...

  5. mysql timestamp 差值_SQL计算timestamp的差值实例分享

    本文主要介绍了SQL计算timestamp的差值的方法的相关资料,需要的朋友可以参考下,希望能帮助到大家. SQL计算timestamp的差值的方法 概述 有时候我们需要按照时间找出某些记录,比如说: ...

  6. php 计算数组的差值,数组计算差值及项的小计,该如何处理

    本帖最后由 lazygc520 于 2014-04-14 16:19:41 编辑 $s = array ( 0 => array ( 0 => array ( 0 => '2014- ...

  7. php求两个数组的差值,数组计算差值及项的小计,该如何处理

    数组计算差值及项的小计 本帖最后由 lazygc520 于 2014-04-14 16:19:41 编辑 $s = array ( 0 => array ( 0 => array ( 0  ...

  8. JS-计算日期差值;计算日期之间的月数

    计算两天之间的日期差值 // 输入格式:yyyy-MM-DD function daysBetween(sDate1, sDate2) {//Date.parse() 解析一个日期时间字符串,并返回1 ...

  9. python计算日期间的差值,python 计算时间、日期差值类

    环境:win10+python3x V:1.0 简单实现了一下功能本地测试可用,记录下次接着二次开发用 import datetime import re class TimeDifferenceC( ...

最新文章

  1. FineReport中Domino数据库连接方法
  2. 注解RequestMapping中的URI路径最前面到底需不需要加斜线?
  3. AT COMMAND的命令集
  4. datagrip mysql乱码_DataGrip 2019.1.2 x64 连接MySQL出错解决
  5. 解决ubuntu无法修改分辨率为1920*1080问题
  6. 路由器上的usb接口有什么用_路由器的USB接口,非常强大的功能,教您轻轻松松玩转,太实用了...
  7. 【ffmpeg】基本使用方法总结
  8. php设置session 生命周期,php会话(session)生命周期概念介绍及设置更改和回收
  9. Selenium的一些技巧与错误处理
  10. java中使用tika_Tika基本使用
  11. jQuery给页面弹出层添加半透明背景
  12. 使用ROS提取udacity .bag文件中的压缩图片
  13. HTML+CSS简单的淘宝首页框架布局小练(三)
  14. HyperLedger-Fabric v0.6环境搭建详细教程
  15. TR1 tuple的用法
  16. 现在Web前端工程师年薪区间是多少?
  17. HTML学生个人网站作业设计:电影网站设计——叮当电影(5页) HTML+CSS+JavaScript 简单DIV布局个人介绍网页模板代码 DW学生个人网站制作成品下载
  18. python总结(全面讲解)
  19. python可以开发手机AAP吗?kivy说可以
  20. Laravel 安装多国语言包后,phpstorm 还是报错

热门文章

  1. bat 连续读取两行_Redis底层数据结构解析(BAT大厂必问)
  2. python中exec是什么意思_Python中的进程分支fork和exec详解
  3. dataframe 一列的不同值_python数据分析包|Pandas-02之缺失值(NA)处理
  4. python mysql 保存csv_使用Python将csv文件快速转存到Mysql
  5. linux openoffice centos,centos8 openoffice安装
  6. cdn需要备案吗_车子贴改色膜需要到车管所备案吗?
  7. 字节跳动测试开发4轮面试_字节跳动测试开发工程师一面总结
  8. 【LeetCode笔记】581. 最短无序连续子数组(Java、数组)
  9. 【LeetCode笔记】15.三数之和(JAVA、双指针)
  10. google 浏览器默认打开控制台_chrome浏览器使用 Console(控制台)