SortShuffle

1 mapTask将map(聚合算子)或array(join算子)写入内存

2 达到阀值发生溢写,溢写前根据key排序,分批写入磁盘,最终将所有临时文件合并成一个最终文件,并建立一份索引记录分区信息。一个mapTask最终形成一个文件。

3 reduceTask拉取各个task中自己的分区数据去计算。

和hadoop shuffle的区别

1 MR没有所谓的DAG划分,一次MR任务就意味着一次shuffle;spark并不是每次都会触发shuffle

2 MR要进行3次排序,map阶段2次,分区内排序,同一个map的多个输出文件合并时进行归并排序.reduce阶段的归并排序. spark未必会排序,用了sort shuffle的才会排序,用来hash shuffle的就不会.

3 MR的reduce拉取的数据直接放磁盘再读,spark则是先放内存,放不下才放磁盘

4 MR在数据拉取完毕后才开始计算,spark则是边拉边计算(reduceByKey原理)

job 提交流程(重点)

standalone
  1. driver端:通过反射获取主类执行main方法 -> 创建sparkconfsparkContext,创建通信环境、后端调度器(负责向master发送注册信息、向excutor发送task的调度器)、task调度器、DAG(根据宽窄依赖划分stage)调度器 ->封装任务信息提交给Master

  2. Master端:缓存任务信息并将其放入任务队列 -> 轮到该任务时,调用调度方法进行资源调度 ->发送调度信息给对应的worker

  3. Worker端:worker将调度信息封装成对象 -> 调用对象的start方法,启动excutor进程

  4. Excutor进程:启动后向driver端反向注册(driver端拿到信息后注册excutor,向其发送任务) -> 创建线程池,封装任务对象 -> 获取池中线程执行任务 -> 反序列化TastSet,执行给定的各种算子步骤

2.1.2.2 yarn-client
  1. 客户端向yarn的RM申请启动AM,同时在自身的sparkContext中创建DAGScheduler和TASKScheduler(创建driver)

  2. 按照正常Yarn流程,一个NM领取到AM任务作为AM与客户端的driver产生连接(在yarn-cluster中该AM直接作为driver而不是连接driver)

  3. driver根据任务信息通过AM向RM申请资源(计算容器)

  4. AM通知领取到任务的NM向driver的sparkContext反注册并申请Task

  5. driver的sparkContext分配Task给各个计算节点,并随时掌握各个任务运行状态

  6. 应用程序运行完成后,sparkContext向RM申请注销并关闭自己。

总结:与standalone区别是,AM只作为中间联系,实际作为AM的是driver的sparkContext

2.1.2.3 yarn-cluster
  1. 先将driver作为一个AM在一个NM中启动

  2. 由AM创建应用程序,走正常的yarn流程启动Executor运行Task,直到运行完成

总结:与yarn client相比只是把driver端由客户端变成了集群中的某个NodeManager节点。

2.1.3 Job 运行原理

Task 重试与本地化级别

TaskScheduler遍历taskSet,调用launchTask方法根据数据"本地化级别"发送task到指定的Executor

task在选择Executor时,会优先第一级,如果该Executor资源不足则会等待一段时间(默认3s),然后逐渐降级。

本地化级别
PROCESS_LOCAL 进程本地化NODE_LOCAL 节点本地化
NO_PREF 非本地化
RACK_LOCAL 机架本地化
ANY 任意
重试机制

taskSet监视到某个task处于失败或挣扎状态时,会进行重试机制

当某个task提交失败后,默认会重试3次,3次之后DAGScheduler会重新提交TaskSet再次尝试,总共提交4次,当12次之后判定job失败,杀死Executor

挣扎状态:当75%的Task完成之后,每隔100s计算所有剩余task已执行时间的中位数,超过这个数的1.5倍的task判定为挣扎task。

DAG原理(源码级)

