Spark SQL

Spark SQL是构建在Spark RDD之上一款ETL(Extract Transformation Load)工具(类似Hive-1.x-构建在MapReduce之上)。同Spark RDD 不同地方在于Spark SQL的API可以给Spark计算引擎提供更多的信息(计算数据结构、转换算子),Spark计算引擎可以根据SparkSQL提供的信息优化底层计算任务。目前为止Spark SQL提供了两种风格的交互API:Dataset-API /SQL脚本。

Dataset API:加强版的RDD操作,例如支持map、flatMap、filter等同时还支持select、where、groupBy、cube等,需要用户在操作数据的时候必须知道操作数据的类型,因此通常将这种API操作称为strong-type操作。

val wordPairRDD = spark.sparkContext.textFile("file:///D:/demo/words")
.flatMap(_.split("\\s+"))
.map((_, 1))//将RDD变成Dataset
wordPairRDD.toDS().groupByKey(pair=>pair._1).agg(typed.sum[(String,Int)](t=>t._2).name("total")) .show()

SQL脚本:用户无需关注底层 Dataset API, 用户所有的操作类型都是基于命名列类型,需要用户在操作数据的不必要关心所操作数据的类型,因此通常将这种操作称为untyped操作

word num
a    1
b    1
a    1
select sum(num) from t_words groupBy word

参考:http://spark.apache.org/docs/latest/sql-programming-guide.html

Spark SQL研究的主要对象是Dataset/Dataframe(加强版本的RDD)。Dataset是一个分布式数据集合在Spark 1.6提供一个新的接口,Dataset提供RDD的优势(强类型,使用强大的lambda函 数)以及具备了Spark SQL执行引擎的优点。Dataset可以通过JVM对象构建,然后可以使用转换函数等(例如:map、flatMap、filter等),目前Dataset API支持Scala和Java 目前Python对Dataset支持还不算完备。

DataFrame是命名列的数据集-特殊Dataset,他在概念是等价于关系型数据库。DataFrame可以从很多地方构建,比如说结构化数据文件(Json、CSV等)、hive中的表(遗留系统对接)或者外部数据库,使用Dataset[Row]的数据集,可以理解DataFrame就是一个Dataset[Row].

快速入门

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.3</version>
</dependency>

SQL脚本

//1.创建sparksession对象
val spark = SparkSession.builder().appName("word count")
.master("local[6]")
.getOrCreate()
//2.导入常见的隐式转换|增强 将RDD转换为Dataframe/Dataset
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL") //关闭日志信息//创建RDD
val wordPairRDD = spark.sparkContext.textFile("file:///D:/demo/words")
.flatMap(_.split("\\s+"))
.map((_, 1))
//将RDD转换为dataframe
val wordDataframe = wordPairRDD.toDF("word", "num")
//将wordDataframe注册一个t_words表
wordDataframe.createOrReplaceTempView("t_words")
//写SQL脚本查询t_words
spark.sql("select word,sum(num) total from t_words group by  word order by total desc")
.show()//3.关闭sparksession对象
spark.stop()//close

命名列-API

//1.创建sparksession对象
val spark = SparkSession.builder().appName("word count")
.master("local[6]")
.getOrCreate()
//2.导入常见的隐式转换|增强 将RDD转换为Dataframe
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL") //关闭日志信息//创建RDD
val wordPairRDD = spark.sparkContext.textFile("file:///D:/demo/words")
.flatMap(_.split("\\s+"))
.map((_, 1))//将RDD变成Dataset
import org.apache.spark.sql.functions._
wordPairRDD.toDF("word","num")
.groupBy($"word")
.agg(sum($"num") as "total")
.orderBy($"total" desc)
.show()
//3.关闭sparksession对象
spark.stop()//close

强类型-typed-不推荐

//1.创建sparksession对象
val spark = SparkSession.builder().appName("word count")
.master("local[6]")
.getOrCreate()
//2.导入常见的隐式转换|增强 将RDD转换为Dataframe
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL") //关闭日志信息//创建RDD
val wordPairRDD = spark.sparkContext.textFile("file:///D:/demo/words")
.flatMap(_.split("\\s+"))
.map((_, 1))//将RDD变成Dataset
wordPairRDD.toDS()
.groupByKey(pair=>pair._1)
.agg(typed.sum[(String,Int)](t=>t._2).name("total"))
.rdd
.sortBy(_._2,false,3)
.toDF("word","total")
.show()
//3.关闭sparksession对象
spark.stop()//close

Dataset和Dataframe构建

Dataset

Dataset是个特殊的RDD(增强版本的RDD),与RDD不同Spark SQL自己维护了一套序列化和反序列化规范,规避在计算过程中多次因为序列化对计算节点的性能损耗,因为Spark SQL提倡的是untyped的操作,用户无需关注操作的类型,只需提供针对命名列操作的算子即可,因此提升计算节点计算性能。

rdd

> sc.textFile("hdfs://...").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_)

dataset

> val wordPairRDD = spark.sparkContext.textFile("file:///D:/demo/words").flatMap(_.split("\\s+")).map((_, 1))> wordPairRDD.toDF("word","num") //无需知道操作类型,只需提供列名.groupBy($"word").agg(sum($"num") as "total").orderBy($"total" desc).show()
  • case-class
