Spark SQL 中UDF的讲解
Spark SQL 中UDF的讲解
User Define Function, 用户自定义函数,简称UDF,存在与很多组件中。
在使用Sparksql的人都遇到了Sparksql所支持的函数太少了的难处,除了最基本的函数,Sparksql所能支撑的函数很少,肯定不能满足正常的项目使用,UDF可以解决问题。
SparkSQL中的UDF相当于是1进1出,UDAF相当于是多进一出,类似于聚合函数。
开窗函数一般分组取topn时常用。
package com.bynear.Scalaimport org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object UDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SparkSQL_UDF").setMaster("local")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val names = Array("刘亦菲", "张柏芝", "冯提模","陈一发儿")val nameRDD = sc.parallelize(names, 5)val nameRowRDD = nameRDD.map(name => Row(name))val structType = StructType(Array(StructField("name", StringType, true)))val namesDF = sqlContext.createDataFrame(nameRowRDD, structType)namesDF.registerTempTable("names")sqlContext.udf.register("strLen", (str: String) => str.length)sqlContext.sql("select name,strLen(name) as length from names").show()sqlContext.sql("select name,strLen(name) as length from names").collect().foreach(println)} }
运行结果:
|name|length|
+----+------+
| 刘亦菲| 3|
| 张柏芝| 3|
| 冯提模| 3|
|陈一发儿| 4|
+----+------+
[张柏芝,3]
[冯提模,3]
[陈一发儿,4]
package com.bynear.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; public class JavaUDF {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaUDF").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc.sc()); ArrayList<String> names = new ArrayList<String>(); names.add("刘亦菲"); names.add("张柏芝"); names.add("冯提模"); names.add("陈一发儿"); JavaRDD<String> nameRDD = sc.parallelize(names); JavaRDD<Row> nameRowRDD = nameRDD.map(new Function<String, Row>() {@Override public Row call(String line) throws Exception {return RowFactory.create(String.valueOf(line)); }}); /** * 使用动态编程方式,将RDD转换为Dataframe */ ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame nameDF = sqlContext.createDataFrame(nameRowRDD, structType); /** * 注册临时表 */ nameDF.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2, 表明包含几个参数传入 * UDF1<String, Integer> 表示 传入参数 String 输出参数为 Integer * call方法为 自定义的函数! * DataTypes.IntegerType 必须与输出参数的类型一致即 Integer */ sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {@Override public Integer call(String s) throws Exception {return s.length(); }}, DataTypes.IntegerType); /** * select name ,StrLen(name) as length from user * 在临时表user中 查找name StrLen(name) == name的长度 * StrLen(name) as length 表示将获取到的name的长度 例如15 15作为一列 as length 列名为 length */ sqlContext.sql("select name ,StrLen(name) as length from user").show(); Row[] rows = sqlContext.sql("select name ,StrLen(name) as length from user").collect(); for (Row row : rows) {System.out.println(row); }sc.close(); } }
输出结果:同上!
Spark SQL 中UDF的讲解相关推荐
- Spark SQL中出现 CROSS JOIN 问题解决
Spark SQL中出现 CROSS JOIN 问题解决 参考文章: (1)Spark SQL中出现 CROSS JOIN 问题解决 (2)https://www.cnblogs.com/yjd_hy ...
- SQL中触发器实例讲解(转)
SQL中触发器实例讲解 定义: 何为触发器?在SQL Server里面也就是对某一个表的一定的操作,触发某种条件,从而执行的一段程序.触发器是一个特殊的存储过程. 常见的触发器有三种:分别应 ...
- Spark SQL中的DataFrame
在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是 ...
- 解决数据倾斜一:RDD执行reduceByKey或则Spark SQL中使用group by语句导致的数据倾斜
一:概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...
- spark sql自定义UDF函数-java语言
背景说明 基于spark sql开发过程中,需要一些类似与官网提供的 int().from_json()等自定函数处理数据.下属将简单讲解通过java如何实现spark sql自定义函数 官方UDF接 ...
- Spark SQL中 RDD 转换到 DataFrame (方法二)
强调它与方法一的区别:当DataFrame的数据结构不能够被提前定义.例如:(1)记录结构已经被编码成字符串 (2) 结构在文本文件中,可能需要为不同场景分别设计属性等以上情况出现适用于以下方法.1. ...
- 如何查询spark版本_掌握Spark SQL中的查询执行
了解您的查询计划 自从Spark 2.x以来,由于SQL和声明性DataFrame API,在Spark中查询数据已成为一种奢侈. 仅使用几行高级代码就可以表达非常复杂的逻辑并执行复杂的转换. API ...
- spark sql中的窗口函数
2019独角兽企业重金招聘Python工程师标准>>> databricks博客给出的窗口函数概述 Spark SQL supports three kinds of window ...
- spark sql中的first函数在多个字段使用实例
1.建立hive表如下: CREATE EXTERNAL TABLE `newsapp.test_first`(`userkey` string, `publish_id` string, `data ...
最新文章
- J2EE学习中一些值得研究的开源项目
- 怎样进入服务器修改东西,进入服务器修改数据库
- 第二个Spring冲刺周期团队进展报告
- BAL数据集与BA优化
- python开发学习记录
- 四.激光SLAM框架学习之A-LOAM框架---项目工程代码介绍---2.scanRegistration.cpp--前端雷达处理和特征提取
- linux查看ip访问日志文件,linux分析apache日志获取最多访问的前10个IP
- Oh~Yeah,刘翔!
- 一款免费好用的英文润色软件(1Checker没错,这是免费的)
- gp3688 uhf2扩频_摩托罗拉GP3688_GP3188写频软件
- JeecgBoot商业版源码下载
- AArch64架构内存布局及线性地址转换
- WIN 10 FTP 不好用问题总结
- RTKLIB专题学习(七)---精密单点定位实现初识(三)
- 模电(二)半导体二极管
- Nginx 启动报错 directive is not allowed here in
- STM32汇编指令(一)WFI 和 WFE
- 大文件切片上传、视频切片上传转m3u8播放
- 2022年PMP最新报名流程来了! PMP考生看过来!
- 数据库 for update的作用
热门文章
- ubuntu 文件管理器死机
- Spring boot 启动过程
- 2020牛客暑期多校训练营(第六场)H.Harmony Pairs 数位dp
- 【NOI2019】弹跳【二维线段树】【dijkstra】
- Educational Codeforces Round 94 (Rated for Div. 2) D(思维)
- HDU1269 迷宫城堡(模板题)
- 「ROI 2017 Day 2」反物质(单调队列优化dp)
- jzoj3910-Idiot的间谍网络【倍增,dfs】
- 转圈游戏(luogu 1965)
- 纪中B组模拟赛总结(2020.2.1)