1. sparkContext创建DAGScheduler->创建EventProcessLoop->调用eventLoop.start()方法开启事件监听2. action调用sparkContext.runJob->eventLoop监听到事件,调用handleJobSubmitted开始划分stage3. 首先对触发job的finalRDD调用createResultStage方法,通过getOrCreateParentStages获取所有父stage列表,然后创建自己。如:父(stage1,stage2),再创建自己stage34. getOrCreateParentStages内部会调用getShuffleDependencies获取所有直接宽依赖(从后往前推,窄依赖直接跳过) 在这个图中G的直接宽依赖是A和F,B因为是窄依赖所以跳过,所以最后B和G属于同一个stage5. 接下来会循环宽依赖列表,分别调用getOrCreateShuffleMapStage:-- 如果某个RDD已经被划分过会直接返回stageID;否则就执行getMissingAncestorShuffleDependencies方法,继续寻找该RDD的父宽依赖,窄依赖老规矩直接加入:​    -- 如果返回的宽依赖列表不为空,则继续执行4,5的流程直到为空为止;​    -- 如果返回的宽依赖列表为空,则说明它没有父RDD或者没有宽依赖,此时可以直接调用createShuffleMapStage将该stage创建出来6. 因此最终的划分结果是stage3(B,G)、stage2(C,D,E,F)、stage1(A)7. 创建ResultStage,调用submitStage提交这个stage8. submitStage会首先检查这个stage的父stage是否已经提交,如果没提交就开始递归调用submitStage提交父stage,最后再提交自己。9. 每一个stage都是一个taskSet,每次提交都会提交一个taskSet给TaskScheduler

SparkContext 创建流程(源码级)

1. SparkSubmit反射调用主类的main方法
2. main方法中初始化SparkContext对象
3. SparkContext开始初始化Spark通信环境 RpcEnv
4. SparkContext创建TaskSchedulerImpl对象
5. SparkContext创建StandaloneSchedulerBackend对象
6. 最后创建DAGScheduler对象

Spark SQL 运行原理

1. SQL语句封装到`SQLContext`对象中
2. 调用分析器检查语义、调用翻译器翻译成RDD算子、调用优化器选择最佳算子
3. 打包成jar包上传集群
4. 走常规spark作业流程

Spark的内存模型

executor的内存分为4+1块:

  Execution:计算用内存,用于执行各种算子时存放临时对象的内存Storage:缓存用内存,主要存储catch到内存中的数据,广播变量也存在这里User Memory:用户用内存,存储RDD依赖关系等RDD的信息Reserved Memory:预留内存,用来存储Spark自己的对象Off-heap Memory:堆外内存,开启之后计算和缓存的内存都分别可以存在堆外内存。堆外内存不受spark GC的影响。

ExecutionStorage采用联合内存机制,可以互相借用对方的内存区域,但是Execution可以强制征收Storage的内存,反过来不行。

Task共用executor的内存区域,spark准备了一个hashMap用来记录各个task使用的内存,task申请新的内存时,如果剩余内存不够则会阻塞直到有足够的内存为止。每个task至少需要1/2N的内存才能被启动。

算子原理

foreach和foreachPartition的区别

​ 两个算子都是属于Action算子,但是适用于场景不同,foreach主要是基于输出打印使用,进行数据的显示,而foreachPartition的适用于各种的connection连接创建时候进行使用,保证每个分区内创建一个连接,提高执行效率,减少资源的消耗。

map与mapPartitions的区别

​ 两个算子都属于transformtion算子,转换算子,但是适用于场景不同,map是处理每一条数据,也就是说,执行效率稍低,而mapPartition是处理一个分区的数据,返回值是一个集合,也就是说,在效率方面后者效率更高,前者稍低,但是在执行安全性方面考虑,map更适合处理大数据量的数据,而mappartition适用于中小型数据量,如果数据量过大那么会导致程序的崩溃,或oom。

spark版本2.x 与1.6的区别

底层的执行内存模型发生改变,从之前的静态内存模型,改为动态内存模型,在spark2.X以后推出了一个全新的特性,叫DataSet,DataSet相当于整合了DF和RDD之间的关系,可以更容易的操作Spark的API。

Spark 概念

RDD

  • 弹性分布式数据集
  • 组成:Partition,分区器,compute函数,id,name等等
  • 特性:不可变,分区,并行

Job

一个action算子会触发一个job

Stage

1个job可以划分为多个satge
Stage有两个子类:ResultStage和ShuffleMapStage

