2019独角兽企业重金招聘Python工程师标准>>>

引语:

本篇博客主要介绍了Spark SQL中的filter过滤数据、去重、集合等基本操作,以及一些常用日期函数,随机函数,字符串操作等函数的使用,并列编写了示例代码,同时还给出了代码当中用到的一些数据,放在最文章最后。

SparkSQL简介

Spark SQL是Spark生态系统中非常重要的组件,其前身为Shark。Shark是Spark上的数据仓库,最初设计成与Hive兼容,但是该项目于2014年开始停止开发,转向Spark SQL。Spark SQL全面继承了Shark,并进行了优化。 Spark SQL增加了SchemaRDD(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。

Spark SQL的优点

Spark SQL可以很好地支持SQL查询,一方面,可以编写Spark应用程序使用SQL语句进行数据查询,另一方面,也可以使用标准的数据库连接器(比如JDBC或ODBC)连接Spark进行SQL查询 。

Spark SQL基本操作

去重

distinct:根据每条数据进行完整去重。

dropDuplicates:根据字段去重。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  DistinctDemo* 作者   彭三青* 创建时间  2018-11-29 15:02* 版本  1.0* 描述: $ 去重操作:distinct、drop*/
​
object DistinctDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("Operations").getOrCreate()import spark.implicits._
​val employeeDF: DataFrame = spark.read.json("E://temp/person.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​println("--------------------distinct---------------------")// 根据每条数据进行完整的去重employeeDS.distinct().show()
​println("--------------------dropDuplicates---------------------")// 根据字段进行去重employeeDS.dropDuplicates(Seq("name")).show()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

过滤

filter():括号里的参数可以是过滤函数、函数返回的Boolean值(为true则保留,false则过滤掉)、列名或者表达式。

except:过滤出当前DataSet中有,但在另一个DataSet中不存在的。

intersect:获取两个DataSet的交集。

提示:except和intersect使用的时候必须要是相同的实例,如果把另外一个的Employee换成一个同样的字段的Person类就会报错。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  FilterDemo* 作者   彭三青* 创建时间  2018-11-29 15:09* 版本  1.0* 描述: $*/
​
object FilterDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FilterDemo").getOrCreate()import spark.implicits._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json")val employee2DS: Dataset[Employee] = employee2DF.as[Employee]
​println("--------------------employee--------------------")employeeDS.show()
​println("--------------------employee2--------------------")employee2DS.show()
​println("       ┏┓   ┏┓\n" +"     ┏┛┻━━━┛┻┓\n" +"   ┃       ┃\n" +"   ┃   ━   ┃\n" +"   ┃ ┳┛ ┗┳ ┃\n" +"   ┃       ┃\n" +"   ┃   ┻   ┃\n" +"   ┃       ┃\n" +"   ┗━┓   ┏━┛\n" +"     ┃   ┃\n" +"      ┃   ┃\n" +"     ┃   ┗━━━┓\n" +"     ┃       ┣┓\n" +"     ┃       ┏┛\n" +"     ┗┓┓┏━┳┓┏┛\n" +"      ┃┫┫ ┃┫┫\n" +"      ┗┻┛ ┗┻┛\n")
​println("-------------------------------------------------")
​// 如果参数返回true,就保留该元素,否则就过滤掉employeeDS.filter(employee => employee.age == 35).show()employeeDS.filter(employee => employee.age > 30).show()// 获取当前的DataSet中有,但是在另外一个DataSet中没有的元素employeeDS.except(employee2DS).show()// 获取两个DataSet的交集employeeDS.intersect(employee2DS).show()
​spark.stop()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

集合

collect_set:将一个分组内指定字段的值都收集到一起,不去重

collect_list:讲一个分组内指定字段的值都收集到一起,会去重

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  CollectSetAndList* 作者   彭三青* 创建时间  2018-11-29 15:24* 版本  1.0* 描述: $ collect_list、 collect_set*/
​
object CollectSetAndList {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FilterDemo").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​// collect_list:将一个分组内指定字段的值都收集到一起,不去重// collect_set:同上,但唯一区别是会去重employeeDS.groupBy(employeeDS("depId")).agg(collect_set(employeeDS("name")), collect_list(employeeDS("name"))).show()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

joinWith和sort

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  JoinAndSort* 作者   彭三青* 创建时间  2018-11-29 15:19* 版本  1.0* 描述: $*/
​
object JoinAndSort {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FilterDemo").getOrCreate()import spark.implicits._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]val departmentDF: DataFrame = spark.read.json("E://temp/department.json")val departmentDS: Dataset[Department] = departmentDF.as[Department]
​println("----------------------employeeDS----------------------")employeeDS.show()println("----------------------departmentDS----------------------")departmentDS.show()println("------------------------------------------------------------")
​// 等值连接employeeDS.joinWith(departmentDS, $"depId" === $"id").show()// 按照年龄进行排序,并降序排列employeeDS.sort($"age".desc).show()}
}
case class Department(id: Long, name: String)
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

函数的使用

日期函数:

current_time():获取当前日期。

current_timestamp():获取当前时间戳。

数学函数

rand():生成0~1之间的随机数

round(e: column,scale: Int ):column列名,scala精确到小数点的位数。

round(e: column):一个参数默认精确到小数点1位。

字符串函数

concat_ws(seq: String, exprs: column*):字符串拼接。参数seq传入的拼接的字符,column传入的需要拼接的字符,可以指定多个列,不同列之间用逗号隔开。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  FunctionsDemo* 作者   彭三青* 创建时间  2018-11-29 15:56* 版本  1.0* 描述: $*/
​
object FunctionsDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("Operations").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​employeeDS.select(employeeDS("name"), current_date(), current_timestamp(),rand(), round(employeeDS("salary"), 2),// 取随机数,concat(employeeDS("gender"), employeeDS("age")),concat_ws("|", employeeDS("gender"), employeeDS("age"))).show()
​spark.stop()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

数据

employee.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}

employee2.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}

department.json

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}

