学习内容

  • 一、sparkSQL在IDEA的使用
    • 1.环境配置
    • 2.快速入门
  • 二、sparkSQL实战案例
    • 1.数据准备
    • 2.案例分析
    • 3.功能实现
    • 4.代码实现

一、sparkSQL在IDEA的使用

1.环境配置

配置pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>Spark3.0</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency></dependencies><artifactId>spark-core</artifactId></project>

连接hadoop集群中的hive

  1. 首先集群要启动
  2. Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
  3. 将 Mysql 的驱动 copy 到 jars/目录下(mysql-connector-java-5.1.27-bin.jar)
  4. 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  5. 重启集群中 spark-shell,执行语句 spark.sql(“show tables”).show 显示hive的表即可
  6. 把 hive-site.xml放到IDEA项目的target文件夹下
  7. 最后注意windows和虚拟机的IP的映射

2.快速入门

package com.bigdata.SparkSQLimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** @author wangbo* @version 1.0*//*** 环境测试*/
object Spark02_SparkSQL_Hive_demo {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()//使用SparkSQL连接外置的Hive//首先 集群要启动//1.拷贝Hive-size.xml文件到classpath下//2.启用hive的支持//3.增加对应的依赖关系(包含mysql的驱动)sparkSession.sql("show tables").show()// TODO 关闭环境sparkSession.close()}
}

如果报错类似这种:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=anonymous, access=EXECUTE, inode=“/tmp”:root:supergroup:drwx------

解决方法:

这种情况说明你hive中的数据库权限不够,直接将hdfs中存放该数据库的文件的权限修改即可 如:[root@hadoop100 ~]# hadoop dfs -chmod 777 /user/hive/warehouse/spark_demo.db

二、sparkSQL实战案例

1.数据准备

数据文件:

链接:https://pan.baidu.com/s/1t9hxa3dXF9gNRZJtxosWtQ
提取码:x523

package com.bigdata.SparkSQLimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession/*** @author wangbo* @version 1.0*//*** 首先在hive中创建数据库,在hdfs中把hive创建的数据库文件,给上权限 如:hadoop dfs -chmod 777 /user/hive/warehouse/spark_demo.db* 数据的准备:进入数据库,创建表,导入数据*/
object Spark02_SparkSQL_Hive_demo1 {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()//进入数据库sparkSession.sql("use spark_demo")//TODO 准备数据, 创建表//用户信息表sparkSession.sql("""|CREATE TABLE `user_visit_action`(| `date` string,| `user_id` bigint,| `session_id` string,| `page_id` bigint,| `action_time` string,| `search_keyword` string,| `click_category_id` bigint,| `click_product_id` bigint,| `order_category_ids` string,| `order_product_ids` string,| `pay_category_ids` string,| `pay_product_ids` string,| `city_id` bigint)|row format delimited fields terminated by '\t'""".stripMargin)sparkSession.sql("""|load data local inpath 'datas/user_visit_action1.txt' into table spark_demo.user_visit_action|""".stripMargin)//商品信息表sparkSession.sql("""|CREATE TABLE `product_info`(| `product_id` bigint,| `product_name` string,| `extend_info` string)|row format delimited fields terminated by '\t'|""".stripMargin)sparkSession.sql("""|load data local inpath 'datas/product_info.txt' into table spark_demo.product_info|""".stripMargin)//城市信息表sparkSession.sql("""|CREATE TABLE `city_info`(| `city_id` bigint,| `city_name` string,| `area` string)|row format delimited fields terminated by '\t'|""".stripMargin)sparkSession.sql("""|load data local inpath 'datas/city_info.txt' into table spark_demo.city_info|""".stripMargin)sparkSession.sql("show tables").show()// TODO 关闭环境sparkSession.close()}
}

2.案例分析

  1. 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与Product_info 表连接得到产品名称
  2. 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数
  3. 每个地区内按照点击次数降序排列
  4. 只取前三名
  5. 城市备注需要自定义 UDAF 函数

