一、Spark生态简介

官网: http://spark.apache.org/

  Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于丰巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。

  我们将会学习Spark生态中的3部分内容:SparkCore,Spark SQL,Spark Streaming. 机器学习和图计算的子项目不涉及。

 下面会学习第2部分的内容:Spark SQL.

二、Spark SQL项目

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

为什么要学习Spark SQL?

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。所以我们类比的理解:Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD。都是一种解析传统SQL到大数据运算模型的引擎,属于数据分析的范围。

三、什么是DataFrame和DataSet?

首先,最简单的理解我们可以认为DataFrame就是Spark中的数据表(类比传统数据库),DataFrame的结构如下:

DataFrame(表)= Schema(表结构) + Data(表数据)
(*)就是表,是Spark SQL对结构化数据的抽象
(*)DataFrame表现形式就是:RDD

总结:DataFrame(表)是Spark SQL对结构化数据的抽象。可以将DataFrame看做RDD。

>DataFrame

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,

例如:

  • >结构化数据文件(JSON)
  • >hive中的表
  • >外部数据库或现有RDDs

DataFrame API支持的语言有Scala,Java,Python和R。

从上图可以看出,DataFrame相比RDD多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

>DataSet

Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。

四、测试数据

我们仍然使用2个之前用过的csv文件作为部分测试数据:

dept.csv信息:

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

emp.csv信息:

7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

将这2个csv文件put到HDFS的hdfs://bigdata111:9000/input/csvFiles/目录以便后面使用

[root@bigdata111 ~]# hdfs dfs -ls /input/csvFiles
Found 2 items
-rw-r--r--   1 root supergroup         84 2018-06-15 13:40 /input/csvFiles/dept.csv
-rw-r--r--   1 root supergroup        617 2018-06-15 13:40 /input/csvFiles/emp.csv

五、创建DataFrame

前提:在集群模式下启动spark-shell:bin/spark-shell --master spark://bigdata111:7077

方式1:使用case class定义表

(1) 定义case class代表表的结构schema
scala>case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)(2) 导入emp.csv文件(导入数据)
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//读取Linux本地数据或者
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/csvFiles/emp.csv").map(_.split(","))//读取HDFS数据

(3) 生成表: DataFrame
scala>val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4)由allEmp直接生成表
scala>val empDF = allEmp.toDF(4) 操作: DSL语句
scala>empDF.show         ---->  select * from emp
scala>empDF.printSchema  ---->  desc emp

操作结果:

方式2:使用SparkSession对象创建DataFrame

什么是SparkSession?

Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。

通过SparkSession可以访问Spark所有的模块!

使用Sparksession创建DataFrame过程:

(2)加载结构化数据
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//读取Linux数据
或者
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/emp.csv").map(_.split(","))//读取HDFS数据

(3) 定义schema:StructType
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType), StructField("ename", DataTypes.StringType),StructField("job", DataTypes.StringType),StructField("mgr", DataTypes.StringType),StructField("hiredate", DataTypes.StringType),StructField("sal", DataTypes.IntegerType),StructField("comm", DataTypes.StringType),StructField("deptno", DataTypes.IntegerType)))(4)把读入的每一行数据映射成一个个Row
scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))(5) 使用SparkSession.createDataFrame创建表
scala>val df = spark.createDataFrame(rowRDD,myschema)
可以看到df支持的函数很多,其实就是RDD的算子。这里也可以看出DF很像一个RDD。

方式3:直接读取格式化的文件(json,csv)等-最简单

前提:数据文件本身一定具有格式,这里我们选取json格式的数据,json文件可以使用spark例子中提供的people.json。你也可以使用任意json文件进行操作。测试数据如下:
[root@bigdata111 resources]# pwd
/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
[root@bigdata111 resources]# ls
full_user.avsc  kv1.txt  people.json  people.txt  user.avsc  users.avro  users.parquet
[root@bigdata111 resources]# more people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}使用SparkSession对象直接读取Json文件
spark>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")创建完毕DF之后就可以直接查看表的信息,十分的简单:

六、操作DataFrame(DSL+SQL)

DataFrame操作也称为无类型的Dataset操作.操作的DataFrame是方法1创建的empDF.

>1.DSL操作DataFrame

