DataFrame&DataSet

DataFrame

DataFrame产生背景

DataFrame概述

DF:A DataFrame is a Dataset organized into named columns(列名).#列(列名,列值,列的类型)->表It is conceptually(概念上) equivalent(相等的) to a table in a relational database(关系型数据库的一张表) or a data frame in R/PythonDS:A Dataset is a distributed(分布式) collection of data(数据集).shema:表的信息

DataFrame对比RDD

RDD:Java/Sclal ==>jvmpython ==>python runtimeDataFrame:Java/Scala/python ==>Logic Plan

DataFrame基本API操作

操作数据源

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

1.create DataFrame

//将josn文件加载成DF
val peopleDF = spark.read.format("json").load("D:\\Spark\\DataSets\\people.json")

2.printSchema

数据表结构信息

peopleDF.printSchema()root|-- age: long (nullable = true)|-- name: string (nullable = true)

3.show

展示表你内容

peopleDF.show() #默认展示前20条+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

4.select

输出列

//输出指定列信息 sql->select name from table;
peopleDF.select("name").show()+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
-------------------------------------------------------------
//输出指定列,并且对列进行计算操作
peopleDF.select(peopleDF.col("name"),peopleDF.col("age")+10).show()+-------+----------+
|   name|(age + 10)|
+-------+----------+
|Michael|      null|
|   Andy|        40|
| Justin|        29|
+-------+----------+
-------------------------------------------------------------
//输出指定列,并且对列进行计算操作,并且起个别名
peopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("new_age")).show()+-------+-------+
|   name|new_age|
+-------+-------+
|Michael|   null|
|   Andy|     40|
| Justin|     29|
+-------+-------+

5.filter

根据条件过滤输出

//根据某一列的值进行过滤 sql->select * from table where age>19;
peopleDF.filter(peopleDF.col("age")>19).show()+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

6.groupBy

分组聚合

//根据某一列进行分组,再进行聚合 sql->select age,count(1) from table group by age;
peopleDF.groupBy("age").count().show()+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

DataFrame 基本API(完整)

package com.saddam.spark.MuKe.DataFrame_DataSetimport org.apache.spark.sql.SparkSessionobject DataFrameApp {def main(args: Array[String]): Unit = {//1.入口点val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()//2.处理逻辑//将josn文件加载成DFval peopleDF = spark.read.format("json").load("D:\\Spark\\DataSets\\people.json")//输出表结构信息peopleDF.printSchema()//输出数据集前20条记录peopleDF.show()//输出指定列信息 sql->select name from table;peopleDF.select("name").show()输出指定列,并且对列进行操作 sql->select age+10 from table;peopleDF.select(peopleDF.col("name"),peopleDF.col("age")+10).show()//输出指定列,并且对列进行计算操作,并且起个别名 sql->select age+10 as new_age from table;peopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("new_age")).show()//根据某一列的值进行过滤 sql->select * from table where age>19;peopleDF.filter(peopleDF.col("age")>19).show()//根据某一列进行分组,再进行聚合 sql->select age,count(1) from table group by age;peopleDF.groupBy("age").count().show()spark.stop()}
}

DF与RDD互操作一:反射

数据类型事先知道

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Seqs or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.Spark SQL的Scala接口支持将包含案例类的RDD自动转换为数据帧。case类定义了表的模式。case类的参数名称使用反射读取,并成为列的名称。案例类也可以嵌套或包含复杂类型,如seq或数组。此RDD可以隐式转换为数据帧,然后注册为表。表可以在后续SQL语句中使用。
package com.saddam.spark.MuKe.DataFrame_DataSetimport org.apache.spark.sql.SparkSessionobject DF_RDD_reflectAPP {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().appName("DF_RDD_reflectAPP").master("local[2]").getOrCreate()//RDD==>DataFrameval RDD = spark.read.textFile("D:\\Spark\\DataSets\\infos.txt")//    RDD.show()/*
+-------------+
|        value|
+-------------+
|1,zhangsan,20|
|    2,lisi,30|
|  3,wangwu,40|
|  4,saddam,18|
+-------------+*///隐式转换导入后才有toDFimport  spark.implicits._val infoDF = RDD.map(_.split(",")).map(line=>Info(line(0).toInt,line(1),line(2).toInt)).toDF()infoDF.show()
/*
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 30|
|  3|  wangwu| 40|
|  4|  saddam| 18|
+---+--------+---+*/infoDF.filter(infoDF.col("age")>30).show()//TODO 可以采用SQL的方式//创建一个本地临时表infoDF.createOrReplaceTempView("infos")spark.sql("select * from infos").show()
/*
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 30|
|  3|  wangwu| 40|
|  4|  saddam| 18|
+---+--------+---+*/spark.stop()}case class Info(id:Int,name:String,age:Int)
}

