Table API是内嵌在Java语言中的,很多方法需要在类中额外添加,扩展功能比较麻烦,目前支持的函数比较少,故一般情况下我们使用Flink SQL中的函数

Flink SQL中的函数主要分为两类:

  • SQL内置函数,直接通过函数名调用,能够实现一些常用的转换操作,比如之前用到的COUNT()、CHAR_LENGTH()、UPPER等
  • 用户自定义函数UDF,需要在表环境中注册使用

    文章目录

    • 系统函数
      • 标量函数
      • 聚合函数
    • 自定义函数
      • 整体调用流程
      • 标量函数
      • 表函数
      • 聚合函数
      • 表聚合函数

系统函数

系统函数又叫内置函数,是在系统中预先实现好的功能模块,可以通过固定的函数名直接调用,实现想要的转换操作。

Flink SQL中的系统函数主要可分为两大类:标量函数(Scalar Functions)、聚合函数(Aggregate Functions)

标量函数

对输入数据做转换操作、返回一个值的函数。标量函数是最常见、最简单的一类系统函数,这里对一些常见类型的标量函数做个介绍

  • 比较函数:比较表达式,判断两个值之间的关系,返回一个布尔类型。

    (1)value1 = value2 判断两个值相等

    (2)value1 <> value2 判断两个值不相等

    (3)value IS NOT NULL 判断value不为空

  • 逻辑函数:逻辑表达式,用AND、OR、NOT将布尔类型值连接起来,也可以用判断语句进行真值判断,返回一个布尔类型的值

    (1)boolean1 OR boolean2 取逻辑或

    (2)boolean IS FALSE 判断boolean是否为false

    (3)NOT boolean 取逻辑非

  • 算术函数:算术计算的函数,用算术符号连接的运算,复杂的数学运算

    (1)a + b 两数相加

    (2)POWER(a,b) 幂运算,取a的b次方

    (3)RAND() 返回(0,1)区间内的一个double类型的伪随机数

  • 字符串函数:进行字符串处理的函数

    (1)string1 || string2 两个字符串的连接

    (2)UPPER(string) 将字符串string转为全部大写

    (3)CHAR_LENGTH(string) 计算字符串string的长度

  • 时间函数:与时间相关操作的函数

    (1)DATA string 按格式"yyyy-MM-dd" 解析字符串string,返回类型为SQL Date

    (2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

    (3)CURRENT_TIME 返回本地时区的当前时间,类型为SQL time

    (4)INTERVAL string range 返回一个时间间隔,string表示数据,range可以是DAY,MINUTE,DAT TO HOUR等单位,也可以是YEAR TO MONTH这样的复合单位。2年10月 -> INTERVAL ‘2-10’ YEAR TO MONTH

聚合函数

以表中多个行作为输入,提取字段进行聚合操作的函数,将唯一的聚合值作为结果返回。聚合函数应用广泛,不论分组聚合、窗口聚合、开窗聚合,对数据的聚合操作都可以用相同的函数来定义

标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能,例如:

  • COUNT(*):返回所有行的数量,统计个数
  • SUM([ALL | DISTINCT] expression) 对某个字段进行求和操作,默认情况下省略关键字ALL,表示对所有行求和,若指定DISTINCT,则会对数据进行去重
  • RANK() 返回当前值在一组值中的排名
  • ROW_NUMBER() 对一组值排序后,返回当前值的行号,与RANK()功能相似

自定义函数

Flink 的 Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义,当前UDF主要有以下几类:

  • 标量函数:将输入的标量值转换成一个新的标量值
  • 表函数:将标量值转换成一个或多个新的行数据,也就是扩展成一个表
  • 聚合函数:将多行数据里的标量值转换成一个新的标量值
  • 表聚合函数:将多行数据里的标量值转换成一个或多个新的行数据
整体调用流程

要想在代码中使用自定义的函数,首先自定义对应的UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用

(1)注册函数

注册函数时需要表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:

// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

createTemporarySystemFunction()创建一个临时系统函数;也可以用createTemporaryFunction()方法,注册的函数依赖于当前当前目录和数据,此时就是目录函数

(2)Table API调用函数

在Table API中,使用call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

call()传入两个参数,一个注册好的函数名MyFunction,另一个是函数调用本身的参数,需要传入的字段

此外,在Table API中也可以不注册函数,直接用内联(inline)的方式调用UDF

tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField")));

(3)在SQL中调用函数

将函数注册为系统函数之后,再SQL中的调用就与内置系统函数完全一样

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
标量函数

自定义标量函数可以把0、1、多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值

继承抽象类ScalarFunction,并实现eval()的求值方法

例子:实现一个自定义的哈希函数,用来求传入对象的哈希值

public static class HashFunction extends ScalarFunction {// 接受任意类型输入,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}
}
// 注册函数
tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);
// 在 SQL 里调用注册好的函数
tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");

DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示 eval 的 参数可以是任意类型

表函数

输入参数是0、1、多个标量值,返回任意多行数据。多行数据事实上构成了一个表,所以表函数可以认为是返回一个表的函数,这是一对多的转换关系

继承抽象类TableFunction,通过collect()方法发送想要输出的行数据

例:实现一个分隔字符串的函数SplitFunction,将一个字符串转换成(字符串,长度)的二元组

// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {// 使用 collect()方法发送一行数据collect(Row.of(s, s.length()));}}
}
// 注册函数
tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
// 在 SQL 里调用注册好的函数
// 1. 交叉联结
tableEnv.sqlQuery("SELECT myField, word, length " +"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
// 2. 带 ON TRUE 条件的左联结
tableEnv.sqlQuery("SELECT myField, word, length " +"FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
// 重命名侧向表中的字段
tableEnv.sqlQuery("SELECT myField, newWord, newLength " +"FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");

将表函数的输出类型定义成ROW,得到侧向表中的数据类型,每行数据转换后只有一行。我们分别用交叉联结和左联结等方式在SQL中进行调用

聚合函数

把一行或多行数据聚合成一个标量值,多对一转换

继承抽象类AggregateFunction<T,ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型

  • 1、创建累加器(accumulator),用来存储聚合的中间结果。调用createAccumulator()方法可以创建一个空的累加器
  • 2、对于输入的每一行数据,都会调用accumulate()方法来更新累加器
  • 3、当所有数据处理完毕,调用getValue()计算返回最终的结果

例子:实现某个字段的加权平均值

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;public class UdfTest_AggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//1.创建表String creatDDL = "CREATE TABLE clickTable (" +" `user` STRING, " +" url STRING, " +" ts BIGINT" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'input/clicks.txt'," +" 'format' = 'csv'" +")";tableEnv.executeSql(creatDDL);//2.注册自定义聚合函数tableEnv.createTemporarySystemFunction("WeightedAverage", WeightedAverage.class);//3.调用UDF进行查询转换Table resultTable = tableEnv.sqlQuery("select user,WeightedAverage(ts,1) as w_avg " +"from clickTable group by user");//4.转换成流打印输出tableEnv.toChangelogStream(resultTable).print();env.execute();}//单独定义一个累加器类型public static class WeightedAvgAccumulator {public long sum = 0;public int count = 0;}//实现自定义的聚合函数,计算加权平均值public static class WeightedAverage extends AggregateFunction<Long, WeightedAvgAccumulator> {@Overridepublic Long getValue(WeightedAvgAccumulator accumulator) {if (accumulator.count == 0) return null;else return accumulator.sum / accumulator.count;}@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}//累加计算方法public void accumulate(WeightedAvgAccumulator accmulator, Long iValue, Integer iWeight) {accmulator.sum += iValue * iWeight;accmulator.count += iWeight;}}
}
表聚合函数

把一行或多行数据聚合成另一张表,多对多的转换

继承抽象类TableAggregateFunction<T,ACC>,用一个ACC类型的累加器来存储聚合的中间结果

  • createAccumulator() :创建累加器的方法
  • accumulate():聚合计算的核心方法
  • emitValue:所有输入后处理完成,输出最终计算结果的方法,调用collect收集

例子:Top N查询,选取一组数据排序的前两名,每来一条新数据就在accumulate()方法中进行更新,最终在emitValue中调用将前两名数据输出

// 聚合累加器的类型定义,包含最大的第一和第二两个数据
public static class Top2Accumulator {public Integer first;public Integer second;
}
// 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> {@Overridepublic Top2Accumulator createAccumulator() {Top2Accumulator acc = new Top2Accumulator();acc.first = Integer.MIN_VALUE; // 为方便比较,初始值给最小值acc.second = Integer.MIN_VALUE;return acc;}// 每来一个数据调用一次,判断是否更新累加器public void accumulate(Top2Accumulator acc, Integer value) {if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 输出(数值,排名)的二元组,输出两行数据public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {if (acc.first != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}}

目前SQL中没有直接使用表聚合函数的方式,故需要使用Table API的方式来调用

// 注册表聚合函数函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
// 在 Table API 中调用函数
tableEnv.from("MyTable").groupBy($("myField")).flatAggregate(call("Top2", $("value")).as("value", "rank")).select($("myField"), $("value"), $("rank"));

使用flatAggregate()方法,专业用来调用表聚合函数的接口。对MyTable中数据按myField字段进行分组聚合,统计value值最大的两个,并将聚合结果的两个字段重命名为value合rank,之后就可以使用select()将它们提取出来

Flink SQL中的函数相关推荐

  1. 简单介绍SQL中ISNULL函数使用方法

    这篇文章介绍了SQL Server.MySQL.Oracle三种数据库中ISNULL函数的使用方法,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 SQL Ser ...

  2. SQL中object_id函数的用法

    SQL中object_id函数的用法 收藏  int object_id('objectname'); 此方法返回数据库对象标识号. 其中,参数objectname 表示要使用的对象,其数据类型为nc ...

  3. SQL中STR()函数功能

    SQL 2010-03-14 21:10:02 阅读1029 评论0   字号:大中小 订阅 SQL中STR()函数功能 declare @number smallint set @number=2 ...

  4. sql中聚合函数和分组函数_SQL选择计数聚合函数-语法示例解释

    sql中聚合函数和分组函数 The COUNT operator is usually used in combination with a GROUP BY clause. It is one of ...

  5. 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

    1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

  6. 在sql中使用函数,遇到net.sf.jsqlparser.parser.ParseException异常

    在sql中使用函数,遇到net.sf.jsqlparser.parser.ParseException异常 参考文章: (1)在sql中使用函数,遇到net.sf.jsqlparser.parser. ...

  7. SQL中 拆解函数 之 strsplit()

    SQL中 拆解函数 之 strsplit() 在前面提到过拆解函数 今天查数据,发现了另外一个函数,实验一下,还挺好用,记录一下 原始数据如下: 拆解完成以后如下图: 使用的函数是 strsplit( ...

  8. SQL中 ROW_NUMBER 函数的用法

    SQL中 ROW_NUMBER 函数的用法 ROW_NUMBER()函数将针对SELECT语句返回的每一行,从1开始编号,赋予其连续的编号.在查询时应用了一个排序标准后,只有通过编号才能够保证其顺序是 ...

  9. SQL中IF函数的使用

    SQL中IF函数的使用 if(a,b,c) if判断,如果a满足条件,返回b,否则返回c 举个例子 查询SC表中及格的学生 创建表SC SC(SId,CId,score) –SId 学生编号,CId ...

最新文章

  1. JForum 的 SSO集成
  2. lvs中dr模式配置脚本
  3. java得出两个日期之间所有日期
  4. java 打印对象属性 工具类_关于java实现任意对象输出字符串的工具类ObjectUtils用户打印日志、接口调试及监控等...
  5. 2012年度IT博客大赛50强报道:贾小平
  6. Django中使用ajax技术概述
  7. 投篮机投篮有技巧吗_「技巧干货」高手练习投篮的几个技巧,让投篮变得更实用...
  8. maven ssm框架 mysql_SSM框架(IDEA+Spring+SpringMVC+Maven+Mybatis+MySQL)
  9. 信息学奥赛C++语言: 密码翻译
  10. python的实例类方法、修饰器类方法、修饰器保护方法、修饰器静态方法中私有属性的区别和自定义property的读写方法
  11. php拆分excel,PHP_PHPExcel合并与拆分单元格的方法,本文实例讲述了PHPExcel合并与 - phpStudy...
  12. 说说微信聊天记录收费这件事
  13. Java设计模式------单例模式
  14. epoch,batch_size,iteration,batch_idx什么意思
  15. WARN: Establishing SSL connection without server‘s identity verification is not recommended 的解决方法
  16. iOS企业签名过程中APP频繁出现闪退是什么原因?
  17. 如果使用Vue3.0实现一个 Modal,你会怎么进行设计?
  18. 计算机和人脑在线阅读,人脑与电脑(原文)
  19. 配色三部曲-创建自己的调色板
  20. 基于PLC音乐喷泉控制系统设计音乐喷泉组态设计音乐喷泉

热门文章

  1. 解决tomcat 的端口被占用问题
  2. phalapi-进阶篇3(自动加载和拦截器)
  3. JavaScript提高:003:easy UI实现tab页面自适应问题
  4. kvm虚拟化学习笔记(二)之linux kvm虚拟机安装
  5. hdu 1856 并查集 求最大的子树含有元素的个数
  6. 【贪心+双指针】LeetCode 11. Container With Most Water
  7. 《编程之美》1.3一摞烙饼的排序
  8. tensorflow : 队列管理 FIFOQueue amp;amp; RandomShuffleQueue
  9. Spring Boot 整合 docker
  10. Django学习笔记---第一天