Spark SQL

  • 一、Spark SQL基础知识
    • 1、Spark SQL简介
      • (1)简单介绍
      • (2)Datasets & DataFrames
      • (3)Spark SQL架构
      • (4)Spark SQL的特点
  • 二、入门案例
    • 1、案例代码
    • 2、SparkSession
    • 3、代码分析
      • (1)创建spark Session
      • (2)创建Dataset
        • 样例类创建
        • Tuple创建
        • JSON创建
        • RDD创建
      • (3)创建Dataframe
        • Json创建
        • 样例类创建
        • Tuple创建
        • RDD创建
  • 三、SQL操作(常用的操作方法)
    • printSchema()
    • show()
    • Select()
    • SelectExpr()
    • withColumn()
    • withColumnRenamed()
    • Drop()
    • DropDuplicates()
    • OrderBy()| Sort()
    • GroupBy ()
    • Agg()
    • Limit()
    • Where()
    • Pivot() 【透视】
    • na()
    • over()
    • Join()
    • cube(多维度)
    • DataSet操作(typed)
  • 四、DataFrame纯SQL操作
    • 单行查询
    • 模糊查询
    • 排序查询
    • 分组查询
    • Limit
    • having 分组后过滤
    • Case … when 语句
    • Pivot(行转列)
    • Cube(多维度分组)
    • Join表连接查询
    • 子查询
    • 窗口函数
  • 五、Spark SQL的自定义函数
    • 单行函数
    • 多行函数
  • 六、Spark SQL的数据导入导出
    • JSON
    • Paquet
    • ORC
    • CSV
    • JDBC
    • DF转RDD

spark系列
Spark运行架构(一)
Spark SQL原理及常用方法详解(二)
Spark性能优化指南——基础篇(三)
Spark性能优化指南——高级篇(四)
spark与flink的区别(五)
Spark 为什么比 Hadoop 快(六)


一、Spark SQL基础知识

1、Spark SQL简介

(1)简单介绍


Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

http://spark.apache.org/sql/

为什么要学习Spark SQL?我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。

SparkSQL在RDD之上抽象出来Dataset/Dataframe
这两个类提供了类似RDD的功能,也就意味用户可以使用map、flatMap、filter等高阶算子,同时也通过了基于列的命名查询,也就是说Dataset/DataFrame提供了两套操作数据的API,这些API可以给Saprk引擎要提供更多信息,系统可以根据这些信息对计算实现一定的优化。
目前Spark SQL提供了两种交互方式: 1) SQL 脚本 2) Dataset API(strong-typed类型、untyped类型操作)

官方文档

(2)Datasets & DataFrames

Dataset是一个分布式数据集,Dataset是在spark-1.6提出新的API,该API构建在RDD(strong type,使用lambda表达式)之上同时可以借助于Spark SQL对执行引擎的优点,使得使用Dateset执行一些数据的转换比直接使用RDD算子功能和性能都有所提升。另外Python不支持DataSet API。

DataFrames 是Dataset的一种特殊情况。比如Dataset中可以存储任意对象类型的数据作为Dataset的元素。但是Dataframe的元素只有一种类型Row类型,这种基于Row查询和传统数据库中ResultSet操作极其相似。因为Row类型的数据表示Dataframe的一个元素,类似数据库中的一行,这些行中的元素可以通过下标或者column name访问。由于Dateset是API的兼容或者支持度上不是多么的好,但是Dataframe在API层面支持的Scala、Java、R、Python支持比较全面。

从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

(3)Spark SQL架构


语言API
Spark与不同的语言和Spark SQL兼容。 它也是由这些语言支持的API(python,scala,java,HiveQL)。

模式RDD
Spark Core是使用称为RDD的特殊数据结构设计的。 通常,Spark SQL适用于模式,表和记录。 因此,我们可以使用Schema RDD作为临时表。 我们可以将此Schema RDD称为数据帧。

数据源
通常spark-core的数据源是文本文件,Avro文件等。但是,Spark SQL的数据源不同。 这些是Parquet文件,JSON文档,HIVE表和Cassandra数据库。

(4)Spark SQL的特点

1.容易整合(集成)
将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。
2.统一的数据访问方式
以相同的方式连接到任何数据源。
3.兼容Hive
4.标准的数据连接
5.可扩展性
对于交互式查询和长查询使用相同的引擎。 Spark SQL利用RDD模型来支持中查询容错,使其能够扩展到大型作业。不要担心为历史数据使用不同的引擎。

二、入门案例

运用scala编写

1、案例代码

导入开发依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.4</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.4</version>
</dependency>

程序代码

import org.apache.spark.sql.SparkSession/*** 第一个spark sql的单词计数案例**/
object SparkSQLWordCount {def main(args: Array[String]): Unit = {// 1. sparkSession是spark sql应用入口,内部封装了sparkconf和sparkContextval spark = SparkSession.builder().appName("the first spark sql example").master("local[*]").getOrCreate()// 2. 创建Datasetval rdd = spark.sparkContext.makeRDD(List("Hello Hadoop", "Hello Scala")).flatMap(_.split(" ")).map((_, 1))// 3. rdd ==> Dataset// 4. 导入隐式转换import spark.implicits._//将RDD转换成DataFramesval dataset = rdd.toDS // (Hello 1) | (Hadoop 1)// 5. 对dataset进行sql处理dataset.where("_1 !='Scala'").groupBy("_1").sum("_2").withColumnRenamed("_1", "word").withColumnRenamed("sum(_2)", "num").show()/*+------+-------+|    _1|sum(_2)|+------+-------+| Hello|      2|| Scala|      1||Hadoop|      1|+------+-------+*///------------------------------------------------------------------------------val dataFrame = dataset.toDF()dataFrame.createOrReplaceTempView("t_word")dataFrame.sqlContext.sql("select _1 as word, count(_2) as num from t_word group by _1").show()/*+------+---+|  word|num|+------+---+| Hello|  2|| Scala|  1||Hadoop|  1|+------+---+*/// 6. 关闭spark sql应用spark.stop()}
}

2、SparkSession

Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。

在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。

3、代码分析

(1)创建spark Session