ResultStage
在RDD的某些分区上应用函数来计算action操作的结果,对应DAG原理中createResultStage()创建的对象ShuffleMapStage
ShuffleMapStage 是中间的stage,为shuffle生产数据。它们在shuffle之前出现。当执行完毕之后,结果数据被保存,以便reduce 任务可以获取到。

Task

一个stage可以划分为多个task,一个分区就对应一个task.
task是执行spark job 的逻辑单元,运行在executorCpu Core

 ShuffleMapTask
​shuffle操作发生后,那么Task会划分两种Task,其中,上游数据叫做ShuffleMapTask,主要是用于将数据进行上游处理,为下游数据拉取做准备。ResultTask下游数据,主要是基于上游数据的结果集,其实ResultTask会根据元数据索引进行拉取数据文件,然后根据Key聚合内部所有的value值。

Spark里kyro序列化了解多少?

Spark的序列化 默认为org.apache.spark.serializer.JavaSerializer,可选org.apache.spark.serializer.KryoSerializer

序列化对于spark应用的性能来说,还是有很大影响的,在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,当然放到整个Spark程序中来考量,比重就没有那么大了,但是以Wordcount为例,通常也很容易达到30%以上的性能提升。而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持。

需要注意的是,这里可配的Serializer针对的对象是Shuffle数据,以及RDD Cache等场合,而Spark Task的序列化是通过spark.closure.serializer来配置,但是目前只支持JavaSerializer

持久化

  • spark通过cachepersist进行持久化
  • persist方法有12个缓存级别
  • cache默认用的是Memory_Only,就是只有内存持久化
  • 实际应用根据需求来选择持久化策略
  • 持久化策略也会影响spark运行效率

检查点

  • spark有检查点机制,checkpoint
  • 执行checkpoint方法后不再保存依赖链(如何实现?)
  • checkpoint的数据要手动清理, persis和cache的程序运行结束后就会自动删除
    spark通过checkPoint方法将RDD状态保存在高可用存储中,与持久化不同的是,它是对RDD状态的一个复制持久化,执行checkPoint后不再保存依赖链。此外,持久化存储的缓存当程序运行结束后就会被自动删除,检查点保存的RDD状态只能手动清理。

广播变量

正常情况下spark为每个Task都复制了一份它需要的数据,如果有大量Task都需要用到一份相同的数据,这种做法就会导致一个节点Excutor(内含多个Task)从driver端拉取大量重复数据,占用网络IO和内存资源。
使用广播变量后,Task会惰性加载数据,加载时,先在本地ExcutorBlockManager中寻找,如果找不到再到最近节点的BlockManager中查找,直到找到数据后将数据传输到本地存储起来,同一节点的多个Task就可以复用这份数据,大幅减少内存占用和IO时间。

#### 2.3.9 累加器

spark提供了一个累加器用于在整个流程中额外执行一个MR任务,它可以在driver端被初始化发送给各个Task,然后在每个Task中为它添加数据,最终经过reduce将结果聚合后返回driver端。
可以自定义累加器的类型,通过实现一个聚合方法来创建自定义累加器。
除此之外spark2还支持特殊的累加器-收集器,它不需要执行reduce,会将数据原原本本存放在集合中返回。
注意:如果累加操作在transform算子并且action算子有多个时,需要catch该转换算子,否则可能造成重复累加。

分区

概念
x: org.apache.spark.rdd.HadoopPartition@3c1 x.index: 0 x.hashCode: 961
x: org.apache.spark.rdd.HadoopPartition@3c2 x.index: 1 x.hashCode: 962
x: org.apache.spark.rdd.HadoopPartition@3c3 x.index: 2 x.hashCode: 963

分区是RDD内部并行计算的一个计算单元,是RDD数据集的逻辑分片,分区的格式决定并行计算的粒度,分区的个数决定任务的个数。

作用

通过将相同的key放在相同的节点,避免不同节点聚合key时进行shuffle操作产生的网络IO;此外,事先分区好的数据在join时就可以只由另一张表shuffle,自身不shuffle,这常常用在大表join小表上。

默认分区器

