Spark SQL 理论学习:

简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

特点

1)易整合

2) 统一的数据访问方式

3)兼容Hive

4)标准的数据连接

SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式。

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet

RDD (Spark1.0) —>Dataframe(Spark1.3) —>Dataset(Spark1.6)

同样的数据都给到这三个数据结构有相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口。

RDD

RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。简单,API设计友好。但它是一个JVM驻内存对象,受GC的限制和数据增加时Java序列化成本的升高。

1
val rdd = sc.textFile("file:///opt/module/spark/README.md")

1
rdd.collect

Dataframe

在Spark中DataFrame与RDD类似,也是一个分布式数据容器。但是DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息(schema),与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。与R和Pandas的DataFrame类似。

RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

性能上比RDD要高,主要有两方面原因:

DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待是同时DataFrame也是懒执行的。

定制化内存管理数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。

优化的执行计划查询计划通过Spark catalyst optimiser进行优化.

举个例子

上图展示的人口数据分析的示例,构造了两个DataFrame,join之后做了filter操作。直接地执行这个执行计划,执行效率很差。join是代价较大的操作,如果能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,可以缩短执行时间。Spark SQL的查询优化器是这样做的:逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

得到的优化执行计划在转换成物理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。

对于普通开发者而言:即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。但是由于在编译期缺少类型安全检查,导致运行时容易出错。

Dataset

1)是Dataframe API的一个扩展,是Spark最新的数据抽象

2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。

3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。

4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。

5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。

6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].

7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。

数据准备

123
{"name":"Hadoop"}{"name":"Spark", "Year":2015}{"name":"Flink", "Year":2018}
123
case class  Bigdata(name:String,Year:Int) val ds = spark.sqlContext.read.json("file:///opt/module/spark/json/bigdata.json")ds.show()

RDD让我们能够决定怎么做,而DataFrame和DataSet让我们决定做什么,控制的粒度不一样。

三者共性

1)都是spark平台下的分布式弹性数据集。

2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过

3)都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。

1234567
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))// map不运行// map不运行rdd.map{line=>  println("运行")  line._1}.collect

+

4)都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

5)都有partition的概念

6)都有partition的概念

7)对DataFrame和Dataset进行操作许多操作都需要这个包进行支持 import spark.implicits._

8)DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

三者区别

1) RDD一般和spark mlib同时使用,但是不支持sparksql操作

2) DataFrame:与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,每一列的值没法直接访问

123456
val testDF = spark.read.json("file:///opt/module/spark/json/bigdata.json")testDF.foreach{  line =>    val col1=line.getAs[String]("name")    val col2=line.getAs[Long]("Year")}

3) DataFrame与Dataset一般不与spark ml同时使用

4) DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作:

123
val testDF = spark.read.json("file:///opt/module/spark/json/bigdata.json")testDF.createOrReplaceTempView("tmp")spark.sql("select * from tmp").show(100,false)

5) DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

1234567891011
"","Sepal.Length","Sepal.Width","Petal.Length","Petal.Width","Species""1",5.1,3.5,1.4,0.2,"setosa""2",4.9,3,1.4,0.2,"setosa""3",4.7,3.2,1.3,0.2,"setosa""4",4.6,3.1,1.5,0.2,"setosa""5",5,3.6,1.4,0.2,"setosa""6",5.4,3.9,1.7,0.4,"setosa""7",4.6,3.4,1.4,0.3,"setosa""8",5,3.4,1.5,0.2,"setosa""9",4.4,2.9,1.4,0.2,"setosa""10",4.9,3.1,1.5,0.1,"setosa"
1234567
//读取val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "file:///opt/module/spark/csv/iris.csv")val datarDF= spark.read.options(options).format("csv").load()datarDF.show()//保存val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://datanode1:9000/test/saveToCSV")datarDF.write.format("csv").mode(org.apache.spark.sql.SaveMode.Overwrite).options(saveoptions).save()

三者转换

RDD => DataFrame

手动确定

123456
val peopleRDD = sc.textFile("/input/sparksql/people.txt")val name2AgeRDD = peopleRDD.map{x => val para = x.split(",");(para(0).trim, para(1).trim.toInt) }name2AgeRDD.collectimport spark.implicits._val df = name2AgeRDD.toDF("name","age")df.show

反射确定

利用case class

1234
val peopleRDD = sc.textFile("/input/sparksql/people.txt")class classPeople(name:String,age:Int)val df = peopleRDD.map{x => val para = x.split(",");People(para(0).trim, para(1).trim.toInt) }.toDSdf.show()

编程方式

1)准备Scheam

12
import org.apache.spark.sql.types._val schema = StructType( StructField("name",StringType)::StructField("age",org.apache.spark.sql.types.IntegerType)::Nil)

2)准备Data 【需要Row类型】

123
val peopleRDD = sc.textFile("/input/sparksql/people.txt")import org.apache.spark.sql._val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}

3)生成DataFrame

1
val dataFrame = spark.createDataFrame(data, schema)

DataFrame => RDD

直接DataFrame.rdd即可

1
dataFrame.rdd

RDD <==>DataSet

RDD -》 DataSet

