1.本文会主要讲三种udf:

ScalarFunction

TableFunction

AggregateFunction

用户自定义函数是非常重要的一个特征,因为他极大地扩展了查询的表达能力。本文除了介绍这三种udf之外,最后会介绍一个redis作为交互数据源的udf案例。

2.注册用户自定义函数

在大多数场景下,用户自定义函数在使用之前是必须要注册的。对于Scala的Table API,udf是不需要注册的。

调用TableEnvironmentregisterFunction()方法来实现注册。Udf注册成功之后,会被插入TableEnvironmentfunction catalog,这样table API和sql就能解析他了。

1.1 Scalar Functions 标量函数

标量函数,是指返回一个值的函数。标量函数是实现将0,1,或者多个标量值转化为一个新值。

实现一个标量函数需要继承ScalarFunction,并且实现一个或者多个evaluation方法。标量函数的行为就是通过evaluation方法来实现的。evaluation方法必须定义为public,命名为evalevaluation方法的输入参数类型和返回值类型决定着标量函数的输入参数类型和返回值类型。evaluation方法也可以被重载实现多个eval。同时evaluation方法支持变参数,例如:eval(String... strs)

下面给出一个标量函数的例子。例子实现的是一个hashcode方法。

public class HashCode extends ScalarFunction {private int factor = 12;
public HashCode(int factor) {this.factor = factor;
}
public int eval(String s) {return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL APItableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");

默认情况下evaluation方法的返回值类型是由flink类型抽取工具决定。对于基础类型及简单的POJOS是足够的,但是更复杂的类型,自定义类型,组合类型,会报错。这种情况下,返回值类型的TypeInformation,需要手动指定,方法是重载ScalarFunction#getResultType()

下面给一个例子,通过复写ScalarFunction#getResultType(),将long型的返回值在代码生成的时候翻译成Types.TIMESTAMP

public static class TimestampModifier extends ScalarFunction {public long eval(long t) {return t % 1000;
}
public TypeInformation<?> getResultType(signature: Class<?>[]) {return Types.TIMESTAMP;
}
}

2.Table Functions 表函数

与标量函数相似之处是输入可以0,1,或者多个参数,但是不同之处可以输出任意数目的行数。返回的行也可以包含一个或者多个列。

为了自定义表函数,需要继承TableFunction,实现一个或者多个evaluation方法。表函数的行为定义在这些evaluation方法内部,函数名为eval并且必须是public。TableFunction可以重载多个eval方法。Evaluation方法的输入参数类型,决定着表函数的输入类型。Evaluation方法也支持变参,例如:eval(String... strs)。返回表的类型取决于TableFunction的基本类型。Evaluation方法使用collect(T)发射输出rows。

在Table API中,表函数在scala语言中使用方法如下:.join(Expression) 或者 .leftOuterJoin(Expression),在java语言中使用方法如下:.join(String) 或者.leftOuterJoin(String)

  1. Join操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行。
  2. leftOuterJoin操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行,并且在表函数返回一个空表的情况下会保留所有的outer rows。

在sql语法中稍微有点区别:

  1. cross join用法是LATERAL TABLE()。
  2. LEFT JOIN用法是在join条件中加入ON TRUE。

下面的例子讲的是如何使用表值函数。

/

/ The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).public class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = " ";public Split(String separator) {this.separator = separator;}public void eval(String str) {for (String s : str.split(separator)) {// use collect(...) to emit a rowcollect(new Tuple2<String, Integer>(s, s.length()));}}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

需要注意的是PROJO类型不需要一个确定的字段顺序。意味着你不能使用as修改表函数返回的pojo的字段的名字。

默认情况下TableFunction返回值类型是由flink类型抽取工具决定。对于基础类型及简单的POJOS是足够的,但是更复杂的类型,自定义类型,组合类型,会报错。这种情况下,返回值类型的TypeInformation,需要手动指定,方法是重载TableFunction#getResultType()。

下面的例子,我们通过复写TableFunction#getResultType()方法使得表返回类型是RowTypeInfo(String, Integer)。

p

ublic class CustomTypeSplit extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {Row row = new Row(2);row.setField(0, s);row.setField(1, s.length);collect(row);}}@Overridepublic TypeInformation<Row> getResultType() {return Types.ROW(Types.STRING(), Types.INT());}
}

3.Aggregation Functions 聚合函数

用户自定义聚合函数聚合一张表(一行或者多行,一行有一个或者多个属性)为一个标量的值。
[图片上传失败…(image-f5e972-1542542047386)]
上图中是讲的一张饮料的表这个表有是那个字段五行数据,现在要做的是求出所有饮料的最高价。

聚合函数需要继承AggregateFunction。聚合函数工作方式如下:

