本文主要研究一下flink Table的ScalarFunction

实例

public class HashCode extends ScalarFunction {private int factor = 0;@Overridepublic void open(FunctionContext context) throws Exception {// access "hashcode_factor" parameter// "12" would be the default value if parameter does not existfactor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); }public int eval(String s) {return s.hashCode() * factor;}
}ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);// register the function
tableEnv.registerFunction("hashCode", new HashCode());// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
  • HashCode继承了ScalarFunction,它定义了eval方法

ScalarFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala

abstract class ScalarFunction extends UserDefinedFunction {/*** Creates a call to a [[ScalarFunction]] in Scala Table API.** @param params actual parameters of function* @return [[Expression]] in form of a [[ScalarFunctionCall]]*/final def apply(params: Expression*): Expression = {ScalarFunctionCall(this, params)}// ----------------------------------------------------------------------------------------------/*** Returns the result type of the evaluation method with a given signature.** This method needs to be overridden in case Flink's type extraction facilities are not* sufficient to extract the [[TypeInformation]] based on the return type of the evaluation* method. Flink's type extraction facilities can handle basic types or* simple POJOs but might be wrong for more complex, custom, or composite types.** @param signature signature of the method the return type needs to be determined* @return [[TypeInformation]] of result type or null if Flink should determine the type*/def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null/*** Returns [[TypeInformation]] about the operands of the evaluation method with a given* signature.** In order to perform operand type inference in SQL (especially when NULL is used) it might be* necessary to determine the parameter [[TypeInformation]] of an evaluation method.* By default Flink's type extraction facilities are used for this but might be wrong for* more complex, custom, or composite types.** @param signature signature of the method the operand types need to be determined* @return [[TypeInformation]] of operand types*/def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {signature.map { c =>try {TypeExtractor.getForClass(c)} catch {case ite: InvalidTypesException =>throw new ValidationException(s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +s"automatically determined. Please provide type information manually.")}}}
}
  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP

CRowProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala

class CRowProcessRunner(name: String,code: String,@transient var returnType: TypeInformation[CRow])extends ProcessFunction[CRow, CRow]with ResultTypeQueryable[CRow]with Compiler[ProcessFunction[Row, Row]]with Logging {private var function: ProcessFunction[Row, Row] = _private var cRowWrapper: CRowWrappingCollector = _override def open(parameters: Configuration): Unit = {LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)LOG.debug("Instantiating ProcessFunction.")function = clazz.newInstance()FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)FunctionUtils.openFunction(function, parameters)this.cRowWrapper = new CRowWrappingCollector()}override def processElement(in: CRow,ctx: ProcessFunction[CRow, CRow]#Context,out: Collector[CRow]): Unit = {cRowWrapper.out = outcRowWrapper.setChange(in.change)function.processElement(in.row,ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],cRowWrapper)}override def getProducedType: TypeInformation[CRow] = returnTypeoverride def close(): Unit = {FunctionUtils.closeFunction(function)}
}
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成

ProcessFunction

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter* and also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting*            a {@link TimerService} for registering timers and querying the time. The*            context is only valid during the invocation of this method, do not store it.* @param out The collector for returning result values.** @throws Exception This method may throw exceptions. Throwing an exception will cause the operation*                   to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,*            querying the {@link TimeDomain} of the firing timer and getting a*            {@link TimerService} for registering timers and querying the time.*            The context is only valid during the invocation of this method, do not store it.* @param out The collector for returning result values.** @throws Exception This method may throw exceptions. Throwing an exception will cause the operation*                   to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/*** A {@link TimerService} for querying time and registering timers.*/public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/*** The {@link TimeDomain} of the firing timer.*/public abstract TimeDomain timeDomain();}}
  • ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement

DataStreamCalc

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