3.功能实现

  1. 连接三张表的数据,获取完整的数据(只有点击)
  2. 将数据根据地区,商品名称分组
  3. 统计商品点击次数总和,取 Top3
  4. 实现自定义聚合函数显示备注

4.代码实现

package com.bigdata.SparkSQLimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregatorimport scala.collection.mutable
import scala.collection.mutable.ListBuffer/*** @author wangbo* @version 1.0*//*** 进行表的查询*/
object Spark02_SparkSQL_Hive_demo2 {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()//进入数据库sparkSession.sql("use spark_demo")//查询基本数据sparkSession.sql("""| select|      a.*,|       p.product_name,|        c.area,|        c.city_name| from user_visit_action a| join product_info p on a.click_product_id = p.product_id| join city_info c on a.city_id = c.city_id| where a.click_product_id > -1|""".stripMargin).createOrReplaceTempView("t1") //把上面的查询结果,放在一个临时表他t1中//根据区域,商品进行数据聚合sparkSession.udf.register("cityRemark",functions.udaf(new cityRemarkUDAF()))sparkSession.sql("""| select|        area,|      product_name,|      count(*) as clickCnt,|   cityRemark(city_name) as city_remark|  from t1 group by area,product_name|""".stripMargin).createOrReplaceTempView("t2")//区域内对点击数量进行排序sparkSession.sql("""| select|        *,|     rank() over(partition by area order by clickCnt desc) as rank| from t2|""".stripMargin).createOrReplaceTempView("t3")//取前三名sparkSession.sql("""| select|      *| from t3 where rank <=3|""".stripMargin).show(false) //这里的false为显示完整的字段名,如果不写,字段过长会被省略// TODO 关闭环境sparkSession.close()}/*自定义聚合函数:实现城市备注功能1.定义自定义类继承org.apache.spark.sql.expressions.Aggregator定义泛型IN:输入的数据类型:城市的名称BUF:缓冲区的数据类型(使用了样例类):【总点击数量,Map[ (city,cnt),(city,cnt) ]】OUT:输出的数据类型:备注信息2.重写方法*/case class Buffer(var total:Long,var cityMap:mutable.Map[String,Long])class cityRemarkUDAF extends Aggregator[String,Buffer,String]{//初始值,缓冲区初始化override def zero: Buffer = {Buffer(0,mutable.Map[String,Long]())}//根据输入的数据更新缓冲区的数据override def reduce(buff: Buffer, city: String): Buffer = {buff.total += 1val newCount = buff.cityMap.getOrElse(city,0L) + 1 //获取cityMap的value,如果能取到就+1,取不到赋值为0+1buff.cityMap.update(city,newCount)  //更新缓冲区buff}//合并缓冲区的数据override def merge(buff1: Buffer, buff2: Buffer): Buffer = {buff1.total += buff2.total  //将点击量合并val map1: mutable.Map[String, Long] = buff1.cityMapval map2: mutable.Map[String, Long] = buff2.cityMap//方式一:两个map合并操作
//      buff1.cityMap = map1.foldLeft(map2) {//        case (map, (city, count)) => { //key:city,value:count
//          val newCount = map.getOrElse(city, 0L) + count
//          map.update(city, newCount)
//          map
//        }
//      }
//      buff1//方式二:两个map合并操作map2.foreach{case (city , count) => {val newCount = map1.getOrElse(city,0L) + countmap1.update(city, newCount)}}buff1.cityMap = map1buff1}//将统计的结构生成字符串信息override def finish(buff: Buffer): String = {val remarkList: ListBuffer[String] = ListBuffer[String]()val totalCount: Long = buff.total //城市的总数量val cityMap: mutable.Map[String, Long] = buff.cityMap//数据进行降序排列,去前两个val cityCountList: List[(String, Long)] = cityMap.toList.sortWith( //因为List可以排序(left, right) => { //cityMap1 和 cityMap2 两个map进行比较left._2 > right._2}).take(2)//判断城市是否大于2val bool: Boolean = cityMap.size > 2var rsum = 0LcityCountList.foreach{case (city,count) => {  //city城市名称,count城市数量val r = count * 100 / totalCount  //求出商品在主要城市的比例 乘100是为了取整remarkList.append(s"${city} ${r}%")rsum += r}}if (bool){remarkList.append(s"其他 ${100-rsum}")}remarkList.mkString(",")}//缓冲区的编码操作,自定义的类就写Encoders.product,如果是scala存在的类,如Long 就写Encoders.scalaLongoverride def bufferEncoder: Encoder[Buffer] = Encoders.product//输出的编码操作,自定义的类就写Encoders.product,如果是scala存在的类,如Long 就写Encoders.scalaLongoverride def outputEncoder: Encoder[String] = Encoders.STRING}
}

