SparkSQL

SparkSQL概述

Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块。

SparkSQL编程

DataFrame编程

sparkSession

Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL其实可以理解为对Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark框架会自动的创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样

DataFrame

Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation操作也有action操作。

创建DataFrame

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

1)从Spark数据源进行创建
# 查看Spark支持创建文件的数据源格式
scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile# 在spark的bin/data目录(windows环境)中创建user.json文件
{"username":"zhangsan","age":20}# 读取json文件创建DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换# 展示结果:show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
2)从RDD进行转换
在后续章节中讨论
3)从Hive Table进行查询返回

SQL语法

SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

1)读取JSON文件创建DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]2)对DataFrame创建一个临时表(视图)
scala> df.createOrReplaceTempView("people")3)通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]4)结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|     lisi|
| 40|   wangwu|
+---+--------+
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people5)对于DataFrame创建一个全局表
scala> df.createGlobalTempView("people")6)通过SQL语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|     lisi|
| 40|   wangwu|
+---+--------+scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|     lisi|
| 40|   wangwu|
+---+--------+

DSL语法

DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

1)创建一个DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]2)查看DataFrame的Schema信息
scala> df.printSchema
root|-- age: Long (nullable = true)|-- username: string (nullable = true)3)只查看"username"列数据,
scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
|     lisi|
|   wangwu|
+--------+4)查看"username"列数据以及"age+1"数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> df.select($"username",$"age" + 1).show
scala> df.select('username, 'age + 1).show()
scala> df.select('username, 'age + 1 as "newage").show()+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|        21|
|     lisi|        31|
|  wangwu|         41|
+--------+---------+5)查看"age"大于"30"的数据
scala> df.filter($"age">30).show
+---+---------+
|age| username|
+---+---------+
| 40|    wangwu|
+---+---------+6)按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 20|    1|
| 30|    1|
| 40|    1|
+---+-----+

DataSet编程

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

创建DataSet

1)使用样例类序列创建DataSet
scala> case class Person(name: String, age: Long)
defined class Personscala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]scala> caseClassDS.show
+---------+---+
|     name|age|
+---------+---+
| zhangsan|  2|
+---------+---+
2)使用基本类型的序列创建DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。

scala> case class User(name:String, age:Int)
defined class Userscala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

DataSet转换为RDD

DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD

scala> case class User(name:String, age:Int)
defined class Userscala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

DataFrame和 DataSet转换

DataFrame其实是DataSet的特例,所以它们之间是可以互相转换的。

# DataFrame转换为DataSet   as[T]
scala> case class User(name:String, age:Int)
defined class Userscala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]# DataSet转换为DataFrame  toDF
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

IDEA开发SparkSQL

基本使用

package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}object SparkSQL02 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")//SparkSession//Scala中构造方法前端加上private关键字,表示构造方法私有//scala中构造方法分为主构造方法与辅助构造方法//只有主构造方法才能完成类的初始化,辅助构造方法必须显示或者间接调用主构造方法//SparkSession无法直接构建对象,需要通过Builder构建  val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()//简单操作//注意:spark读取json文件中的方式是按行读取,每一行都是jsonval frame: DataFrame = spark.read.json("data/user.json")//查询frame.show//将DF转为试图frame.createTempView("user")//采用sql查询spark.sql("select avg(age) from user").show//采用DSL语法frame.select("username").show//使用$引用// 进行操作时,必须使用隐式转换,导入对象的内容,必须使用var修饰import spark.implicits._frame.select($"username",$"age" + 1).showspark.stop()}}

rdd,df,ds相互转换

package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object SparkSQL03 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()import spark.implicits._val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 20),(1, "list", 20),(1, "wangwu", 20)))// RDD => DataFrame   增加结构// DataFrame就是一个DataSet[Row],但是类型时固定的: Rowval df: DataFrame = rdd.toDF("id","name","age")// DataFrame => DataSet  增加类型val ds: Dataset[User] = df.as[User]//DataSet => DataFrameval frame: DataFrame = ds.toDF()//DataFrame => rddval newRdd: RDD[Row] = frame.rdd// RDD => DataSet 增加结构与类型val newDS: Dataset[User] = rdd.map {case (id, name, age) => {User(id, name, age)}}.toDS()// DataSet => RDDval rdd1: RDD[User] = newDS.rddspark.stop()}case class User(id:Int,name:String,age:Int)}