case class Person(id:Int,name:String,age:Int,dept:Int)
List(Person(1,"zs",18,1),Person(2,"ls",18,1)).toDS().show()
+---+----+---+----+
| id|name|age|dept|
+---+----+---+----+
|  1|  zs| 18|   1|
|  2|  ls| 18|   1|
+---+----+---+----+
  • Tuple元组

+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
|  1| zs| 18|  1|
|  2| ls| 18|  1|
+---+---+---+---+
  • json数据
case class User(id:BigInt,name:String,age:Long,dept:Long)
spark.read.json("file:///D:/demo/json").as[User].show()
  • rdd(略)

RDD中元素必须是元组或者是case-class才可以直接通过toDS创建

Data Frame √

Data Frame是命名列的数据集,他在概念是等价于关系型数据库。DataFrame可以从很多地方构建,比如说结构化数据文 件、hive中的表或者外部数据库,使用Dataset[row]的数据集,可以理解DataFrame就是一个Dataset[Row].

  • json文件
spark.read.json("file:///D:/demo/json").show()
  • case-class
List(Person(1,"zs",18,1),Person(2,"ls",18,1)).toDF().show()
  • Tuple元组
List((1,"zs",18,1),(2,"ls",18,1)).toDF("id","name","age","dept").show()
  • RDD 创建 √

    • RDD[Row]类型
    val rowRDD:RDD[Row] = spark.sparkContext.makeRDD(List((1, "zs", 18, 1), (2, "ls", 18, 1)))
    .map(t => Row(t._1, t._2, t._3, t._4))val schema = new StructType()
    .add("id",IntegerType)
    .add("name",StringType)
    .add("age",IntegerType)
    .add("dept",IntegerType)spark.createDataFrame(rowRDD,schema)
    .show()
    
    • RDD[Javabean]
    public class Student implements Serializable {private Integer id;private String name;private Integer age;private Integer dept;public Student() {}public Student(Integer id, String name, Integer age, Integer dept) {this.id = id;this.name = name;this.age = age;this.dept = dept;}//省略get/set
    }
    val userRDD:RDD[Student] = spark.sparkContext.makeRDD(List((1, "zs", 18, 1), (2, "ls", 18, 1)))
    .map(t => new Student(t._1, t._2, t._3, t._4))spark.createDataFrame(userRDD,classOf[Student])
    .show()
    
    • RDD[case-class]
    val userRDD:RDD[User] = spark.sparkContext.makeRDD(List((1, "zs", 18, 1), (2, "ls", 18, 1)))
    .map(t => User(t._1, t._2, t._3, t._4))
    spark.createDataFrame(userRDD)
    .show()
    
    +---+---+---+---+
    | id| name| age| dept|
    +---+---+---+---+
    |  1| zs| 18|  1|
    |  2| ls| 18|  1|
    +---+---+---+---+

    可以直接调用toDF无需指定列别名

    • RDD[Tuple]
    val userRDD:RDD[(Int,String,Int,Int)] = spark.sparkContext.makeRDD(List((1, "zs", 18, 1), (2, "ls", 18, 1)))
    spark.createDataFrame(userRDD).toDF("")
    .show()
    
    +---+---+---+---+
    | _1| _2| _3| _4|
    +---+---+---+---+
    |  1| zs| 18|  1|
    |  2| ls| 18|  1|
    +---+---+---+---+
    

    可以直接调用toDF,一般需要指定列别名。

总结

RDD[元组/case-class] --toDS/toDF --> 转换为Dataset / DataFrame

Dataset[U] – rdd -->可以得到对应的RDD[U]

DataFrame – rdd --> 可以得到RDD[Row]类型

val userRDD:RDD[Student] = spark.sparkContext.makeRDD(List((1, "zs", 18, 1), (2, "ls", 18, 1)))
.map(t => new Student(t._1, t._2, t._3, t._4))spark.createDataFrame(userRDD,classOf[Student])
.rdd
.map(row=>(row.getAs[Int]("id"),row.getAs[String]("name")))//操作Row类型数据
.collect()
.foreach(println)

Dataset[U],这里的U一定是case-class或者元组:Dataset[U] – toDF --> Dataframe

Dataframe – as[U] --> Dataset[U] 这里的U一定是case-class或者元组

无类型数据集操作 (又称为 DataFrame 操作)

id name sex age salary dept
---------------------------
1,Michael,false,29,2000,001
2,Andy,true,30,5000,001
3,Justin,true,19,1000,002
4,Kaine,false,20,5000,003
5,Lisa,false,19,1000,002
case class Employee(id:Int,name:String,sex:Boolean,age:Int,salary:Double,dept:String)val employeeRDD:RDD[Employee] = spark.sparkContext.textFile("file:///D:/demo/users")
.map(line => line.split(","))
.map(ts => Employee(ts(0).toInt, ts(1), ts(2).toBoolean, ts(3).toInt, ts(4).toDouble, ts(5)))val employeeDF = employeeRDD.toDF()

printSchema()

打印当前表的结构信息

