一 通过定义Case Class,使用反射推断Schema

定义Case Class,在RDD的转换过程中使用Case Class可以隐式转换成SchemaRDD,然后再注册成表,然后就可以利用sqlContext或者SparkSession操作了。

我们给出一个电影测试数据film.txt,定一个Case Class(Film),然后将数据文件读入后隐式转换成SchemeRDD:film,并将film在SparkSession中注册,最后对表进行查询

1.1 上传测试数据

hdfs dfs -put /opt/data/film.txt /user/hadoop

1.2 定义SparkSession,静态导入其所有成员

val session = SparkSession.builder()
    .appName("Case Class To Define RDD")
    .config("spark.some.config.option", "some-value")
    .master("local[*]")
    .getOrCreate()

import session.implicits._

1.3 定义Film类,读入数据并创建视图

val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt")
val filmDF = filmRdd.map(_.split(",")).map(fields => Film(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat)).toDF()
filmDF.createOrReplaceTempView("film")

1.4 查询分数大于5.0的电影

val results =session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0")

1.5 对获取到的Dataset进行映射,因为不知道数据的schema,所以我们需要getAs方法获取对应的列,并将每一行结果返回,最后打印结果

val filmDS = results.map(film => {val name = film.getAs[String]("name")val director = film.getAs[String]("director")val style = film.getAs[String]("style")val score = film.getAs[Float]("score")(name,director,style,score)
})
filmDS.show(10)

二 通过编程接口,定义Schema,并应用到RDD上

通过使用createDataFrame定义RDD,通常有三个步骤

# 创建初始RDD

# 构建Row类型的RDD

# 构建该RDD对应的schema

然后调用createDataFrame方法

2.1 创建SparkSession,静态导入成员

val session = SparkSession.builder().appName("Create DataFrame API To Define RDD").config("spark.some.config.option", "some-value").master("local[*]").getOrCreate()import session.implicits._

2.2HDFS 读取数据,构建初始RDD

val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt")

2.3构建Row类型的RDD

val rowRdd = filmRdd.map(_.split(",")).map(fields =>Row(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat))

2.4 构建该RDD对应的schema

// 这里的数据类型必须和数据源所有类型对应
val schema:StructType = StructType(Array(
    StructField("filmid",StringType),
    StructField("director",StringType),
    StructField("name",StringType),
    StructField("release_time",StringType),
    StructField("box_office",IntegerType),
    StructField("style",StringType),
    StructField("score",FloatType)
))

2.5 创建DataFrame,并创建汇或者替换视图,然后查询查询分数大于5.0的电影

val filmDF = session.createDataFrame(rowRdd,schema)
filmDF.createOrReplaceTempView("film")
val results = session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0")

2.6 获取结果,进行展示

val filmDS = results.map(film => {val name = film.getAs[String]("name")val director = film.getAs[String]("director")val style = film.getAs[String]("style")val score = film.getAs[Float]("score")(name,director,style,score)
})
filmDS.show(10)

spark基础之RDD和DataFrame的转换方式相关推荐

  1. spark基础之RDD和DataFrame和Dataset比较

    一 SparkSQL简介 Spark SQL是一个能够利用Spark进行结构化数据的存储和操作的组件,结构化数据可以来自外部结构化数据源也可以通过RDD获取. 外部的结构化数据源包括Hive,JSON ...

  2. spark sql定义RDD、DataFrame与DataSet

    RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和 ...

  3. Spark SQL之RDD转DataFrame

    准备文件 首先准备好测试文件info.txt,内容如下: 1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26 将RDD转成DataFrame 方式一:反射 可 ...

  4. spark基础之RDD详解

    一 什么是RDD,有什么特点? RDD: Resilient Distributed Dataset,弹性分布式数据集. 特点: # 它是一种数据的集合 # 它可以被分区,每一个分区分布在不同的集群中 ...

  5. Spark SQL将rdd转换为数据集-以编程方式指定模式(Programmatically Specifying the Schema)

    一:解释 官网:https://spark.apache.org/docs/latest/sql-getting-started.html 这种场景是生活中的常态 When case classes ...

  6. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

  7. Spark中RDD、DataFrame和DataSet的区别与联系

    一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...

  8. spark与python关系_spark: RDD与DataFrame之间的相互转换方法

    DataFrame是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框架,但其经过了优化.DataFrames可以从各种各样的源构建,例如:结构化数据文件,Hive ...

  9. [Spark]PySpark入门学习教程---例子RDD与DataFrame

    一 例子说明 用spark的RDD与DataFrame两种方式实现如下功能 1.合并主特征与单特征 2.对标签进行过滤 3.标签与特征进行合并 4.输出指定格式最后的数据 二 数据说明 包括三个文件: ...

最新文章

  1. Svchost.exe占用CPU100%全面解析与进程说明
  2. 用verilog表示两个4x4矩阵的乘法运算?及单个矩阵的求逆
  3. 事件绑定、事件监听、事件委托
  4. 51nod 1693 水群
  5. js添加事件 attachEvent 和addEventListener的用法
  6. 使用android日志工具
  7. 基于 Sharding Sphere,实现数据 “一键脱敏”!
  8. java数据库查询类
  9. 电线电缆行业MES解决方案
  10. 人工智能在智能制造中的应用
  11. linux mysql 挂马_解决数据库被挂马最快方法
  12. 儿童讲堂 - 量词的解释
  13. 实验五 八段数码管显示(红绿灯)
  14. 汽车电子行业英文缩写大全(不断更新)
  15. Navicat Premium 12 中文版v12.1.19
  16. POI获取单元格颜色与设置单元格颜色
  17. 基于numpy实现矩阵计算器
  18. Java入门(四):进阶
  19. C#基础知识点个人整理【菜鸟教程】
  20. getDate方法的妙用(js判断闰年)

热门文章

  1. 预览ExtJS 4.0的新功能(四):焕然一新的Store/Proxy
  2. java转json数组对象_java对象转json、json数组 、xml | 学步园
  3. linux+tar+man,Linux常用命令
  4. linux跟踪查看实时追加文件的结尾(常用于日志文件)
  5. pandas dataframe 使用多进程apply(原生、pandarallel多进程、swifter多进程)
  6. Linux/Mac 配置安装scala
  7. 新型冠状病毒肺炎国内分省分日期从1.16起的全部数据爬取与整理代码(附下载)
  8. html签到插件,GitHub - inu1255/soulsign-chrome: 魂签,一款用于自动签到的chrome插件
  9. c语言输入日期返回星期几,C语言程序设计: 输入年月日 然后输出是星期几
  10. WebsSocket