第一章:上次课回顾

第二章:Spark SQL Functions

  • 2.1 简单小应用
  • 2.2 Spark SQL自定义函数

第三章:Catalog

第四章:DataSet

第五章:窗口函数

第六章:穿插Shuffle

第一章:上次课回顾

https://blog.csdn.net/zhikanjiani/article/details/96722109

上次课主要讲了DataFrame,它是一个分布式数据集,把数据转换成一个带名字的column,好比就是一张普通数据库中的表,它底层也做了一些优化;

对比RDD,RDD中看不到具体信息,DataFrame中可以看到具体信息。API一定要掌握。
PK哥公司都是做平台的,用户在界面输入东西而在数据平台底层都是使用API拼接的。
不就是用DataFrame写一个select把那几个字段拼接,用户的操作对于数据平台底层是用API编程。

表示方式最简单的比如竖线"|",需要使用"||"转义出来;外部数据源可以去处理不同文件系统上面不同格式的数据,让我们很方便的把多种数据源的东西弄进来,parquet的时候还有分区探测,还有schema的合并,分区探测的base在哪里;ORC、JDBC外部数据源的源码定义出来,测试把普通文本作用上schema后变成了一个DataFrame。

学长10个问题,只回答出来3个问题?就拿到了17.5k的工作。
小文件的解决方案
外部数据源的实现:谁创建谁,谁干了什么事
资源的分配调度

第二章:Spark SQL Functions

所有东西都在这个类中:functions.scala,是spark sql包下的。

如果真正掌握了Hive,学习Spark SQL,Impala会更清楚哪块没有学习。

如何在IDEA中设置快捷键和eclipse相同:file --> settings --> keymap

Ctrl + 0键,functions中有一个column方法,col方法;

/*** Returns a [[Column]] based on the given column name.** @group normal_funcs* @since 1.3.0*/def col(colName: String): Column = Column(colName)/*** Returns a [[Column]] based on the given column name. Alias of [[col]].** @group normal_funcs* @since 1.3.0*/def column(colName: String): Column = Column(colName)

联想到Hive中subString和subStr,其实是相同的两个东西;

  • 继续查看方法,asc,dsc,avg平均数;问题,collect_list和collect_set的区别,

  • collect_list方法的描述:Aggregate functions:returns a list of objects with duplicate.(返回具有重复项的对象列表)

  • collect_set方法描述:Aggregate functions:returns a set of objects with duplicate elements eliminated. (返回一组消除了重复元素的对象)

countDistinct:UV
broadcast:
coalesce
abs:
floor:地板函数,往下取
datediff:两个时间差
N多函数:

2.1 简单小应用

需求一:

读入在spark-core02中写的LogApp中的片段,日志还是自己造的日志,展示前10条。

package sparksql04import org.apache.spark.sql.SparkSessionobject FunctionsApp {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FunctionApp").getOrCreate()import spark.implicits._val lines = spark.sparkContext.textFile("C:///Users//Administrator//Desktop//a.log")val df = lines.map(x => {val splits = x.split("\t")val length = splits.lengthvar traffic = 0Lif (length == 5) {val domain = splits(0) // index from zero(domain, 1L)} else {("-", 0L)}}).toDF("domain", "times")df.show(10)spark.stop()}}

输出信息如下,已经转换成了一个DF

+---------------+-----+
|         domain|times|
+---------------+-----+
|  24.168.127.82|    1|
| 222.153.127.24|    1|
| 105.77.222.228|    1|
|153.127.118.192|    1|
| 228.32.222.127|    1|
| 127.168.105.77|    1|
|118.105.127.127|    1|
| 127.228.127.82|    1|
| 127.168.105.31|    1|
| 127.24.222.127|    1|
+---------------+-----+
only showing top 10 rows

需求二

.........}).toDF("domain", "times")//分组以后做一个agg,按照times字段做一个聚合函数,取别名是pvdf.groupBy("domain").agg(count("times").as("pv")).select("domain","pv").show(false)spark.stop()}输出:
+---------------+---+
|domain         |pv |
+---------------+---+
|156.127.32.222 |1  |
|127.24.222.127 |1  |
|31.127.24.153  |1  |
|118.105.156.168|1  |
|127.168.105.77 |1  |
|153.31.127.156 |1  |
|32.228.118.156 |1  |
|228.82.24.153  |1  |
|192.82.127.31  |1  |
|153.24.156.228 |1  |
|105.82.192.77  |1  |
|31.127.24.118  |1  |
|10.127.228.168 |1  |
|105.127.118.10 |1  |
|127.168.105.31 |2  |
|127.156.168.31 |1  |
|118.31.127.127 |1  |
|24.127.228.10  |1  |
|192.118.31.127 |1  |
|156.31.105.82  |1  |
+---------------+---+
only showing top 20 rows