    val spark = SparkSession.builder()//创建SparkSession,给该进程起个名字.appName("the first spark sql example")//设定进程使用计算机核数,*代表当前所有的空隙cpu.master("local[*]")//有就获取,没有就创建.getOrCreate()

(2)创建Dataset

val rdd = spark.sparkContext.makeRDD(List("Hello Hadoop", "Hello Scala")).flatMap(_.split(" ")).map((_, 1))

可以自己创建要用到的数据,也可以通过其他方式来获取数据

样例类创建

// 通过样例类创建dataset
val userList = List(User(1, "zs", true), User(2, "ls", false), User(3, "ww", true))
import spark.implicits._
val dataset = userList.toDS // 通过样例类构建的dataset字段名是样例类属性名
dataset.show()//-----------------------------------------------
+---+----+-----+
| id|name|  sex|
+---+----+-----+
|  1|  zs| true|
|  2|  ls|false|
|  3|  ww| true|
+---+----+-----+

Tuple创建

// 通过元组创建dataset
val tupleList = List((1, "zs", true), (2, "ls", false), (3, "ww", true))
import spark.implicits._
val dataset = tupleList.toDS
dataset.show()
//-----------------------------------------------
+---+---+-----+
| _1| _2|   _3|
+---+---+-----+
|  1| zs| true|
|  2| ls|false|
|  3| ww| true|
+---+---+-----+

JSON创建

// 通过JSON创建
val dataset = spark.read.json("file:///G:\\IDEA_WorkSpace\\scala-workspace\\spark-day7\\src\\main\\resources").as("user")
//-----------------------------------------------
+---+----+-----+
| id|name|  sex|
+---+----+-----+
|  4| zs2| true|
|  5| ls2|false|
|  6| ww2| true|
|  1|  zs| true|
|  2|  ls|false|
|  3|  ww| true|
+---+----+-----+

RDD创建

// 通过RDD【元组集合】创建
val rdd = spark.sparkContext.makeRDD(List((1, "zs", true), (2, "ls", false), (3, "ww", true)))
import spark.implicits._
val dataset = rdd.toDS
dataset.show()// 通过RDD【样例类集合】创建
val rdd = spark.sparkContext.makeRDD(List(User(1, "zs", true), User(2, "ls", false), User(3, "ww", true)))
import spark.implicits._
val dataset = rdd.toDS
dataset.show()

(3)创建Dataframe

DataFrame是一个命名列的数据集,用户可以直接操作column 因此几乎所有Dataframe推荐操作都是 无类型操作 。用户也可以把一个Dataframe看做是 Dataset[Row] 类型的数据集。

Json创建

// 通过JSON创建
val df = spark.read.json("file:///G:\\IDEA_WorkSpace\\scala-workspace\\spark-day7\\src\\main\\resources")
df.printSchema()  // df结构
df.show()
//-------------------------------------------------------------------
root|-- id: long (nullable = true)|-- name: string (nullable = true)|-- sex: boolean (nullable = true)+---+----+-----+
| id|name|  sex|
+---+----+-----+
|  4| zs2| true|
|  5| ls2|false|
|  6| ww2| true|
|  1|  zs| true|
|  2|  ls|false|
|  3|  ww| true|
+---+----+-----+

样例类创建

val userDF=List(User2(1,"zs",true)).toDF()
userDF.show()
//--------------------------------------------------------------------
+---+----+----+
| id|name| sex|
+---+----+----+
|  1|  zs|true|
+---+----+----+

Tuple创建

var userDF=List((1,"zs",true)).toDF("id","name","sex")
userDF.show()
//--------------------------------------------------------------------
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| zs|true|
+---+----+----+

RDD创建

// RDD【元组】转换创建
val rdd = spark.sparkContext.makeRDD(List((1,"zs"),(2,"ls")))
val df = rdd.toDF("id","name")
df.printSchema()
df.show()
//--------------------------------------------------------------------
root|-- id: integer (nullable = false)|-- name: string (nullable = true)+---+----+
| id|name|
+---+----+
|  1|  zs|
|  2|  ls|
+---+----+// RDD【样例类】转换创建
var userDF= spark.sparkContext.parallelize(List(User(1,"zs",true))).toDF("id","uname","sex")
userDF.show()
//--------------------------------------------------------------------
+---+-----+----+
| id|uname| sex|
+---+-----+----+
| 1 | zs  |true|
+---+-----+----+// 通过RDD[Row]创建DF
val rdd = spark.sparkContext.parallelize(List((1, "zs", true), (2, "ls", false))).map(t => Row(t._1, t._2, t._3))
val schema = new StructType().add("id", IntegerType).add("name", StringType).add("sex", BooleanType)
val df = spark.createDataFrame(rdd, schema)
df.show()
spark.stop()
//--------------------------------------------------------------------
+---+----+-----+
| id|name|  sex|
+---+----+-----+
|  1|  zs| true|
|  2|  ls|false|
+---+----+-----+// 通过JAVA Bean创建DF
val personList = List(new Person(1, "zs"), new Person(2, "ls"))
val rdd = spark.sparkContext.makeRDD(personList)
val df = spark.createDataFrame(rdd, classOf[Person])
df.show()
//--------------------------------------------------------------------
+---+----+
| id|name|
+---+----+
|  1|  zs|
|  2|  ls|
+---+----+// 通过RDD[case class]创建DF
case class ScalaUser(id:Int,name:String,sex:Boolean)
var userRDD:RDD[ScalaUser]=spark.sparkContext.makeRDD(List(ScalaUser(1,"zs",true))) var userDF=spark.createDataFrame(userRDD) userDF.show()
//--------------------------------------------------------------------
+---+----+----+
| id|name| sex|
+---+----+----+
| 1 | zs |true|
+---+----+----+// 通过RDD[Tuple] 创建DF
var userRDD:RDD[(Int,String,Boolean)]=spark.sparkContext.makeRDD(List((1,"zs",true)))
var userDF=spark.createDataFrame(userRDD)
userDF.show()
//--------------------------------------------------------------------
+---+---+----+
| _1| _2| _3|
+---+---+----+
| 1| zs|true|
+---+---+----+

三、SQL操作(常用的操作方法)

DataFrame操作(untyped)

printSchema()

object DataframeOperationTest {def main(args: Array[String]): Unit = {val sparkSql = SparkSession.builder().appName("df operation").master("local[*]").getOrCreate()import sparkSql.implicits._val rdd = sparkSql.sparkContext.makeRDD(List((1,"zs",1000.0,true),(2,"ls",2000.0,false),(3,"ww",3000.0,false)))val df = rdd.toDF("id","name","salary","sex")// 打印df的结构df.printSchema()sparkSql.stop()}
}//-----------------------------------------------------------------------------
root|-- id: integer (nullable = false)|-- name: string (nullable = true)|-- salary: double (nullable = false)|-- sex: boolean (nullable = false)

show()

// 默认输出df中前20行数据
df.show()
//-----------------------------------------------------------------------------
+---+----+------+-----+
| id|name|salary|  sex|
+---+----+------+-----+
|  1|  zs|1000.0| true|
|  2|  ls|2000.0|false|
|  3|  ww|3000.0|false|
+---+----+------+-----+

Select()

// 查询指定的字段
// df.select("id","name","sex").show()
// $是另外一种写法[隐式转换] 字符串列名==>Column对象
df.select($"id",$"name",$"sex").show()
//-----------------------------------------------------------------------------
+---+----+-----+
| id|name|  sex|
+---+----+-----+
|  1|  zs| true|
|  2|  ls|false|
|  3|  ww|false|
+---+----+-----+

SelectExpr()

// 查询指定字段【表达式】
// df.selectExpr("name as username").show()
// df.selectExpr("sum(salary)").show()
df.selectExpr("id","name as username","salary","salary*12").show()//-----------------------------------------------------------------------------
+---+--------+------+-------------+
| id|username|salary|(salary * 12)|
+---+--------+------+-------------+
|  1|      zs|1000.0|      12000.0|
|  2|      ls|2000.0|      24000.0|
|  3|      ww|3000.0|      36000.0|
+---+--------+------+-------------+

withColumn()

// 添加或者替换【列名相同】字段
df.select($"id",$"name",$"salary")// .withColumn("year_salary",$"salary"*12) // 添加列.withColumn("salary",$"salary"*12) // 替换已存在的列.show()//-----------------------------------------------------------------------------
+---+----+-------+
| id|name| salary|
+---+----+-------+
|  1|  zs|12000.0|
|  2|  ls|24000.0|
|  3|  ww|36000.0|
+---+----+-------+

withColumnRenamed()

df.select($"id", $"name", $"salary")// .withColumn("year_salary",$"salary"*12) // 添加列.withColumn("salary", $"salary" * 12) // 替换已存在的列.withColumnRenamed("name","username").withColumnRenamed("id","user_id").show()//-----------------------------------------------------------------------------
+-------+--------+-------+
|user_id|username| salary|
+-------+--------+-------+
|      1|      zs|12000.0|
|      2|      ls|24000.0|
|      3|      ww|36000.0|
+-------+--------+-------+

Drop()

df.select($"id", $"name", $"salary")// .withColumn("year_salary",$"salary"*12) // 添加列.withColumn("salary", $"salary" * 12) // 替换已存在的列.withColumnRenamed("name", "username").withColumnRenamed("id", "user_id").drop($"username").show()
//-----------------------------------------------------------------------------+-------+-------+
|user_id| salary|
+-------+-------+
|      1|12000.0|
|      2|24000.0|
|      3|36000.0|
+-------+-------+

DropDuplicates()

// 删除重复数据 DropDuplicates  类似于数据库中distinct【重复数据只保留一个】
val df2 = sparkSql.sparkContext.makeRDD(List((1, "zs", 1000.0, true), (2, "ls", 2000.0, false), (3, "ww", 2000.0, false),(4, "zl", 2000.0, false))).toDF("id","name","salary","sex")
df2.select($"id", $"name", $"salary",$"sex")
// .withColumn("year_salary",$"salary"*12) // 添加列.withColumn("salary", $"salary" * 12) // 替换已存在的列.withColumnRenamed("name", "username").withColumnRenamed("id", "user_id").dropDuplicates("salary","sex").show()//-----------------------------------------------------------------------------
+-------+--------+-------+-----+
|user_id|username| salary|  sex|
+-------+--------+-------+-----+
|      2|      ls|24000.0|false|
|      1|      zs|12000.0| true|
+-------+--------+-------+-----+

OrderBy()| Sort()

// 排序OrderBy()| Sort()
df.select($"id", $"name", $"salary", $"sex")//.orderBy($"salary" desc)//.orderBy($"salary" asc)//.orderBy($"salary" asc,$"id" asc).sort($"salary" desc)  // 等价于OrderBy.show()//-----------------------------------------------------------------------------
+---+----+------+-----+
| id|name|salary|  sex|
+---+----+------+-----+
|  3|  ww|3000.0|false|
|  2|  ls|2000.0|false|
|  1|  zs|1000.0| true|
+---+----+------+-----+

GroupBy ()

// 分组groupBy()
df.groupBy($"sex").sum("salary").show()
//-----------------------------------------------------------------------------
+-----+-----------+
|  sex|sum(salary)|
+-----+-----------+
| true|     1000.0|
|false|     5000.0|
+-----+-----------+

Agg()

// agg 聚合操作
var df3 = List((1, "zs", true, 1, 15000),(2, "ls", false, 2, 18000),(3, "ww", false, 2, 14000),(4, "zl", false, 1, 18000),(4, "zl", false, 1, 16000))
.toDF("id", "name", "sex", "dept", "salary")
import org.apache.spark.sql.functions._
df3.groupBy("sex")// .agg(max("salary"), min("salary"), avg("salary"), sum("salary"), count("salary")).agg(Map(("salary", "max"))) // 另外的一种写法【局限性 只支持单个字段的聚合查询】.show()
//-----------------------------------------------------------------------------
+-----+-------------+
|  sex|count(salary)|
+-----+-------------+
| true|            1|
|false|            4|
+-----+-------------+

Limit()

// limit 限制返回的结果条数
df.limit(2).show()
//-----------------------------------------------------------------------------
+---+----+------+-----+
| id|name|salary|  sex|
+---+----+------+-----+
|  1|  zs|1000.0| true|
|  2|  ls|2000.0|false|
+---+----+------+-----+

Where()

val df4=List((1,"zs",true,1,15000),(2,"ls",false,2,18000),(3,"ww",false,2,14000),(4,"zl",false,1,18000),(5,"win7",false,1,16000)).toDF("id","name","sex","dept","salary")df4.select($"id",$"name",$"sex",$"dept",$"salary")//where("(name like '%s%' and salary > 15000) or name = 'win7'").where(($"name" like "%s%" and $"salary" > 15000) or $"name" ==="win7" ).show()
//--------------------------------------------------------------------------------------
+---+----+-----+----+------+
| id|name|  sex|dept|salary|
+---+----+-----+----+------+
|  2|  ls|false|   2| 18000|
|  5|win7|false|   1| 16000|
+---+----+-----+----+------+

Pivot() 【透视】

var scoreDF=List((1,"math",85),(1,"chinese",80),(1,"english",90), (2,"math",90), (2,"chinese",80)).toDF("id","course","score")scoreDF.groupBy($"id").pivot($"course")  // 行转列【重点】.max("score").show()//--------------------------------------------------------------------------------------
+---+-------+-------+----+
| id|chinese|english|math|
+---+-------+-------+----+
|  1|     80|     90|  85|
|  2|     80|   null|  90|
+---+-------+-------+----+

na()

对空值的一种处理方式
na().fill 填充 null赋予默认值
na().drop 删除为null的一行内容

scoreDF.groupBy($"id").pivot($"course") // 行转列【重点】.max("score")//.na.fill(Map("english" -> 59))  // 为空值赋予一个默认值.na.drop()  // 删除包含空值的一行记录.show()
//--------------------------------------------------------------------------------------
+---+-------+-------+----+
| id|chinese|english|math|
+---+-------+-------+----+
|  1|     80|     90|  85|
+---+-------+-------+----+

over()

窗口函数:聚合函数
排名函数
分析函数
作用: 窗口函数使用over,对一组数据进行操作,返回普通列和聚合列val w1 = Window​ .partitionBy(“分区规则”)​ .orderBy($“列” asc| desc)​ .rangeBetween | rowsBetween窗口函数名 over(w1)
t_userid    name   salary   sex   dept1     zs     1000     true   12     ls  2000     false  23     ww   2000     false  1// 查询用户信息(id,name,salary,用户所在部门的平均工资)SQL: select id,name,salary,(select avg(salary) from t_user group by dept) as avg_salary from t_userid   name salaray  avg_salary1    zs   1000     15002    ls   2000     20003    ww   2000     1500spark sql 窗口函数 简化如上查询语义: select id,name,salary,avg(salary) over(partition by dept order by ...) from t_user具体使用方法:
count(...) over(partition by ... order by ...) --求分组后的总数。
sum(...) over(partition by ... order by ...)   --求分组后的和。
max(...) over(partition by ... order by ...)--求分组后的最大值。
min(...) over(partition by ... order by ...)--求分组后的最小值。
avg(...) over(partition by ... order by ...)--求分组后的平均值。
rank() over(partition by ... order by ...)--rank值可能是不连续的。
dense_rank() over(partition by ... order by ...)--rank值是连续的。
first_value(...) over(partition by ... order by ...)--求分组内的第一个值。
last_value(...) over(partition by ... order by ...)--求分组内的最后一个值。
lag() over(partition by ... order by ...)--取出前n行数据。  
lead() over(partition by ... order by ...)--取出后n行数据。
ratio_to_report() over(partition by ... order by ...)--Ratio_to_report() 括号中就是分子,over() 括号中就是分母。
percent_rank() over(partition by ... order by ...)

Join()

val userInfoDF= sparkSql.sparkContext.makeRDD(List((1,"zs"),(2,"ls"),(3,"ww"))).toDF("id","name")
val orderInfoDF= sparkSql.sparkContext.makeRDD(List((1,"iphone",1000,1),(2,"mi9",999,1),(3,"连衣裙",99,2))).toDF("oid","product","price","uid")// join DF连接操作
userInfoDF.join(orderInfoDF,$"id"===$"uid","inner").show()
//-----------------------------------------------------------------
+---+----+---+-------+-----+---+
| id|name|oid|product|price|uid|
+---+----+---+-------+-----+---+
|  1|  zs|  1| iphone| 1000|  1|
|  1|  zs|  2|    mi9|  999|  1|
|  2|  ls|  3| 连衣裙|   99|  2|
+---+----+---+-------+-----+---+

cube(多维度)

cube多维度查询 尝试根据多个分组可能继续数据查询cube(A,B)​ group by A null​ group by null B​ group by null null​ group by AB
import org.apache.spark.sql.functions._
List((110,50,80,80),(120,60,95,75),(120,50,96,70))
.toDF("height","weight","IQ","EQ")
.cube($"height",$"weight")  // spark sql尝试根据元组第一个和第二个值 进行各种可能分组操作,这种操作的好处,如果以后有任何第一个和第二个值的分区操作,都将出现在cube的结果表中
.agg(avg("IQ"),avg("EQ")) .show()

DataSet操作(typed)

在实际开发中,我们通常使用的是DataFrame API,这种Dataset强类型的操作几乎不使用

package com.baizhi.sql.operation.typedimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.scalalang.typed/*** ds 强类型操作** spark context :rdd* spark streaming context : streaming* spark session : sql*/
object DatasetWithTypedOpt {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("typed opt").getOrCreate()spark.sparkContext.setLogLevel("ERROR")import spark.implicits._val ds = spark.sparkContext.makeRDD(List("Hello Hadoop", "Hello Scala")).flatMap(_.split(" ")).map((_, 1)).toDSds.groupByKey(t => t._1) // 根据单词进行分组操作.agg(typed.sum[(String, Int)](t => t._2)) // 对单词初始值进行聚合.withColumnRenamed("TypedSumDouble(scala.Tuple2)", "num").show()spark.stop()}
}

四、DataFrame纯SQL操作

df.createGlobalTempView()  //  对DF创建全局的临时视图,它产生的表,可以多个spark session共享,它的生命周期和spark application绑定
df.createTempView()  // 对DF创建局部的临时视图,它产生的表,仅供创建spark session使用,其它的spark session无法获取

单行查询

package com.baizhi.sqlimport org.apache.spark.sql.SparkSessionobject DataFrameSqlOpt {def main(args: Array[String]): Unit = {// 1. sparkSession是spark sql应用入口,内部封装了sparkconf和sparkContextval spark = SparkSession.builder().appName("the first spark sql example").master("local[*]").getOrCreate()// 2. 创建Datasetval rdd = spark.sparkContext.makeRDD(List("Hello Hadoop", "Hello Scala")).flatMap(_.split(" ")).map((_, 1))import spark.implicits._val df = rdd.toDF("word", "num")// 给df起了表名 如果是全局表的话,在访问的时候需要加数据库名【】// df.createGlobalTempView("t_user") //  对DF创建全局的临时视图,它产生的表,可以多个spark session共享,它的生命周期和spark application绑定df.createTempView("t_user") // 对DF创建局部的临时视图,它产生的表,仅供创建spark session使用,其它的spark session无法获取// 再创建一个session,请问是否能够使用全局表? 正确//    val newSparkSession = spark.newSession()//    spark.sql("select * from global_temp.t_user").show()//    newSparkSession.sql("select * from global_temp.t_user").show()// 再创建一个session,请问是否能够使用局部临时表? 错误val newSparkSession = spark.newSession()spark.sql("select * from t_user").show()newSparkSession.sql("select * from t_user").show()spark.stop()}
}

模糊查询

import spark.implicits._
val userDF = List((1, "zs", true, 18, 15000, 1), (2, "ls", false, 19, 15000, 1)).toDF("id", "name", "sex", "age", "salary", "dept")userDF.createTempView("t_user")spark.sql("select * from t_user where name like '%z%' and age > 18").show()

排序查询

// 排序查询
spark.sql(// 自动将"""引起的内容 进行字符串拼接"""select*from t_userorder by id desc""").show()

分组查询

spark.sql("""selectsex,avg(salary)as avg_salaryfromt_usergroupby sex""").show()
//---------------------------------------------------------------------------+-----+----------+
|  sex|avg_salary|
+-----+----------+
| true|   15000.0|
|false|   15000.0|
+-----+----------+

Limit

// 分组查询  统计男和女的平均工资
spark.sql("""selectsex,avg(salary)as avg_salaryfromt_usergroupby sexlimit 1""").show()
//---------------------------------------------------------------------------
+----+----------+
| sex|avg_salary|
+----+----------+
|true|   15000.0|
+----+----------+

having 分组后过滤

spark.sql("""selectsex,avg(salary)as avg_salaryfromt_usergroupby sexhavingsex = true""").show()
//---------------------------------------------------------------------------
+----+----------+
| sex|avg_salary|
+----+----------+
|true|   15000.0|
+----+----------+

Case … when 语句

spark.sql("""| select|   id,name,salary,age,|     case sex|       when true|         then '男'|       when false|         then '女'|       else|         '中性'|     end|   as newSex| from|   t_user""".stripMargin).show()//----------------------------------------------------------------------------
+---+----+------+---+------+
| id|name|salary|age|newSex|
+---+----+------+---+------+
|  1|  zs| 15000| 18|    男|
|  2|  ls| 15000| 19|    女|
|  3|  ww| 18000| 19|    女|
+---+----+------+---+------+

Pivot(行转列)

// pivot
val scoreDF=List((1, "语文", 100),(1, "数学", 100),(1, "英语", 100),(2, "数学", 79),(2, "语文", 80),(2, "英语", 100),(2, "英语", 120))
.toDF("id","course","score")
scoreDF.createOrReplaceTempView("t_course")// 注意: 缺省的列会作为分组的依据
spark.sql("""| select|   *| from|   t_course| pivot(max(score) for course in('语文','数学','英语'))|""".stripMargin).show()//----------------------------------------------------------------------------
+---+----+----+----+
| id|语文|数学|英语|
+---+----+----+----+
|  1| 100| 100| 100|
|  2|  80|  79| 120|
+---+----+----+----+

Cube(多维度分组)

// cube (A,B)//    A null//    null B//    A Bval df2 = List((110, 50, 80, 80),(120, 60, 95, 75),(120, 50, 96, 70)).toDF("height", "weight", "uiq", "ueq")df2.createTempView("tt_user")spark.sql("""| select|   height,uiq,avg(uiq)| from|   tt_user| group by|   cube(height,uiq)""".stripMargin).show()//-----------------------------------------------------------------
+------+----+-----------------+
|height| uiq|         avg(uiq)|
+------+----+-----------------+
|   120|null|             95.5|
|  null|  80|             80.0|
|  null|null|90.33333333333333|
|  null|  95|             95.0|
|   120|  95|             95.0|
|   110|null|             80.0|
|   110|  80|             80.0|
|   120|  96|             96.0|
|  null|  96|             96.0|
+------+----+-----------------+

Join表连接查询

// join
val userInfoDF = spark.sparkContext.makeRDD(List((1, "zs"), (2, "ls"), (3, "ww"))).toDF("id", "name")
val orderInfoDF = spark.sparkContext.makeRDD(List((1, "iphone", 1000, 1), (2, "mi9", 999, 1), (3, "连衣裙", 99, 2))).toDF("oid", "product", "price", "uid")userInfoDF.createTempView("ttt_user")
orderInfoDF.createTempView("t_order")// inner  left_outer  right_outer full  cross
spark.sql("""| select|   *| from|   ttt_user t1| inner join|   t_order t2| on|   t1.id = t2.uid""".stripMargin).show()

子查询

// 子查询val df =List((1, "zs", true, 1, 15000),(2, "ls", false, 2, 18000),(3, "ww", false, 2, 14000),(4, "zl", false, 1, 18000),(5, "win7", false, 1, 16000)).toDF("id", "name", "sex", "dept", "salary")df.createTempView("t_employee")spark.sql("""selectid,name,sex,deptfrom (select * from t_employee)""".stripMargin).show()//-----------------------------------------------------
+---+----+-----+----+
| id|name|  sex|dept|
+---+----+-----+----+
|  1|  zs| true|   1|
|  2|  ls|false|   2|
|  3|  ww|false|   2|
|  4|  zl|false|   1|
|  5|win7|false|   1|
+---+----+-----+----+

窗口函数

在正常的统计分析中 ,通常使用聚合函数作为分析,聚合分析函数的特点是将n行记录合并成一行,在数据库的统计当中
还有一种统计称为开窗统计,开窗函数可以实现将一行变成多行。可以将数据库查询的每一条记录比作是一幢高楼的一层, 开窗函数就是在每一层开一扇窗,
让每一层能看到整装楼的全貌或一部分。

语法:

​ 窗口函数名() over([partition by 分区字段] order by 字段 asc | desc [range |
rows between unbounded preceding and unbounded following] )

// 创建DFval df = List((1, "zs", true, 1, 15000),(2, "ls", false, 2, 18000),(3, "ww", false, 2, 14000),(4, "zl", false, 1, 18000),(5, "win7", false, 1, 16000)).toDF("id", "name", "sex", "dept", "salary")df.createTempView("t_employee")//    窗口函数名() over([partition by 分区字段] order by 字段 asc | desc [range | rows between unbounded preceding and unbounded following] )spark.sql("""| select|   id,name,sex,dept,salary,|   sum(id) over(partition by dept order by salary rows between unbounded preceding and unbounded following) as sum_id,|   sum(id) over(partition by dept order by salary) as sum_id2,|   sum(id) over() as sum_id3,|   sum(id) over(partition by dept order by salary rows between 1 preceding and 1 following) as sum_id4,|   sum(id) over(partition by dept order by salary range between 1000 preceding and 2000 following) as sum_id5,|   row_number() over(partition by dept order by salary) as rn,|   rank(salary) over(partition by dept order by salary) as salary_rank,|   dense_rank(salary) over(partition by dept order by salary asc) as salary_rank2,|   lag(salary,2) over(partition by dept order by salary asc) as lag2| from|   t_employee""".stripMargin).show()// 第一个表示 以任意的一行数据为基准都可以看到窗口的所有数据// 第二个表示 没有加任何的数据可视范围,使用默认的数据可视范围 rowsBetween[Long.min_value,0]// 第三个表示 over没有声明任何的窗口函数内容,则在每行显示整张表的聚合结果  // agg = 15// 第四个表示 以当前行为基准 上一行和下一行 rowsBetween[-1,1]// 第五个表示 数据可视范围区间 range between[当前数据排序字段为基准-下界,当前数据排序字段为基准+上界]// 第六个表示 排序窗口函数 row_number() 给窗口的数据添加一个序号 类似与Oracle伪列rownum// 第七个表示 对窗口函数 rank(排名字段) 给窗口的数据按照排名字段信息排名  注意: 非密集或者非连续的排名// 第八个表示 对窗口函数 dense_rank(连续密集排名字段) 给窗口的数据按照排名字段信息排名  注意: 密集或者连续的排名// 第九个表示 获取往上两行的slary的值 作为当前行窗口的值spark.stop()//--------------------------------------------------------------------------------
+---+----+-----+----+------+------+-------+-------+-------+-------+---+-----------+------------+-----+
| id|name|  sex|dept|salary|sum_id|sum_id2|sum_id3|sum_id4|sum_id5| rn|salary_rank|salary_rank2| lag2|
+---+----+-----+----+------+------+-------+-------+-------+-------+---+-----------+------------+-----+
|  1|  zs| true|   1| 15000|    23|      1|     28|      6|      6|  1|          1|           1| null|
|  5|win7|false|   1| 16000|    23|      6|     28|     10|     16|  2|          2|           2| null|
|  4|  zl|false|   1| 18000|    23|     16|     28|     15|     17|  3|          3|           3|15000|
|  6|  wb|false|   1| 18000|    23|     16|     28|     17|     17|  4|          3|           3|16000|
|  7| wb2|false|   1| 20000|    23|     23|     28|     13|      7|  5|          5|           4|18000|
|  3|  ww|false|   2| 14000|     5|      3|     28|      5|      3|  1|          1|           1| null|
|  2|  ls|false|   2| 18000|     5|      5|     28|      5|      2|  2|          2|           2| null|
+---+----+-----+----+------+------+-------+-------+-------+-------+---+-----------+------------+-----+
unbounded preceding 等价于 Long.min_valueunbounded following 等价于 Long.max_valuecurrent row: 当前行current row - 1 : 当前行的上一行current row +1 : 当前行的下一行

五、Spark SQL的自定义函数

单行函数

对每一行数据应用函数内容,如:Upper() Lower() Length()

package com.baizhiimport org.apache.spark.sql.SparkSession/****/
object CustomUserSingleFunction1 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("window function").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("ERROR")import spark.implicits._// 创建DFval df = List((1, "zs", true, 1, 15000),(2, "ls", false, 2, 18000),(3, "ww", false, 2, 14000),(4, "zl", false, 1, 18000),(5, "win7", false, 1, 16000),(6, "wb", false, 1, 18000),(7, "wb2", false, 1, 20000)).toDF("id", "name", "sex", "dept", "salary")df.createTempView("t_employee")// 自定义单行函数spark.udf.register("sex_converter", (sex: Boolean) => {sex match {case true => "男"case false => "女"case _ => "不男不女"}})spark.sql("select id,upper(name),sex_converter(sex) from t_employee").show()spark.stop()}
}
//----------------------------------------------------------------
+---+-----------+----------------------+
| id|upper(name)|UDF:sex_converter(sex)|
+---+-----------+----------------------+
|  1|         ZS|                    男|
|  2|         LS|                    女|
|  3|         WW|                    女|
|  4|         ZL|                    女|
|  5|       WIN7|                    女|
|  6|         WB|                    女|
|  7|        WB2|                    女|
+---+-----------+----------------------+

多行函数

指对多行数据应用函数内容,返回单个结果. 如:聚合函数 sum() avg() min()…
需求:自定义 整数求和的多行函数

package com.baizhiimport org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}/****/
object CustomUserSingleFunction2 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("window function").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("ERROR")import spark.implicits._// 创建DFval df = List((1, "zs", true, 1, 15000),(2, "ls", false, 2, 18000),(3, "ww", false, 2, 14000),(4, "zl", false, 1, 18000),(5, "win7", false, 1, 16000),(6, "wb", false, 1, 18000),(7, "wb2", false, 1, 20000)).toDF("id", "name", "sex", "dept", "salary")df.createTempView("t_employee")// 自定义多行函数spark.udf.register("my_sum", new UserDefinedAggregateFunction {/*** 输入数据的结构类型** @return*/override def inputSchema: StructType = new StructType().add("salary", IntegerType)/*** 缓冲区【用来存放聚合产生的临时结果】的结构类型** @return*/override def bufferSchema: StructType = new StructType().add("total", IntegerType)/*** 聚合操作结束后的返回值类型** @return*/override def dataType: DataType = IntegerType/*** 聚合操作时,输入类型和聚合结果的返回类型是否匹配** @return*/override def deterministic: Boolean = true/*** 初始化方法** @param buffer*/override def initialize(buffer: MutableAggregationBuffer): Unit = {// buffer缓冲区的第一个位置 存放了一个初始值0buffer.update(0, 0)}/*** 修改方法** @param buffer* @param input*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {// 获取多行函数的第一个参数的值val rowValue = input.getInt(0)val currentValue = buffer.getInt(0)buffer.update(0, rowValue + currentValue)}/*** 合并* 将两个buffer中的数据合并 并将最终结果保存到第一个buffer中** @param buffer1* @param buffer2*/override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {val b1CurrentValue = buffer1.getInt(0)val b2CurrentValue = buffer2.getInt(0)buffer1.update(0, b1CurrentValue + b2CurrentValue)}/*** 评估方法  返回聚合结果** @param buffer* @return*/override def evaluate(buffer: Row): Any = buffer.getInt(0)})spark.sql("select my_sum(salary) from t_employee").show()spark.stop()}
}
//------------------------------------------------------------------------------------------
+--------------+
|anon$1(salary)|
+--------------+
|        119000|
+--------------+

六、Spark SQL的数据导入导出

JSON

val df = spark.read.json("file:///G:\\IDEA_WorkSpace\\scala-workspace\\spark-day9\\src\\main\\resources")df.createTempView("t_user")spark.sql("select id,name from t_user").write.format("json").save("file:///D://result")

Paquet

基于列式存储的文件格式,底层会将数据编码成二进制数据【默认】

// 读parquet文件内容 创建df
val df = spark.read.parquet("file:///D://result2")// 写出parquet文件格式
spark.sql("select id,name from t_user").write.save("file:///D://result2")

ORC

矢量化文件格式,比较节省磁盘空间

// 写出orc文件格式
spark.sql("select * from t_user").write.orc("file:///d://result3")// 读orc文件内容 创建df
val df = spark.read.orc("file:///D://result3")

CSV

