从两方面来阐述spark的组件,一个是宏观上,一个是微观上。

1. spark组件

要分析spark的源码,首先要了解spark是如何工作的。spark的组件:

了解其工作过程先要了解基本概念

官方罗列了一些概念:

Term Meaning
Application User program built on Spark. Consists of a driver program and executors on the cluster.
Application jar A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
Driver program The process running the main() function of the application and creating the SparkContext
Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Deploy mode Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
Worker node Any node that can run application code in the cluster
Executor A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
Task A unit of work that will be sent to one executor
Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save,collect); you'll see this term used in the driver's logs.
Stage Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.

1.1 SparkContext

sparkContext:Main entry point for Spark functionality. A SparkContext represents the connection to a Spark,cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.

1.2 Task

/*** A unit of execution. We have two kinds of Task's in Spark:**  - [[org.apache.spark.scheduler.ShuffleMapTask]]*  - [[org.apache.spark.scheduler.ResultTask]]** A Spark job consists of one or more stages. The very last stage in a job consists of multiple* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task* and sends the task output back to the driver application. A ShuffleMapTask executes the task* and divides the task output to multiple buckets (based on the task's partitioner).** @param stageId id of the stage this task belongs to* @param partitionId index of the number in the RDD*/

1.3 ActiveJob

/*** A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a* ResultStage to execute an action, or a map-stage job, which computes the map outputs for a* ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive* query planning, to look at map output statistics before submitting later stages. We distinguish* between these two types of jobs using the finalStage field of this class.** Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's* submitJob or submitMapStage methods. However, either type of job may cause the execution of* other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of* these previous stages. These dependencies are managed inside DAGScheduler.** @param jobId A unique ID for this job.* @param finalStage The stage that this job computes (either a ResultStage for an action or a*   ShuffleMapStage for submitMapStage).* @param callSite Where this job was initiated in the user's program (shown on UI).* @param listener A listener to notify if tasks in this job finish or the job fails.* @param properties Scheduling properties attached to the job, such as fair scheduler pool name.*/

1.4 Stage

/*** A stage is a set of parallel tasks all computing the same function that need to run as part* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the* DAGScheduler runs these stages in topological order.** Each Stage can either be a shuffle map stage, in which case its tasks' results are input for* other stage(s), or a result stage, in which case its tasks directly compute a Spark action* (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also* track the nodes that each output partition is on.** Each Stage also has a firstJobId, identifying the job that first submitted the stage.  When FIFO* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered* faster on failure.** Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that* case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.* The latest one will be accessible through latestInfo.** @param id Unique stage ID* @param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks*   on, while for a result stage, it's the target RDD that we ran an action on* @param numTasks Total number of tasks in stage; result stages in particular may not need to*   compute all partitions, e.g. for first(), lookup(), and take().* @param parents List of stages that this stage depends on (through shuffle dependencies).* @param firstJobId ID of the first job this stage was part of, for FIFO scheduling.* @param callSite Location in the user program associated with this stage: either where the target*   RDD was created, for a shuffle map stage, or where the action for a result stage was called.*/

1.5 executor

/*** Spark executor, backed by a threadpool to run tasks.** This can be used with Mesos, YARN, and the standalone scheduler.* An internal RPC interface (at the moment Akka) is used for communication with the driver,* except in the case of Mesos fine-grained mode.*/

2. spark核心

Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等。

要理解Spark,就需得理解RDD。

2.1 RDD是什么?

它的特性可以总结如下:

  • 它是不变的数据结构存储
  • 它是支持跨集群的分布式数据结构
  • 可以根据数据记录的key对结构进行分区
  • 提供了粗粒度的操作,且这些操作都支持分区
  • 它将数据存储在内存中,从而提供了低延迟性

官方定义:

/*** A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,* partitioned collection of elements that can be operated on in parallel. This class contains the* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value* pairs, such as `groupByKey` and `join`;* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of* Doubles; and* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that* can be saved as SequenceFiles.* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]* through implicit.** Internally, each RDD is characterized by five main properties:**  - A list of partitions*  - A function for computing each split*  - A list of dependencies on other RDDs*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for*    an HDFS file)** All of the scheduling and execution in Spark is done based on these methods, allowing each RDD* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for* reading data from a new storage system) by overriding these functions. Please refer to the* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details* on RDD internals.*/

从上述描述可以知道:

1.rdd是可以并行操作的不可变,分区的元素集合。

2.rdd定义了所有类型rdd的操作如:

`map`, `filter`, and `persist`

 其中,其它操作有:PairRDDFunctions,DoubleRDDFunctions,SequenceFileRDDFunctions