自定义函数的编写:

  1. 定义函数
  2. 注册函数
  3. 使用函数

前提:数据准备:sparksql04下new一个file:

名字和爱好之间以tab键分割,求每个人喜欢的人的个数??

老二  17er,jeff,小朋友2滴
血狼  蝶舞,大树,老梁
若老  17er

测试数据能不能读进来:

object FunctionsApp{def main(args: Array[String]) : Unit = {val spark = SparkSession.builder().master("local[2]").appName("FunctionsApp")val info = spark.sparkContext.textFile("file:///G:/ruozedata_workspace/g7-spark/src/main/scala/sparksql04/functions.data")info.foreach(println)spark.stop()}}输出:发现是能够完整打印的
若老  17er
老二  17er,jeff,小朋友2滴
血狼  蝶舞,大树,老梁

开始进行处理数据:

  • 需要使用到的是trim 翻译:为指定的字符串列从两端修剪空格。
package sparksql04import org.apache.spark.sql.SparkSessionobject FunctionsApp {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FunctionsApp").getOrCreate()import spark.implicits._val info = spark.sparkContext.textFile("file:///C:/Users/Administrator/Desktop/testInfo.txt")val df = info.map(_.split("\t")).map(x => Info(x(0).trim, x(1).trim)).toDFdf.show(false)spark.stop()}case class Info(name:String,likes:String)
}输出如下:
+----+-------------------+
|name|likes              |
+----+-------------------+
|老二|17er,jeff,小朋友2滴|
|血狼|蝶舞,大树,老梁     |
|若老|17er               |
+----+-------------------+

坑:在sparksql04下面创建的file,copy file到spark.sparkContext.textFile中去的时候,idea运行的时候运行不出来,还是更换路径为本地路径。

2.2 Spark SQL自定义函数

需求:统计每个人喜欢的人数是几个?

package sparksql04import org.apache.spark.sql.SparkSessionobject FunctionsApp {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FunctionsApp").getOrCreate()import spark.implicits._val info = spark.sparkContext.textFile("file:///C:/Users/Administrator/Desktop/testInfo.txt")val df = info.map(_.split("\t")).map(x => Info(x(0).trim, x(1).trim)).toDFspark.udf.register("likes_num",(str:String) => {str.split(",").size})df.createOrReplaceTempView("info")spark.sql("select name,likes,likes_num(likes) as cnt from info").show(false)spark.stop()}case class Info(name:String,likes:String)
}输出:
+----+-------------------+---+
|name|likes              |cnt|
+----+-------------------+---+
|老二|17er,jeff,小朋友2滴|3  |
|血狼|蝶舞,大树,老梁     |3  |
|若老|17er               |1  |
+----+-------------------+---+

小结: spark.udf.register(“likes_num”,(str:String) => {
str.split(",").size
})
不用SQL,用API的方式给它整出来。

第三章:Catalog

源代码包:catalog.scala 是在spark2.0以后才有的

回顾在Hive中,元数据是在mysql中,我们可以通过外部数据源去查询;
启动spark-shell,在2.0中,不需要使用jdbc的代码去查,可以直接使用catalog来访问。

  1. Catalog interface for Spark. To access this, use SparkSession.catalog.

  2. Returns the current default database in this session.