HashPartitioner:将key的哈希值/分区数量进行分区
可选分区器RangePartitioner:范围分区器,按照字典顺序或数字大小排序后/分区数量来分区

自定义分区器

通过实现get分区总数方法和get分区数方法,指定自定义规则的key分区方式;
使用自定义分区器创建的RDD进行复杂的聚合或join操作效率更高。

并行度

spark作业的最大并行度=excutor个数*每个excutor的cpu core数
但spark的当前并行度取决于task数,而task数=分区数
分区数可以通过spark.default.parallelism设置默认分区数,也可以在使用算子时显示地指定分区器和分区数量。
spark官方推荐设置分区数为最大并行度的2-3倍,这样可以保证提前计算的线程立刻被后面的task使用,并且每个task处理的数据量会更少。

2.4 Spark 调优

  1. spark1.6以下的版本中executionstorage的内存是各自固定的,执行内存负责transform算子和shuffle算子,storage负责cache分和广播变量的存储。通过以下两个参数改变它们的大小。
 spark.shuffle.memoryFraction=0.2spark.storage.memoryFraction=0.6
  1. spark1.6以上的版本时使用联合内存机制,两者可以互相借用内存,但是如果执行内存不够时会强制回收storage借走的内存。因此如果要进行大缓存任务时建议手动设置固定内存机制。
  2. 1.6以上的版本还额外增加了堆外内存,调用persist方法时指定StorageLevel.OFF_HEAP参数,配合分布式内存文件系统Tachyon将需要缓存很久的数据存放到堆外内存,大幅降低full GC的发生频率。
    . 使用repartition增加分区数量,降低每个task的大小
  3. 当合并分区的数据量过大时,可以使用repartition并手动指定使用shuffle来进行带shuffle的合并操作,可以在合并前先进行一次聚合。
  4. 使用shuffle算子时指定分区数量或指定自定义分区器避免数据倾斜
  5. 当某个数据重复很多时,尽量用一个对象来代表这些数据,可以是string,也可以是带计数器的map。
  6. 使用mapPartition代替map可以提升效率,但要注意内存紧缺时不能使用。
  7. map端join:当join小表时,可以先用collect将数据收集到driver端,然后用广播变量的方式发送到各个节点上,避免大数据的迁移。
  8. 可以使用map端reduce的方式进一步减少网络IO。调用combineByKey算子。
  9. 内存不足时使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER),直接缓存到磁盘。
  10. spark集群节点应该覆盖hbase,因为spark读取hbase时是按region读取,在同一个节点上可以避免大量数据迁移
  11. 参数设置:
spark.driver.memory (default:1G) 设置driver端内存
spark.rdd.compress 设置压缩内存的rdd数据,减少内存的占用,但是增加CPU负担
spark.serializer 设置默认kyro
spark.memory.storageFraction 设置storage在内存中的比例,根据缓存的大小决定
spark.locality.wait 设置等待任务的等待时间,如果某个任务等待数据到达的时间超过该时间,就会被下调优先级
spark.speculation 设置空闲节点是否执行某个长时间未结束的task,有点类似hive的预测执行,建议开启。
总结
减少GC:增加计算用的内存;把频繁使用的大缓存缓存到堆外内存;使用计数器存储重复的数据增加并行度:shuffle时指定分区数量、repartition增加分区(可用线程的2-3倍)、减小分区可以指定带shuffle的repartition进行局部聚合减少shuffle:使用指定的分区器进行分区,使得相同的key都处于同一分区中(主要用在数据清洗时按自定义分区器存储数据)map端join:先读取小表到driver端存成广播变量,再读取大表使用广播变量进行joinmap端reduce:使用combineByKey算子指定persist : 内存不足时用persisit指定缓存磁盘来代替catch参数调优:设置压缩RDD(节约内存,加重CPU)、设置kyro序列化、延长下调本地化级别的等待时间、开启预测执行等等