转载于:https://my.oschina.net/u/3875806/blog/2964314

Spark SQL基本操作以及函数的使用相关推荐

  1. Spark SQL 内置函数(五)Aggregate Functions(基于 Spark 3.2.0)

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  2. Spark SQL 内置函数(一)Array Functions(基于 Spark 3.2.0)

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  3. 73、Spark SQL之开窗函数以及top3销售额统计案例实战

    开窗函数以及top3销售额统计案例实战 Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取to ...

  4. 16 ,spark sql : 开窗函数 ,top3

    一 ,开窗函数 : 1 ,共几行 : sql : val sql = "select sid,sname,count(1) over() cnt from student" val ...

  5. spark sql自定义UDF函数-java语言

    背景说明 基于spark sql开发过程中,需要一些类似与官网提供的 int().from_json()等自定函数处理数据.下属将简单讲解通过java如何实现spark sql自定义函数 官方UDF接 ...

  6. Spark SQL操作之-函数汇总篇-下

    Spark SQL操作之-自定义函数篇-下 环境说明 自定义函数分类 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 环境说明 1. JDK 1.8 2. Spark 2.1 自定义函数分类 ...

  7. Spark sql之开窗函数

    目录 一.开窗函数的分类 二.聚合函数 三.排序函数 1. ROW_NUMBER 连续递增 2. RANK 跳跃排序 3. DENSE_RANK 连续排序 4. NTILE 分组排名 四.开窗函数之 ...

  8. 第70课:Spark SQL内置函数解密与实战 每天晚上20:00YY频道现场授课频道68917580

    每天晚上20:00YY频道现场授课频道68917580每天晚上20:00YY频道现场授课频道68917580 /* * *王家林老师授课http://weibo.com/ilovepains */ 源 ...

  9. spark SQL自定义函数:

    spark SQL 自定义函数: 自定义函数: 第一种:  U D F  (用户自定义函数)函数 特点:  一对一的关系,输入一个值以后输出一个值  (一进一出) 大部分的内置函数都是U D F函数 ...

最新文章

  1. 变步长龙格库塔法matlab代码,matlab 龙格库塔法 变步长龙格库塔法.doc
  2. AI强势来袭,锁上手机就真的安全了吗?
  3. 前端学习(3072):vue+element今日头条管理-删除文章失败(json-bigint)
  4. 3W+字的设计模式手册
  5. 丢失MySQL root 密码?
  6. 接入gitment为hexo添加评论功能
  7. 聊聊storm的CheckpointSpout
  8. [转]java String的经典问题(new String(), String)
  9. 每天一道面试题(2):实现strncpy
  10. Java菜鸟教程 递归算法与Scanner类
  11. 计算机图形学之阴影解读
  12. 机械电钢琴音源 Cinesamples Keyboard In Blue Kontakt
  13. Cause: org.apache.ibatis.type.TypeException: Could not resolve type alias ' star.facade.vipuser.vo.
  14. 解决搜狗输入法默认问题
  15. python文本错别字检测
  16. OpenGL 矩阵变换GLM库的使用
  17. Npm 安装提示 EUNSUPPORTEDPROTOCOL 错误
  18. What?校花居然半夜问我要“软件”
  19. VS Code编写HTML-CSS-JS等——代码格式化
  20. uniapp小程序运行正常,app运行报cid unmatched at view.umd.min.js

热门文章

  1. 人脸识别引擎SeetaFaceEngine中Identification模块使用的测试代码
  2. 【C++】C++11 STL算法(三):分隔操作(Partitioning operations)、排序操作(Sorting operations)
  3. mysql的字符串函数大全_MySQL的字符串函数大全
  4. a*算法matlab代码_导向滤波算法及其matlab代码实现
  5. 哆啦a梦简单图画python编程_[python]画哆啦A梦,Python,绘制
  6. Java项目:图书管理系统(java+swing+Gui+Mysql)
  7. linux进程下的线程数,Linux下查看进程线程数的方法
  8. 极小连通子图和极大连通子图_强连通分量与拓扑排序
  9. MySQL5.7的date类型_Mysql5.7 虚拟列数据类型为DATE时,如何存入数据?
  10. 【spring】使用spring的环境配置及从官网获得配置文件所用代码的方法