Spark 中 RDD 的详细介绍
RDD ---弹性分布式数据集
RDD概述
RDD论文
中文版 : http://spark.apachecn.org/paper/zh/spark-rdd.html
RDD产生背景
为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了 RDD 的概念,而当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是RDDs 的提出的动机。
什么是 RDD
RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。
创建RDD
1 . 集合并行化创建 (通过scala集合创建) scala中的本地集合 -->Spark RDD
spark-shell --master spark://hadoop01:7077
scala> val arr = Array(1,2,3,4,5)
scala> val rdd = sc.parallelize(arr)
scala> val rdd = sc.makeRDD(arr)
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5)
通过集合并行化方式创建RDD,适用于本地测试,做实验
2 外部文件系统 , 比如 HDFS
读取HDFS文件系统
val rdd2 = sc.textFile("hdfs://hadoop01:9000/words.txt")
读取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
scala> val rdd2 = sc.textFile("file:root/word.txt")
scala> rdd2.collect
res2: Array[String] = Array(hadoop hbase java, hbase java spark, java, hadoop hive hive, hive hbase)
3 从父RDD转换成新的子RDD
调用 Transformation 类的方法,生成新的 RDD
只要调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定
注意:action类的算子,不会生成新的 RDD
scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(arr)
scala> val rdd2 = rdd.map(_*100)
scala> rdd2.collect
res4: Array[Int] = Array(100, 200, 300, 400, 500)
Spark上的所有的方法 , 有一个专有的名词 , 叫做算子
RDD分区
说对RDD进行操作 , 实际上是操作的RDD上的每一个分区 , 分区的数量决定了并行的数量 .
使用 rdd.partitions.size 查看分区数量
scala> rdd.partitions.size
res7: Int = 4
scala> rdd2.partitions.size
res8: Int = 4
如果从外部创建RDD,比如从hdfs中读取数据,正常情况下,分区的数量和我们读取的文件的block块数是一致的,但是如果只有一个block块,那么分区数量是2.也就是说最低的分区数量是2
如果是集合并行化创建得到的RDD,分区的数量,默认的和最大可用的cores数量相等。
(--total-executor-cores > 可用的 cores? 可用的 cores:--total-executor-cores)
集合并行化得到的RDD的分区 :
默认情况下,一个application使用多少个cores,就有多少个分区
分区的数量 = 运行任务的可用的cores(默认一个cores,能处理一个任务)
可以指定分区的数量:
通过集合并行化创建的RDD是可以任意修改分区的数量的
val rdd = sc.makeRDD(arr,分区的数值)
scala> val arr = Array(List(1,3),List(4,6))
scala> val rdd3 = sc.parallelize(arr,3)
scala> rdd3.partitions.size
res1: Int = 3
这种方式,多用于测试
读取外部文件RDD的分区
正常情况下,读取HDFS中的文件,默认情况下,读到的文件有几个block块,得到的RDD就有几个分区。
当读取一个文件,不足一个block块的时候,会是2个分区
默认情况下,分区的数量 = 读取的文件的block块的数量,但是至少是2个
scala> val rdd1 = sc.textFile("hdfs://hadoop01:9000/hbase-1.2.6-bin.tar.gz")
scala> rdd1.partitions.size
res2: Int = 1
scala> val rdd2 = sc.textFile("hdfs://hadoop01:9000/hadoop-2.8.3.tar.gz")
scala> rdd2.partitions.size
res3: Int = 1
scala> val rdd3 = sc.textFile("hdfs://hadoop01:9000/ideaIU-2017.2.2.exe")
scala> rdd3.partitions.size
res4: Int = 4
hadoop-2.8.3文件200多M,有俩个块,按说有俩个分区,hbase不到100M,有一个块,按说应该有2个分区,结果这俩个都是一个分区,是不正常的,不知道问题在哪里,希望知道的大佬指点一下
idea文件500多M有4个块,有四个分区,是正常的
textFile自身提供了修改分区的API
sc.textFile(path,分区数量)
1 这里的分区数量,不能少于读取的数据的block块的数量
2 当设置的分区的数量大于block的数量的时候,读取数据的API会根据我们的数据进行优化
scala> val rdd3 = sc.textFile("hdfs://hadoop01:9000/ideaIU-2017.2.2.exe",5)
scala> rdd3.partitions.size
res7: Int = 5
真正的想要改变分区的数量:用算子
repartition,coalesce,专用于修改分区数量
读取HDFS上的数据,写入到HDFS中的数据,使用的API都是hadoop的API
总结:
默认情况下,分区的数量 = 读取文件的block块的数量
分区的数量至少是2个
通过转换类的算子
默认情况下,分区的数量是不变的。map flatMap filter
groupByKey,reduceByKey 默认是不变的,但是可以通过参数来改变
repartition(分区数量),coalesce(分区数量),根据指定的分区数量重新分区
union:分区数量会增加
scala> val rdd2 = rdd1.flatMap(_.split(" ")).map((_,1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:26scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:28scala> rdd2.partitions.size
res7: Int = 3scala> rdd3.partitions.size
res8: Int = 3scala> val rdd3 = rdd2.reduceByKey(_+_,6)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:28scala> rdd3.partitions.size
res9: Int = 6
总结:
集合并行化:
val arr = Array[Int](1,4,5,6) –》 sc.makeRDD(arr) RDD[Int]
默认情况下, 分区数量 = application使用的 cores
sc.makeRDD(data,分区数量)
读取HDFS数据:
默认情况下, 分区数量 = 读取的数据的block块的数量
至少是2个
通过转换类的算子获取的RDD :
默认情况下,分区的数量是不变的。
简单来说,rdd分区数量就决定了任务的并行的数量。
Spark 中 RDD 的详细介绍相关推荐
- pythonexcel介绍_Python 中pandas.read_excel详细介绍
Python 中pandas.read_excel详细介绍 #coding:utf-8 import pandas as pd import numpy as np filefullpath = r& ...
- 2020-12-09 深度学习 卷积神经网络中感受野的详细介绍
卷积神经网络中感受野的详细介绍 1. 感受野的概念 在卷积神经网络中,感受野(Receptive Field)的定义是卷积神经网络每一层输出的特征图(feature map)上的像素点在输入图片上映射 ...
- Spark中 RDD之coalesce与repartition区别
Spark中 RDD之coalesce与repartition区别 coalesce def coalesce(numPartitions: Int, shuffle: Boolean = false ...
- Android manifest文件中的标签详细介绍
Android manifest文件中的标签详细介绍 概要 每一个Android应用都应该包含一个manifest文件,即AndroidManifest.xml.它包含了程序运行的一些必备信息,比如: ...
- python中soup_python中BeautifulSoup的详细介绍(附代码)
本篇文章给大家带来的内容是关于python中BeautifulSoup的详细介绍(附代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. Beautiful Soup提供一些简单的. ...
- 位在c语言中用什么定义,C语言中位段的详细介绍
C语言中位段的详细介绍 位段(bit-field)是以位为单位来定义结构体(或联合体)中的成员变量所占的空间.含有位段的结构体(联合体)称为位段结构.采用位段结构既能够节省空间,又方便于操作.以下是百 ...
- linux的run目录,Linux 中run 目录详细介绍
原标题:Linux 中run 目录详细介绍 /run 是各种各样数据的家园.例如,如果你查看 /run/user,你会注意到一组带有数字名称的目录. $ ls /run/user 1000 1002 ...
- csp怎么给线条描边_CSP优动漫PAINT中基本功能详细介绍
今天我们来看一下CSP优动漫PAINT中基本功能详细介绍,有不清楚的小伙伴,一起来了解一下吧. CSP优动漫PAINT中基本功能详细介绍 颜色丰富多彩.笔感自然真实 通过高级笔压感应功能,展现自然真实 ...
- Cesium中图元Primitive详细介绍及案例
Cesium从入门到项目实战总目录: 点击 文章目录 Cesium中图元Primitive详细介绍 Cesium中Primitive案例 Cesium中图元Primitive详细介绍 在Cesium中 ...
最新文章
- ML之xgboost:利用xgboost算法对breast_cancer数据集实现二分类预测并进行graphviz二叉树节点图可视化
- flash和html5
- 数据结构实验之排序三:bucket sort
- Hadoop框架:NameNode工作机制详解
- 创建用户的种类与区分
- oracle12c常用新特性,开发者必读:Oracle12c新特性再总结
- stm32f4串口烧录[flymcu]
- 如何用GraphPad Prism 进行pearson相关性分析
- 关于APP 内涉及用户个人敏感信息/权限的进一步整改
- blender玻璃材质
- 哪些是GraphPad Prism 的当前版本?如何更新或升级?
- 阿里云香港服务器被攻击进黑洞了怎么办
- 【官方文档】Fluent Bit 数据管道之过滤插件(Kubernetes)
- 智慧养鸡场环境监管系统方案
- 效率倍增,5 个提高生产力的 Jupyter notebook插件
- ai是个什么软件,和PS一样么
- 关于客户机服务器与微内核结构操作系统,第1章 操作系统概述1
- matlab共阳极二极管,三引脚SOT-323封装的共阳极稳压二极管详情
- STM32之数据采集和心率检测仪(原理图、PCB、程序源码等)超详细!!!
- 我谈网络扫描 -- 之三