目录

RDD、DF、DS相关操作

SparkSQL初体验

SparkSession 应用入口

获取DataFrame/DataSet

使用样例类

指定类型+列名

自定义Schema

​​​​​​​RDD、DF、DS相互转换


RDD、DF、DS相关操作

SparkSQL初体验

Spark 2.0开始,SparkSQL应用程序入口为SparkSession,加载不同数据源的数据,封装到DataFrame/Dataset集合数据结构中,使得编程更加简单,程序运行更加快速高效。

SparkSession 应用入口

SparkSession:这是一个新入口,取代了原本的SQLContext与HiveContext。对于DataFrame API的用户来说,Spark常见的混乱源头来自于使用哪个“context”。现在使用SparkSession,它作为单个入口可以兼容两者,注意原本的SQLContext与HiveContext仍然保留,以支持向下兼容。

文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#starting-point-sparksession

1)、添加MAVEN依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.5</version></dependency>

2)、SparkSession对象实例通过建造者模式构建,代码如下:

其中①表示导入SparkSession所在的包,②表示建造者模式构建对象和设置属性,③表示导入SparkSession类中implicits对象object中隐式转换函数。

3)、范例演示:构建SparkSession实例,加载文本数据,统计条目数。

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 演示SparkSQL*/
object SparkSQLDemo00_hello {def main(args: Array[String]): Unit = {//1.准备SparkSQL开发环境println(this.getClass.getSimpleName)println(this.getClass.getSimpleName.stripSuffix("$"))val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val df1: DataFrame = spark.read.text("data/input/text")val df2: DataFrame = spark.read.json("data/input/json")val df3: DataFrame = spark.read.csv("data/input/csv")val df4: DataFrame = spark.read.parquet("data/input/parquet")df1.printSchema()df1.show(false)df2.printSchema()df2.show(false)df3.printSchema()df3.show(false)df4.printSchema()df4.show(false)df1.coalesce(1).write.mode(SaveMode.Overwrite).text("data/output/text")df2.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")df3.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")df4.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")//关闭资源sc.stop()spark.stop()}
}

使用SparkSession加载数据源数据,将其封装到DataFrame或Dataset中,直接使用show函数就可以显示样本数据(默认显示前20条)。

Spark2.0使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。 SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

获取DataFrame/DataSet

实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:类型推断和自定义Schema。

官方文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

​​​​​​​使用样例类

当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示基于RDD创建DataFrame--使用样例类*/
object CreateDataFrameDemo1 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.准备环境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加载数据val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//错误的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.将每一行(每一个Array)转为样例类(相当于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.将RDD转为DataFrame(DF)//注意:RDD的API中没有toDF方法,需要导入隐式转换!import spark.implicits._val personDF: DataFrame = personRDD.toDF//6.查看约束personDF.printSchema()//7.查看分布式表中的数据集personDF.show(6,false)//false表示不截断列名,也就是列名很长的时候不会用...代替}
}

此种方式要求RDD数据类型必须为CaseClass,转换的DataFrame中字段名称就是CaseClass中属性名称。

​​​​​​​指定类型+列名

除了上述两种方式将RDD转换为DataFrame以外,SparkSQL中提供一个函数:toDF,通过指定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示基于RDD创建DataFrame--使用类型加列名*/
object CreateDataFrameDemo2 {def main(args: Array[String]): Unit = {//1.准备环境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加载数据val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//错误的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.将每一行(每一个Array)转为三元组(相当于有了类型!)val personWithColumnsTypeRDD: RDD[(Int, String, Int)] = linesArrayRDD.map(arr=>(arr(0).toInt,arr(1),arr(2).toInt))//5.将RDD转为DataFrame(DF)并指定列名//注意:RDD的API中没有toDF方法,需要导入隐式转换!import spark.implicits._val personDF: DataFrame = personWithColumnsTypeRDD.toDF("id","name","age")//6.查看约束personDF.printSchema()//7.查看分布式表中的数据集personDF.show(6,false)//false表示不截断列名,也就是列名很长的时候不会用...代替}
}

​​​​​​​自定义Schema

依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:

第一步、RDD中数据类型为Row:RDD[Row]

第二步、针对Row中数据定义Schema:StructType

第三步、使用SparkSession中方法将定义的Schema应用到RDD[Row]上;

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** Author itcast* Desc 演示基于RDD创建DataFrame--使用StructType*/
object CreateDataFrameDemo3 {def main(args: Array[String]): Unit = {//1.准备环境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加载数据val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//错误的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.将每一行(每一个Array)转为Rowval rowRDD: RDD[Row] = linesArrayRDD.map(arr=>Row(arr(0).toInt,arr(1),arr(2).toInt))//5.将RDD转为DataFrame(DF)并指定列名//注意:RDD的API中没有toDF方法,需要导入隐式转换!import spark.implicits._/*val schema: StructType = StructType(StructField("id", IntegerType, false) ::StructField("name", StringType, false) ::StructField("age", IntegerType, false) :: Nil)*/val schema: StructType = StructType(List(StructField("id", IntegerType, false),StructField("name", StringType, false),StructField("age", IntegerType, false)))val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)//6.查看约束personDF.printSchema()//7.查看分布式表中的数据集personDF.show(6,false)//false表示不截断列名,也就是列名很长的时候不会用...代替}}

此种方式可以更加体会到DataFrame = RDD[Row] + Schema组成,在实际项目开发中灵活的选择方式将RDD转换为DataFrame。

​​​​​​​RDD、DF、DS相互转换

实际项目开发中,常常需要对RDD、DataFrame及Dataset之间相互转换,其中要点就是Schema约束结构信息。

1)、RDD转换DataFrame或者Dataset

转换DataFrame时,定义Schema信息,两种方式

转换为Dataset时,不仅需要Schema信息,还需要RDD数据类型为CaseClass类型

2)、Dataset或DataFrame转换RDD

由于Dataset或DataFrame底层就是RDD,所以直接调用rdd函数即可转换

dataframe.rdd 或者dataset.rdd

3)、DataFrame与Dataset之间转换