employeeDF.printSchema()
root|-- id: integer (nullable = false)|-- name: string (nullable = true)|-- sex: boolean (nullable = false)|-- age: integer (nullable = false)|-- salary: double (nullable = false)|-- dept: string (nullable = true)

select

投影查询,只查询对应字段的信息

employeeDF.select("id","name","salary")
.show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|Michael| 29|
|  2|   Andy| 30|
|  3| Justin| 19|
|  4|  Kaine| 20|
|  5|   Lisa| 19|
+---+-------+---+

如果用户想参与字段的计算

employeeDF.select($"id",$"name",$"salary",$"salary" * 12 as "annual_salary")
.show()
+---+-------+------+-------------+
| id|   name|salary|annual_salary|
+---+-------+------+-------------+
|  1|Michael|2000.0|      24000.0|
|  2|   Andy|5000.0|      60000.0|
|  3| Justin|1000.0|      12000.0|
|  4|  Kaine|5000.0|      60000.0|
|  5|   Lisa|1000.0|      12000.0|
+---+-------+------+-------------+

selectExpr

可以在字段中使用SQL脚本/表达式

employeeDF.selectExpr("id as uid","name","salary","salary * 12 as annual_salary")
.show()
+---+-------+------+-------------+
|uid|   name|salary|annual_salary|
+---+-------+------+-------------+
|  1|Michael|2000.0|      24000.0|
|  2|   Andy|5000.0|      60000.0|
|  3| Justin|1000.0|      12000.0|
|  4|  Kaine|5000.0|      60000.0|
|  5|   Lisa|1000.0|      12000.0|
+---+-------+------+-------------+

注意在spark中,别名不要出现中文

where

等价于filter算子,将满足条件的记录过滤出来

employeeDF.selectExpr("id as uid","age","name","salary","salary * 12 as annual_salary")
.where("(name = 'Andy' or age >= 20) and (annual_salary > 24000) ")
.show()

where算子除了使用表达式以外,还可以使用column过滤结果

 employeeDF.selectExpr("id as uid","age","name","salary","salary * 12 as annual_salary")
//.where("(name = 'Andy' or age >= 20) and (annual_salary > 24000) ")
.where(($"name" ==="Andy" or $"age" >= 20) and($"annual_salary" > 24000) )
.show()
+---+---+-----+------+-------------+
|uid|age| name|salary|annual_salary|
+---+---+-----+------+-------------+
|  2| 30| Andy|5000.0|      60000.0|
|  4| 20|Kaine|5000.0|      60000.0|
+---+---+-----+------+-------------+

√withColumn

给结果中添加新的字段

 employeeDF.select($"id" ,$"age",$"name",$"salary",$"dept").withColumn("annual_salary",$"salary" * 12).where(($"name" ==="Andy" or $"age" >= 20) and($"annual_salary" > 24000) ).withColumn("dept_as",$"dept").show()
+---+---+-----+------+----+-------------+-------+
| id|age| name|salary|dept|annual_salary|dept_as|
+---+---+-----+------+----+-------------+-------+
|  2| 30| Andy|5000.0| 001|      60000.0|    001|
|  4| 20|Kaine|5000.0| 003|      60000.0|    003|
+---+---+-----+------+----+-------------+-------+

withColumnRenamed

修改字段的名字

employeeDF.select($"id" ,$"age",$"name",$"salary",$"dept")
.withColumnRenamed("id","uid")
.show()
+---+---+-------+------+----+
|uid|age|   name|salary|dept|
+---+---+-------+------+----+
|  1| 29|Michael|2000.0| 001|
|  2| 30|   Andy|5000.0| 001|
|  3| 19| Justin|1000.0| 002|
|  4| 20|  Kaine|5000.0| 003|
|  5| 19|   Lisa|1000.0| 002|
+---+---+-------+------+----+

groupBy

类似于SQL中的groupBy字句,必须和聚合函数连用。

employeeDF.select($"id" ,$"age",$"name",$"salary",$"dept")
.groupBy("dept")
.sum("salary")
.show()
+----+-----------+
|dept|sum(salary)|
+----+-----------+
| 003|     5000.0|
| 001|     7000.0|
| 002|     2000.0|
+----+-----------+

√agg

负责联合多个聚合算子countmaxminavg等实现常规复合计算

import org.apache.spark.sql.functions._
employeeDF.select($"id" ,$"age",$"name",$"salary",$"dept").groupBy("dept").agg(sum("salary") as "total",avg("age") as "avg_age",max("salary") as "max_salary").show()
employeeDF.select($"id" ,$"age",$"name",$"salary",$"dept").groupBy("dept").agg(("salary","max"),("age","avg"),("salary","sum"))//.agg("salary"->"max","age"->"avg","salary"->"sum").show()
employeeDF.select($"id" ,$"age",$"name",$"salary",$"dept").groupBy("dept").agg(Map("age"->"avg","salary"->"sum")) //注意key出现重复会导致覆盖.show()

√pivot

实现转列

