Spark源码走读1——RDD
RDD全称Resilient Distributed DataSets,弹性的分布式数据集。是Spark的核心内容。
RDD是只读的,不可变的数据集,也拥有很好的容错机制。他有5个主要特性
-A list of partitions 分片列表,数据能为切分才好做并行计算
-A function for computing each split 一个函数计算一个分片
-A list of dependencies on other RDDs 对其他RDD的依赖列表
-Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
RDD 可选的,key-value型的RDD,根据hash来分区
-Optionally, a list of preferred locations to compute each split on (e.g. blocklocations for
an HDFS file) 可选的,每一个分片的最佳计算位置
RDD是Spark所有组件运行的底层系统,RDD是一个容错的,并行的数据结构,它提供了丰富的数据操作和API接口
Spark中的RDD API
一个RDD可以包含多个分区。每个分区都是一个dataset片段。RDD之间可以相互依赖
窄依赖:一一对应的关系,一个RDD分区只能被一个子RDD的分区使用的关系
宽依赖:一多对应关系,若多个子RDD分区都依赖同一个父RDD分区
如下RDD图览
在源码packageorg.apache.spark.rdd.RDD中有一些比较中的方法:
1)
/** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * 子类实现返回一组分区在这个RDD。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。 */ protected def getPartitions: Array[Partition] |
这个方法返回多个partition,存放在一个数字中
2)
/** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * 子类实现返回这个RDD如何取决于父RDDS。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。 * */ protected def getDependencies: Seq[Dependency[_]] = deps |
它返回一个依赖关系的Seq集合
3)
/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. * 子类实现的计算一个给定的分区。 */ @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T] |
每个RDD都有一个对应的具体计算函数
4)
/** * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil |
获取partition的首选位置,这是分区策略。
RDD Transformations and action
RDD 数据操作主要有两个动作:
Transformations(转换) |
map(f : T ) U) : RDD[T] ) RDD[U] |
Action(动作) |
count() : RDD[T] ) Long |
先看下Transformations部分
// Transformations (return a new RDD) /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) /** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) ...... |
Map
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) |
返回一个MappedRDD,它继承RDD并重写了两个方法getPartitions、compute
第一个方法getPartitions,他获取第一个父RDD,并获取分片数组
override def getPartitions: Array[Partition] = firstParent[T].partitions |
第二个方法compute,将根据map参数内容来遍历RDD分区
override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f) |
filter
/** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) |
Filter是一个过滤操作,比如mapRDD.filter(_ >1)
Union
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other)) |
多个RDD组成成一个新RDD,它重写了RDD的5个方法getPartitions、getDependencies、compute、getPreferredLocations、clearDependencies
从getPartitions、getDependencies中可以看出它应该是一组宽依赖关系
override def getDependencies: Seq[Dependency[_]] = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size) pos += rdd.partitions.size } deps } |
groupBy
/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) |
根据参数分组,这又产生了一个新的RDD
Action
Count
/** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum |
跟踪代码,在runJob方法中调用了dagScheduler.runJob。而在DAGScheduler,将提交到作业调度器,并获得JobWaiter对象返回。该JobWaiter对象可以用来阻塞,直到任务完成执行或可以用来取消作业。
RDD中的任务调度
从这个图中:
RDD Object产生DAG,然后进入DAGScheduler阶段:
1、DAGScheduler是面向Stage的高层次调度器,DAGScheduler会将DAG拆分成很多个 tasks,而一组tasks就是图中的stage。
2、每一次shuffle的过程就会产生一个新的stage。DAGScheduler会有RDD记录磁盘的物· 理化操作,为了获得最有tasks,DAGSchulder会先查找本地tasks。
3、DAGScheduler还要监控shuffle产生的失败任务,如果还得重启
DAGScheduler划分stage后,会以TaskSet为单位把任务提交给TaskScheduler:
1、一个TaskScheduler只为一个sparkConext服务。
2、当接收到TaskSet后,它会把任务提交给Worker节点的Executor中去运行。失败的任务
由TaskScheduler监控重启。
Executor是以多线程的方式运行,每个线程都负责一个任务。
接下来跟踪一个spark提供的例子源码:
源码packageorg.apache.spark.examples.SparkPi
def main(args: Array[String]) { //设置一个应用名称(用于在Web UI中显示) val conf = new SparkConf().setAppName("Spark Pi") //实例化一个SparkContext val spark = new SparkContext(conf) //转成数据 val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } |
代码中的parallelize是一个并行化的延迟加载,跟踪源码
/** Distribute a local Scala collection to form an RDD. * 从RDD中分配一个本地的scala集合 * @note Parallelize acts lazily. If `seq` is a mutable collection and is * altered after the call to parallelize and before the first action on the * RDD, the resultant RDD will reflect the modified collection. Pass a copy of * the argument to avoid this. */ def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } |
它调用了RDD中的map,上面说过的map是一个转换过程,将生成一个新的RDD。最后reduce。
在shell中弄一个单词统计例子:
scala> val rdd = sc.textFile("hdfs://192.168.0.245:8020/test/README.md") 14/12/18 01:12:26 INFO storage.MemoryStore: ensureFreeSpace(82180) called with curMem=331133, maxMem=280248975 14/12/18 01:12:26 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 80.3 KB, free 266.9 MB) rdd: org.apache.spark.rdd.RDD[String] = hdfs://192.168.0.245:8020/test/README.md MappedRDD[7] at textFile at <console>:12 scala> rdd.toDebugString 14/12/18 01:12:29 INFO mapred.FileInputFormat: Total input paths to process : 1 res3: String = (1) hdfs://192.168.0.245:8020/test/README.md MappedRDD[7] at textFile at <console>:12 | hdfs://192.168.0.245:8020/test/README.md HadoopRDD[6] at textFile at <console>:12 |
Sc是从hdfs中读取数据,那在debugString中他就转换成了HadoopRDD
scala> val result = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect 14/12/18 01:14:51 INFO spark.SparkContext: Starting job: collect at <console>:14 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Registering RDD 9 (map at <console>:14) 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:14) with 1 output partitions (allowLocal=false) 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:14) 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1) 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1) 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[9] at map at <console>:14), which has no missing parents 14/12/18 01:14:51 INFO storage.MemoryStore: ensureFreeSpace(3440) called with curMem=413313, maxMem=280248975 14/12/18 01:14:51 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 3.4 KB, free 266.9 MB) 14/12/18 01:14:51 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[9] at map at <console>:14) 14/12/18 01:14:51 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/12/18 01:14:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, ANY, 1185 bytes) 14/12/18 01:14:51 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 0) 14/12/18 01:14:51 INFO rdd.HadoopRDD: Input split: hdfs://192.168.0.245:8020/test/README.md:0+4811 14/12/18 01:14:51 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/12/18 01:14:51 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/12/18 01:14:51 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/12/18 01:14:51 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/12/18 01:14:51 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/12/18 01:14:52 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 0). 1860 bytes result sent to driver 14/12/18 01:14:53 INFO scheduler.DAGScheduler: Stage 1 (map at <console>:14) finished in 1.450 s 14/12/18 01:14:53 INFO scheduler.DAGScheduler: looking for newly runnable stages 14/12/18 01:14:53 INFO scheduler.DAGScheduler: running: Set() 14/12/18 01:14:53 INFO scheduler.DAGScheduler: waiting: Set(Stage 0) 14/12/18 01:14:53 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 1419 ms on localhost (1/1) 14/12/18 01:14:53 INFO scheduler.DAGScheduler: failed: Set() 14/12/18 01:14:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/12/18 01:14:53 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List() 14/12/18 01:14:53 INFO scheduler.DAGScheduler: Submitting Stage 0 (ShuffledRDD[10] at reduceByKey at <console>:14), which is now runnable 14/12/18 01:14:53 INFO storage.MemoryStore: ensureFreeSpace(2112) called with curMem=416753, maxMem=280248975 14/12/18 01:14:53 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.1 KB, free 266.9 MB) 14/12/18 01:14:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (ShuffledRDD[10] at reduceByKey at <console>:14) 14/12/18 01:14:53 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/12/18 01:14:53 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 948 bytes) 14/12/18 01:14:53 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 1) 14/12/18 01:14:53 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/12/18 01:14:53 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 14/12/18 01:14:53 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 5 ms 14/12/18 01:14:53 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 1). 8680 bytes result sent to driver 14/12/18 01:14:53 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:14) finished in 0.108 s 14/12/18 01:14:53 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 99 ms on localhost (1/1) 14/12/18 01:14:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/18 01:14:53 INFO spark.SparkContext: Job finished: collect at <console>:14, took 1.884598939 s result: Array[(String, Int)] = Array((For,5), (Programs,1), (gladly,1), (Because,1), (The,1), (agree,1), (cluster.,1), (webpage,1), (its,1), (-Pyarn,3), (under,2), (legal,1), (APIs,1), (1.x,,1), (computation,1), (Try,1), (MRv1,,1), (have,2), (Thrift,2), (add,2), (through,1), (several,1), (This,2), (Whether,1), ("yarn-cluster",1), (%,2), (graph,1), (storage,1), (To,2), (setting,2), (any,2), (Once,1), (application,1), (JDBC,3), (use:,1), (prefer,1), (SparkPi,2), (engine,1), (version,3), (file,1), (documentation,,1), (processing,,2), (Along,1), (the,28), (explicitly,,1), (entry,1), (author.,1), (are,2), (systems.,1), (params,1), (not,2), (different,1), (refer,1), (Interactive,2), (given.,1), (if,5), (`-Pyarn`:,1), (build,3), (when,3), (be,2), (Tests,1), (file's,1), (Apache,6), (./bin/run-e... |
根据空格来区分单词后,各个单词的统计结果
本文转自http://blog.csdn.net/huwenfeng_2011/article/details/43344361,所有权力归原作者所有。
Spark源码走读1——RDD相关推荐
- Apache Spark源码走读之16 -- spark repl实现详解
欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码 ...
- Apache Spark源码走读之6 -- 存储子系统分析
Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互 ...
- Apache Spark源码走读(九)如何进行代码跟读使用Intellij idea调试Spark源码
<一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着 ...
- Spark源码走读概述
Spark代码量 --Spark:20000loc --Hadoop 1.0:90000loc --Hadoop 2.0:220000loc Spark生态系统代码量 Spark生态系统 概述 --构 ...
- Spark源码走读10——Spark On Yarn
首先需要修改配置文件spark-env.sh.在这个文件中需要添加两个属性: Export HADOOP_HOME=/../hadoop.. ExportHADOOP_CONF_DIR=/../had ...
- Apache Spark源码走读之8 -- Spark on Yarn
欢迎转载,转载请注明出处,徽沪一郎. 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的 ...
- Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现
欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 ...
- Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 ...
- Apache Spark源码走读之4 -- DStream实时流数据处理
欢迎转载,转载请注明出处,徽沪一郎. Spark Streaming能够对流数据进行近乎实时的速度进行数据处理.采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处 ...
最新文章
- (5.2) Tomcat 8 源码, 初始化组件
- windows平台cl.exe编译pcre 8.13
- 重磅:Swagger3.0 官方 starter 诞生了,其它的都可以扔了~
- autocomplete触发事件_输入内容的自动匹配(AutoCompleteTextView)的点击事件遇到的问题...
- ORACLE 内置函数之 GREATEST 和 LEAST(转)
- JS 无法清除Cookie的解决方法
- 前端学习(2622):过滤器进行操作
- Facebook面向所有用户开放人脸识别功能;福布斯美国最具创新力领袖公布;AMD:将发新BIOS 优化三代锐龙加速性能……...
- android按钮周围阴影,Android 上的按钮填充和阴影
- Vue 路由的模块化
- linux 中eth,执行命令取出linux中eth0的IP地址
- 为了对电脑进行保护,已经阻止此应用。
- 嵌入式编程经典书籍推荐
- excel打开密码忘记了_Excel工作表保护密码忘记了怎么办?这一招轻松去除密码...
- 【XJOI】燃烧的远征
- DLang 编译实验
- 关于写专利的一点感想
- [轻量化语义分割] Rethinking BiSeNet For Real-time Semantic Segmentation(CVPR2021)
- 解决:RecyclerView 滑动后布局错乱
- Huawei U8825d 对4G手机内存重新分区过程[把2Gb内置SD卡容量划分给DATA分区使用]...
热门文章
- [ubuntu]deb软件源
- JSP Unable to compile class for JSP
- iOS 5.0.1完美越狱教程
- JBoss-4.2.3.GA下发布EJB3项目遇到的问题之JSF篇
- 深度学习——在TensorFlow中查看和设定张量的形态
- python中利用字典加密字符串_python 数字字典加密非汉字
- datetime报错 sql脚本_SQLSERVER 中datetime 和 smalldatetime类型分析说明
- 周五下午3h直播丨2021第5期大咖讲坛:数据库安全与智能运维
- 另辟蹊径第二弹,时间规律里的秘密
- 关于BCT,你需要知道的是...