用户自定义函数

用户可以通过spark.udf功能添加自定义函数,实现自定义功能。

UDF

package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 自定义函数*/
object SparkSQL04_UDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()import spark.implicits._val frame: DataFrame = spark.read.json("data/user.json")frame.createTempView("user")//自定义函数//可以将udf函数理解为map功能函数spark.udf.register("PrefixNmae",(name:String)=>{"用户姓名: "+name})spark.sql("select PrefixNmae(username) from user").show}
}

用户自定义聚合函数

package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}/*** 自定义聚合函数*/
object SparkSQL05_UDAF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()import spark.implicits._val frame: DataFrame = spark.read.json("data/user.json")frame.createTempView("user")//UDF不需要聚合数据,所以轮询的每一条数据处理完毕就结束了//UDAF需要聚合数据,所以轮询时,需要将数据处理的状态保存起来,保存在缓冲区//sql:弱类型操作,DataFrame//Aggregator:强类型操作 DataSetspark.udf.register("ageAvg",functions.udaf(new AgeAvgUDAF))spark.sql("select ageAvg(age) from user").showspark.stop()}//1.需要继承Aggregator// IN: 聚合函数的输入// BUFF: 聚合函数中用于计算的缓冲区类型// OUT: 聚合函数的输出//2.重写方法//逻辑(4)//编码(2)case class AvgBuff(var total:Long, var count:Long)class AgeAvgUDAF extends Aggregator[Long,AvgBuff,Long]{//返回初始化的缓冲区对象override def zero: AvgBuff = {AvgBuff(0,0)}//聚合:用当前的数据更新缓冲区override def reduce(buff: AvgBuff, age: Long): AvgBuff = {buff.total = buff.total + agebuff.count = buff.count + 1buff}//合并:分布式,多个缓冲区数据合并override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = {b1.total = b1.total + b2.totalb1.count = b1.count + b2.countb1}//完成,计算聚合函数的值override def finish(buff: AvgBuff): Long = {buff.total / buff.count}override def bufferEncoder: Encoder[AvgBuff] = Encoders.productoverride def outputEncoder: Encoder[Long] = Encoders.scalaLong}}

SparkSQL数据的加载与保存

通用的加载和保存方式

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parquet

加载数据

# park.read.load 是加载数据的通用方法
scala> spark.read.csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile# 如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")- format("…"):指定加载的数据类型,包"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
- load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
- option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable# 我们前面都是使用read API 先把文件加载到 DataFrame然后再查询,其实,我们也可以直接在文件上进行查询:  文件格式.`文件路径`scala>spark.sql("select * from json.`/opt/module/data/user.json`").show

保存数据

# df.write.save 是保存数据的通用方法
scala>df.write.
csv  jdbc   json  orc   parquet textFile… …# 如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用mode()方法来设置。
有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
SaveMode是一个枚举类,其中的常量包括:
Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
SaveMode.Append “append” 如果文件已经存在则追加
SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
SaveMode.Ignore “ignore” 如果文件已经存在则忽略

Parqut

Spark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。

数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。

Json

Spark SQL 能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载JSON 文件。

注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。格式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]1)导入隐式转换
import spark.implicits._2)加载JSON文件
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path) # 可以直接展示出来3)创建临时表
peopleDF.createOrReplaceTempView("people")4)数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
|  name|
+------+
|Justin|
+------+

CSV

Spark SQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列

spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")

MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。

读数据

spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/gmall").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "123123").option("dbtable", "sku_info").load().show

写数据

//ds就是dataSet,已经存在了数据
ds.write.format("jdbc").option("url","jdbc:mysql://hadoop102:3306/gmall").option("user":"root").option("password":"123123").option("dbtable":"user").mode("append") // 指定模式.save

Hive

Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

内置hive

一般用于测试

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
。。。
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+scala> spark.sql("create table aa(id int)")。。。scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|       aa|      false|
+--------+---------+-----------+
向表加载本地数据
scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")。。。scala> spark.sql("select * from aa").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+
在实际使用中, 几乎没有任何人会使用内置的 Hive

外置hive

如果想连接外部已经部署好的Hive,需要通过以下几个步骤:

Ø Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下

Ø 把Mysql的驱动copy到jars/目录下

Ø 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下

Ø 重启spark-shell

scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|                 emp|      false|
| default|hive_hbase_emp_table|      false|
| default| relevance_hbase_emp|      false|
+--------+--------------------+-----------+scala>

IDEA连接hive

添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version>
</dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>

代码示例

package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject SparkSQL06_HIVE {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()import spark.implicits._//TODO idea开发HIVE操作,需要遵循以下几个步骤//添加依赖关系//在创建环境对象时,启用Hive知此恨//将hive的配置文件放置在classpath中spark.sql("show tables").showspark.stop()}
}

spark-sql/spark-beeline

spark-sql

Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口

# 可以直接写sql了
bin/spark-sql

spark-beeline

Spark Thrift Server是Spark社区基于HiveServer2实现的一个Thrift服务。旨在无缝兼容HiveServer2。因为Spark Thrift Server的接口和协议都和HiveServer2完全一致,因此我们部署好Spark Thrift Server后,可以直接使用hive的beeline访问Spark Thrift Server执行相关语句。Spark Thrift Server的目的也只是取代HiveServer2,因此它依旧可以和Hive Metastore进行交互,获取到hive的元数据。

# 如果想连接Thrift Server,需要通过以下几个步骤:
- Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
- 把Mysql的驱动copy到jars/目录下
- 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
- 启动Thrift Server
sbin/start-thriftserver.sh# 使用beeline连接Thrift Server
bin/beeline -u jdbc:hive2://hadoop102:10000 -n root

SparkSQL项目实战

数据准备

我们这次 Spark-sql 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,,并导入数据。

一共有3张表: 1张用户行为表,1张城市表,1 张产品表