stuno name course score
-----------------------
001 zhangsan math 80
001 zhangsan chinese 70
001 zhangsan english 70
001 zhangsan history 90
002 wangw math 90
002 wangw chinese 80
case class CourseScore(stuno:String,name:String,course:String,score:Double)val courseRDD:RDD[CourseScore] = spark.sparkContext.textFile("file:///D:/demo/students").map(line => line.split("\\s+")).map(ts => CourseScore(ts(0), ts(1), ts(2), ts(3).toDouble))val courses= spark.sparkContext.textFile("file:///D:/demo/students").map(line => line.split("\\s+")(2)).distinct().collect()val courseDF = courseRDD.toDF()import org.apache.spark.sql.functions._courseDF.groupBy("stuno","name").pivot($"course",courses)//需要行专列的 字段、可选值不可以出现重复.agg(sum($"score")).show()
+-----+--------+----+-------+-------+-------+
|stuno|    name|math|history|english|chinese|
+-----+--------+----+-------+-------+-------+
|  001|zhangsan|80.0|   90.0|   70.0|   70.0|
|  002|   wangw|90.0|   null|   null|   80.0|
+-----+--------+----+-------+-------+-------+

na

专门处理空值的数据,处理方式drop或者fill

courseDF.groupBy("stuno","name").pivot($"course",courses).agg(sum($"score")).na.drop(5)//保留至少5个字段不为null的记录.show()
+-----+--------+----+-------+-------+-------+
|stuno|    name|math|history|english|chinese|
+-----+--------+----+-------+-------+-------+
|  001|zhangsan|80.0|   90.0|   70.0|   70.0|
+-----+--------+----+-------+-------+-------+
courseDF.groupBy("stuno","name").pivot($"course",courses).agg(sum($"score")).na.fill(Map("history"-> -1,"english" -> 0))//将一些null值填充默认值.show()
+-----+--------+----+-------+-------+-------+
|stuno|    name|math|history|english|chinese|
+-----+--------+----+-------+-------+-------+
|  001|zhangsan|80.0|   90.0|   70.0|   70.0|
|  002|   wangw|90.0|   -1.0|    0.0|   80.0|
+-----+--------+----+-------+-------+-------+

√cube

多维度分析手段,通常用作数据分析,比groupBy更加灵活。

height weght eq iq
80 23 72 85
80 23 70 85
80 25 70 85
82 30 80 70
case class KindScore(height:Int,weight:Int,eq:Double,iq:Double)val kindScoreRDD:RDD[KindScore]=spark.sparkContext.textFile("file:///D:/demo/kinds").map(line=>line.split("\\s+")).map(ts=>KindScore(ts(0).toInt,ts(1).toInt,ts(2).toDouble,ts(3).toDouble))
import org.apache.spark.sql.functions._kindScoreRDD.toDF().cube("height","weight").agg(avg($"eq") as "EQ",avg($"iq") as "IQ").show()
+------+------+-----------------+----+
|height|weight|               EQ|  IQ|
+------+------+-----------------+----+
|  null|    23|             70.0|85.0|
|  null|  null|73.33333333333333|80.0|
|  null|    30|             80.0|70.0|
|    80|  null|             70.0|85.0|
|  null|    25|             70.0|85.0|
|    80|    25|             70.0|85.0|
|    82|    30|             80.0|70.0|
|    82|  null|             80.0|70.0|
|    80|    23|             70.0|85.0|
+------+------+-----------------+----+

√over

配合聚合算子,完成局部分析

id name job salary dept
------------------------
1 zhangsan sale 8000.00 1
2 wangw sale 8500.00    1
3 lis clicker 6000.00   2
4 zhaol manager 10000.00 1
val empRDD:RDD[(Int,String,String,Double,Int)] = spark.sparkContext.textFile("file:///D:/demo/employee").map(_.split("\\s+")).map(ts=>(ts(0).toInt,ts(1),ts(2),ts(3).toDouble,ts(4).toInt))
import org.apache.spark.sql.functions._val w=Window.partitionBy("dept") //获取当前记录所在dept的所有记录.orderBy($"salary" desc) //按照salary进行降序                 负数<-- currentRow(0) --> 正数  .rowsBetween(Window.unboundedPreceding,Window.currentRow) //  比当前记录大的偏移量     val empDF=  empRDD.toDF("id","name","job","salary","dept")
empDF.select("id","name","salary","dept").withColumn("rank",count($"salary") over(w)).withColumn("avg_salary",avg($"salary") over(Window.partitionBy("dept"))).withColumn("max_salary",max($"salary") over(Window.partitionBy("dept"))).withColumn("min_salary",min($"salary") over(Window.partitionBy("dept"))).show()spark.stop()
+---+--------+-------+----+----+-----------------+----------+----------+
| id|    name| salary|dept|rank|       avg_salary|max_salary|min_salary|
+---+--------+-------+----+----+-----------------+----------+----------+
|  4|   zhaol|10000.0|   1|   1|8833.333333333334|   10000.0|    8000.0|
|  2|   wangw| 8500.0|   1|   2|8833.333333333334|   10000.0|    8000.0|
|  1|zhangsan| 8000.0|   1|   3|8833.333333333334|   10000.0|    8000.0|
|  3|     lis| 6000.0|   2|   1|           6000.0|    6000.0|    6000.0|
+---+--------+-------+----+----+-----------------+----------+----------+

