课程大纲

课程内容

学习效果

掌握目标

SparkSQL简介

SparkSQL简介

了解

SparkSQL特点

SparkSQL编程

编程模型

掌握

API操作

掌握

SparkSQL函数

SparkSQL函数

掌握

SparkSQL运行架构

SparkSQL运行架构

掌握

一、SparkSQL简介

(一)SparkSQL简介

SparkSQL,顾名思义,就是Spark生态体系中的构建在SparkCore基础之上的一个基于SQL的计算模块。SparkSQL的前身不叫SparkSQL,而叫Shark,最开始的时候底层代码优化,sql的解析、执行引擎等等完全基于Hive,总之Shark的执行速度要比hive高出一个数量级,但是hive的发展制约了Shark,所以在15年中旬的时候,shark负责人,将shark项目结束掉,重新独立出来的一个项目,就是sparksql,不再依赖hive,做了独立的发展,逐渐的形成两条互相独立的业务:SparkSQL和Hive-On-Spark。在SparkSQL发展过程中,同时也吸收了Shark有些的特点:基于内存的列存储,动态字节码优化技术。

  • SparkSQL特点

Standard Connectivity:
    SparkSQL强大的功能的同时,为了方便一些BI组件的调用数据,也提供了支持JDBC/ODBC,使得对数据访问变得多元化,功能完整化,如下图1-4所示:

(三)总结

SparkSQL就是Spark生态体系中用于处理结构化数据的一个模块。结构化数据是什么?存储在关系型数据库中的数据,就是结构化数据;半结构化数据是什么?类似xml、json等的格式的数据被称之为半结构化数据;非结构化数据是什么?音频、视频、图片等为非结构化数据。

换句话说,SparkSQL处理的就是二维表数据。

二、SparkSQL编程入口和模型

(一)SparkSQL编程模型

1、编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

SQL

SQL不用多说,就和Hive操作一样,但是需要清楚一点的是,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。

同时支持,通用sql和hivesql。

DSL(DataFrame&DataSet)

在支持SQL编程的同时,方便大家使用函数式编程的思想,类似sparkcore的编程模式,sparksql也支持DSL(Domain Specified Language,领域专用语言,或者特定领域语言),即通过DataFrame和Dataset来支持类似RDD的编程。

​DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。

Dataset是在spark1.6.2开始出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。

​一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

2、RDD V.S. DataFrame V.S. Dataset

(1)RDD

弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法

从字面上就能看出的几个特点:

  • 弹性

数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换

RDD出错后可自动重新计算(通过血缘自动容错)

可checkpoint(设置检查点,用于容错),可persist或cache(缓存)里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整

  • 分布式:

RDD中的数据可存放在多个节点上

  • 数据集:

数据的集合,没啥好说的

相对于与DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)调整。

(2)DataFrame

DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)

假设RDD中的两行数据长这样,如图1-5所示。

从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。

有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。

不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

(3)Dataset

相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束,下图1-7是官网对于dataset的表述。

使用Dataset API的程序,会经过Spark SQL的优化器进行优化(优化器叫什么还记得吗?)

目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是不方便,这也是引入Dataset的一个重要原因。

(二)SparkSession

在SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。

三、SparkSQL基本编程

(一)、SparkSQL编程初体验

  1. SparkSession的构建
val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")//.enableHiveSupport()//支持hive的相关操作.getOrCreate()
object SparkSQLOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")
//                .enableHiveSupport()//支持hive的相关操作.getOrCreate()//加载数据val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")//二维表结构pdf.printSchema()//数据内容 select * from tblpdf.show()//具体的查询 select name, age from tblpdf.select("name", "age").show()import spark.implicits._//导入sparksession中的隐式转换操作,增强sql的功能pdf.select($"name",$"age").show()//列的运算,给每个人的年龄+10 select name, age+10,height-1 from tblpdf.select($"name",$"height" - 1, new Column("age").+(10)).show()//起别名  select name, age+10 as age,height-1  as height from tblpdf.select($"name",($"height" - 1).as("height"), new Column("age").+(10).as("age")).show()//做聚合统计 统计不同年龄的人数 select age, count(1) counts from tbl group by agepdf.select($"age").groupBy($"age").count().show()//条件查询 获取年龄超过18的用户  select * from tbl where age > 18
//        pdf.select("name", "age", "height").where($"age".>(18)).show()pdf.select("name", "age", "height").where("age > 18").show()//sql
//        pdf.registerTempTable()//在spark2.0之后处于维护状态,使用createOrReplaceTempView/*从使用范围上说,分为global和非globalglobal是当前SparkApplication中可用,非global只在当前SparkSession中可用从创建的角度上说,分为createOrReplace和不ReplacecreateOrReplace会覆盖之前的数据create不Replace,如果视图存在,会报错*/pdf.createOrReplaceTempView("people")spark.sql("""|select| age,| count(1) as countz|from people|group by age""".stripMargin).showspark.stop()}
}

(二)、SparkSQL编程模型的操作