 val spark = SparkSession.builder().master("local[2]").appName("count").getOrCreate()import spark.implicits._//读取文件。然后用元祖通过toDF方法创建val customes = spark.sparkContext.textFile("file:///D:\\927kt\\customers.csv").map(line=>{val st = line.replaceAll("\"", "")var strs = st.split(",")(strs(0),strs(1),strs(2),strs(3),strs(4),strs(5),strs(6),strs(7),strs(8))}).toDF("userid","name","fname","pwd1","pwd2","addr","city","country","zip").show(3)
+------+-------+---------+---------+---------+--------------------+-----------+-------+-----+
|userid|   name|    fname|     pwd1|     pwd2|                addr|       city|country|  zip|
+------+-------+---------+---------+---------+--------------------+-----------+-------+-----+
|     1|Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|     TX|78521|
|     2|   Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|     CO|80126|
|     3|    Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|     PR|00725|
+------+-------+---------+---------+---------+--------------------+-----------+-------+-----+//按CSV文件的表格格式也可以直接创建val ordes = spark.read.format("csv").load("file:///D:\\927kt\\orders.csv").toDF("order_id","order_date","userid","order_status").show(3)
+--------+-------------------+------+---------------+
|order_id|         order_date|userid|   order_status|
+--------+-------------------+------+---------------+
|       1|2013-07-25 00:00:00| 11599|         CLOSED|
|       2|2013-07-25 00:00:00|   256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00| 12111|       COMPLETE|
+--------+-------------------+------+---------------+

JDBC

// 写出JDBC Mysql数据库中
spark.sql("SELECT * FROM t_user").write.format("jdbc").mode(SaveMode.Overwrite) // 覆盖.option("user", "root").option("password", "1234").option("url", "jdbc:mysql://hadoopnode00:3306/test").option("dbtable", "t_user").save()// 读JDBC Mysql中数据,创建DFval df = spark.read.format("jdbc").option("user", "root").option("password", "1234").option("url", "jdbc:mysql://hadoopnode00:3306/test").option("dbtable", "t_user").load()

DF转RDD

// 写出JDBC Mysql数据库中
spark.sql("SELECT * FROM t_user").write.format("jdbc").mode(SaveMode.Overwrite) // 覆盖.option("user", "root").option("password", "1234").option("url", "jdbc:mysql://hadoopnode00:3306/test").option("dbtable", "t_user").save()// 读JDBC Mysql中数据,创建DFval df = spark.read.format("jdbc").option("user", "root").option("password", "1234").option("url", "jdbc:mysql://hadoopnode00:3306/test").option("dbtable", "t_user").load()

参考文章

Spark SQL原理及常用方法详解(二)相关推荐