DF与RDD互操作二:编程

数据类型事先不知道

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
1.Create an RDD of Rows from the original RDD;
2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
3.Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.如果无法提前定义案例类(例如,记录的结构编码为字符串,或者将对文本数据集进行解析,并对不同用户的字段进行不同的投影),则可以通过三个步骤以编程方式创建数据帧。
1.从原始RDD创建行的RDD;
2.创建由StructType表示的模式,该StructType与在步骤1中创建的RDD中的行结构相匹配。
3.通过SparkSession提供的createDataFrame方法将模式应用于行的RDD。
 def program(spark: SparkSession):Unit ={val RDD=spark.sparkContext.textFile("D:\\Spark\\DataSets\\infos.txt")val rowRDD = RDD.map(_.split(",")).map(line=>Row(line(0).toInt,line(1),line(2).toInt))val  structType=StructType(Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))val infoDF = spark.createDataFrame(rowRDD,structType)infoDF.printSchema()infoDF.show()}

学生信息统计案例实战

操作数据源

1|Burke|1-300-746-8446|aaaaa.velit.in@qq.com
2|Kamal|2-300-746-8446|bbbbb.velit.in@qq.com
3|Olgal|3-300-746-8446|ccccc.velit.in@qq.com
4|Belle|4-300-746-8446|ddddd.velit.in@qq.com
5|Trevor|5-300-746-8446|eeeee.velit.in@qq.com
6|Laure|6-300-746-8446|fffff.velit.in@qq.com
7|Saral|7-300-746-8446|ggggg.velit.in@qq.com
8|Kaseem|8-300-746-8446|hhhhh.velit.in@qq.com
9|Lev|9-300-746-8446|iiiii.velit.in@qq.com
10|Maya|10-300-746-8446|jjjjj.velit.in@qq.com
11|Guy|11-300-745-8888|kkkkk.velit.in@qq.com
12|Malachi|12-300-854-9999|lllll.velit.in@qq.com
13|NULL|13-855-889-9966|mmmmm.velit.in@qq.com
14|Burke|1-300-746-8446|aaaaa.velit.in@qq.com
15|Kamal|2-300-746-8446|bbbbb.velit.in@qq.com
16|Olgal|3-300-746-8446|ccccc.velit.in@qq.com
17|Belle|4-300-746-8446|ddddd.velit.in@qq.com
18|Trevor|5-300-746-8446|eeeee.velit.in@qq.com
19|Laure|6-300-746-8446|fffff.velit.in@qq.com
20|Saral|7-300-746-8446|ggggg.velit.in@qq.com
21||1-711-710-6552|lectus@aliquetlibero.co.uk
22||1-711-710-6552|lectus@aliquetlibero.co.uk
23|NULL|10-300-746-8446|jjjjj.velit.in@qq.com

代码实现

package com.saddam.spark.MuKe.DataFrame_DataSetimport org.apache.spark.sql.SparkSessionobject DataFrameCase {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().appName("DF_RDD_reflectAPP").master("local[2]").getOrCreate()val rdd = spark.sparkContext.textFile("D:\\Spark\\DataSets\\student.data")import spark.implicits._val studentDF = rdd.map(_.split("\\|")).map(line=>Student(line(0).toInt,line(1),line(2),line(3))).toDF()studentDF.show(30,false)}case class Student(id:Int,name:String,phone:String,email:String)
}