参考:尚硅谷spark3.0教学

spark—SQL实战案例相关推荐

  1. Hive/Spark SQL使用案例

    Hive/Spark SQL使用案例 求 TOPN:开窗函数 求天数:datediff() 函数 求每个学生的成绩都大于...系列:开窗 / 分组 表转置/行转列系列一:concat_ws 函数 表转 ...

  2. flink sql实战案例

    目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...

  3. Spark Streaming 实战案例(一)

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-opera ...

  4. Spark SQL实战(08)-整合Hive

    1 整合原理及使用 Apache Spark 是一个快速.可扩展的分布式计算引擎,而 Hive 则是一个数据仓库工具,它提供了数据存储和查询功能.在 Spark 中使用 Hive 可以提高数据处理和查 ...

  5. Spark SQL实战

    一.程序 1 package sparklearning 2 3 import org.apache.log4j.Logger 4 import org.apache.spark.SparkConf ...

  6. Spark SQL之案例实战(四)

    1. 获取数据 本文通过将github上的Spark项目Git日志作为数据,对SparkSQL的内容进行详细介绍 数据获取命令如下: [root@master spark]# git log --pr ...

  7. Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

    主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...

  8. Spark Streaming 实战案例(五) Spark Streaming与Kafka

    主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...

  9. Spark Streaming 实战案例(二) Transformation操作

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...

最新文章

  1. Python:模拟登录、点击和执行 JavaScript 语句案例
  2. [转]Asp.NET生成静态页面并分页
  3. 越过网络层看威胁:为什么全攻击界面才是最重要的
  4. 1134. Vertex Cover (25)
  5. JavaScript 逆向 ( 一 ) --- JavaScript 语法基础
  6. node path html模块,深入理解node.js之path模块
  7. Django 配置出错模板
  8. 软件设计师的成长之路
  9. 穿了好几个马甲,差点没认出来是二分查找
  10. 中标麒麟系统u盘安装_中标麒麟u盘安装系统教程
  11. ADS EM MODEL 问题
  12. ISO 3166-1 国家编码
  13. python桌面快捷方式不见了怎么办_桌面快捷方式不见了怎么办?桌面快捷方式不见了解决方法...
  14. 2021蓝桥杯预选赛题解
  15. 2021Unity教程:Unity官方中文版免费下载方法(黑皮肤可选)无需破解!
  16. Python入门学习笔记17(sqlalchemyd的使用)
  17. ✿2021NEWCTF6.1萌新赛✿MISC-all-WP
  18. 论文《基于结构光和双目视觉的三维重建系统研究》摘要
  19. php 漫画小偷程序,php简单小偷程序
  20. ZOJ 3717 二分+2-sat判定。

热门文章

  1. 【牛客网C++服务器项目学习】-Day09-网络模型个人总结
  2. IP TCP分组内容的解析
  3. “芯片荒”危机致汽车销量持续下滑,加快推进智慧供应链深化应用迫在眉睫
  4. ADSL技术原理介绍(看看哪款ADSL虚拟拨号软件适合你)
  5. 小程序端JS加密,传输PHP端解密--微信小程序联盟
  6. 计算机弹奏月亮之上,二胡独奏曲谱牧歌_速求草原牧歌二胡曲谱
  7. 激光安全和光生物安全的深入对比
  8. 一年对于程序员来说有多长?
  9. 计算机网络osi参考模型分为几层,计算机网络的七层OSI参考模型
  10. css 居中代码 加粗代码,css居中代码是什么