join

用作表连接

val empDF=  empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer")
//.na.drop(List("deptname"))
.show()
+---+--------+-------+----+------+--------+
| id|    name| salary|dept|deptid|deptname|
+---+--------+-------+----+------+--------+
|  1|zhangsan| 8000.0|   1|     1|  销售部|
|  2|   wangw| 8500.0|   1|     1|  销售部|
|  3|     lis| 6000.0|   2|     2|  运营部|
|  4|   zhaol|10000.0|   1|     1|  销售部|
|  5|    win7|10000.0|   3|  null|    null|
+---+--------+-------+----+------+--------+

或者如果两个表字段一样,可以

val empDF=  empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")val deptDF=List((1,"销售部"),(2,"运营部")).toDF("dept","deptname")
empDF.join(deptDF,"dept")
//.na.drop(List("deptname"))
.show()
+----+---+--------+-------+--------+
|dept| id|    name| salary|deptname|
+----+---+--------+-------+--------+
|   1|  1|zhangsan| 8000.0|  销售部|
|   1|  2|   wangw| 8500.0|  销售部|
|   2|  3|     lis| 6000.0|  运营部|
|   1|  4|   zhaol|10000.0|  销售部|
+----+---+--------+-------+--------+

drop

删除指定的字段

val empDF=  empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer").drop("deptid").show()
+---+--------+-------+----+--------+
| id|    name| salary|dept|deptname|
+---+--------+-------+----+--------+
|  1|zhangsan| 8000.0|   1|  销售部|
|  2|   wangw| 8500.0|   1|  销售部|
|  3|     lis| 6000.0|   2|  运营部|
|  4|   zhaol|10000.0|   1|  销售部|
|  5|    win7|10000.0|   3|    null|
+---+--------+-------+----+--------+

√dropDuplicates

删除重复的记录

val empDF=  empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer")
.drop("deptid")
.dropDuplicates("deptname")
.show()
+---+--------+-------+----+--------+
| id|    name| salary|dept|deptname|
+---+--------+-------+----+--------+
|  1|zhangsan| 8000.0|   1|  销售部|
|  5|    win7|10000.0|   3|    null|
|  3|     lis| 6000.0|   2|  运营部|
+---+--------+-------+----+--------+

orderBy

排序

val empDF=  empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer").orderBy($"dept" desc,$"salary" desc).show()
+---+--------+-------+----+------+--------+
| id|    name| salary|dept|deptid|deptname|
+---+--------+-------+----+------+--------+
|  5|    win7|10000.0|   3|  null|    null|
|  3|     lis| 6000.0|   2|     2|  运营部|
|  4|   zhaol|10000.0|   1|     1|  销售部|
|  2|   wangw| 8500.0|   1|     1|  销售部|
|  1|zhangsan| 8000.0|   1|     1|  销售部|
+---+--------+-------+----+------+--------+

limit

分页语句,等价take(n)前n条

val empDF=  empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer").orderBy($"dept" desc,$"salary" desc).limit(3) //获取前3.show()
+---+-----+-------+----+------+--------+
| id| name| salary|dept|deptid|deptname|
+---+-----+-------+----+------+--------+
|  5| win7|10000.0|   3|  null|    null|
|  3|  lis| 6000.0|   2|     2|  运营部|
|  4|zhaol|10000.0|   1|     1|  销售部|
+---+-----+-------+----+------+--------+

Dataset/DataFrame SQL

+---+-------+-----+---+------+----+
| id|   name|  sex|age|salary|dept|
+---+-------+-----+---+------+----+
|  1|Michael|false| 29|2000.0| 001|
|  2|   Andy| true| 30|5000.0| 001|
|  3| Justin| true| 19|1000.0| 002|
|  4|  Kaine|false| 20|5000.0| 003|
|  5|   Lisa|false| 19|1000.0| 002|
+---+-------+-----+---+------+----++------+--------+
|deptid|deptname|
+------+--------+
|   001|  销售部|
|   002|  运营部|
|   003|  行政部|
+------+--------++-----+--------+-------+-----+
|stuno|    name| course|score|
+-----+--------+-------+-----+
|  001|zhangsan|   math| 80.0|
|  001|zhangsan|chinese| 70.0|
|  001|zhangsan|english| 70.0|
|  001|zhangsan|history| 90.0|
|  002|   wangw|   math| 90.0|
|  002|   wangw|chinese| 80.0|
+-----+--------+-------+-----+
val spark = SparkSession.builder().appName("sparksql").master("local[6]").getOrCreate()import spark.implicits._val employeeRDD:RDD[Employee] = spark.sparkContext.textFile("file:///D:/demo/users")
.map(line => line.split(","))
.map(ts => Employee(ts(0).toInt, ts(1), ts(2).toBoolean, ts(3).toInt, ts(4).toDouble, ts(5)))
val courseRDD:RDD[(String,String,String,Double)] = spark.sparkContext.textFile("file:///D:/demo/students").map(line => line.split("\\s+")).map(ts => (ts(0), ts(1), ts(2), ts(3).toDouble))val employeeDF = employeeRDD.toDF()
val courseDF= courseRDD.toDF("stuno" ,"name" ,"course" ,"score")
val deptDF=List(("001","销售部"),("002","运营部"),("003","行政部")).toDF("deptid","deptname")employeeDF.createOrReplaceTempView("t_employee")
deptDF.createOrReplaceTempView("t_dept")
courseDF.createOrReplaceTempView("t_course")val sql="" //研究重点
spark.sql(sql).show()spark.stop()