How to Use??

//select a,b,c from XXX
spark.catalog
写一个SQL语句,这只是一个字符串,一个SQL进来变成抽象语法树,在catalog这一步的时候就能直接判断

Spark shell中测试:1、val catalog = spark.catalog         已经连到meta信息了2、catalog.listDatabases().show       显示元数据中的所有数据库3、catalog.listTables("default").show
使用catalog。listTables("ruoze_d6").show         //同hive-site.xml中的配置信息4、catalog.listTables("default").select("name").show
list返回的就是DataSet5、查看里面所有的函数信息
catalog.listFunctions().show()

第四章:DataSet

SchemaRDD ==> DataFrame ⇒ DataSet(1.6版本出来的)
compile-time type safety further optimization(编译时的类型安全优化)

走一个例子:

数据准备:cd /home/hadoop/data下面准备一份sales.csv的文件:

transformationsId,cunstomerId,itemId,amountId

在IDEA中写代码:

   spark.read.format("csv").option("header","true").option("inferSchema","true").load("file:///home/hadoop/data/sales.csv").show()

重点:

1、val df = spark.read.format(“csv”).option(“header”,“true”).option(“inferSchema”,“true”)
.load(“file:///home/hadoop/data/sales.csv”).show

会出来一份数据。

2、此时定义一个case class:
case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)

DataFrame怎么转换为DataSet?