  1. 首先,需要一个accumulator,这个是保存聚合中间结果的数据结构。调用AggregateFunction函数的createAccumulator()方法来创建一个空accumulator.

  2. 随后,每个输入行都会调用accumulate()方法来更新accumulator。一旦所有的行被处理了,getValue()方法就会被调用,计算和返回最终的结果。

对于每个AggregateFunction,下面三个方法都是比不可少的:

createAccumulator()

accumulate()

getValue()
flink的类型抽取机制不能识别复杂的数据类型,比如,数据类型不是基础类型或者简单的pojos类型。所以,类似于ScalarFunction 和TableFunction,AggregateFunction提供了方法去指定返回结果类型的TypeInformation,用的是AggregateFunction#getResultType()。Accumulator类型用的是AggregateFunction#getAccumulatorType()。

除了上面的方法,还有一些可选的方法。有些方法是让系统更加高效的执行查询,另外的一些在特定的场景下是必须的。例如,merge()方法在会话组窗口(session group window)上下文中是必须的。当一行数据是被视为跟两个回话窗口相关的时候,两个会话窗口的accumulators需要被join。

AggregateFunction的下面几个方法,根据使用场景的不同需要被实现:

retract():在bounded OVER窗口的聚合方法中是需要实现的。
merge():在很多batch 聚合和会话窗口聚合是必须的。
resetAccumulator(): 在大多数batch聚合是必须的。
AggregateFunction的所有方法都是需要被声明为public,而不是static。定义聚合函数需要实现org.apache.flink.table.functions.AggregateFunction同时需要实现一个或者多个accumulate方法。该方法可以被重载为不同的数据类型,并且支持变参。

为了计算加权平均值,累加器需要存储已累积的所有数据的加权和及计数。在栗子中定义一个WeightedAvgAccum类作为accumulator。尽管,retract(), merge(), 和resetAccumulator()方法在很多聚合类型是不需要的,这里也给出了栗子。

/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {public long sum = 0;public int count = 0;
}
/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() {return new WeightedAvgAccum();}@Overridepublic Long getValue(WeightedAvgAccum acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {Iterator<WeightedAvgAccum> iter = it.iterator();while (iter.hasNext()) {WeightedAvgAccum a = iter.next();acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccum acc) {acc.count = 0;acc.sum = 0L;}
}
// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
// use functiontEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

4.udf的最佳实践经验

4.1 Table API和SQL

代码生成器内部会尽可能多的尝试使用原生值。用户定义的函数可能通过对象创建、强制转换(casting)和拆装箱((un)boxing)引入大量开销。因此,强烈推荐参数和返回值的类型定义为原生类型而不是他们包装类型(boxing class)。Types.DATE 和Types.TIME可以用int代替。Types.TIMESTAMP可以用long代替。

建议用户自定义函数使用java编写而不是scala编写,因为scala的类型可能会有不被flink类型抽取器兼容。

4.2 用Runtime集成UDFs

有时候udf需要获取全局runtime信息或者在进行实际工作之前做一些设置和清除工作,比如,打开数据库链接和关闭数据库链接。Udf提供了open()和close()方法,可以被复写,功能类似Dataset和DataStream API的RichFunction方法。

Open()方法是在evaluation方法调用前调用一次。Close()是在evaluation方法最后一次调用后调用。Open()方法提共一个FunctionContext,FunctionContext包含了udf执行环境的上下文,比如,metric group,分布式缓存文件,全局的job参数。

通过调用FunctionContext的相关方法,可以获取到相关的信息:

getMetricGroup()并行子任务的指标组;
getCachedFile(name)分布式缓存文件的本地副本;
getJobParameter(name, defaultValue)给定key全局job参数;
给出的例子就是通过FunctionContext在一个标量函数中获取全局job的参数。主要是实现获取redis的配置,然后简历redis链接,实现redis的交互的过程。

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import redis.clients.jedis.Jedis;
public class HashCode extends ScalarFunction {private int factor = 12;Jedis jedis = null;public HashCode() {super();}@Overridepublic void open(FunctionContext context) throws Exception {super.open(context);String redisHost = context.getJobParameter("redis.host","localhost");int redisPort = Integer.valueOf(context.getJobParameter("redis.port","6379"));jedis = new Jedis(redisHost,redisPort);}@Overridepublic void close() throws Exception {super.close();jedis.close();}public HashCode(int factor) {this.factor = factor;}public int eval(int s) {s = s % 3;if(s == 2)return Integer.valueOf(jedis.get(String.valueOf(s)));elsereturn 0;}
}ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// set job parameter
Map<String,String> hashmap = new HashMap<>();hashmap.put("redis.host","localhost");hashmap.put("redis.port","6379");ParameterTool parameter = ParameterTool.fromMap(hashmap);exeEnv.getConfig().setGlobalJobParameters(parameter);
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");

【Flink】Flink各种UDF简介相关推荐

  1. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  2. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  3. 凌波微步Flink——Flink API中的一些基础概念

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  4. Flink(五):watermark简介

    一.简介 我们基于特定时间段进行聚合时,可以引用不同的时间类型,Flink 最新版本提供了Event Time.Processing Time 两种时间类型.数据在Flink 流转时,有时因为网络.资 ...

  5. 95-847-040-源码-Netty-netty在Flink运行时的简介

    文章目录 1.视界 2.拓扑图 3.概述 4.NettyConnectionManager 5. NettyBufferPool 5.1 拓扑图 6. NettyClient 7.NettyServe ...

  6. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  7. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看

    简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...

  8. [基础架构] [Flink] Flink/Flink-CDC的部署和配置

    简介 下载官方Flink依赖包 (笔者所用版本为1.13.6) 下载下面列出的依赖包,并将它们放到目录 flink-1.13.6/lib/ 下: 下载elasticsearch连接器flink-sql ...

  9. flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍

    前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...

最新文章

  1. 联想服务器 重装系统u盘启动,联想_Lenovo BIOS Setup Utility 设置U盘启动教程
  2. pytorch CenterLoss
  3. php实现设计模式之 适配器模式
  4. 开发技巧: 简述iOS应用间的互相跳转
  5. Python 网络编程(Socket)
  6. 右键我的电脑,没有属性,解决方案
  7. angular 拼接html 事件无效
  8. c++11之std::move()
  9. K-SVD字典学习算法
  10. 2013 Multi-University Training Contest 2 Balls Rearrangement
  11. GDAL使用DEM数据计算山体阴影(Hillshade)
  12. 【python】80行代码实现压缩包密码破解软件,支持zip和rar
  13. 计算机音乐模式怎么设置,电脑开机时自启QQ音乐APP播放歌曲的功能在哪里设置...
  14. 各个小组对于“我爱淘”的评价
  15. 将image对象转成BufferedImage
  16. web.xml.jsf_JSF 2.0 Ajax世界中的GMaps4JSF
  17. 率土之滨服务器维修,率土之滨征服赛季合服与转服功能详解
  18. 山东科技大学第二届ACM校赛解题报告
  19. 计算图替代——一种DNN框架计算图优化方法
  20. CTR/推荐系统中多任务/多目标学习应用概述文章汇总

热门文章

  1. 来了!iPhone 12今晚天猫首销:12期分期免息,还送5G流量包
  2. 台积电对世界最大创新贡献是什么?总裁魏哲家这样说...
  3. 国际电信联盟:3GPP系标准成为唯一被认可的5G标准
  4. 拼多多回应“二次上市”:公司现金储备充裕 暂无任何计划
  5. 孙宇晨回顾区块链历程:不走热点走心
  6. 华为屏下前置摄像头专利曝光:消灭刘海
  7. 2999元!联想Z6 Pro开启预售:搭载骁龙855+后置高清四摄
  8. 外媒称苹果挖走特斯拉高管 可能将重启电动汽车开发
  9. 受“社保掌上通”APP影响 麦达数字遭深交所问询
  10. 拳王虚拟项目公社:流量如何截流?各类流量截流技巧分享