单表查询

select * from t_employee
+---+-------+-----+---+------+----+
| id|   name|  sex|age|salary|dept|
+---+-------+-----+---+------+----+
|  1|Michael|false| 29|2000.0| 001|
|  2|   Andy| true| 30|5000.0| 001|
|  3| Justin| true| 19|1000.0| 002|
|  4|  Kaine|false| 20|5000.0| 003|
|  5|   Lisa|false| 19|1000.0| 002|
+---+-------+-----+---+------+----+

where子句过滤

select * from t_employee where name = 'Michael' or age >=29
+---+-------+-----+---+------+----+
| id|   name|  sex|age|salary|dept|
+---+-------+-----+---+------+----+
|  1|Michael|false| 29|2000.0| 001|
|  2|   Andy| true| 30|5000.0| 001|
+---+-------+-----+---+------+----+

模糊查询

select * from t_employee where name like '%ich%'
+---+-------+-----+---+------+----+
| id|   name|  sex|age|salary|dept|
+---+-------+-----+---+------+----+
|  1|Michael|false| 29|2000.0| 001|
+---+-------+-----+---+------+----+

分组聚合

select dept,avg(salary) as avg_salary from t_employee group by dept
+----+----------+
|dept|avg_salary|
+----+----------+
| 003|    5000.0|
| 001|    3500.0|
| 002|    1000.0|
+----+----------+

分组过滤

select dept,avg(salary) as avg_salary from t_employee group by dept having avg_salary > 3500
+----+----------+
|dept|avg_salary|
+----+----------+
| 003|    5000.0|
+----+----------+

case when

select stuno,name,max(case course when 'math' then score else 0 end ) math,max(case course when 'english' then score else 0 end) english,max(case course when 'chinese' then score else 0 end) chinese,max(case course when 'history' then score else 0 end) historyfrom t_course
group by stuno,name
+-----+--------+----+-------+-------+-------+
|stuno|    name|math|english|chinese|history|
+-----+--------+----+-------+-------+-------+
|  001|zhangsan|80.0|   70.0|   70.0|   90.0|
|  002|   wangw|90.0|    0.0|   80.0|    0.0|
+-----+--------+----+-------+-------+-------+

pivot

select * from t_course pivot(max(score) for course in('math','chinese','english','history'))
+-----+--------+----+-------+-------+-------+
|stuno|    name|math|chinese|english|history|
+-----+--------+----+-------+-------+-------+
|  001|zhangsan|80.0|   70.0|   70.0|   90.0|
|  002|   wangw|90.0|   80.0|   null|   null|
+-----+--------+----+-------+-------+-------+

cube

select height,weight,avg(eq),avg(iq) from t_kindscore  group by height,weight with cube

或者

select height,weight,avg(eq),avg(iq) from t_kindscore group by cube(height,weight)
+------+------+-----------------+-------+
|height|weight|          avg(eq)|avg(iq)|
+------+------+-----------------+-------+
|  null|    23|             70.0|   85.0|
|  null|  null|73.33333333333333|   80.0|
|  null|    30|             80.0|   70.0|
|    80|  null|             70.0|   85.0|
|  null|    25|             70.0|   85.0|
|    80|    25|             70.0|   85.0|
|    82|    30|             80.0|   70.0|
|    82|  null|             80.0|   70.0|
|    80|    23|             70.0|   85.0|
+------+------+-----------------+-------+

表连接

select e.*,d.* from t_employee e left join t_dept d on d.deptid = e.dept
+---+-------+-----+---+------+----+------+--------+
| id|   name|  sex|age|salary|dept|deptid|deptname|
+---+-------+-----+---+------+----+------+--------+
|  1|Michael|false| 29|2000.0| 001|   001|  销售部|
|  2|   Andy| true| 30|5000.0| 001|   001|  销售部|
|  3| Justin| true| 19|1000.0| 002|   002|  运营部|
|  4|  Kaine|false| 20|5000.0| 003|   003|  行政部|
|  5|   Lisa|false| 19|1000.0| 002|   002|  运营部|
+---+-------+-----+---+------+----+------+--------+

子查询

查询部门员工信息,并且查询所在部门的人数

select id,name,salary,dept,(select count(*) from t_employee e2 where e1.dept=e2.dept) total from t_employee e1
+---+-------+------+----+-----+
| id|   name|salary|dept|total|
+---+-------+------+----+-----+
|  4|  Kaine|5000.0| 003|    1|
|  1|Michael|2000.0| 001|    2|
|  2|   Andy|5000.0| 001|    2|
|  3| Justin|1000.0| 002|    2|
|  5|   Lisa|1000.0| 002|    2|
+---+-------+------+----+-----+