1234
val peopleRDD = sc.textFile("/input/sparksql/people.txt")case class People(name:String, age:Int)  //case class 确定schemaval ds = peopleRDD.map{x => val para = x.split(",");People(para(0), para(1).trim.toInt)}.toDSds.show()

DataSet -》 RDD

12
val dsRDD = ds.rdddsRDD.collect

DataFrame <==> DataSet

DataSet => DataFrame

12
val ds2df = ds.totoDFds2df.show

DataFrame =>DataSet

123
case class People(name:String, age:Int)val df2ds = ds2df.as[People]df2ds.show

参考资料

关于SparkSQL原理深入的学习可以参考《图解Spark》

Spark之SparkSQL理论篇相关推荐

  1. Spark之SparkStreaming理论篇

    SparkStreaming的相关理论学习: 简介 Spark Streaming用于流式数据的处理.Spark Streaming有高吞吐量和容错能力强等特点.Spark Streaming支持的数 ...

  2. Spark之RDD理论篇

    Spark的基石RDD: RDD与MapReduce Spark的编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce的扩展和延申, ...

  3. 艾伟_转载:学习 ASP.NET MVC (第五回)理论篇

    本系列文章导航 学习 ASP.NET MVC (第一回)理论篇 学习 ASP.NET MVC (第二回)实战篇 学习 ASP.NET MVC (第三回)实战篇 学习 ASP.NET MVC (第四回) ...

  4. 一步步教你轻松学朴素贝叶斯模型算法理论篇1

    一步步教你轻松学朴素贝叶斯模型理论篇1 (白宁超2018年9月3日17:51:32) 导读:朴素贝叶斯模型是机器学习常用的模型算法之一,其在文本分类方面简单易行,且取得不错的分类效果.所以很受欢迎,对 ...

  5. RabbitMQ学习总结 第一篇:理论篇

    目录 RabbitMQ学习总结 第一篇:理论篇 RabbitMQ学习总结 第二篇:快速入门HelloWorld RabbitMQ学习总结 第三篇:工作队列Work Queue RabbitMQ学习总结 ...

  6. 解密回声消除技术之一(理论篇)

    http://hulong988.blog.51cto.com 解密回声消除技术之一(理论篇) 2009-06-11 22:24:58 标签:语音 职场 休闲 通讯 原创作品,允许转载,转载时请务必以 ...

  7. 【机器学习】Logistic Regression 的前世今生(理论篇)

    Logistic Regression 的前世今生(理论篇) 本博客仅为作者记录笔记之用,不免有很多细节不对之处. 还望各位看官能够见谅,欢迎批评指正. 博客虽水,然亦博主之苦劳也. 如需转载,请附上 ...

  8. php switch 函数,PHP丨PHP基础知识之条件语SWITCH判断「理论篇」

    Switch在一些计算机语言中是保留字,其作用大多情况下是进行判断选择.以PHP来说,switch(开关语句)常和case break default一起使用 典型结构 switch($control ...

  9. dma访问主存时_DMA导致Cache数据一致性问题的原因及其解决方式(理论篇)

    点击上方公众号名称关注,获得更多内容 ✎ 编 者 悟 语 唉!嫉妒之心真的能俘获它所接触到的任何目标啊--"欲加之罪,何患无辞"呢?"觉悟高的"更有" ...

最新文章

  1. Cassandra 2016/00
  2. Unity3D基础API之Vector3
  3. 论文笔记:Group Equivariant Convolutional Networks
  4. python idle撤回上一条命令_找回Python IDLE Shell里的历史命令(用上下键翻历史命令怎么不好用了呢?)...
  5. dotNET Core WebAPI 统一处理(返回值、参数验证、异常)
  6. php获取域名方法,PHP实现获取域名的方法小结
  7. 成功入职阿里P7后 一个技术老哥总结了这几句话
  8. 时尚达人必备的潮流壁纸桌面!
  9. springcache使用笔记003_注释驱动的 Spring cache 基本原理,注意和限制,@CacheEvict 的可靠性问题
  10. 机器学习基础 | 有监督学习篇
  11. Ubuntu 汉化及kate汉化和使用自带终端的解决方式
  12. #浪潮之巅#苹果公司和乔布斯神话----有感
  13. access mysql odbc驱动程序_access odbc驱动下载|
  14. java库的使用--Failsafe
  15. word毕设论文制作——封面(一)
  16. Android平台png转jpg的实现
  17. Mask R-CNN学习笔记
  18. php+laravel框架七牛云存储+图片审核+文字审核
  19. linux centos ubuntu 网络图标消失的解决办法
  20. 倍福scaling factor参数的配置

热门文章

  1. pb调用键盘钩子的例子_搞不动Vue3.0的源码,先做个API调用师也行(新人踩坑初试)...
  2. 如何修改php的网页文件,php如何修改php文件内容
  3. Delphi 生日提醒的方法《LceMeaning》
  4. 对上一篇博客问题的回应
  5. 使用c++为node.js扩展模块
  6. Linux下能访问Nginx,本地无法访问
  7. linux 引导管理器,linux系统引导管理器GRUB
  8. Java面向对象——基础3 其他关键字
  9. 动态规划——最长回文子串(Leetcode 5)
  10. 【重要通知】数据技术嘉年华改为线上举办!拥抱元宇宙,我们这么玩儿!