RDD 与 DataFrame原理-区别-操作详解
1. RDD原理及操作
RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。RDD内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。RDD具有五大特征:
- dependencies:建立RDD的依赖关系,主要RDD之间是宽窄依赖的关系,具有窄依赖的可以在同一个stage中进行计
- partition:每个RDD会有若干个分区,分区的大小决定RDD计算粒度,每个RDD的分区的计算都在单独的任务中进行
- preferedlocations:按照“移动数据不如移动计算”原则,在spark进行任务调度的时候,优先将任务分配到数据块存储的位置
- compute:spark中的计算都是以分区为基本单位的,compute函数只是对迭代器进行复合,并不保存单次计算的结果
- partitioner:只存在于(K,V)类型的RDD中,非(K,V)类型的partitioner的值就是None
RDD的算子action会触发真正的作业提交,而transformation算子是不会立即触发作业提交的。在Spark中,所有RDD的转换都是是惰性求值的。RDD的转换操作transformation会生成新的RDD,新的RDD的数据依赖于原来的RDD的数据,每个RDD又包含多个分区。那么一段程序实际上就构造了一个由相互依赖的多个RDD组成的有向无环图(DAG)。并通过在RDD上执行action动作将这个有向无环图作为一个Job提交给Spark执行。在DAG中又进行stage的划分,划分的依据是依赖算子是否是shuffle(如reduceByKey,Join等)的,每个stage又可以划分成若干task。接下来的事情就是driver发送task到executor,executor自己的线程池去执行这些task,完成之后将结果返回给driver。action算子是划分不同job的依据。Spark对于有向无环图Job进行调度,确定阶段(Stage),分区(Partition),流水线(Pipeline),任务(Task)和缓存(Cache),进行优化,并在Spark集群上运行Job。RDD之间的依赖分为宽依赖(依赖多个分区)和窄依赖(只依赖一个分区),在确定阶段时,需要根据宽依赖shuffle划分阶段。根据分区划分任务。
Spark支持故障恢复的方式也不同,提供两种方式:
- Linage:通过数据的血缘关系,再执行一遍前面的处理
- Checkpoint:将数据集存储到持久存储中。每次迭代的数据可以保存在内存中,而不是写入文件
2. 窄依赖与宽依赖
shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤。RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作。窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作。宽依赖会发生 shuffle 操作。窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果;宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。如下图所示:map就是一种窄依赖,而join则会导致宽依赖:
如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作,也就是说先按照key分组然后shuffle write的时候一个父分区对应一个子分区)则为第二类窄依赖 groupByKey和对输入未协同划分的join操作就是宽依赖,这是shuffle类操作。
首先,窄依赖允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元素地依次执行filter操作和map操作。相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。 其次,在窄依赖中,节点失败后的恢复更加高效。因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。
下面看一段代码段:
// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))
第一个 Map 操作将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化;第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作。shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle(join 需要针对同一个 key 合并,所以需要 shuffle)
。根据是否发生 shuffle 操作能够将其分成如下的 stage 类型:
运行到每个 stage 的边界时,数据在父 stage 中按照 Task 写到磁盘上,而在子 stage 中通过网络从上一个 Task 中去读取数据。这些操作会导致很严重的网络传输以及磁盘的I/O,所以 stage 的边界是非常占资源的,在编写 Spark 程序的时候需要尽量避免的 。父 stage 中 partition 个数与子 stage 的 partition 个数可能不同,所以那些产生 stage 边界的 Transformation 常常需要接受一个 numPartition 的参数来觉得子 stage 中的数据将被切分为多少个 partition。 PS:shuffle 操作的时候可以用 combiner 压缩数据,减少 IO 的消耗。
3. 为什么我们还需要Data Frame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
3.1 DataFrame创建
SparkSQL可以以其他RDD对象、parquet文件、json文件、hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。
JDBC
// Read
postgresUrl="jdbc:postgresql://127.0.0.1:5432/testdb"
dimDF = sqlContext.read.format('jdbc'). options(url=postgresUrl,dbtable=tableName,user="root",password="root").load()
dimDF.registerTempTable(tmpTableName)// Write
self.postgresURL = str(self.postgresIP) + ":" + str(self.postgresPort) + "/" + str(self.postgresDB)
self.postgresqlDatasource = {"url" : "jdbc:postgresql://" + self.postgresURL,"user" : self.postgresUser,"password" : self.postgresPwd
}
resultDF.coalesce(int(partitionNum)).write.jdbc(url=postgresqlDatasource["url"]
table=reportTable, mode='append', properties=postgresqlDatasource)
- parquet
// Read
telematicFilePath = "/user/spark/test/telematic.parquet/key=" + handleRecordDateStr
if( common.fileExist(telematicFilePath, self.sc) ):df = self.sqlContext.read.schema(TELEMATIC_PARQUET_SCHEMA).parquet(telematicFilePath).coalesce(int(self.partitionNum))
# schema for /user/spark/test/telematic.parquet
TELEMATIC_PARQUET_SCHEMA = SQLType.StructType([SQLType.StructField('dm_transct_date_hr_key', SQLType.LongType(), True),SQLType.StructField('dm_vehicle_dim_key', SQLType.IntegerType(), True),SQLType.StructField('dm_driver_dim_key', SQLType.IntegerType(), True),SQLType.StructField('dm_company_dim_key', SQLType.IntegerType(), True),SQLType.StructField('deviceId', SQLType.StringType(), True),SQLType.StructField('companyId', SQLType.StringType(), True)])// Write
df.write.parquet(parquetPath, mode="overwrite")
- JSON
df = sqlContext.read.json(path)
- List
dataList = resultDF.collect()
resultDF = self.sqlContext.createDataFrame(dataList)
- RDD
if rddSchema is None:df = sqlContext.createDataFrame(rdd)
else:df = sqlContext.createDataFrame(rdd, rddSchema)// OR
rdd = sc.parallelize(resultList)
df = self.sqlContext.createDataFrame(rdd)
RDD 与 DataFrame原理-区别-操作详解相关推荐
- Spark RDD、DataFrame原理及操作详解
RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...
- SSH远程连接原理及操作详解
参考资料SSH登录原理_藏红的博客-CSDN博客 SSH全称是Secure Shell,SSH协议是基于应用层的协议,为远程登录会话和其他网络服务提供安全性的协议.SSH使用最多的是远程登录和传输文件 ...
- MySQL 传统复制与 GTID 复制原理及操作详解
MySQL 复制在业界里有叫:mysql 同步,ab 复制等.专业名称就是叫:复制. 复制是单向的,只能从 master 复制到 slave 上,延时基本上是毫秒级别的. 一组复制结构中可以有多个 s ...
- python使用kafka原理详解_Python操作Kafka原理及使用详解
Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...
- Spark SQL原理及常用方法详解(二)
Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...
- python时间函数报错_python3中datetime库,time库以及pandas中的时间函数区别与详解...
1介绍datetime库之前 我们先比较下time库和datetime库的区别 先说下time 在 Python 文档里,time是归类在Generic Operating System Servic ...
- python获取系统时间函数_python3中datetime库,time库以及pandas中的时间函数区别与详解...
1介绍datetime库之前 我们先比较下time库和datetime库的区别 先说下time 在 Python 文档里,time是归类在Generic Operating System Servic ...
- python的编程模式-Python设计模式之状态模式原理与用法详解
本文实例讲述了Python设计模式之状态模式原理与用法.分享给大家供大家参考,具体如下: 状态模式(State Pattern):当一个对象的内在状态改变时允许改变其行为,这个对象看起来像是改变了其类 ...
- DeepLearning tutorial(1)Softmax回归原理简介+代码详解
FROM: http://blog.csdn.net/u012162613/article/details/43157801 DeepLearning tutorial(1)Softmax回归原理简介 ...
最新文章
- NOIP模拟 蛋糕(DP+Dilworth定理)
- 百度景鲲:AI交互正在吃掉旧产品边界,触达移动互联网盲区用户 | MEET2020
- 第2周项目1c++语言中函数参数传递的三种方式
- JavaScript实现离散傅立叶变换DFT算法(附完整源码)
- ELK(ElasticSearch+Logstash+ Kibana)搭建实时日志分析平台
- Linux RedHat7.0 上vsftp配置
- 思科网络基础之访问控制列表
- [mybatis]动态sql_set_与if结合的动态更新
- Apache Camel 2.20发布–新增功能
- 什么是 NoSQL 数据库、NoSQL 与 SQL 的区别
- 手把手教你搭建数据库服务器平台 | DBA VS 自动化运维,究竟谁与争锋?
- windows 使用 tricks
- HTML的form表单标签
- 关于如何将DB2中的非空约束删除
- vs2017远程编译linux教程,Visual Studio 2017 远程编译调试 Linux 上已存在的通过 Samba 共享的 CMake 工程...
- 论如何用cmd命令做出数字雨特效
- spss——主成分分析详解
- C语言 输出乘法口诀表
- 解决linux(centos7)重新安装mysql systemctl start mysqld.service时报错
- 用python制作动态二维码_用Python制作动态二维码