1.查看所有的员工信息===selec * from empDF;
scala>empDF.show
2.查询所有的员工姓名 ($符号添加不添加功能一样)===select ename,deptno from empDF;
scala>empDF.select("ename","deptno").show
scala>empDF.select($"ename",$"deptno").show
3.查询所有的员工姓名和薪水,并给薪水加100块钱===select ename,sal,sal+100 from empDF;
scala>empDF.select($"ename",$"sal",$"sal"+100).show
4.查询工资大于2000的员工===select * from empDF where sal>2000;
scala>empDF.filter($"sal" > 2000).show
5.分组===select deptno,count(*) from empDF group by deptno;
scala>empDF.groupBy($"deptno").count.show
scala>empDF.groupBy($"deptno").avg().show
scala>empDF.groupBy($"deptno").max().show

>2.SQL操作DataFrame

(1)前提条件:需要把DataFrame注册成是一个Table或者View
scala>empDF.createOrReplaceTempView("emp")(2)使用SparkSession执行从查询
scala>spark.sql("select * from emp").show
scala>spark.sql("select * from emp where deptno=10").show
(3)求每个部门的工资总额
scala>spark.sql("select deptno,sum(sal) from emp group by deptno").show
此时可以通过普通的SQL进行查询操作,也可以将DSL操作中对应的SQL语句重新验证。

七、视图(临时和全局视图)

在使用SQL操作DataFrame的时候,有一个前提就是必须通过DF创建一个表或者视图:empDF.createOrReplaceTempView("emp")

在SparkSQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。

① 创建一个普通的view和一个全局的view
scala>empDF.createOrReplaceTempView("emp1")
scala>empDF.createGlobalTempView("emp2")
② 在当前会话中执行查询,均可查询出结果。
scala>spark.sql("select * from emp1").show
scala>spark.sql("select * from global_temp.emp2").show
③ 开启一个新的会话,执行同样的查询
scala>spark.newSession.sql("select * from emp1").show     (运行出错)
scala>spark.newSession.sql("select * from global_temp.emp2").show

八、使用数据源

在介绍parquet(地板)文件的时候我们使用的是Spark例子文件夹中提供的users.parquet文件:

[root@bigdata111 resources]# pwd
/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
[root@bigdata111 resources]# ls
full_user.avsc kv1.txt people.json people.txt temp user.avsc users.avro users.parquet

1、通用的Load/Save函数

(*)什么是parquet文件?
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:

  • 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
  • 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
  • 只读取需要的列,支持向量运算,能够获取更好的扫描性能。

Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置

(*)通用的Load/Save函数

  • load函数读取Parquet文件:scala>val userDF = spark.read.load("hdfs://bigdata111:9000/input/users.parquet")

对比如下语句:

scala>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")

查询Schema和数据:scala>userDF.show

  • save函数保存数据,默认的文件格式:Parquet文件(列式存储文件)

scala>userDF.select($"name",$"favorite_color").write.save("/root/temp/result1")
scala>userDF.select($"name",$"favorite_color").write.format("csv").save("/root/temp/result2")
scala>userDF.select($"name",$"favorite_color").write.csv("/root/temp/result3")

(*)显式指定文件格式:加载json格式
直接加载:val usersDF = spark.read.load("/root/resources/people.json")
会出错
val usersDF = spark.read.format("json").load("/root/resources/people.json")

(*)存储模式(Save Modes)
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:
Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出错:因为/root/result/parquet1已经存在

usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

(*)将结果保存为表
usersDF.select($"name").write.saveAsTable("table1")

也可以进行分区、分桶等操作:partitionBy、bucketBy

转载于:https://www.cnblogs.com/forfreewill/articles/9187519.html