3. rdd主要由五部分组成:

  一组分片(partition);计算每个分片的函数;其它rdd的依赖集合;可选的分区键(key-value rdd拥有);一个列表,存储存取每个partition的preferred位置。对于一个HDFS文件来说,存储每个partition所在的块的位置。

小结:

  RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作(注意,reduceByKey是action,而非transformation),以支持常见的数据运算。

  通常来讲,针对数据处理有几种常见模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce采用了MapReduces模型,Storm则采用了Stream Processing模型。RDD混合了这四种模型,使得Spark可以应用于各种大数据处理场景。

  RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区,每个分区就是一个dataset片段。RDD可以相互依赖。如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency;若多个Child RDD分区都可以依赖,则称之为wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency,而join操作则产生wide dependency。

  Spark之所以将依赖分为narrow与wide,基于两点原因。

  首先,narrow dependencies可以支持在同一个cluster node上以管道形式执行多条命令,例如在执行了map后,紧接着执行filter。相反,wide dependencies需要所有的父分区都是可用的,可能还需要调用类似MapReduce之类的操作进行跨节点传递。

  其次,则是从失败恢复的角度考虑。narrow dependencies的失败恢复更有效,因为它只需要重新计算丢失的parent partition即可,而且可以并行地在不同节点进行重计算。而wide dependencies牵涉到RDD各级的多个Parent Partitions。下图说明了narrow dependencies与wide dependencies之间的区别:

  本图来自Matei Zaharia撰写的论文An Architecture for Fast and General Data Processing on Large Clusters。图中,一个box代表一个RDD,一个带阴影的矩形框代表一个partition。

2.2 RDD对容错的支持

  支持容错通常采用两种方式:数据复制或日志记录。对于以数据为中心的系统而言,这两种方式都非常昂贵,因为它需要跨集群网络拷贝大量数据,毕竟带宽的数据远远低于内存。

  RDD天生是支持容错的。首先,它自身是一个不变的(immutable)数据集,其次,它能够记住构建它的操作图(Graph of Operation),因此当执行任务的Worker失败时,完全可以通过操作图获得之前执行的操作,进行重新计算。由于无需采用replication方式支持容错,很好地降低了跨网络的数据传输成本。

  不过,在某些场景下,Spark也需要利用记录日志的方式来支持容错。例如,在Spark Streaming中,针对数据进行update操作,或者调用Streaming提供的window操作时,就需要恢复执行过程的中间状态。此时,需要通过Spark提供的checkpoint机制,以支持操作能够从checkpoint得到恢复。

  针对RDD的wide dependency,最有效的容错方式同样还是采用checkpoint机制。不过,似乎Spark的最新版本仍然没有引入auto checkpointing机制。

2.3 分片Partition

Partition是RDD中一个分片的标识。
/*** An identifier for a partition in an RDD.*/
trait Partition extends Serializable {/*** Get the partition's index within its parent RDD*/def index: Int// A better default implementation of HashCodeoverride def hashCode(): Int = index
}

2.4 分片函数Partitioner

分片函数Partitioner:An object that defines how the elements in a key-value pair RDD are partitioned by key.Maps each key to a partition ID, from 0 to `numPartitions - 1`.

分片函数的分类

默认分片函数defaultPartitioner:

 /*** Choose a partitioner to use for a cogroup-like operation between a number of RDDs.** If any of the RDDs already has a partitioner, choose that one.** Otherwise, we use a default HashPartitioner. For the number of partitions, if* spark.default.parallelism is set, then we'll use the value from SparkContext* defaultParallelism, otherwise we'll use the max number of upstream partitions.** Unless spark.default.parallelism is set, the number of partitions will be the* same as the number of partitions in the largest upstream RDD, as this should* be least likely to cause out-of-memory errors.** We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.*/def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reversefor (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {return r.partitioner.get}if (rdd.context.conf.contains("spark.default.parallelism")) {new HashPartitioner(rdd.context.defaultParallelism)} else {new HashPartitioner(bySize.head.partitions.size)}}

hash分片函数HashPartitioner:A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using, Java's `Object.hashCode`.Java arrays have hashCodes that are based on the arrays' identities rather than their contents,so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will produce an unexpected or incorrect result.

Range分片函数RangePartitioner:A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. The ranges are determined by sampling the content of the RDD passed in. Note that the actual number of partitions created by the RangePartitioner might not be the same as the `partitions` parameter, in the case where the number of sampled records is less than the value of `partitions`.

2.5 依赖Dependency

依赖的分类

窄依赖NarrowDependency:Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.

shuffle依赖ShuffleDependency:Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,the RDD is transient since we don't need it on the executor side.