测试参数

以下参数在spark-shell中操作快捷

show
scala>studentDF.show
scala>studentDF.show()
scala>studentDF.show(30)
scala>studentDF.show(30,false)
take
scala>studentDF.take(10)
first
scala>studentDF.first()
head
scala>studentDF.head(3)
select

查询

scala>studentDF.select("colname").show()
filter

guolv

scala> studentDF.filter("name=''or name='NULL'").show
sort

排序

scala>studentDF.sort(studentDF.col("name")).show(false)
as

别名

scala>studentDF.select(studentDF("name").as("student_name")).show
join

两表联合

scala>val studentDF2 = rdd.map(_.split("\\|")).map(line=>Student(line(0).toInt,line(1),line(2),line(3))).toDF()//===是三个等号
scala> studentDF.join(studentDF2,studentDF.col("id")===studentDF2.col("id")).show(30,false)

DataSet

1.概述

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine数据集是数据的分布式集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDD(强类型,能够使用强大的lambda函数)的优点和Spark SQL优化的执行引擎的优点

2.数据源

TransactionId,CustomeId,itemId,amountPaid
111,1,12,500.0
112,2,22,300.0
113,3,4,200.0
114,4,22,100.
115,5,4,125.0
116,6,6,150.0
117,1,7,130.0
118,1,5,140.0
119,1,88,800.0
120,2,554,600.0
121,2,243,350.0
122,2,2,850.0
123,3,45,900.0
124,4,21,700.0

3.代码实现:解析CSV文件

package com.saddam.spark.MuKe.DataFrame_DataSetimport org.apache.spark.sql.SparkSessionobject DataSetAPP {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("DataSetAPP").master("local[2]").getOrCreate()val path="D:\\Spark\\DataSets\\sales.csv"//Spark解析CSV文件/*option("header","true"):解析头*/val df = spark.read.option("header","true").option("inferSchema","true").csv(path)df.show()/*
+-------------+---------+------+----------+
|TransactionId|CustomeId|itemId|amountPaid|
+-------------+---------+------+----------+
|          111|        1|    12|     500.0|
|          112|        2|    22|     300.0|
|          113|        3|     4|     200.0|
|          114|        4|    22|     100.0|
|          115|        5|     4|     125.0|
|          116|        6|     6|     150.0|
|          117|        1|     7|     130.0|
|          118|        1|     5|     140.0|
|          119|        1|    88|     800.0|
|          120|        2|   554|     600.0|
|          121|        2|   243|     350.0|
|          122|        2|     2|     850.0|
|          123|        3|    45|     900.0|
|          124|        4|    21|     700.0|
+-------------+---------+------+----------+*/import spark.implicits._val ds=df.as[Sales]ds.map(line=>line.itemId).show()/*
+-----+
|value|
+-----+
|   12|
|   22|
|    4|
|   22|
|    4|
|    6|
|    7|
|    5|
|   88|
|  554|
|  243|
|    2|
|   45|
|   21|
+-----+*/val df2=spark.read.csv(path)df2.show(3)/*
+-------------+---------+------+----------+
|          _c0|      _c1|   _c2|       _c3|
+-------------+---------+------+----------+
|TransactionId|CustomeId|itemId|amountPaid|
|          111|        1|    12|     500.0|
|          112|        2|    22|     300.0|
+-------------+---------+------+----------+*/spark.stop()}case class Sales(TransactionId:Int,CustomeId:Int,itemId:Int,amountPaid:Double)}