【大数据Spark_SparkSQL系列_1】Spark SQL基础(五星重要)相关推荐

  1. 大数据进阶之路——Spark SQL 之 DataFrameDataset

    文章目录 dataframe 和 rdd API常用操作 DataFrame和RDD 案例 DataSet DataFrame它不是Spark SQL提出的,而是早起在R.Pandas语言就已经有了的 ...

  2. 大数据学习系列----基于Spark Streaming流式计算

    2019独角兽企业重金招聘Python工程师标准>>> 个性化的需求 随着互联网知识信息指数级膨胀,个性化的需求对于用户来说越来越重要,通过推荐算法和用户点击行为的流式计算可以很简单 ...

  3. dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化

    Spark大数据分析中涉及到RDD.Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用. 1.在IPython Notebook运行Python Spark程序 ...

  4. 大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

    引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单 ...

  5. Spark大数据开发学习:Spark基础入门

    在诸多的大数据技术框架当中,Spark可以说是占据了非常重要的地位,继Hadoop之后,基于实时数据处理需求的不断上升,Spark开始占据越来越大的市场份额,而Spark,也成为大数据的必学知识点.今 ...

  6. 2018年又传喜报!热烈祝贺王家林大师大数据经典著作《Spark SQL大数据实例开发教程》 畅销书籍 出版上市!

    2018年又传喜报!热烈祝贺王家林大师大数据经典著作<Spark SQL大数据实例开发教程> 畅销书籍 出版上市! 作者: 王家林 段智华  条码书号:9787111591979 出版日期 ...

  7. 大数据学习系列之八----- Hadoop、Spark、HBase、Hive搭建环境遇到的错误以及解决方法

    大数据学习系列之八----- Hadoop.Spark.HBase.Hive搭建环境遇到的错误以及解决方法 参考文章: (1)大数据学习系列之八----- Hadoop.Spark.HBase.Hiv ...

  8. “卜算子·大数据”学习系列原创文章、源码——从入门到精通

    大数据 big-data :white_check_mark: 转载请注明出处与作者信息(如下) 原创作者:王小雷 作品出自:https://github.com/wangxiaoleiAI/big- ...

  9. 大数据小白系列——HDFS(1)

    [注1:结尾有大福利!] [注2:想写一个大数据小白系列,介绍大数据生态系统中的主要成员,理解其原理,明白其用途,万一有用呢,对不对.] 大数据是什么?抛开那些高大上但笼统的说法,其实大数据说的是两件 ...

  10. 大数据项目(基于spark)--新冠疫情防控指挥作战平台项目

    大数据项目(基于spark)–新冠疫情防控指挥作战平台项目 文章目录 第一章 项目介绍 1.1 项目背景 1.2 项目架构 1.3 项目截图 1.4 功能模块 第二章 数据爬取 2.1 数据清单 2. ...

最新文章

  1. 【swjtu】数据结构实验4_基于改进KMP算法的子串查找与替换
  2. contain_of宏定义
  3. 如何加快Simulink模型的仿真速度
  4. Android自定义属性、控件三步法
  5. php cdi_CDI和EJB:在事务成功时发送异步邮件
  6. linux 4.1.16 ftrace 进程调度,Linux内核进程调度overview(1)
  7. 【覆盖安装】通用测试点
  8. 前端学习(1880)vue之电商管理系统电商系统之获取左侧菜单数据
  9. 新加坡计算机金融专业,22Fall香港、新加坡名校多个专业申请时间已公布!最快本月截止!...
  10. python 使用lxml中的xpath 和 scrpay中的xpath的区别
  11. 图书管理系统(大一C语言大作业 包含主要结构体,文件操作, 如数据的修改 查询 删除等)
  12. EI收录的中国(中文)期刊(2021版)
  13. nodejs api 设计
  14. 服务器草稿位置在c盘可以吗,T+增加凭证的时候保存草稿,保存到那里去了
  15. Ubuntu16.04下网易云音乐点击图标打不开——已解决
  16. hbuilderx升级3.6.5版本后运行到手机端同步资源失败,未得到同步资源的授权,请停止运行后重新运行,并注意手机上的授权提示
  17. 打字母案例完整版(C#)
  18. uboat-slitaz 中文 美化 uboat定制版 基于xorg
  19. shell编程之特殊符号
  20. 初创企业如何做高效持续交付

热门文章

  1. Unity 5.3 官方VR教程(—)VR综述
  2. c语言小兔子原来有1个萝卜,体能《小兔子运萝卜》.doc
  3. char在计算机中是什么作用,C语言中char的用法
  4. win10没有indexed文件_不止用来切程序,Win10任务栏还能这么玩
  5. yd什么意思_YD是什么意思
  6. 圆的内接正n边形的周长
  7. 白岩松:40岁左右的男人,比你想象中更难
  8. 旋转跳跃加后空翻,波士顿动力机器人Atlas的“变态程度”又升级了
  9. 2016.12.10
  10. flex:1代表什么