1、DataFrame的构建方式

在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。

从Spark数据源进行创建:

package chapter1import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}object Create_DataFrame {def main(args: Array[String]): Unit = {//创建程序入口val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//从数据源创建DataFrameval personDF: DataFrame = spark.read.json("examples/src/main/resources/people.json")//展示数据personDF.show()}
}

从RDD进行转换:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object Create_DataFrame1 {def main(args: Array[String]): Unit = {//创建程序入口val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val file: RDD[String] = sc.textFile("E:\\offcn\\Spark\\SparkDay01\\资料\\data\\person.txt")//按照分隔符进行切分val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))//指定字段类型val personRDD: RDD[(Int, String, Int)] = spliFile.map(line=>(line(0).toInt,line(1),line(2).toInt))//调用toDF方法指定列名val personDF: DataFrame = personRDD.toDF("id","name","age")//展示数据personDF.show()//释放资源spark.stop()sc.stop()}
}

通过反射创建DataFrame:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}case class person(id:Int,name:String,age:Int)
object createDataFrame2 {def main(args: Array[String]): Unit = {//创建程序入口val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val file: RDD[String] = sc.textFile("E:\\offcn\\Spark\\SparkDay01\\资料\\data\\person.txt")//按照分隔符进行切分val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))//指定字段类型val personRDD: RDD[person] = spliFile.map(line=>person(line(0).toInt,line(1),line(2).toInt))//调用toDF方法指定列名val personDF: DataFrame = personRDD.toDF()//展示数据personDF.show()//释放资源spark.stop()sc.stop()}
}

动态编程

/*使用动态编程的方式构建DataFrameRow-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象*/
val row:RDD[Row] = spark.sparkContext.parallelize(List(Row(1, "李伟", 1, 180.0),Row(2, "汪松伟", 2, 179.0),Row(3, "常洪浩", 1, 183.0),Row(4, "麻宁娜", 0, 168.0)
))
//表对应的元数据信息
val schema = StructType(List(StructField("id", DataTypes.IntegerType, false),StructField("name", DataTypes.StringType, false),StructField("gender", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)
))
val df = spark.createDataFrame(row, schema)
df.printSchema()
df.show()

说明,这里学习三个新的类:

  • Row:代表的是二维表中的一行记录,或者就是一个Java对象
  • StructType:是该二维表的元数据信息,是StructField的集合
  • StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)
  • 总结:

这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。

2、Dataset的构建方式

//dataset的构建
object SparkSQLDatasetOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkSQLDataset").master("local[*]").getOrCreate()//dataset的构val list = List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29))import spark.implicits._val ds = spark.createDataset[Student](list)ds.printSchema()ds.show()spark.stop()}
}
case class Student(id:Int, name:String, gender:Int, age:Int)

而抽取出对应的元数据信息,否则编译无法通过。

3.RDD和DataFrame以及DataSet的互相转换

RDD--->DataFrame

def beanRDD2DataFrame(spark:SparkSession): Unit = {val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29)))val sdf =spark.createDataFrame(stuRDD, classOf[Student])sdf.printSchema()sdf.show()
}

RDD--->Dataset

Def rdd2Dataset(spark:SparkSession): Unit = {val stuRDD = spark.sparkContext.parallelize(List(Student(1, "王盛芃", 1, 19),Student(2, "李金宝", 1, 49),Student(3, "张海波", 1, 39),Student(4, "张文悦", 0, 29)))import spark.implicits._val ds:Dataset[Student] = spark.createDataset[Student](stuRDD)ds.show()
}
case class Student(id:Int, name:String, gender:Int, age:Int)

在RDD转换为DataFrame和Dataset的时候可以有更加简单的方式

import spark.implicits._
rdd.toDF()
rdd.toDS()

DataFrame--->RDD

val rdd:RDD[Row] = df.rdd
rdd.foreach(row => {val id = row.getInt(0)val name = row.getString(1)val gender = row.getInt(2)val height = row.getAs[Double]("height")println(s"id=${id},name=$name,gender=$gender,height=$height")
})

Dataset --->RDD

val stuDS: Dataset[Student] = list2Dataset(spark)
val stuRDD:RDD[Student] = stuDS.rdd
stuRDD.foreach(println)

Dataset--->DataFrame

val stuDS: Dataset[Student] = list2Dataset(spark)
//dataset --->dataframe
val df:DataFrame = stuDS.toDF()
df.show()

DataFrame--->Dataset

无法直接将DataFrame转化为Dataset,需要通过as方法添加泛型。

四、SparkSQLAPI

(一)、SparkSQL统一数据加载与落地

sparksql和外部数据集进行交互,使用统一的api入口。

1、数据加载

spark.read.format(数据文件格式).load(path)

这个方式有更加清晰的简写方式,比如要加载json格式的文件

spark.read.json(path)

默认加载的文件格式为parquet