Spark面试题梳理相关推荐

  1. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  2. spark on yarn 完全分部署_大数据Spark面试题(一)

    1.spark的有几种部署模式,每种模式特点?(☆☆☆☆☆) 1)本地模式 Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定.将Spark应用以多线程的方式直接运行在本地, ...

  3. 2019最新spark面试题,看了它,你还怕找不到工作吗?

    [导读]本篇文章为大家带来spark面试指南,文内会有两种题型,问答题和代码题,题目大部分来自于网络上,有小部分是来自于工作中的总结,每个题目会给出一个参考答案. 为什么考察Spark? Spark作 ...

  4. spark 面试题汇总

    大数据面试题汇总 大数据面试题汇总 - 简书 spark 资源调优 1.列式存储和行式存储的区别 行存储,数据行存储,一个文件可表达一个二维表.适用于一般的业务场景如CSV文件,文本文件 因为这里的行 ...

  5. Spark面试题汇总及答案(推荐收藏)

    一.面试题 Spark 通常来说,Spark与MapReduce相比,Spark运行效率更高.请说明效率更高来源于Spark内置的哪些机制? hadoop和spark使用场景? spark如何保证宕机 ...

  6. spark mapreduce术语梳理

    计算层.存储层.资源管理. 往哪存,资源够不够. hive不存数据,也不算数据,只是产生mr的工具.hive最值钱的是metastore,存储到存储层. mr的并行计算流程梳理: 1个app对应1个j ...

  7. 【面试妥了】史上最全Spark面试题

    戳蓝字"CSDN云计算"关注我们哦! 作者 | 游骑小兵 责编 | 阿秃 Spark问题精华 Q:什么是Spark? A:简单理解,Spark是在Hadoop基础上的改进,是UC ...

  8. Spark面试题(二)

    一.spark streaming和storm有何区别? 一个实时毫秒,一个准实时亚秒,不过storm的吞吐率比较低. 二.spark有哪些组件? Master:管理集群和节点,不参与计算. Work ...

  9. Spark面试,Spark面试题,Spark面试汇总

    Table of Contents 1.你觉得spark 可以完全替代hadoop 么? 2.Spark消费 Kafka,分布式的情况下,如何保证消息的顺序? 3.对于 Spark 中的数据倾斜问题你 ...

最新文章

  1. select chosen-jquery.js下拉列表的层被覆盖
  2. Java时间对比------实际时间和设置时间对比
  3. python速成一小时_Python学习|一小时快速入门python(一)
  4. Android客户端实现七牛云存储文件上传
  5. oracle or索引失效_oracle数据库中索引会失效的几种情况
  6. java arm 编译器下载_最全盘点:18款在线C/C++/Py/Java编译器,一个比一个强大(附地址)...
  7. 2019年第十届蓝桥杯 - 省赛 - Java研究生组 - A. 立方和
  8. 11input/output
  9. 冯乐乐 unity_Unity常用矩阵运算的推导补遗——切线空间
  10. 在C# 中 如何限制在文本框(textBox)中输入的类型为正整数
  11. [论文评析] ArXiv-2021,Pyramid Vision Transformer A Versatile Backbone for Dense Prediction without Convo
  12. 一个黑客都要学习什么语言
  13. 在Cocos2d-x中使用CocosBuilder
  14. 阿里云默认80端口无法访问
  15. AppScan-Authentication Bypass Using HTTP Verb Tampering
  16. 2022-09-29:在第 1 天,有一个人发现了一个秘密。 给你一个整数 delay ,表示每个人会在发现秘密后的 delay 天之后, 每天 给一个新的人 分享 秘密。 同时给你一个整数 forg
  17. 常用的27个Stata命令
  18. Zoomit的用法总结
  19. netflix app for android 使用记录
  20. stm32f4xx-外部中断

热门文章

  1. echart多个柱状图 设置y轴显示_Origin做多因子柱状图
  2. 访问量100万的网站服务器,100万访问量 服务器配置
  3. wxpython4下载_Linux 安装 wxPython4.0.4
  4. 服务器做raid bios界面做raid配置
  5. springmvc01
  6. rstudio 修改代码间距_如何在RStudio里修改R脚本的编码方式
  7. IDEA插件推荐:Material Theme UI(把IDEA变得更加美观)
  8. ios html转json,iOS 中 Model 和 JSON 互相转换
  9. css未生效,css文件引入后出现某些标签生效某些不生效
  10. 台达plc读取变频器电流案例_PLC模拟量信号如何转换,编程必懂!