CREATE TABLE `user_visit_action`(`date` string,  `user_id` bigint,`session_id` string,`page_id` bigint,`action_time` string,`search_keyword` string,`click_category_id` bigint,`click_product_id` bigint,`order_category_ids` string,`order_product_ids` string,`pay_category_ids` string,`pay_product_ids` string,`city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath 'input/user_visit_action.txt' into table user_visit_action;CREATE TABLE `product_info`(`product_id` bigint,`product_name` string,`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/product_info.txt' into table product_info;CREATE TABLE `city_info`(`city_id` bigint,`city_name` string,`area` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/city_info.txt' into table city_info;
// 在IDEA中创建数据,其中sprakSqlReq数据库已在hive的命令行窗口中创建完毕
package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject SparkSQL07_Req {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "atguigu") //放在代码最前面val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()import spark.implicits._//生成模拟数据spark.sql("use sprakSqlReq")//JSON SQL一般采用多行字符串spark.sql("""|CREATE TABLE `user_visit_action`(|  `date` string,|  `user_id` bigint,|  `session_id` string,|  `page_id` bigint,|  `action_time` string,|  `search_keyword` string,|  `click_category_id` bigint,|  `click_product_id` bigint,|  `order_category_ids` string,|  `order_product_ids` string,|  `pay_category_ids` string,|  `pay_product_ids` string,|  `city_id` bigint)|row format delimited fields terminated by '\t'|""".stripMargin)spark.sql("""|load data local inpath 'data/user_visit_action.txt' into table sprakSqlReq.user_visit_action|""".stripMargin);spark.sql("""|CREATE TABLE `product_info`(|  `product_id` bigint,|  `product_name` string,|  `extend_info` string)|row format delimited fields terminated by '\t'|""".stripMargin)spark.sql("""|load data local inpath 'data/product_info.txt' into table sprakSqlReq.product_info|""".stripMargin);spark.sql("""|CREATE TABLE `city_info`(|  `city_id` bigint,|  `city_name` string,|  `area` string)|row format delimited fields terminated by '\t'|""".stripMargin)spark.sql("""|load data local inpath 'data/city_info.txt' into table sprakSqlReq.city_info|""".stripMargin);spark.sql("""|select * from city_info|""".stripMargin).show(20) //只显示20行}}

需求一,各区域热门商品 Top3

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

地区 商品名称 点击次数 城市备注
华北 商品A 100000 北京21.2%,天津13.2%,其他65.6%
华北 商品P 80200 北京63.0%,太原10%,其他27.0%
华北 商品M 40000 北京63.0%,太原10%,其他27.0%
东北 商品J 92000 大连28%,辽宁17.0%,其他 55.0%
SELECT*
FROM(SELECT*,rank() over ( PARTITION BY area ORDER BY clickCount DESC ) AS rank FROM(SELECTarea,product_name,count(*) AS clickCount FROM(SELECTa.*,c.area,p.product_name FROMuser_visit_action aJOIN city_info c ON a.city_id = c.city_idJOIN product_info p ON a.click_product_id = p.product_id WHEREa.click_product_id != - 1 ) t1 GROUP BYarea,product_name ) t2 ) t3
WHERErank <= 3

ideal代码

package com.pihao.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregatorimport scala.collection.mutable
import scala.collection.mutable.ListBufferobject SparkSQL08_Req {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "atguigu") //放在代码最前面val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()import spark.implicits._spark.sql("use sprakSqlReq")spark.sql("""|SELECT|             a.*,|               c.area,|       c.city_name,|                p.product_name|         FROM|               user_visit_action a|                JOIN city_info c ON a.city_id = c.city_id|             JOIN product_info p ON a.click_product_id = p.product_id|          WHERE|              a.click_product_id != -1|""".stripMargin).createOrReplaceTempView("t1")//聚合函数:同一个区域中多个城市名称 => 城市备注spark.udf.register("cityRemark",functions.udaf(new CityRemarkUDAF))spark.sql("""|SELECT|           area,|          product_name,|          count(*) AS clickCount,|     cityRemark(city_name) as cityRemark|       FROM|            t1|        GROUP BY|           area,|          product_name|""".stripMargin).createOrReplaceTempView("t2")//    spark.sql("select * from t2").show()spark.sql("""|SELECT|        *,|     rank() over ( PARTITION BY area ORDER BY clickCount DESC ) AS rank| FROM|        t2|""".stripMargin).createOrReplaceTempView("t3")spark.sql("""|SELECT| *|FROM|  t3|WHERE|  rank <= 3|""".stripMargin).show()spark.stop()}case class CityBuffer(var total:Long,var cityMap:mutable.Map[String,Long])/*** 1.城市备注聚合函数*  IN:String*  BUF: CityBuffer*  OUT:String* 2.重写方法*  6*/class CityRemarkUDAF extends Aggregator[String,CityBuffer,String]{//初始化override def zero: CityBuffer = {CityBuffer(0L,mutable.Map[String,Long]())}//聚合override def reduce(buff: CityBuffer, cityName: String): CityBuffer = {buff.total += 1val newVal = buff.cityMap.getOrElse(cityName,0L) +1buff.cityMap.update(cityName,newVal)buff}//缓冲区的合并override def merge(b1: CityBuffer, b2: CityBuffer): CityBuffer = {b1.total += b2.totalb2.cityMap.foreach {case (cityName, cnt) => {val newCount = b1.cityMap.getOrElse(cityName,0L) + cntb1.cityMap.update(cityName,newCount)}}b1}//怎么生成remarkoverride def finish(buff: CityBuffer): String = {val cityClickList: ListBuffer[String] = ListBuffer[String]()val cityDataList: List[(String, Long)] = buff.cityMap.toList.sortBy(_._2)(Ordering.Long.reverse).take(2)val totalCnt: Long = buff.totalval hasMoreCity =  buff.cityMap.size > 2var p = 100L //总的比率 100%cityDataList.foreach{case (city,cnt) => {var r = cnt * 100/totalCnt //单个城市的点击占比if (hasMoreCity){p = p -r}val s = city + " " + r + "%"cityClickList.append(s)}}if (hasMoreCity){cityClickList.append("其他 "+ p + "%")}cityClickList.mkString(",")}override def bufferEncoder: Encoder[CityBuffer] = Encoders.productoverride def outputEncoder: Encoder[String] = Encoders.STRING}
}//效果
+----+------------+----------+---------------------------+----+
|area|product_name|clickCount|                 cityRemark|rank|
+----+------------+----------+---------------------------+----+
|华南|     商品_23|       224| 厦门 29%,福州 24%,其他 47%|   1|
|华南|     商品_65|       222| 深圳 27%,厦门 26%,其他 47%|   2|
|华南|     商品_50|       212| 福州 27%,深圳 25%,其他 48%|   3|
|华北|     商品_42|       264| 郑州 25%,保定 25%,其他 50%|   1|
|华北|     商品_99|       264| 北京 24%,郑州 23%,其他 53%|   1|
|华北|     商品_19|       260| 郑州 23%,保定 20%,其他 57%|   3|
+----+------------+----------+---------------------------+----+