def main(args: Array[String]): Unit = {//创建程序入口val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//导包import spark.implicits._
//第一种方式
//加载json文件
val personDF: DataFrame = spark.read.format("json").load("E:\\data\\people.json")
//加载parquet文件
val personDF1: DataFrame = spark.read.format("parquet").load("E:\\data\\people.parquet")
//加载csv文件,csv文件有些特殊,如果想要带上表头,必须调用option方法
val person2: DataFrame = spark.read.format("csv").option("header","true").load("E:\\data\\people.csv")
//加载数据库当中的表
val personDF3: DataFrame = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/bigdata").option("user", "root").option("password", "root").option("dbtable", "person").load()
//第二种方式
//加载json文件
val personDF4: DataFrame = spark.read.json("E:\\data\\people.json")
//加载parquet文件
val personDF5: DataFrame = spark.read.parquet("E:\\data\\people.parquet")
//加载csv文件,csv文件有些特殊,如果想要带上表头,必须调用option方法
val person6: DataFrame = spark.read.option("header","true").csv("E:\\data\\people.csv")
//加载数据库当中的表
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
val personDF7: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata", "person", properties)

2、数据落地

SparkSQL对数据的落地保存使用api为:spark.write.save(),需要指定数据的落地格式,因为和read的默认格式一样,save的默认格式也是parquet,需要在write和save之间指定具体的格式format(format)

同样也有简写方式:spark.write.json/parquet等等

def main(args: Array[String]): Unit = {//创建sparksql程序入口val spark: SparkSession = SparkSession.builder().appName("demo").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载文件val personDF: DataFrame = spark.read.json("E:\\data\\people.json")//第一种方式//保存为json文件personDF.write.format("json").save("E:\\data\\json")//保存为parquet文件personDF.write.format("parquet").save("E:\\data\\parquet")//保存为csv文件,想要带上表头,调用option方法personDF.write.format("csv").option("header","true").save("E:\\data\\csv")//保存为数据库当中的表personDF.write.format("jdbc").option("url", "jdbc:mysql://localhost:3306/bigdata").option("user", "root").option("password", "root").option("dbtable", "person").save()}
}
 //第二种方式
//保存为parque文件
personDF.write.parquet("E:\\data\\parquet")
//保存为csv文件
personDF.write.option("header", "true").csv("E:\\data\\csv")
//保存为json文件
personDF.write.format("json").save("E:\\data\\json")
//保存为数据库的表
val props = new Properties()
props.put("user","root")
props.put("password","root")
personDF.write.jdbc("jdbc:mysql://localhost:3306/bigdata","person",props)

3、文件保存选项

可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:

Scala/Java

Any Language

Meaning

SaveMode.ErrorIfExists(default)

"error"(default)

如果文件存在,则报错

SaveMode.Append

"append"

追加

SaveMode.Overwrite

"overwrite"

覆写

SaveMode.Ignore

"ignore"

数据存在,则忽略保存操作

Day69_SparkSQL(一)相关推荐

最新文章

  1. @bean注解和@component注解的区别_阿里面试题一:spring里面使用xml配置和注解配置区别...
  2. git stash的用法
  3. 计算机软件 教案,计算机软件系统教案
  4. 更轻易地实现 Jwt Token
  5. php getdefaultvalue,PHP ReflectionParameter getDefaultValueConstantName()用法及代码示例
  6. java线程wait()使一个线程一直运行,一直提供服务
  7. 一双木棋chess[九省联考2018]
  8. 荣耀总裁赵明:如果开机很快 也就没有必要给消费者看任何广告了
  9. 用for循环打印出大写字母的ASCII码对照表
  10. OA办公系统如何实现最佳界面效果
  11. 精简win服务器系统,My Server之管理Win Server 2012精简版
  12. SSD、Retinanet、RefineDet、CornerNet、ExtremeNet、CenterNet、FSAF、FCOS、FoveaBox相对于yolo的区别
  13. 韦恩图——你学会了吗?
  14. 张小娴说男人不如一条狗,现在连市场分析师也说男人不如一条狗
  15. 四川省内江市启动公共视频监控三期建设
  16. 数字化的下一个目标,就是产业链|数字思考者50人
  17. Java桌面程序打包全过程
  18. Android 获取当前系统语言和切换系统语言
  19. set,setenv和export
  20. 无法运行Hi3516CV-DEMB-uboot-DDR...-BUS_266M.xsl宏。可能是因为该宏在此工作簿中不可用,或者所有的宏都被禁用。

热门文章

  1. 回顾历史,见证精彩|PostgresConf.CN2019大会三大分论坛
  2. 微信小程序携带token请求
  3. c语言拆字程序2000h单元,《C51运算符》PPT课件.ppt
  4. OneNav简约PHP导航源码
  5. citespace教程
  6. oracle ogg重启mgr,OGG的常见运维任务指南
  7. 优化/提高modelsim的仿真速度
  8. 墨者学院-phpMyAdmin后台文件包含分析溯源
  9. 2022-05-18 牛客网每日选择题--前端
  10. 富友eERP打造服装企业电子商务快鱼时代