class DataStreamCalc(cluster: RelOptCluster,traitSet: RelTraitSet,input: RelNode,inputSchema: RowSchema,schema: RowSchema,calcProgram: RexProgram,ruleDescription: String)extends Calc(cluster, traitSet, input, calcProgram)with CommonCalcwith DataStreamRel {//......override def translateToPlan(tableEnv: StreamTableEnvironment,queryConfig: StreamQueryConfig): DataStream[CRow] = {val config = tableEnv.getConfigval inputDataStream =getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)// materialize time attributes in conditionval condition = if (calcProgram.getCondition != null) {val materializedCondition = RelTimeIndicatorConverter.convertExpression(calcProgram.expandLocalRef(calcProgram.getCondition),inputSchema.relDataType,cluster.getRexBuilder)Some(materializedCondition)} else {None}// filter out time attributesval projection = calcProgram.getProjectList.asScala.map(calcProgram.expandLocalRef)val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)val genFunction = generateFunction(generator,ruleDescription,inputSchema,schema,projection,condition,config,classOf[ProcessFunction[CRow, CRow]])val inputParallelism = inputDataStream.getParallelismval processFunc = new CRowProcessRunner(genFunction.name,genFunction.code,CRowTypeInfo(schema.typeInfo))inputDataStream.process(processFunc).name(calcOpName(calcProgram, getExpressionString))// keep parallelism to ensure order of accumulate and retract messages.setParallelism(inputParallelism)}
}
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

小结

  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

doc

  • Integrating UDFs with the Runtime

聊聊flink Table的ScalarFunction相关推荐

  1. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

  2. 聊聊flink Table的OrderBy及Limit

    序 本文主要研究一下flink Table的OrderBy及Limit 实例 Table in = tableEnv.fromDataSet(ds, "a, b, c"); Tab ...

  3. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  4. 四万字!掌握Flink Table一篇就够了

    学习工具与软件版本:开发软件IDEA.Flink1.10.2.Kafka2.0.0.Scala2.11 本章建议有一定Flink基础的伙伴学习 Apache Flink介绍.架构.原理以及实现:点击这 ...

  5. 使用flink Table Sql api来构建批量和流式应用(2)Table API概述

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  6. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  7. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  8. 聊聊flink的CsvTableSink

    序 本文主要研究一下flink的CsvTableSink TableSink flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/si ...

  9. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

最新文章

  1. Redis启动服务器端和客户端的命令(redis-server、redis-cli、--help、kill、ping、切换数据库)
  2. 重现2篇Nature中GraPhlAn绘制的超高颜值物种树Cladogram
  3. django创建验证码
  4. 一致性代码段和非一致性代码段【转】
  5. python程序设计案例课堂第二篇_Python程序设计案例课堂第二篇核心技术第十章图形用户界面...
  6. Selenium WebDriver的TestNG注释完整指南
  7. 4245: KI的斐波那契 递归
  8. [转载] 面试题:说说Java中接口、类、成员变量、成员方法、构造方法有哪些访问修饰符和他们的作用范围
  9. Deprecated: Function ereg_replace() is deprecated
  10. python登录界面源码_基于Python的自媒体小助手---登录页面的实现代码
  11. 使用MongoDB Compass将JSON数据文件导入MongDB
  12. 鼠标手势对应操作及常用快捷键-------360浏览器所有!!!!阿冬专栏
  13. 传统计算机硬盘和固态硬盘有哪些区别,工业级固态硬盘与传统硬盘有什么区别...
  14. 富格林金业:新手投资贵金属容易亏损的原因
  15. 语义网络 语义网 词汇链 知识图谱辨析
  16. 我的世界(18)-精英怪物(InfernalMobs插件)
  17. wss 协议php,作为ws/wss客户端
  18. 单词拆分java与填表法_139. 单词拆分
  19. Word排版 题注与交叉引用实现自动编号
  20. Mac电脑Dock栏开启放大特效

热门文章

  1. 大型企业都用什么web服务器呢?nginx
  2. 三本新书(包含新系列)隆重上市
  3. MySQL 5.7 并行复制参数优化
  4. JAVA传入一个字符串,返回一个字符串中的大写字母
  5. AndroidStudio Refreshing Gradle Project编译更新Gradle卡住问题
  6. java.util.ConcurrentModificationException
  7. 【Android】资源加载过程
  8. HTML表单提交规则
  9. resource.arsc二进制内容解析 之 RES_TABLE_TYPE_TYPE (Config List)
  10. 利用Android Studio的 Monitor Memory 查找内存泄漏