一对一依赖OneToOneDependency:Represents a one-to-one dependency between partitions of the parent and child RDDs.

范围依赖RangeDependency:Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.

参考文献:

【1】http://spark.apache.org/docs/latest/cluster-overview.html

【2】http://www.infoq.com/cn/articles/spark-core-rdd/

转载于:https://www.cnblogs.com/davidwang456/p/5135884.html

spark源码解析之基本概念相关推荐

  1. Ansible源码解析 Inventory组概念

    group.py 作者 煮酒品茶 一个组的Class,包含对外方法以及属性如下. 组做为inventory的子节点,会有嵌套组,组变量,组内主机,子组,父组的一些概念 [ 'name', 'hosts ...

  2. spark源码解析之scala基本语法

    1. scala初识 spark由scala编写,要解析scala,首先要对scala有基本的了解. 1.1 class vs object A class is a blueprint for ob ...

  3. spark源码解析:2.2 start-daemon.sh脚本分析

    上节解析了start-master.sh脚本的内容并进行了debug:start-master.sh脚本解析,这节分析spark-daemon.sh脚本的内容并进行debug 1. spark-dae ...

  4. [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

    [源码解析] 深度学习分布式训练框架 horovod (11) - on spark - GLOO 方案 文章目录 [源码解析] 深度学习分布式训练框架 horovod (11) --- on spa ...

  5. [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark

    [源码解析] 深度学习分布式训练框架 horovod (10) - run on spark 文章目录 [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark ...

  6. Spark任务提交后是如何完成提交过程的?源码解析!

    Spark任务提交后是如何完成提交过程的?源码解析! 我们熟知的提交命令: spark­submit ­v ­­class xxx ­­master spark://xxx7077 .... 然后我们 ...

  7. 第42课: Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

    第42课:  Spark Broadcast内幕解密:Broadcast运行机制彻底解密.Broadcast源码解析.Broadcast最佳实践 Broadcast在机器学习.图计算.构建日常的各种算 ...

  8. Spark特征处理之RFormula源码解析

    ##RFormula简单介绍 RFormula通过R模型公式来操作列. 支持R操作中的部分操作包括'~', '.', ':', '+'以及'-'. 1. ~分隔目标和对象2. +合并对象," ...

  9. Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x

    文章目录 Spark ALS recommendForAll源码解析实战 1. 软件版本: 2. 本文要解决的问题 3. 源码分析实战 3.1 Spark2.2.2 ALS recommendForA ...

最新文章

  1. 北京交大计算机学院王浩业,双胞胎双双“吹”进北交大
  2. 对神经网络某一层做了小改进,效果却提升显著,可以发论文吗?
  3. BCH压力测试最终统计
  4. 在Linux下正确安装VMWARE TOOLS
  5. 从0到50家AI独角兽,中国人工智能凭什么让世界刮目相看?
  6. 【Java NIO深入研究3】文件锁
  7. 号外号外,第一届沙雕项目竞赛,这些项目以数万Star惨获提名
  8. 公司新来了一个质量工程师,说团队要保证 0 error,0 warning
  9. C语言作业完成情况dazuoye,成都信息工程学院C语言作业答案.docx
  10. 汉字的动态编码与显示方案
  11. 用原生js写一个多动症的简历
  12. 如何采集企业信息公示系统
  13. 《EDA前端软件开发工程师面试指南》
  14. 【色彩管理】HSI色彩模式详解
  15. 滴滴开源的损失!章文嵩将离职,曾是阿里开源“赶集人”,投身开源 20 年
  16. DEVC报错[Error] expected initializer before '.'
  17. UnityHub 下载unity 卡在最后不动,已解决
  18. Android HorizontalScrollView左右滑动
  19. 关于多径效应,平坦衰落,频率选择性衰落以及瑞利衰落的理解
  20. OffscreenCanvas-离屏canvas使用说明

热门文章

  1. redis springmvc mysql_SpringMVC + MyBatis + Mysql + Redis(作为二级缓存) 配置
  2. 北航计算机考研 跨考,过来人分享:给外校跨考北航同学的建议
  3. python中系列的含义_一篇文章让你彻底搞清楚Python中self的含义
  4. matlab div矩阵运算,【求助】多维矩阵求和运算!!
  5. matlab patch函数_MATLAB实现紧束缚近似能带结构画图
  6. 用D触发器构造RAM存储器
  7. linux 时间戳 c语言,c语言中的时间戳和时间格式
  8. 图片上传组件_博客必备功能,拖拽上传图片!
  9. flink入门_阿里巴巴为何选择Flink?20年大佬分11章讲解Flink从入门到实践!
  10. 树莓派:树莓派的刷机和登录,以及更新新版vim方便使用