开窗函数

  • 计算部门员工薪资排名
select id,name,salary,dept,
count(*) over(partition by dept  order by salary desc rows between unbounded preceding and current row) rank
from t_employee
order by dept desc
+---+-------+------+----+-----+
| id|   name|salary|dept|count|
+---+-------+------+----+-----+
|  4|  Kaine|5000.0| 003|    1|
|  3| Justin|1000.0| 002|    1|
|  5|   Lisa|1000.0| 002|    2|
|  2|   Andy|5000.0| 001|    1|
|  1|Michael|2000.0| 001|    2|
|  6|   Lily|2000.0| 001|    3|
|  7|   Lucy|1000.0| 001|    4|
+---+-------+------+----+-----+

如果出现薪资一样的员工,最终排名会存在问题

  • 计算部门员工薪资排名 -rank函数,排名不连续
select id,name,salary,dept,
rank() over(partition by dept  order by salary desc) count
from t_employee
order by dept desc
+---+-------+------+----+-----+
| id|   name|salary|dept|count|
+---+-------+------+----+-----+
|  4|  Kaine|5000.0| 003|    1|
|  3| Justin|1000.0| 002|    1|
|  5|   Lisa|1000.0| 002|    1|
|  2|   Andy|5000.0| 001|    1|
|  6|   Lily|2000.0| 001|    2|
|  1|Michael|2000.0| 001|    2|
|  7|   Lucy|1000.0| 001|    4|
+---+-------+------+----+-----+
  • 计算部门员工薪资排名 -dense_rank函数,排名不连续
 select id,name,salary,dept,dense_rank() over(partition by dept  order by salary desc) countfrom t_employeeorder by dept desc
+---+-------+------+----+-----+
| id|   name|salary|dept|count|
+---+-------+------+----+-----+
|  4|  Kaine|5000.0| 003|    1|
|  3| Justin|1000.0| 002|    1|
|  5|   Lisa|1000.0| 002|    1|
|  2|   Andy|5000.0| 001|    1|
|  6|   Lily|2000.0| 001|    2|
|  1|Michael|2000.0| 001|    2|
|  7|   Lucy|1000.0| 001|    3|
+---+-------+------+----+-----+

附注:对应的API写法

import org.apache.spark.sql.functions._
val w=Window.partitionBy("dept")
.orderBy($"salary" desc)employeeDF.select("id","name","salary","dept")
.withColumn("rank",dense_rank() over(w))
.orderBy($"dept" desc)
.show()

分页查询

select id,name,salary,dept,
dense_rank() over(partition by dept  order by salary desc) count
from t_employee
order by dept desc  limit 3
+---+------+------+----+-----+
| id|  name|salary|dept|count|
+---+------+------+----+-----+
|  4| Kaine|5000.0| 003|    1|
|  3|Justin|1000.0| 002|    1|
|  5|  Lisa|1000.0| 002|    1|
+---+------+------+----+-----+

自定义函数

√单行函数

spark.udf.register("annual_salary",(salary:Double,age:Int)=>{if(age>25){salary*12+5000}else{salary*12}
})
select id,name,salary,annual_salary(salary,age) annual_salary,age from t_employee

无类型用户定义聚合函数(了解)

  • 用户需要集成 UserDefinedAggregateFunction抽象类,例如自定义一个求平均的聚合方法:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructType}object MyAverage extends UserDefinedAggregateFunction {//表示输入的字段类型override def inputSchema: StructType = {//这里的name叫什么无所谓,但是类型不能写错new StructType().add("salary",DoubleType)}//存储计算的中间结果 累加的总数、计数器override def bufferSchema: StructType = {new StructType().add("total",DoubleType).add("count",IntegerType)}//最终聚合后的返回值类型override def dataType: DataType = DoubleType//结果类型是否固定 ,一般设置为trueoverride def deterministic: Boolean = true//初始化中间变量 bufferoverride def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0.0 //总金额buffer(1) = 0 //计数}//将记录中的数据更新到buffer中override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if(!input.isNullAt(0)){val salary = input.getAs[Double](0)buffer(0)=buffer.getDouble(0)+salarybuffer(1)=buffer.getInt(1)+1}}//合并最终结果,注意一定合并到buffer1override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0)=buffer1.getDouble(0)+buffer2.getDouble(0) //总金额buffer1(1)=buffer1.getInt(0)+buffer2.getInt(0) //总计数}//返回最终结果类型override def evaluate(buffer: Row): Any = {buffer.getDouble(0)/buffer.getInt(1)}
}
  • 注册用户自定义聚合方法
spark.udf.register("myavg",MyAverage)
  • 使用聚合
select dept,myavg(salary) from t_employee group by dept
+----+------------------+
|dept|myaverage$(salary)|
+----+------------------+
| 003|            5000.0|
| 001|            2500.0|
| 002|            1000.0|
+----+------------------+

数据导入/导出