  1. Pytorch|YOWO原理及代码详解(二)

    Pytorch|YOWO原理及代码详解(二) 本博客上接,Pytorch|YOWO原理及代码详解(一),阅前可看. 1.正式训练 if opt.evaluate:logging('evaluating ...

  2. Nginx(二):反向代理原理 与 配置文件详解

    相关阅读: Nginx(一):Nginx原理概述 与 安装步骤详解 Nginx(二):反向代理原理 与 配置文件详解 Nginx(三):负载均衡策略 与 Nginx静态服务器 Nginx(四):Ngi ...

  3. Android面试Hash原理详解二

    Hash系列目录 Android面试Hash原理详解一 Android面试Hash原理详解二 Android面试Hash常见算法 Android面试Hash算法案例 Android面试Hash原理详解 ...

  4. DPC密度峰值聚类算法原理详解二

    DPC密度峰值聚类算法原理详解二 1.计算数据点两两之间的距离 1.使用 Numpy 模块查找两点之间的欧几里得距离: 2.使用 distance.euclidean() 函数查找两点之间的欧式距离: ...

  5. Pytorch | yolov3原理及代码详解(二)

    阅前可看: Pytorch | yolov3原理及代码详解(一) https://blog.csdn.net/qq_24739717/article/details/92399359 分析代码: ht ...

  6. DeepLearning tutorial(1)Softmax回归原理简介+代码详解

    FROM: http://blog.csdn.net/u012162613/article/details/43157801 DeepLearning tutorial(1)Softmax回归原理简介 ...

  7. DeepLearning tutorial(3)MLP多层感知机原理简介+代码详解

    FROM:http://blog.csdn.net/u012162613/article/details/43221829 @author:wepon @blog:http://blog.csdn.n ...

  8. DeepLearning tutorial(4)CNN卷积神经网络原理简介+代码详解

    FROM: http://blog.csdn.net/u012162613/article/details/43225445 DeepLearning tutorial(4)CNN卷积神经网络原理简介 ...

  9. from mysql partition select_爬虫(九十九)mysql详解二

    **mysql中字段的常见类型: ** 二进制位 bit(长度) tinyint[(长度)] [有无符号unsigned] [位数低于长度时候是否填充零zerofill] 有符号表示范围-128 ~ ...

最新文章

  1. 嵌入式Linux设备驱动程序:在运行时读取驱动程序状态
  2. Python面试必备—分布式爬虫scrapy+redis解析
  3. Visual Studio 2005 Web Application Projects 正式推出
  4. 3年完成2款云端AI芯片研发量产,百度造芯为什么这么快?
  5. LeetCode算法题-Minimum Depth of Binary Tree(Java实现)
  6. tomcat源码分析(一)初始化---Debug方式
  7. haproxy 参数说明
  8. web之表单form
  9. Linux 命令(49)—— export 命令(builtin)
  10. Nginx+Keepalived(双机热备)搭建高可用负载均衡环境(HA)
  11. 太实用了 Python 合成多张图片到PDF格式
  12. 通过vba代码将excel转换为PDF
  13. 记一次windows下安装部署运维监控系统WGCOUD的步骤
  14. Java中的“无限循环”结构
  15. EfficientNet与EfficientDet的详解
  16. E1. Divisible Numbers (easy version)(数学)
  17. 旅游网-去哪儿网景点评论爬取
  18. php在线备忘录,PHP设计模式 - 备忘录模式
  19. 移动端App弱网测试
  20. IDEA phpstorm插件CamelCase 【大小写,下划线,驼峰,首字母大写等转化工具】

热门文章

  1. java 类型参数推断
  2. 做ctf题目的时候运行程序就会显示ImportError: cannot import name ‘flag‘ from ‘secret‘ 求大佬解答
  3. WPS广告投放的优势!WPS广告投放的展现形式!
  4. PAT 甲级1116 1117 1118 1119 解题报告
  5. 德乐生 java_【Senior Java Developer怎么样】德乐生软件2021年Senior Java Developer前景怎么样-看准网...
  6. java.lang.IllegalAccessException 没有访问权限
  7. 推荐视频:神奇的大脑 之 三个错觉演示
  8. 帝国cms php循环,帝国CMS listshowclass循环栏目标签
  9. JAVA练习165-复数乘法
  10. 用python画星空-【Python】手把手教你绘制星空旅游线路图