spark基础之RDD和DataFrame的转换方式
一 通过定义Case Class,使用反射推断Schema
定义Case Class,在RDD的转换过程中使用Case Class可以隐式转换成SchemaRDD,然后再注册成表,然后就可以利用sqlContext或者SparkSession操作了。
我们给出一个电影测试数据film.txt,定一个Case Class(Film),然后将数据文件读入后隐式转换成SchemeRDD:film,并将film在SparkSession中注册,最后对表进行查询
1.1 上传测试数据
hdfs dfs -put /opt/data/film.txt /user/hadoop
1.2 定义SparkSession,静态导入其所有成员
val session = SparkSession.builder()
.appName("Case Class To Define RDD")
.config("spark.some.config.option", "some-value")
.master("local[*]")
.getOrCreate()
import session.implicits._
1.3 定义Film类,读入数据并创建视图
val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt") val filmDF = filmRdd.map(_.split(",")).map(fields => Film(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat)).toDF() filmDF.createOrReplaceTempView("film")
1.4 查询分数大于5.0的电影
val results =session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0")
1.5 对获取到的Dataset进行映射,因为不知道数据的schema,所以我们需要getAs方法获取对应的列,并将每一行结果返回,最后打印结果
val filmDS = results.map(film => {val name = film.getAs[String]("name")val director = film.getAs[String]("director")val style = film.getAs[String]("style")val score = film.getAs[Float]("score")(name,director,style,score) }) filmDS.show(10)
二 通过编程接口,定义Schema,并应用到RDD上
通过使用createDataFrame定义RDD,通常有三个步骤
# 创建初始RDD
# 构建Row类型的RDD
# 构建该RDD对应的schema
然后调用createDataFrame方法
2.1 创建SparkSession,静态导入成员
val session = SparkSession.builder().appName("Create DataFrame API To Define RDD").config("spark.some.config.option", "some-value").master("local[*]").getOrCreate()import session.implicits._
2.2HDFS 读取数据,构建初始RDD
val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt")
2.3构建Row类型的RDD
val rowRdd = filmRdd.map(_.split(",")).map(fields =>Row(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat))
2.4 构建该RDD对应的schema
// 这里的数据类型必须和数据源所有类型对应
val schema:StructType = StructType(Array(
StructField("filmid",StringType),
StructField("director",StringType),
StructField("name",StringType),
StructField("release_time",StringType),
StructField("box_office",IntegerType),
StructField("style",StringType),
StructField("score",FloatType)
))
2.5 创建DataFrame,并创建汇或者替换视图,然后查询查询分数大于5.0的电影
val filmDF = session.createDataFrame(rowRdd,schema) filmDF.createOrReplaceTempView("film") val results = session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0")
2.6 获取结果,进行展示
val filmDS = results.map(film => {val name = film.getAs[String]("name")val director = film.getAs[String]("director")val style = film.getAs[String]("style")val score = film.getAs[Float]("score")(name,director,style,score) }) filmDS.show(10)
spark基础之RDD和DataFrame的转换方式相关推荐
- spark基础之RDD和DataFrame和Dataset比较
一 SparkSQL简介 Spark SQL是一个能够利用Spark进行结构化数据的存储和操作的组件,结构化数据可以来自外部结构化数据源也可以通过RDD获取. 外部的结构化数据源包括Hive,JSON ...
- spark sql定义RDD、DataFrame与DataSet
RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和 ...
- Spark SQL之RDD转DataFrame
准备文件 首先准备好测试文件info.txt,内容如下: 1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26 将RDD转成DataFrame 方式一:反射 可 ...
- spark基础之RDD详解
一 什么是RDD,有什么特点? RDD: Resilient Distributed Dataset,弹性分布式数据集. 特点: # 它是一种数据的集合 # 它可以被分区,每一个分区分布在不同的集群中 ...
- Spark SQL将rdd转换为数据集-以编程方式指定模式(Programmatically Specifying the Schema)
一:解释 官网:https://spark.apache.org/docs/latest/sql-getting-started.html 这种场景是生活中的常态 When case classes ...
- Spark RDD、DataFrame原理及操作详解
RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...
- Spark中RDD、DataFrame和DataSet的区别与联系
一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...
- spark与python关系_spark: RDD与DataFrame之间的相互转换方法
DataFrame是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框架,但其经过了优化.DataFrames可以从各种各样的源构建,例如:结构化数据文件,Hive ...
- [Spark]PySpark入门学习教程---例子RDD与DataFrame
一 例子说明 用spark的RDD与DataFrame两种方式实现如下功能 1.合并主特征与单特征 2.对标签进行过滤 3.标签与特征进行合并 4.输出指定格式最后的数据 二 数据说明 包括三个文件: ...
最新文章
- Svchost.exe占用CPU100%全面解析与进程说明
- 用verilog表示两个4x4矩阵的乘法运算?及单个矩阵的求逆
- 事件绑定、事件监听、事件委托
- 51nod 1693 水群
- js添加事件 attachEvent 和addEventListener的用法
- 使用android日志工具
- 基于 Sharding Sphere,实现数据 “一键脱敏”!
- java数据库查询类
- 电线电缆行业MES解决方案
- 人工智能在智能制造中的应用
- linux mysql 挂马_解决数据库被挂马最快方法
- 儿童讲堂 - 量词的解释
- 实验五 八段数码管显示(红绿灯)
- 汽车电子行业英文缩写大全(不断更新)
- Navicat Premium 12 中文版v12.1.19
- POI获取单元格颜色与设置单元格颜色
- 基于numpy实现矩阵计算器
- Java入门(四):进阶
- C#基础知识点个人整理【菜鸟教程】
- getDate方法的妙用(js判断闰年)
热门文章
- 预览ExtJS 4.0的新功能(四):焕然一新的Store/Proxy
- java转json数组对象_java对象转json、json数组 、xml | 学步园
- linux+tar+man,Linux常用命令
- linux跟踪查看实时追加文件的结尾(常用于日志文件)
- pandas dataframe 使用多进程apply(原生、pandarallel多进程、swifter多进程)
- Linux/Mac 配置安装scala
- 新型冠状病毒肺炎国内分省分日期从1.16起的全部数据爬取与整理代码(附下载)
- html签到插件,GitHub - inu1255/soulsign-chrome: 魂签,一款用于自动签到的chrome插件
- c语言输入日期返回星期几,C语言程序设计: 输入年月日 然后输出是星期几
- WebsSocket