DataFrameDataSet相关推荐

  1. Spark DataFrameDataSet

    1.DataFrame产生背景 Google trend ->DataFrame DataFrame不是spark SQL提出的,而是早起源于R.python Spark RDD API  vs ...

  2. 大数据进阶之路——Spark SQL 之 DataFrameDataset

    文章目录 dataframe 和 rdd API常用操作 DataFrame和RDD 案例 DataSet DataFrame它不是Spark SQL提出的,而是早起在R.Pandas语言就已经有了的 ...

  3. Spark MLlib编程API入门系列之特征选择之R模型公式(RFormula)

    不多说,直接上干货! 特征选择里,常见的有:VectorSlicer(向量选择) RFormula(R模型公式) ChiSqSelector(卡方特征选择). RFormula用于将数据中的字段通过R ...

  4. 机器学习----监督学习算法之决策树(Decision Tree)

    感谢Jack-Cui大佬的知识分享 机器学习专栏点击这里 目录 感谢Jack-Cui大佬的知识分享 0. 概述 1. 使用决策树做预测需要以下过程: 2. 决策树构建步骤 2.1 特征选择 2.1.1 ...

  5. 06-SparkSQL

    1.spark sql 1.1.spark sql概述 官网地址:http://spark.apache.org/sql/ 1.1.1.什么是spark sql spark sql是spark用来处理 ...

  6. Spark 提交执行源码学习

    SparkSubmit 执行后,执行环境准备工作 private def runDriver(): Unit = {addAmIpFilter(None, System.getenv(Applicat ...

  7. 【2020版冲刺年薪30W】超全大数据学习路线+思维导图

    大数据学习路线 ​ 下面和大家讲一下大数据学习的路线,帮助大家快速进入大数据行业.我会结合自己的实际经历还说明学习路线.该路线针对的对象是零基础小白,目标是到初中级大数据工程师,要求掌握数据建模,数据 ...

  8. Day69_SparkSQL(一)

    课程大纲 课程内容 学习效果 掌握目标 SparkSQL简介 SparkSQL简介 了解 SparkSQL特点 SparkSQL编程 编程模型 掌握 API操作 掌握 SparkSQL函数 Spark ...

  9. Spark15:Spark SQL:DataFrame常见算子操作、DataFrame的sql操作、RDD转换为DataFrame、load和save操作、SaveMode、内置函数

    前面我们学习了Spark中的Spark core,离线数据计算,下面我们来学习一下Spark中的Spark SQL. 一.Spark SQL Spark SQL和我们之前讲Hive的时候说的hive ...

最新文章

  1. 2021年9月最新的保姆级计算机视觉学习路线
  2. 如何把百万级别的订单根据金额排序
  3. 让梦露和龙妈学着你说话,只需一张静态图和一个视频 | 项目开源
  4. 稳定性三十六计-幂等设计
  5. Openlayers下载与加载geoserver的wms服务显示地图
  6. 用zlib库读取单个压缩文档[转]
  7. js 删除服务器文件,Node.js复制/删除服务器端文件到指定目录文件夹下,并且预判是否存在该目录,如果没有,则递归创建该文件夹目录...
  8. 惊喜不止小米9!小米又一4800万新机确认:不给友商机会?
  9. IBM MQ Explore使用
  10. python time 时间差,python如何计算时间差
  11. 软件portable
  12. 利用Shell将MySQL数据表导出为csv文件
  13. 基于springboot+vue的商城/体育用品商城/衣服商城系统(前后端分离)
  14. linux tomcat catalina.out 乱码,Tomcat输出日志乱码解决
  15. cfe刷机教程 斐讯k3_PHICOMM 斐讯 K3 路由器 刷机教程
  16. matlab云端软件,关于云端软件上安装matlab
  17. 【大前端】用html和css写一个QQ邮箱登录页面
  18. 【图解HTTP】|【09】Web的攻击技术
  19. 《第四周RFID作业》物联112118 林家辉
  20. 人工智能可预测阿茨海默症病情演变

热门文章

  1. linux如何运行synaptic,在Debian系统中安装Synaptic和使用Synaptic修复损坏的包
  2. The class file xxx contains a signature 'xxx;' ill-formed at position 6 问题的解决
  3. 黑客「杀死」物联网?区块链正在成为救世主
  4. .[转] 全球最值得听的100首英文歌
  5. python值得学习吗?
  6. python的算法是指_Python算法的七个重要特征
  7. the kth number第几大数问题
  8. 机器学习中VC界和VC维草稿笔记
  9. 【PF4J】PF4J入门指南
  10. Java设计模式【1】