发现框架的最后都变成写SQL了

spark学习之SparkSQL相关推荐

  1. spark学习 Java版SparkSQL程序读取Hbase表注册成表SQL查询

    参考: spark学习-SparkSQL–11-scala版写的SparkSQL程序读取Hbase表注册成表SQL查询 http://blog.csdn.net/qq_21383435/article ...

  2. spark学习-Spark的Core理解

    1.为什么理解它? 有一次我要跑一个任务,spark-submit提交的任务,但是它总是处于ACCEPED等待接受的状态,以前遇到这个问题,这个是内存不够引起的 Spark学习-SparkSQL–05 ...

  3. Spark学习痛点和路线图

    Spark学习的痛点 对初学者(特别是自学者)学习来说,Spark学习有以下两大痛点. 1.头绪太多,不知道从哪学 从Spark的技术栈可以看到,涉及的技术从操作系统到外部组件.Spark框架.交互工 ...

  4. spark 学习笔记

    spark 学习笔记 spark介绍 Spark是是一种快速通用的集群计算系统,它的主要特点是能够在内存中进行计算.它包含了 spark 核心组件 spark-core,用于 SQL 和结构化处理数据 ...

  5. Apache Spark学习:利用Eclipse构建Spark集成开发环境

    介绍了如何使用Maven编译生成可直接运行在Hadoop 2.2.0上的Spark jar包,而本文则在此基础上, 介绍如何利用Eclipse构建Spark集成开发环境 . 不建议大家使用eclips ...

  6. Apache Spark学习:利用Scala语言开发Spark应用程序

    Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Ja ...

  7. Spark学习之Spark调优与调试(7)

    Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...

  8. 用Spark学习FP Tree算法和PrefixSpan算法

    在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-l ...

  9. Spark学习(一) -- Spark安装及简介

    标签(空格分隔): Spark 学习中的知识点:函数式编程.泛型编程.面向对象.并行编程. 任何工具的产生都会涉及这几个问题: 现实问题是什么? 理论模型的提出. 工程实现. 思考: 数据规模达到一台 ...

  10. spark学习-58-Spark的EventLoggingListener

    1.本次调试查看源代码采用 spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据http://blog.csdn.net/qq_21383435/article/deta ...

最新文章

  1. Blender创作你自己的动画短片学习教程
  2. RecyclerView ItemTouchHelper
  3. 为别人做嫁衣——代理模式
  4. win7上修改MySQL数据库密码
  5. Linux如何清除系统密码,如何消除LINUX系统密码
  6. .net中如何同步获取数据方式增加一样数据自动刷新列表_知客CRM如何对接微信公众号...
  7. 吝啬的国度 ---用vector 来构图
  8. .NET多线程编程(2)——Thread类
  9. idea 页面改了 网页没_如何做出高大上的PPT?试试美得令人窒息的网页风格!
  10. 03.09 随手记(Mock数据生成器,Easy Mock基本使用)
  11. 从DWG导入SKP后的封面问题
  12. 《念奴娇·赤壁怀古》古词鉴赏
  13. 小程序 翻转, 左侧滑入, 缩小变大消失等等特效
  14. win10c盘扩容_【网赚教程】2020最新百度网盘扩容技术,适合做虚拟资源项目和业务!...
  15. Deepin20.4系统中wine优化设置
  16. 《蛙》杂记------莫言
  17. FPGA 之 时序分析
  18. php后台跨域token,JSON Web Token(JWT)目前最流行的跨域身份验证解决方案(PHP)类...
  19. 物联网平台的结构组成有哪些
  20. 闭关修炼30天,“啃透”这658页PDF,成功定级阿里P7

热门文章

  1. java cobar_Cobar源码解析(二)
  2. mmsi是代表船舶什么_船舶常见的一些缩写
  3. java使用xmlWorkerHelper将html转pdf
  4. 汽车用众大牌大屏幕导航仪凯立德地图升级方法
  5. Unity发布游戏在iOS设备上出现的字体问题
  6. 如何搭建点燃式发动机仿真模型
  7. JAVA 获取文件指纹
  8. 开启Apache一直出现443端口被占用
  9. Python 处理视频文件
  10. 塑料颗粒行业调研报告 - 市场现状分析与发展前景预测