[spark] SparkSQL知识点全集整理
SparkSQL是一个用于处理结构化数据的spark组件,主要强调的是“结构化”,让开发者少写代码、降低开发成本、提升数据分析执行效率、shark是SparkSQl的前身。
目录
简介
DataFrame
对比RDD:
DataFrame常见创建方式:
SparkSQL读写数据
1、与RDD交互
2、读写本地文件
3、读写parquet
4、读写json
5、读写mysql
spark on hive
sparkSQL运行原理
通用SQL执行原理
SparkSQL运行架构
重要角色介绍
Antlr介绍
sparkSQL CLI
使用ThriftServer
UDF/UDAF函数
简介
作为Spark生态系统最活跃的组件之一,他能够利用Spark进行结构化的数据的存储和操作、结构化数据既可以来自外部数据源(hive、json、parquet等),也可以通过向RDD添加Schema得到。同时因为是spark生态系统组件,所以与其他组件无缝集成,与其他组件(spark straming、GraphX、MLib)可以在技术堆栈中对数据进行批处理、流处理和交互式查询等多种业务处理、下面开始介绍SparkSQL的核心及使用
DataFrame
在Spark中、DataFrame是一种以RDD为基础的分布式数据集、类似于传统数据库中的二维表格。不过比RDD多了个Schema元数据,意思就是DataFrame所表示的数据有列名和类型
对比RDD:
很明显左边RDD虽然知道有Person类型,但是并不知道内部属性与类型;而DataFrame有列名和类型,方便查询优化器优化
说明:
RDD与DataFrame都是分布式的 可以并行处理的 一个集合
DataFrame更像是一个二维表格,有列名和类型(Schema)
而对于DataFrame来说,它不仅可以知道里面的数据,而且它还可以知道里面的schema信息
因此能做的优化肯定也是更多的,举个例子:每一列的数据类型是一样的,因此可以采用更好的压缩,这样的话整个DF存储所占用的东西必然是比RDD要少很多的(这也是DF的优点)
想要优化的更好,所要暴露的信息就需要更多,这样系统才能更好大的进行优化
RDD的类型可以是Person,但是这个Person里面,我们是不知道它的Name,Age,Height的,因此相比DF而言更难进行优化
DataFrame常见创建方式:
a、原始方法createDataFrame
//获取SparkSessionval session: SparkSession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()//SparkSession中的sparkContext读取本地数据获取RDDval persons: RDD[String] = session.sparkContext.textFile("person.txt")//数据格式:zhangsan 33val psnArr: RDD[Array[(String, Int)]] = persons.map(_.split(" ")).map(_.zipWithIndex)//转换后格式:(zhangsan,0),(33,1)//定义schema模型var columnDesc = Array[String]("name string", "age int")//获取类型var typeDesc = columnDesc.map(_.split(" ")(1))//自定义匹配类型返回列类型def getSchemaType(x: String) = {x match {case "string" => DataTypes.StringTypecase "int" => DataTypes.IntegerType}}//调用方法var rowRDD = psnArr.map(x => x.map(y => getSchemaType(y))).map((x: Array[Any]) => {Row.fromSeq(x)})val arr: Array[StructField] = columnDesc.map(_.split(" ")).map(x => StructField.apply(x(0), getSchemaType(x(1)), true))val schema: StructType = StructType(arr)//创建DataFrameval df: DataFrame = session.createDataFrame(rowRDD, schema)df.show()df.printSchema()
b、bean类型createDataFrame
class Person extends Serializable {@BeanPropertyvar name= ""@BeanPropertyvar age=0
}
....
val persons: RDD[String] = ss.sparkContext.textFile("person.txt")val value: RDD[(String, String)] = persons.map(_.split(" ")).map(t=>(t(0),t(1)))val beanRDD: RDD[Person] = value.map(p => {var psn = new Person()psn.setName(p._1)psn.setAge(p._2.toInt)psn})val PDF: DataFrame = ss.createDataFrame(beanRDD,classOf[Person])PDF.printSchema()PDF.show()
c、DataSet.toDF()
val ds: Dataset[String] = ss.read.textFile("person.txt")
val person: Dataset[(String, String)] = ds.map(_.split(" ")).map(arr=>(arr(0),arr(1)))
person.printSchema()
person.toDF("name","age").show()
创建DataFrame的方式还有很多,暂只简单介绍,注意版本我使用的spark2.3.1
SparkSQL读写数据
1、与RDD交互
前面创建DataFrame时已有代码,就是定义Schema的bean类型,转换RDD为DataFrame
val PDF: DataFrame = ss.createDataFrame(beanRDD,classOf[Person])
PDF.show()
val rdd: RDD[String] = PDF.toJSON.rdd//转为RDD
2、读写本地文件
val ds: Dataset[String] = ss.read.textFile("person.txt")
person.toDF("name","age").show()
3、读写parquet
println("***************数据存储为parquet********************")val parquet: DataFrame = ss.read.textFile("person.txt").map(_.split(" ")).map(p=>(p(0),p(1))).toDF("name","age")parquet.printSchema()parquet.write.mode(SaveMode.Overwrite).parquet("data/parquet")println("***************读取parquet数据********************")val readP: DataFrame = ss.read.parquet("data/parquet")readP.printSchema()readP.show()
4、读写json
println("***************存储数据为json********************")val sdf = ss.read.textFile("person.txt").map(_.split(" ")).map(p=>(p(0),p(1))).toDF("name","age")sdf.write.mode(SaveMode.Overwrite).json("data/json")println("***************读取json文件********************")val json = ss.read.json("data/person.json")
5、读写mysql
println("***************读取mysql数据********************")val url = "jdbc:mysql://node211:3306/spark"val property = new Properties()//设置连接属性property.setProperty("user","root")property.setProperty("password","manzi")property.setProperty("driver","com.mysql.jdbc.Driver")val empdf: DataFrame = ss.read.jdbc(url,"emp",property)val salarydf: DataFrame = ss.read.jdbc(url,"salary",property)empdf.createTempView("empView")//创建临时表salarydf.createTempView("salaryView")val resultdf: DataFrame = ss.sql("select empView.id,empView.name,empView.age,salaryView.salary from empView join salaryView on empView.id = salaryView.id")resultdf.printSchema()resultdf.show()println("***************存储数据到mysql********************")resultdf.write.mode(SaveMode.Overwrite).jdbc(url,"temp",property)
读写hive单说,不在这里列举
spark on hive
spark on hive只是一种模式、sparkSQL依赖hive语法解析器和使用hive存储,hive语法解析器生成语法树(Syntax Tree)后由SparkSQL接管、执行计划生成和优化都由Catalyst完成,下面是代码示例
val conf: SparkConf = new SparkConf()// conf.set("spark.sql.warehouse.dir","E:/data")conf.set("spark.sql.warehouse.dir", "hdfs://manzi/spark")//保证hive主机已开启metastore服务conf.set("hive.metastore.uris", "thrift://node212:9083")val spa: SparkSession = SparkSession.builder().config(conf).appName("hiveApp").master("local").enableHiveSupport()//支持hive.getOrCreate()import spa._sql("use spark")//使用名为spark的数据库//需要注意的是,如果表存在数据,hdfs目录下的数据也会被一起删除sql("drop table if exists emp")//删除表重新创建sql("create table emp(name string,age int) row format delimited fields terminated by ' ' ")sql("desc formatted emp").show(false)//查看表信息//load数据到empsql("load data inpath '/spark/emp.txt' into table emp")sql("drop table if exists salary")sql("create table salary(name string,salary float) row format delimited fields terminated by ' ' ")sql("desc formatted salary").show(false)sql("load data inpath '/spark/salary.txt' into table salary")val resultData: DataFrame = sql("select emp.name,emp.age,salary.salary from emp join salary on emp.name = salary.name")//存储查询数据到表resultData.write.saveAsTable("emp_salary")
sparkSQL运行原理
通用SQL执行原理
通用数据库中最基本的查询语句通常有Perjection、DataSource、Filter 三部分组成,对应SQL查询过程中的Result--> DataSource-->Operation
1、词法语法解析 parse 分辨出SQL语句关键词(select from where)、区分表达式、DataSource、Projection,判断SQL是否规范、并形成逻辑计划
2、绑定 Bind SQL与数据库的数据字典(列、表、视图)绑定,如果相关的DataSource和Projection都存在,表示SQL可执行
3、优化 Optimize 一般的数据库会提供几个执行计划、这些计划一般都有运行统计数据、数据库会在这些计划中选择一个最优计划
4、执行 Execute 执行最优计划,返回数据集
注:一般数据库运行都会缓存优化过的SQL,每次查询如果能在缓冲池命中缓存的SQL,直接返回优化后的结果
SparkSQL运行架构
SparkSQL对SQL的处理和关系型数据库采用了类似的方法,主要分为两大部:1、parse解析SQL为一个抽象语法书tree;2、Rule对tree进行绑定、优化,优化器是Catalyst(解析、绑定、优化、生成物理计划)
SparkSQL组成:
Core:处理数据的输入输出、从不同数据源获取数据返回DataFrame
Catalyst:处理SQL,解析、绑定、优化、物理计划
Hive:对hive数据的处理
Hive-ThriftServer:提供jdbc、cli接口
重要角色介绍
Tree
tree的具体操作是通过treeNode来实现的,tree是Catalyst执行计划表示的数据结构
SQL生成的语法树在内存维护、不会持久化磁盘、且树的修改都是以替换已有节点方式进行的
Tree --> Children:Seq(BaseType)方法,可以返回一系列孩子节点
遍历: 借助各个tree之间的关系、使用transformDown(默认前序遍历)、transformUp将rule应用到给定的树段并匹配节点实施变化的方法;也可以使用transformChildrenDown、transformChildrenUp对一个给定的节点进行操作、通过迭代将Rule应用到该节点以及子节点
Rule
Rule[TreeType<:TreeNode[]]是一个抽象类、子类需要复写apply(plan:TreeType)方法来制定处理逻辑
具体实现类是RuleExecutor,凡是需要处理执行计划树进行实施规则匹配和节点处理,都需要继承RuleExecutor[TreeType]抽象类
整个结构图如下
执行流程:
1、将SQL通过词法和语法解析生成未绑定的逻辑计划(包含Unresolved Relation、Unresolved Function和Unresolved Attribute)、然后再后续的步骤中使用不同的Rule应用到该逻辑计划上
2、Analyzer使用AnalysisRules配合元数据(如SessionCatalog或者HiveMetastore)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划
先实例化一个SimpleAnalyzer,然后遍历预先定义好的Batch、通过父类Rule Executor的执行方法运行Batch里面的Rules、每个Rule会对未绑定的逻辑计划进行处理、有些可以通过一次解析处理、有些需要多次迭代、迭代直到达到FixedPoint次数迭代或达到前后两次树结构没有变化才停止操作
3、Optimizer使用Optimization Rules、将绑定的逻辑计划进行合并、列裁剪、过滤器下推等优化工作后生成优化的逻辑计划
4、Planner使用Planning Strategies、对优化的逻辑计划进行转换(Transformer)生成可执行的物理计划、根据过去的性能统计数据、选择最佳的物理计划执行CostModel、最后可以执行的物理计划执行树、即得到SparkPlan
5、在真正执行物理计划执行之前、还要进行preparations规则处理,最后调用SparkPlan的execute方法计算RDD
Antlr介绍
Antlr(Another Tool for Language Recognition识别)、为JAVA、C++、C#等语言提供一个通用语法描述来自动构造语言的识别器(Recognizer)、编译器(Parser)、解释器(Translator)的框架
生成未绑定的逻辑计划分为两部分:
词法分析(Lexical Analysis):负责将符号(Token)分组成符号类(Token class or Token type)
解析(Parser):真正的解析、默认会构建出一颗语法树(Syntax Tree)
sparkSQL CLI
1、创建并配置hive-site.xml
<configuration><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><property><name>hive.metastore.uris</name><value>thrift://node212:9083</value></property></configuration>
2、启动hdfs和hive、hive --service metastore
3、启动spark集群和sparkSQL CLI
注:默认情况下、SparkSQL shuffle的时候是200个partition,使用下面命令修改:
set spark.sql.shuffle.partitions=20
[root@node211 sbin]# ./start-all.sh
[root@node211 bin]# ./spark-sql --master yarn> set spark.sql.shuffle.partitions=20;
使用ThriftServer
配置启动ThriftServer
1、创建并配置hive-site.xml到conf文件夹(同上)
2、启动spark和ThriftServer
[root@node211 sbin]# ./start-all.sh
[root@node211 sbin]# ./start-thriftserver.sh --master yarn
3、远程客户端连接
[root@node211 bin]# ./beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://node211:10000
Connecting to jdbc:hive2://node211:10000
Enter username for jdbc:hive2://node211:10000: root
Enter password for jdbc:hive2://node211:10000: *****0: jdbc:hive2://node211:10000> show tables;
+-----------+------------+--------------+--+
| database | tableName | isTemporary |
+-----------+------------+--------------+--+
| default | manzi72 | false |
| default | person | false |
+-----------+------------+--------------+--+
2 rows selected (3.059 seconds)
UDF/UDAF函数
UDF:一进一出
val session: SparkSession = SparkSession.builder().config(conf).appName("udf").master("local").getOrCreate()
import session.implicits._
//建立测试数据
val names: RDD[String] = session.sparkContext.parallelize(List[String]("manzi","mayun","laowang","wangwu"))val dataFrame: DataFrame = names.toDF("name")
dataFrame.createTempView("temp")
//注册简单自定义UDF函数
session.udf.register("addPrefix",(name:String,prefix:String)=>{(prefix+"_"+name)})
session.sql("select addPrefix(name,'0422') as name from temp").show()+------------+
| name|
+------------+
| 0422_manzi|
| 0422_mayun|
|0422_laowang|
| 0422_wangwu|
+------------+
UDAF:多进一出 (联想Sum函数、count)、先看一下这个抽象类源码
abstract class UserDefinedAggregateFunction extends Serializable {//StructType代表的是该聚合函数输入参数的类型。例如,一个UDAF实现需要两个输入参数,类型分别是DoubleType和LongType//new StructType().add("doubleInput",DoubleType).add("LongType",LongType)//那么该udaf就只会识别,这种类型的输入的数据。def inputSchema: StructType//该StructType代表aggregation buffer的类型参数。//例如,一个udaf的buffer有两个值,类型分别是DoubleType和LongType,那么其格式将会如下:// new StructType().add("doubleInput", DoubleType).add("longInput", LongType)// 也只会适用于类型格式如上的数据def bufferSchema: StructType//dataTypeda代表该UDAF的返回值类型def dataType: DataType//如果该函数是确定性的,那么将会返回true,例如,给相同的输入,就会有相同的输出def deterministic: Boolean//初始化聚合buffer,例如,给聚合buffer以0值//在两个初始buffer调用聚合函数,其返回值应该是初始函数自身,例如//merge(initialBuffer,initialBuffer)应该等于initialBuffer。def initialize(buffer: MutableAggregationBuffer): Unit//利用输入去更新给定的聚合buffer,每个输入行都会调用一次该函数def update(buffer: MutableAggregationBuffer, input: Row): Unit//合并两个聚合buffer,并且将更新的buffer返回给buffer1//该函数在聚合并两个部分聚合数据集的时候调用def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit//计算该udaf在给定聚合buffer上的最终结果def evaluate(buffer: Row): Any//使用给定的Column作为输入参数,来为当前UDAF创建一个Column@scala.annotation.varargsdef apply(exprs: Column*): Column = {val aggregateExpression =AggregateExpression(ScalaUDAF(exprs.map(_.expr), this),Complete,isDistinct = false)Column(aggregateExpression)}//使用给定Column去重后的值作为参数来生成一个Column@scala.annotation.varargsdef distinct(exprs: Column*): Column = {val aggregateExpression =AggregateExpression(ScalaUDAF(exprs.map(_.expr), this),Complete,isDistinct = true)Column(aggregateExpression)}
}
自定义UDAF继承接口,求平均值
/*** 按姓名group求salary平均值*/
class MyUDAF extends UserDefinedAggregateFunction {//输入数据的类型def inputSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("input", DoubleType, true)))}// 聚合操作时,所处理的数据的类型def bufferSchema: StructType = {StructType(Array(DataTypes.createStructField("avg", DoubleType, true),DataTypes.createStructField("count", LongType, true)))}// 为每个分组的数据执行初始化值def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0.0buffer(1) = 0l}// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Double](0) + input.getAs[Double](0)//总数buffer(1) = buffer.getAs[Long](1) + 1}// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并//buffer1是机器hadoop1上的结果 buffer2是机器Hadoop2上的结果def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)buffer1(1) = buffer1.getAs[Long](1) + buffer2.getAs[Long](1)}// 最终函数返回值的类型def dataType: DataType = {DataTypes.DoubleType}//如果该函数是确定性的,那么将会返回true,例如,给相同的输入,就会有相同的输出def deterministic: Boolean = {true}// 最后返回一个最终的聚合值 要和dataType的类型一一对应def evaluate(buffer: Row): Any = {buffer.getDouble(0) / buffer.getLong(1)}}
调用输出
val df: DataFrame = List(("zhangsan",5521.6),("lisi",4497.2),("wangwu",1569.99),("zhangsan",2574.45),("lisi",3368.45)).toDF("name","salary")
df.createTempView("myTable")
session.udf.register("avgSalaryByName",new MyUDAF)
session.sql("select name,avgSalaryByName(salary) as salary from myTable group by name").show()+--------+--------+
| name| salary|
+--------+--------+
| wangwu| 1569.99|
|zhangsan|4048.025|
| lisi|3932.825|
+--------+--------+
开窗函数 rou_number() over
开窗函数格式:分组取topN
【 rou_number() over (partitin by 分组字段 order by 排序字段) 】
举个栗子:
原数据:
小米,手机,250
华为,手机,450
苹果,手机,1000
三星,手机,2500
小米Pro,笔记本,1500
苹果Pro,笔记本,2000
三星Pro,笔记本,4100
华为ProX,笔记本,200
华硕,笔记本,10000
苹果,平板电脑,100
三星,平板电脑,200
华为,平板电脑,300
中兴,平板电脑,400
需求分析 : 根据文本中的类别进行分组,取月销售额的前三名
object WindowFunc {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("windowfun")//本地测试设置shuffle分区数、不然很慢conf.set("spark.sql.shuffle.partitions","10")val session: SparkSession = SparkSession.builder().config(conf).master("local").config("hive.metastore.uris", "thrift://node213:9083").config("spark.sql.warehouse.dir", "/user/hive/warehouse/").enableHiveSupport().getOrCreate()//建表session.sql("drop table if exists sales")session.sql("create external table if not exists sales (brand string,product string,price Int) " +"row format delimited fields terminated by ',' " +"stored as textfile location 'hdfs://manzi/data'")//加载数据//外部表方便数据加载//session.sql("load data inpath '/data/data.txt' into table sales")session.sql("select * from sales").show()// 开窗函数格式:* 【 rou_number() over (partitin by XXX order by XXX) 】//窗口函数 row_number即为分组取topN//需求分析 : 根据文本中的类别进行分组,取月销售额的前三名:val result = session.sql("select brand,product,price from " +"( select brand,product,price,row_number() over (partition by product order by price desc) " +"as rank from sales) t where t.rank<=3")result.show()}}
[spark] SparkSQL知识点全集整理相关推荐
- 数据结构知识点总结整理
数据结构知识点总结整理 0.常考基础必知必会 A. 排序:排序有几种,各种排序的比较,哪些排序是稳定的,快排的算法: B. 查找:哈希查找.二叉树查找.折半查找的对比,哈希映射和哈希表的区别? C. ...
- 前端面试知识点目录整理
前端面试知识点目录整理 基本功考察 1.关于Html 1.html语义化标签的理解.结构化的理解:能否写出简洁的html结构:SEO优化. 2.h5中新增的属性,如自定义属性data.类名classN ...
- matplotlib一些常用知识点的整理,
本文作为学习过程中对matplotlib一些常用知识点的整理,方便查找. 强烈推荐ipython 无论你工作在什么项目上,IPython都是值得推荐的.利用ipython --pylab,可以进入Py ...
- PHP框架编写和应用知识点,php框架知识点的整理和补充
我们对于比较常见的php框架,已经基本上有所认识,不过一些比较冷门的框架也可以做一个了解,以便日后的特殊使用.本篇整理了4种php框架,在不同的使用方法上都独具特色,其中很多的框架不被大家熟知,下面我 ...
- spark集群搭建整理之解决亿级人群标签问题
spark集群搭建整理之解决亿级人群标签问题 参考文章: (1)spark集群搭建整理之解决亿级人群标签问题 (2)https://www.cnblogs.com/huangxincheng/p/91 ...
- 机器学习需要掌握的数学知识点---详细整理
机器学习需要掌握的数学知识点---详细整理 第一篇 线性代数篇 第一章 排队!排队!什么是向量 第二章 矩阵 第三章 距离 第二篇 概率偏 由暗恋引发的思考 机器学习中的概率研究 贝叶斯问题 第三篇 ...
- 【体系结构】有关Oracle SCN知识点的整理--补充内容
[体系结构]有关Oracle SCN知识点的整理--补充内容 小麦苗自己整理的内容参考:[体系结构]有关Oracle SCN知识点的整理 http://blog.itpub.net/26736162 ...
- 安卓开发入门教程!终于有人把安卓程序员必学知识点全整理出来了,附答案
如何提升学习? 提升学习很大程度上就是为了跳槽涨薪,当然运气不好的情况可能是被劝退重新找工作,无论是哪种情况,自主学习的能力是要有的,而且得有一个方向,得有重点. 对于Android移动开发来说,热修 ...
- 六年级上册计算机知识点总结,六年级数学上册知识点的整理
第1篇:六年级上册数学圆的知识点整理 一.认识圆 1.圆的定义:圆是由曲线围成的一种平面图形. 2.圆心:将一张圆形纸片对折两次,折痕相交于圆中心的一点,这一点叫做圆心. 一般用字母o表示.它到圆上任 ...
- 河海大学数据库知识点归纳整理
河海大学数据库知识点归纳整理 前言 该文档主要包含了对河海大学数据库这一门课程进行的知识点归纳,并且提供了ppt等其他复习资料. 河海大学许卓明老师数据库期末考点! 1 CH01 CH02 数据模型与 ...
最新文章
- 组策略 从入门到精通 (七) 组策略的继承
- VS Debug和Release版本的区别
- 【AC Saber】双指针
- python的for循环累加_在python中将6 for循环累计和矢量化
- winCVS 使用方法
- 线程同步以及yield()、wait()、Notify()、Notifyall()
- 二叉树的建立与三种遍历
- springMVCs下载
- eclipse下安装Tomcat
- 笨办法学 Python · 续 练习 44:使用 Python 的数据库 API
- 从绝望中寻找希望,人生必将辉煌
- 传奇开服架设之地图索引编辑器以及安装问题排查教程
- 如何在没有原工程的情况下,利用vivado将bit文件转化成bin文件
- 单片机学习方法总结资料分享
- java apdu读取社保卡_读取社保卡信息
- LANP+KEEPALIVED集群(一)
- 升级工作环境并支持C++17
- 苹果vs剪辑下载_Vlog教程 | 如何在手机剪辑app中添加自己的音乐?
- Your actions speak louder
- android系统是什么意思