val ds = spark.read.format(“csv”).option(“header”,“true”).option(“inferSchema”,“true”)
.load(“file:///home/hadoop/data/sales.csv”).as[Sales]

去到sparkshell中测试:测试df的时候打印出结构就是DataFrame:
把ds拷贝进去,

如何看DataFrame和DataSet的区别:

df.select("transactionId").show()
ds.select("transactionId").show()两者的值是一样的,看不出区别:

概述:

  • A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (Strong typing, ability to use powerful lambda functions)

  • 解释:Strong typing(强类型),Datasets明确知道是什么类型,DataFrames不知道

    小结:对于上图:Datasets是第一个反映过来是否语法错误,编译错误的;而SQL是最后反应过来的,会去yarn上申请资源,SQL过去,语法错误,咣当挂了。

第五章:窗口函数

零基础班中:窗口函数:

  • 求得product_id,product_name,product_status,area,click_count,rank,day

在Spark SQL中是如何实现的呢?

原先的操作
两 MySQL ==导入到Sqoop ===> Hive join hive
sqoop是关系型数据库到hadoop之间的导入导出

现在的操作
直接通过:
val cityDF = spark.read.format(“jdbs”).option…load()
val productDF = spark.read.format(“jdbs”).option…load()
val userClickDF = spark.table("")

如何做join:
cityDF join productDF join userClickDF

有一个比较麻烦的事情:
{“product_status”:1} ==> 自定义一个UDF函数把1取出来,怎么定义udf呢?

另一种udf的写法:
val xxxUdf = udf(() => {})

窗口又是怎么回事情呢?

  • row_number().over(Window.partitionBy("").orderBy("")).as("")

  • 最终的结果写到MySQL中去,外部数据源JDBC的写不要用,有问题,自己测试跑两次跑三次

DF/DS的数据输出到DataBase的最佳实践:

那我们应该如何写出去,使用foreach,df.foreachPartition( x =>{
scalike
})

零基础班针对hadoop来上的,高级班针对spark的整套环境学习

引出概念:

幂等性: 先删后加:每跑一次作业都需要把前一次的作业删除

第六章:穿插Shuffle

问题:map的输出非常多;map和reduce的缓冲区都需要加大。

社区提出的解决方案:
spark.shuffle.consolidateFiles = true 这个参数做一些合并操作

一个core上连续执行在图中表示那几个??

好处:在map输出端合并,省去很多磁盘IO.

极端问题:Reduce个数非常多怎么办,也是一样会崩溃的。

进入到spark.apache.org/docs/1.5.1/configuraton.html

找到参数:
spark.shuffle.consolidateFiles,这个参数设置为true可以提升很多性能

后来官方更改为sort了,后期就使用sort了

大数据实战二十四课 - Spark SQL04相关推荐

  1. 客快物流大数据项目(二十四):OGG安装部署

    目录 OGG安装部署 一.配置Oracle11gR2数据库 1.Oracle11gR2打开归档模式 2.Oracle开启辅助日志和补充日志

  2. 2021年大数据HBase(十四):HBase的原理及其相关的工作机制

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的原理及其相关的工作机制 一.HBase的flus ...

  3. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

  4. 【若泽大数据实战第十六天】Hive的安装部署 + 课程一个月总结

    前言: Hadoop的课程已经告一段落,基本上在零基础课程里不会再出现了,接下来的课程由若总来上,第一讲Have. 复习前一个月的课程内容: 1.Linux Linux: 文件(*****) 权限(* ...

  5. 客快物流大数据项目(二十八):大数据服务器环境准备

    目录 大数据服务器环境准备 一.服务器规划 二.Linux虚拟机环境搭建

  6. 【若泽大数据实战第十九天】Hive 函数UDF开发以及永久注册udf函数

    前言: 回顾上期课程,上次课我们讲了聚合函数,多进一出, 分组函数  group by,出现在select里面的字段除了分组函数之外,其他都要出现在group by里面,分组函数的过滤必须使用hivi ...

  7. 客快物流大数据项目(五十四):初始化Spark流式计算程序

    目录 初始化Spark流式计算程序 一.SparkSql参数调优设置 1.设置会话时区

  8. vtk实战(二十四)——读入vtu数据

    主要采用vtkXMLUnstructuredGridReader()类读取非结构化网格结构的vtk XML格式的文件. #include <vtkXMLUnstructuredGridReade ...

  9. 客快物流大数据项目(二十):物流管理系统服务器的数据路径配置和软件下载存放位置

    目录 物流管理系统服务器 一.虚拟机数据路径配置 二.软件下载和存放位置

最新文章

  1. 单目3D物体级SLAM | CubeSLAM: Monocular 3D Object SLAM
  2. php file size,PHP filesize() 函数
  3. 转:在RHEL5系统中搭建iSCSI存储服务器
  4. Windows下socket编程(console非MFC)
  5. 面试问题背后的“猫腻”
  6. 把握今生 不要期待来世
  7. 关于RAID与SCSI的一些基本概念(一)
  8. 测试驱动的项目管理概念文档
  9. 微软在Windows 8之后将放弃Windows品牌
  10. 互补滤波系数_四元数+互补滤波 - osc_5aksh307的个人空间 - OSCHINA - 中文开源技术交流社区...
  11. 人脸服务器如何与门禁系统对接,人脸识别门禁系统功能介绍
  12. 记录一下中移物联网的一面(方向:前端开发)
  13. 【已注册】充QQ业务软件
  14. 【ESAPI】WEB安全ESAPI使用
  15. SAP MM02主数据维护多语言长文本,丢失空格的解决办法
  16. 常说的LDO电路是啥?(简单版)
  17. 2021年十佳优惠券返利APP排名榜 2021年用户喜欢的优惠券返利APP前10排行榜
  18. 非正版Windows用户安装windows media player 11
  19. 联想yoga710风扇声音过大解决方法
  20. 如何将svg转换为xaml

热门文章

  1. Docker概述与基本使用
  2. 简单说说 Servlet
  3. 解决Eclipse打开某个workspace报错:The project description file (.project) for (项目名)
  4. GIONEE A1 金立A1 root 刷机包 GIONEE SWW1609_0201 mt6755
  5. CMD中的用户名和自己账户名不一致
  6. Juniper设备标准配置
  7. 监控系统看这一篇就够了!zabbix、Prometheus等常见监控教程
  8. html高度塌陷问题
  9. oracle授权同义词权限,Oracle授权 同义词
  10. 初学者必会的100个编程代码