文件系统

  • JSON

    • 写出
    spark.sql(sql).write.json("file:///D:/write/json")
    spark.sql(sql)
    .write
    .format("json")
    .save("file:///D:/write/json1")
    
    • 读入
    spark.read.format("json").load("file:///D:\\demo\\json")
    .show()
    
  • CSV

    • 写出
     spark.sql(sql).write.option("sep", ",") //指定分隔符.option("header", "true") //是否添加表头.format("csv").save("file:///D:/write/csv1")
    
    • 读入
    spark.read.option("sep", ",").option("header", "true").format("csv").load("file:///D:/demo/csv").show()
    

数据库RDBMS

  • JDBC

    • 写入 √
    spark.sql(sql).write.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test").option("dbtable", "t_emp").option("user", "root").option("password", "root").mode(SaveMode.Overwrite).save()
    
    • 读出(了解)
    spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test")//.option("dbtable", "t_emp").option("query","select id,name from t_emp") //不可以和dbtable同时出现.option("user", "root").option("password", "root").load().show()
    

    为了不占用数据库连接参数,一般需要计算来自数据中的数据,推荐将数据使用工具导出CSV格式的数据,将数据上传到HDFs,然后在使用Spark计算

任意外围系统

​ 如果用户需要将数据写入到第三方系统,用户需要使用foreachPartition方式写出

spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "t_emp")
//.option("query","select id,name from t_emp") //不可以和dbtable同时出现
.option("user", "root")
.option("password", "root")
.load()
.foreachPartition(rows=>{rows.foreach(row=>{val id = row.getAs[Int]("id")val name = row.getAs[String]("name")val salary = row.getAs[Double]("salary")println(id+" "+name+" "+salary) //自定义sink将结果写出到外围系统})
})

Spark SQL_JZZ158_MBY相关推荐

  1. hadoop,spark,scala,flink 大数据分布式系统汇总

    20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...

  2. spark,hadoop区别

    https://zhuanlan.zhihu.com/p/95016937 Spark和Hadoop的区别和比较: 1.原理比较: Hadoop和Spark都是并行计算,两者都是用MR模型进行计算 H ...

  3. 大规模数据处理Apache Spark开发

    大规模数据处理Apache Spark开发 Spark是用于大规模数据处理的统一分析引擎.它提供了Scala.Java.Python和R的高级api,以及一个支持用于数据分析的通用计算图的优化引擎.它 ...

  4. 客快物流大数据项目(五十四):初始化Spark流式计算程序

    目录 初始化Spark流式计算程序 一.SparkSql参数调优设置 1.设置会话时区

  5. 客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

    Spark操作Kudu dataFrame操作kudu 一.DataFrameApi读取kudu表中的数据 虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本 ...

  6. ❤️Spark的关键技术回顾,持续更新!【推荐收藏加关注】❤️

    目录 前言 Spark的关键技术回顾 一.Spark复习题回顾 1.Spark使用的版本 2.Spark几种部署方式? 3.Spark的提交任务的方式? 4.使用Spark-shell的方式也可以交互 ...

  7. 2021年大数据Spark(五十三):Structured Streaming Deduplication

    目录 Streaming Deduplication 介绍 需求 ​​​​​​​代码演示 Streaming Deduplication 介绍 在实时流式应用中,最典型的应用场景:网站UV统计. 1: ...

  8. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  9. 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析

    目录 ​​​​​​​物联网设备数据分析 ​​​​​​​设备监控数据准备 ​​​​​​​创建Topic ​​​​​​​模拟数据 ​​​​​​​SQL风格 ​​​​​​​DSL风格 物联网设备数据分析 在 ...

最新文章

  1. jq修改iframe html代码,jQuery控制iFrame(实例代码)
  2. Nginx中木马解决方法
  3. svm分类代码_当支持向量机遇上神经网络:SVM、GAN距离之间的关系
  4. 在MAC上给Anaconda的python安装tensorflow
  5. 合同相似可逆等价矩阵的关系及性质_笔记:辛矩阵和Siegel上半平面
  6. 控件:DataGridView列类型
  7. debian9 设置
  8. Word Cookbook by Eric
  9. Linux——进程管理简单学习笔记(二)
  10. PPT中均匀分布各图形(水平或垂直)
  11. 190310每日一句
  12. 为域用户创建漫游用户配置文件
  13. QC DCP PD SCP FCP等充电协议
  14. 工厂模式实现多种数据库连接
  15. ASP.Net Core数据加密
  16. linux获取pc指针地址,为什么pc可以看成使程序存储器的地址指针
  17. ES学习系列02-创建索引
  18. linux 创建连接命令 ln -s 软链接
  19. 基于WINDOS系统自带工具IIS配置文件下载服务器
  20. 增值税发票信息ocr扫描识别核验验真sdk接口

热门文章

  1. java 分层处理解耦_后端分层架构如何解耦?
  2. 赚多多V10自动任务网抢单源码+会员自营版+教程
  3. [ Linux ] 可重入函数,volatile 关键字,SIGCHLD信号
  4. 关于直接在网页中插入mp4视频实际应用中经历的那些事儿
  5. 华为防火墙USG5500的配置方法
  6. 写bat文件for循环批量创建文件夹
  7. MATLAB toc使用
  8. #217-[哈希]好人卡
  9. PostgreSQL 15 新特性解读 | 墨天轮优质文章合集
  10. archlinux自定义安装教程(一)