由于DataFrame为Dataset特例,所以Dataset直接调用toDF函数转换为DataFrame

当将DataFrame转换为Dataset时,使用函数as[Type],指定CaseClass类型即可。

RDD、DataFrame和DataSet之间的转换如下,假设有个样例类:case class Emp(name: String),相互转换

RDD转换到DataFrame:rdd.toDF(“name”)RDD转换到Dataset:rdd.map(x => Emp(x)).toDSDataFrame转换到Dataset:df.as[Emp]DataFrame转换到RDD:df.rddDataset转换到DataFrame:ds.toDFDataset转换到RDD:ds.rdd

注意:

RDD与DataFrame或者DataSet进行操作,都需要引入隐式转换import spark.implicits._,其中的spark是SparkSession对象的名称!


package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** Author itcast* Desc 演示基于RDD/DataFrame/DataSet三者之间的相互转换*/
object TransformationDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.准备环境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加载数据val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//错误的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.将每一行(每一个Array)转为样例类(相当于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.将RDD转为DataFrame(DF)//注意:RDD的API中没有toDF方法,需要导入隐式转换!import spark.implicits._//转换1:rdd-->dfval personDF: DataFrame = personRDD.toDF //注意:DataFrame没有泛型//转换2:rdd-->dsval personDS: Dataset[Person] = personRDD.toDS() //注意:Dataset具有泛型//转换3:df-->rddval rdd: RDD[Row] = personDF.rdd //注意:DataFrame没有泛型,也就是不知道里面是Person,所以转为rdd之后统一的使用Row表示里面是很多行//转换4:ds-->rddval rdd1: RDD[Person] = personDS.rdd //注意:Dataset具有泛型,所以转为rdd之后还有原来泛型!//转换5:ds-->dfval dataFrame: DataFrame = personDS.toDF()//转换5:df-->dsval personDS2: Dataset[Person] = personDF.as[Person]//目前DataFrame和DataSet使用类似,如:也有show/createOrReplaceTempView/selectpersonDS.show()personDS.createOrReplaceTempView("t_person")personDS.select("name").show()}
}

2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作相关推荐

  1. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  2. 2021年大数据HBase(十五):HBase的Bulk Load批量加载操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的Bulk Load批量加载操作 一.Bulk L ...

  3. 2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 Hadoop的联邦机制 Federation 背景概述 F ...

  4. 2021年大数据HBase(十):Apache Phoenix的基本入门操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的基本入门操作 一.Pho ...

  5. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  6. 2021年大数据ELK(十五):Elasticsearch SQL简单介绍

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch SQL简单介绍 一.SQL与Elasticsear ...

  7. 2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka

    目录 Kafka pom依赖 参数设置 参数说明 Kafka命令 代码实现-Kafka Consumer 代码实现-Kafka Producer 代码实现-实时ETL Kafka pom依赖 Flin ...

  8. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  9. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  10. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

最新文章

  1. 公有云 --- 华为云的基本运用
  2. 【开机自启】属于你的个性化!八步完成喜欢的软件开机自启!
  3. action标签的使用
  4. .NET MasterPage技术
  5. [maven] springboot将jar包打包到指定目录
  6. 【剑指offer】面试题57 - II:和为s的连续正数序列(Java)
  7. jquery插入第一个元素? [问题点数:20分,结帖人zsw19909001]
  8. 最全NFC芯片技术厂商介绍及应用介绍
  9. 百度成立互联网医院;钉钉招小学生产品体验师;iOS 13.4 上线 | 极客头条
  10. Win7 64bit IIS无法访问ACCESS数据库解决方案
  11. 小白自定义bat文件一键启动电脑应用
  12. 2020低压电工模拟考试及低压电工复审模拟考试
  13. Linux 内核md5sum使用,linux命令详解:md5sum命令(示例代码)
  14. 重心法求中心matlab,两个中转站选址问题(重心法,metlab,spss)程序.ppt
  15. 进行小红书营销,怎样拉入更多的客户?
  16. inspect的使用
  17. 银河麒麟V10业务系统适配记录 处理器:FT2000+ 中国信创服务社区
  18. 分布式搜索引擎ElasticSearch之高级运用(三)
  19. google dapper论文
  20. 股票量化分析工具QTYX使用攻略——箱体形态突破选股v2.5.3

热门文章

  1. 2022-2028年中国喷涂速凝橡胶行业市场调研分析及未来前景分析报告
  2. 2022-2028年中国PET基膜行业市场发展规模及市场分析预测报告
  3. 2022-2028年中国塑料绳的制造行业市场现状调查及投资商机预测报告
  4. 【AJAX】JavaScript的面向对象
  5. 【微服务架构】SpringCloud之Eureka入门篇
  6. 奇异值分解与最小二乘问题 线性回归
  7. 深度学习——Xavier初始化方法
  8. 模拟Servlet本质
  9. php将图片链接转换为base64编码文件流
  10. Docker核心技术之Docker Compose