看的是极客时间的课,讲得很不错

零基础入门 Spark (geekbang.org)

基础知识

01 Spark:从“大数据的Hello World”开始

准备工作

  1. IDEA安装Scala插件

  2. 构建Maven项目

  3. pom.xml加入spark

    <dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.0</version></dependency>
    </dependencies>
    

Word Count

1. 读取内容

val rootPath: String = "xxxx" //这里是文件所在目录
val file: String = s"$rootPath\\wikiOfSpark.txt"
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]")   //本地运行
sparkConf.setAppName("wordCount")
val lineRDD: RDD[String] = new SparkContext(sparkConf).textFile(file)//文件中的每一行当作一个元素存入RDD

源码中关于SparkConf和SparkContext的说明

SparkConf:Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

SparkContext:Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

我目前理解的RDD:一个分布式元素集合,里面的元素可以通过算子转换成各种类型和结构,每个RDD都被分成不同的分区,分别在集群中的不同节点上操作

2. 分词

//以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(_.split(" "))
#mermaid-svg-MOIpmLbpsrEFzbEX {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX .error-icon{fill:#552222;}#mermaid-svg-MOIpmLbpsrEFzbEX .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-MOIpmLbpsrEFzbEX .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-MOIpmLbpsrEFzbEX .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-MOIpmLbpsrEFzbEX .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-MOIpmLbpsrEFzbEX .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-MOIpmLbpsrEFzbEX .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-MOIpmLbpsrEFzbEX .marker{fill:#333333;stroke:#333333;}#mermaid-svg-MOIpmLbpsrEFzbEX .marker.cross{stroke:#333333;}#mermaid-svg-MOIpmLbpsrEFzbEX svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-MOIpmLbpsrEFzbEX .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX .cluster-label text{fill:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX .cluster-label span{color:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX .label text,#mermaid-svg-MOIpmLbpsrEFzbEX span{fill:#333;color:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX .node rect,#mermaid-svg-MOIpmLbpsrEFzbEX .node circle,#mermaid-svg-MOIpmLbpsrEFzbEX .node ellipse,#mermaid-svg-MOIpmLbpsrEFzbEX .node polygon,#mermaid-svg-MOIpmLbpsrEFzbEX .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-MOIpmLbpsrEFzbEX .node .label{text-align:center;}#mermaid-svg-MOIpmLbpsrEFzbEX .node.clickable{cursor:pointer;}#mermaid-svg-MOIpmLbpsrEFzbEX .arrowheadPath{fill:#333333;}#mermaid-svg-MOIpmLbpsrEFzbEX .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-MOIpmLbpsrEFzbEX .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-MOIpmLbpsrEFzbEX .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-MOIpmLbpsrEFzbEX .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-MOIpmLbpsrEFzbEX .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-MOIpmLbpsrEFzbEX .cluster text{fill:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX .cluster span{color:#333;}#mermaid-svg-MOIpmLbpsrEFzbEX div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-MOIpmLbpsrEFzbEX :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

wordRDD:RDD[String]
RDD[Array[String]]
lineRDD:RDD[String]
映射
展平
第1行第1个词,第1行第2个词,...,第n行第n个词
[第1行第1个词,第1行第2个词,...]
[第2行第1个词,第2行第2个词,...]
[...]
[第n行第1个词,第n行第2个词,...]
第1行,第2行,...,第n行
// 过滤掉空字符串
val cleanWordRDD: RDD[String] = wordRDD.filter(!_.equals(""))

3. 分组计数

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map((_, 1)) // 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _) // 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

完整代码

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDobject WordCount {def main(args: Array[String]): Unit = {val rootPath: String = "xxxx"val file: String = s"$rootPath\\wikiOfSpark.txt"val sparkConf = new SparkConf()sparkConf.setMaster("local[*]")   //本地运行sparkConf.setAppName("wordCount")val lineRDD: RDD[String] = new SparkContext(sparkConf).textFile(file)val wordRDD: RDD[String] = lineRDD.flatMap(_.split(" "))val cleanRDD: RDD[String] = wordRDD.filter(!_.equals(""))val kvRDD: RDD[(String,Int)] = cleanRDD.map((_, 1))val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)println(wordCounts.map { case (k, v) => (v, k) }.sortByKey(false).take(5).mkString("Array(", ", ", ")"))}
}

自测题:

  1. 独立完成WordCount代码的编写
  2. flatMap流程

02 RDD与编程模型:延迟计算是怎么回事?

RDD与数组的区别

RDD是一种抽象,是Spark对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体

对比项 数组 RDD
概念类型 数据结构实体 数据模型抽象
数据跨度 单机进程内 跨进程、跨计算节点
数据构成 数组元素 数据分片(Partitions)
数据定位 数组下标、索引 数据分片索引

RDD的四大属性

  • partitions:数据分片,不同节点上的数据属于不同分片
  • partitioner:分片切割规则,根据规则将数据发往不同分区
  • dependencies:RDD依赖,RDD每种数据形态都依赖上一种形态
  • compute:转换函数,RDD的转换方法

编程模型与延迟计算

每个RDD都代表着一种分布式数据形态

RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations)

RDD算子的一个共性:RDD转换

在 RDD 的编程模型中,一共有两种算子,Transformations 类算子Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子,将计算结果收集起来、或是物化到磁盘。

在这样的编程模型下,Spark 在运行时的计算被划分为两个环节。

  1. 基于不同数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph)
  2. 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。

调用的各类 Transformations 算子,并不立即执行计算,调用 Actions 算子时,之前调用的Transformations算子才会执行,这就叫作“延迟计算”(Lazy Evaluation)。

所以构建计算流图的过程不会耗费很多时间,Actions算子触发执行计算流程的过程才最耗时,这也是延迟计算的一个特点

自测题:

  1. 讲一下RDD
  2. 延迟计算是什么意思

03 RDD常用算子(一):RDD内部的数据转换

创建RDD

在Spark中,创建RDD的典型方式有两种

  • 通过SparkContext.parallelize在内部数据之上创建RDD

    import org.apache.spark.rdd.RDD
    val words: Array[String] = Array("Spark", "is", "cool")
    val rdd: RDD[String] = sc.parallelize(words)
    
  • 通过SparkContext.textFile等API从外部数据创建RDD

这里的内部、外部是相对应用程序来说的。开发者在 Spark 应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”;而“外部数据”指代的,是 Spark 系统之外的所有数据形式,如本地文件系统或是分布式文件系统中的数据,再比如来自其他大数据组件(Hive、Hbase、RDBMS 等)的数据。

map:以元素为粒度的数据转换

map算子的用法:**给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。**其中 f 可以是带有明确签名的带名函数,也可以是匿名函数,它的形参类型必须与 RDD 的元素类型保持一致,而输出类型则任由开发者自行决定。

正因为map是以元素为粒度做数据转换的,在某些计算场景下,这个特点会严重影响执行效率

例:对每个元素的哈希值计数

import java.security.MessageDigestval kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>// 获取MD5对象实例val md5 = MessageDigest.getInstance("MD5")// 使用MD5计算哈希值val hash = md5.digest(word.getBytes).mkString// 返回哈希值与数字1的Pair(hash, 1)
}

要对每个元素创建一次MD5对象实例,严重影响效率

这种情况就要用到mapPartitions

mapPartitions:以数据分区为粒度的数据转换

还是同样的例子:对每个元素的哈希值计数

import java.security.MessageDigest val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {// 注意!这里是以数据分区为粒度,获取MD5对象实例val md5 = MessageDigest.getInstance("MD5")val newPartition = partition.map( word => {// 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象(md5.digest(word.getBytes()).mkString,1)})newPartition
})

同一个分区的数据,可以共享同一个MD5对象

flatMap:从元素到集合、再从集合到元素

flatMap的映射过程在逻辑上分为两步:

  1. 以元素为单位,创建集合
  2. 去掉集合“外包装”,提取集合元素

可以结合01 wordcount中的分词部分理解

例子:统计相邻单词共同出现的次数

如:Spark is cool --> (Spark is, 1) (is cool, 1)

import org.apache.spark.{SparkConf, SparkContext}object AdjacentWordsCount {def main(args: Array[String]): Unit = {val rootPath = "xxxx"val file = s"$rootPath\\wikiOfSpark.txt"val conf = new SparkConf().setMaster("local[*]").setAppName("AdjacentWordsCount")val lineRDD = new SparkContext(conf).textFile(file)val adjacentRDD = lineRDD.flatMap(line => {val words = line.split(" ")for(i <- 0 until words.length - 1) yield words(i) + "-" + words(i + 1)})val kvRDD = adjacentRDD.map((_,1))val resRDD = kvRDD.reduceByKey(_ + _)println(resRDD.map { case (k, v) => (v, k) }.sortByKey(false).take(5).mkString("Array(", ", ", ")"))}
}

可以用一些正则表达式

//按空格、标点符号、数学符号、数字分割
val wordRDD: RDD[String] = lineRDD.flatMap(_.split("[ ]|\\pP|\\pS|\\pN"))

filter:过滤RDD

filter,顾名思义,这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样,filter 算子也需要借助一个判定函数 f,才能实现对 RDD 的过滤转换。

所谓判定函数,它指的是类型为(RDD 元素类型) => (Boolean)的函数。可以看到,判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是 True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f(也就是 f 返回 True)的数据元素,而过滤掉不满足 f(也就是 f 返回 False)的数据元素。

还是上面的例子,这次要把像“Spark-&”之类的词对过滤掉

import org.apache.spark.{SparkConf, SparkContext}object AdjacentWordsCount {def main(args: Array[String]): Unit = {val rootPath = "xxxx"val file = s"$rootPath\\wikiOfSpark.txt"val conf = new SparkConf().setMaster("local[*]").setAppName("AdjacentWordsCount")val lineRDD = new SparkContext(conf).textFile(file)val adjacentRDD = lineRDD.flatMap(line => {val words = line.split(" ")for(i <- 0 until words.length - 1) yield words(i) + "-" + words(i + 1)})//================filter====================val list = List("","!","@","#","$","%","^","&","*")def f(s: String) = {val words = s.split("-")val b1 = list.contains(words(0))val b2 = list.contains(words(1))!b1 && !b2}val kvRDD = adjacentRDD.filter(f).map((_,1))//==========================================val resRDD = kvRDD.reduceByKey(_ + _)println(resRDD.map { case (k, v) => (v, k) }.sortByKey(false).take(5).mkString("Array(", ", ", ")"))}
}

自测题:

  1. SparkContext.parallelize和SparkContext.textFile的区别
  2. map和mapPartitions的区别
  3. 四个算子的传入的形参各有什么特点

04 进程模型与分布式部署:分布式计算是怎么回事?

分布式计算的精髓,在于如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行

Spark实现分布式计算的两个关键要素:进程模型分布式部署

进程模型

Dirver与Executor

任何一个Spark应用程序的入口,都是带有SparkSession的main函数,在Spark分布式计算环境中,有且仅有一个JVM进程运行这样的main函数,称为Dirver

Dirver构建计算流图,然后将计算流图转化为分布式任务,并把分布式任务分发给集群中的执行进程Executor交付运行

Driver 除了分发任务之外,还需要定期与每个 Executor 进行沟通,及时获取他们的工作进展,从而协调整体的执行进度。

在Spark的Dirver进程中,DAGSchedulerTaskSchedulerSchedulerBackend依次完成分布式任务调度的三个核心步骤:

  1. DAGScheduler:根据用户代码构建计算流图
  2. TaskScheduler:根据计算流图拆解出分布式任务
  3. SchedulerBackend:将分布式任务分发到Executors中去

接收到任务之后,Executors 调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。对于一个完整的 RDD,每个 Executors 负责处理这个 RDD 的一个数据分片子集。

master

Spark shell命令中的master用于指定部署模式

spark-shell --master local[*]

分布式任务(以word count为例)

步骤:

  1. word count中用到了textFile、flatMap、filter、map、reduceByKey等算子,其中textFile、flatMap、filter和map都可以在单个Executor中独立完成,所以Dirver会先将这几个算子捏合成一个任务,打包发给每个Executor

  2. 每个Executor收到这个任务后,再将任务拆解成原本的四个步骤,分别对自己负责的数据分片按顺序执行这些步骤

  3. 每个Executor执行完任务后会向Dirver汇报自己的工作进展

  4. 在执行reduceByKey这个任务之前,会进行shuffle操作

    因为reduceByKey需要按照Key值将Value值进行统计计数,而相同的Key值很可能分布在不同的数据分片中,shuffle的过程就是把相同的Key聚合到同一个数据分片的过程

  5. 执行完shuffle操作,Driver会分发reduceByKey的任务,Executors会把最终的计算结果统一返回给Driver

分布式环境部署

Spark的两种部署模式:本地部署 和 分布式部署

Spark支持多种分布式部署模式:Standalone或YARN等

Standalone模式

Standalone 在资源调度层面,采用了一主多从的主从架构,把计算节点的角色分为 Master 和 Worker。其中,Master 有且只有一个,而 Worker 可以有一到多个。所有 Worker 节点周期性地向 Master 汇报本节点可用资源状态,Master 负责汇总、变更、管理集群中的可用资源,并对 Spark 应用程序中 Driver 的资源请求作出响应。

Standalone 在计算层面,就是用上述的Driver和Executors进程模型进行任务的执行

摘自评论区:

提问:

老师好!讲解很精彩! 为了帮助大家理解,还是要说说 standalone 模式下的 主从选举过程,三个节点怎么互相找到并选出主从。另外,standalone 模式下的 master 和 worker,与前面进程模型里说的 Driver 和 executor,二组之间的对应关系,也要讲讲。只要能简单串起来就可以了。让大家获得一个即便简单、但却完成的理解模型。

作者回复:

感谢老弟,问题提得很好~

先说说选主,这个其实比较简单,Standalone部署模式下,Master与Worker角色,这个是我们通过配置文件,事先配置好的,所以说,哪台是Master,哪台是Worker,这个配置文件里面都有。在Standalone部署下,先启动Master,然后启动Worker,由于配置中有Master的连接地址,所以Worker启动的时候,会自动去连接Master,然后双方建立心跳机制,随后集群进入ready状态。

接下来说Master、Worker与Driver、Executors的关系。首先,这4个“家伙”,都是JVM进程。不过呢,他们的定位和角色,是完全不一样的。Master、Worker用来做资源的调度与分配,你可以这样理解,这两个家伙,只负责维护集群中可用硬件资源的状态。换句话说,Worker记录着每个计算节点可用CPU cores、可用内存,等等。而Master从Worker收集并汇总所有集群中节点的可用计算资源。

Driver和Executors的角色,那就纯是Spark应用级别的进程了。这个咱们课程有介绍,就不赘述了。Driver、Executors的计算资源,全部来自于Master的调度。一般来说,Driver会占用Master所在节点的资源;而Executors一般占用Worker所在节点的计算资源。一旦Driver、Executors从Master、Worker那里申请到资源之后,Driver、Executors就不再“鸟”Master和Worker了,因为资源已经到手了,后续就是任务调度的范畴。任务调度课程中也有详细的介绍,老弟可以关注下~ 大概其就是这么些关系,不知道对老弟是否有所帮助~

Standalone分布式部署快速入门

  1. 配置ssh免密登陆

    将master的公钥追加到workers的authorized_keys文件

  2. JAVA和Spark环境搭建

    安装JAVA和Spark,配置环境变量

  3. 配置ip地址与域名对

    vim /etc/hosts<node1 ip地址> node1
    <node2 ip地址> node2
    <node3 ip地址> node3
    
  4. 启动Master和Workers

    修改Master的spark-defaults.conf配置文件,设置Master URL

    spark.master  spark://node1:7077
    

    启动Master和Workers

    #master
    sbin/start-master.sh
    #workers
    sbin/start-worker.sh node1:7077
    
  5. 运行spark自带的demo

    MASTER=spark://node1:7077 $SPARK_HOME/bin/run-example org.apache.spark.examples.SparkPi
    

自测题:

  1. 分布式任务执行的步骤
  2. Worker和Master、Driver和Executor的联系与区别

05 调度系统:如何把握分布式计算的精髓

分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行

步骤序号 调度系统关键步骤 所在进程 核心组件
1 将DAG(计算流图)拆分为不同的运行阶段,即Stages;根据Stages创建分布式任务Tasks和任务组TaskSets Driver DAGScheduler
2 获取集群内可用计算资源 Driver SchedulerBackend
3 按照调度规则决定任务优先级,完成任务调度 Driver TaskScheduler
4 依序将分布式任务分发到Executors Driver SchedulerBackend
5 并发执行接收到的分布式计算任务 Executors ExecutorBackend

SchedulerBackend可以理解为资源管理器(Standalone、YARN等),用于分配资源,分发任务

简而言之,DAGScheduler 手里有“活儿”,SchedulerBackend 手里有“人力”,TaskScheduler 的核心职能,就是把合适的“活儿”派发到合适的“人”的手里。由此可见,TaskScheduler 承担的是承上启下、上通下达的关键角色

Spark 调度系统的核心思想,是“数据不动、代码动”

任务调度分为如下 5 个步骤:

  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet。
  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。
  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。
  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。
  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

自测题:

  1. 说一下DAGScheduler、SchedulerBackend、TaskScheduler、ExecutorBackend
  2. 任务调度的步骤

06 Shuffle管理:为什么shuffle是性能瓶颈

Shuffle 的本意是扑克的“洗牌”,在分布式计算场景中,它被引申为集群范围内跨节点、跨进程的数据分发

Shuffle 的过程中,分布式数据集在集群内的分发,会引入大量的磁盘 I/O 与网络 I/O。在 DAG 的计算链条中,Shuffle 环节的执行性能是最差的。

#mermaid-svg-v0fd0kxNjOvEXIUn {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn .error-icon{fill:#552222;}#mermaid-svg-v0fd0kxNjOvEXIUn .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-v0fd0kxNjOvEXIUn .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-v0fd0kxNjOvEXIUn .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-v0fd0kxNjOvEXIUn .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-v0fd0kxNjOvEXIUn .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-v0fd0kxNjOvEXIUn .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-v0fd0kxNjOvEXIUn .marker{fill:#333333;stroke:#333333;}#mermaid-svg-v0fd0kxNjOvEXIUn .marker.cross{stroke:#333333;}#mermaid-svg-v0fd0kxNjOvEXIUn svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-v0fd0kxNjOvEXIUn .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn .cluster-label text{fill:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn .cluster-label span{color:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn .label text,#mermaid-svg-v0fd0kxNjOvEXIUn span{fill:#333;color:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn .node rect,#mermaid-svg-v0fd0kxNjOvEXIUn .node circle,#mermaid-svg-v0fd0kxNjOvEXIUn .node ellipse,#mermaid-svg-v0fd0kxNjOvEXIUn .node polygon,#mermaid-svg-v0fd0kxNjOvEXIUn .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-v0fd0kxNjOvEXIUn .node .label{text-align:center;}#mermaid-svg-v0fd0kxNjOvEXIUn .node.clickable{cursor:pointer;}#mermaid-svg-v0fd0kxNjOvEXIUn .arrowheadPath{fill:#333333;}#mermaid-svg-v0fd0kxNjOvEXIUn .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-v0fd0kxNjOvEXIUn .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-v0fd0kxNjOvEXIUn .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-v0fd0kxNjOvEXIUn .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-v0fd0kxNjOvEXIUn .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-v0fd0kxNjOvEXIUn .cluster text{fill:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn .cluster span{color:#333;}#mermaid-svg-v0fd0kxNjOvEXIUn div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-v0fd0kxNjOvEXIUn :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

map
shuffle
reduce

shuffle是map和reduce的中间阶段,将不同的数据分发到不同的节点

shuffle中间文件

Map 阶段与 Reduce 阶段,通过生产消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换

Shuffle 中间文件是统称,它包含两类文件,一个是记录(Key,Value)键值对的 data 文件,另一个是记录键值对所属 Reduce Task 的 index 文件。计算图 DAG 中的 Map 阶段与 Reduce 阶段,正是通过中间文件来完成数据的交换。

Shuffle 中间文件的生成过程,分为如下几个步骤:

  1. 对于数据分区中的数据记录,逐一计算(哈希取模)其目标分区,然后填充内存数据结构;
  2. 当数据结构填满后,如果分区中还有未处理的数据记录,就对结构中的数据记录按(目标分区 ID,Key)排序,将所有数据溢出到临时文件,同时清空数据结构;
  3. 重复前 2 个步骤,直到分区中所有的数据记录都被处理为止;
  4. 对所有临时文件和内存数据结构中剩余的数据记录做归并排序,生成数据文件(data)和索引文件(index)。

最后,在 Reduce 阶段,Reduce Task 通过 index 文件来“定位”属于自己的数据内容,并通过网络从不同节点的 data 文件中下载属于自己的数据记录。

自测题:

  1. 什么是shuffle中间文件
  2. shuffle中间文件的生成过程

07 RDD常用算子(二):Spark如何实现数据聚合

  • 算子类型:Transformations
  • 适用范围:Paired RDD(kvRDD)
  • 算子用途:RDD内数据聚合
  • 算子集合:groupByKey、sortByKey、reduceByKey、aggregateByKey
  • 特点:会引入繁重的Shuffle计算

groupByKey:分组收集

RDD[(Key, Value)] —> RDD[(Key, Value集合)]

使用场景较少

reduceByKey:分组聚合

reduceByKey函数需要传入一个聚合函数f

需要强调的是,给定 RDD[(Key 类型,Value 类型)],聚合函数 f 的类型,必须是(Value 类型,Value 类型) => (Value 类型)。换句话说,函数 f 的形参,必须是两个数值,且数值的类型必须与 Value 的类型相同,而 f 的返回值,也必须是 Value 类型的数值。

某个kvRDD.reduceByKey((Value 类型,Value 类型) => (Value 类型))

练习:把 Word Count 的计算逻辑,改为随机赋值、提取同一个 Key 的最大值。也就是在 kvRDD 的生成过程中,我们不再使用映射函数 word => (word, 1),而是改为 word => (word, 随机数),然后再使用 reduceByKey 算子来计算同一个 word 当中最大的那个随机数。

import scala.util.Random._val kvRDD = cleanWordRDD.map((_, nextInt(100)))val wordCounts = kvRDD.reduceByKey(math.max)
  • Map端聚合:在每个节点内部先聚合
  • Reduce端聚合:数据经由网络分发之后,在 Reduce 阶段完成的聚合

**reduceByKey 算子的局限性,在于其 Map 阶段与 Reduce 阶段的计算逻辑必须保持一致,这个计算逻辑统一由聚合函数 f 定义。**当一种计算场景需要在两个阶段执行不同计算逻辑的时候,reduceByKey 就爱莫能助了。

aggregateByKey:更加灵活的聚合算子

aggregateByKey算子需要三个参数,分别对应初始值、Map端聚合函数和Reduce端聚合函数,可以实现在两个阶段执行不同的计算逻辑

就这 3 个参数来说,比较伤脑筋的,是它们之间的类型需要保持一致,具体来说:

  • 初始值类型,必须与 f2 的结果类型保持一致;
  • f1 的形参类型,必须与 Paired RDD 的 Value 类型保持一致;
  • f2 的形参类型,必须与 f1 的结果类型保持一致。

练习:map端求和,reduce端求最值

val wordCounts = kvRDD.aggregateByKey(0)(_ + _, math.max)

与 reduceByKey 一样,aggregateByKey 也可以通过 Map 端的初步聚合来大幅削减数据量,在降低磁盘与网络开销的同时,提升 Shuffle 环节的执行性能。

摘自评论区:

reduceByKey和aggregateByKey的联系和区别:

reduceByKey和aggregateByKey底层实现完全相同,都是combineByKeyWithClassTag,只不过reduceByKey调用 combineByKeyWithClassTag的入参mergeValue和mergeCombiners是相等的,aggregateByKey是用户指定可以不等的,也就是说reduceByKey是一种特殊的aggregateByKey。

sortByKey:排序

//按照Key升序排序
rdd.sortByKey()
rdd.sortByKey(true)
//降序
rdd.sortByKey(false)

08 内存管理:Spark如何使用内存

Spark内存区域划分
Execution Memory <—相互转化—>Storage Memory
User Memory
Reserved Memory (300MB)
  • Reserved Memory:固定300MB,是Spark预留的、用来存储各种Spark内部对象的内存区域
  • User Memory:用于存储开发者自定义的数据结构
  • Execution Memory:用来执行分布式任务
  • Storage Memory:缓存分布式数据集

RDD Cache

当同一个 RDD 被引用多次时,就可以考虑对其进行 Cache,从而提升作业的执行效率。

例如:调用两次wordCounts(RDD)

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)wordCounts.cache// 使用cache算子告知Spark对wordCounts加缓存
wordCounts.count// 触发wordCounts的计算,并将wordCounts缓存到内存// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)// 将分组计数结果落盘到文件
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

下面两句等价

wordCounts.cache
wordCounts.persist(MEMORY_ONLY)

persist()括号里填写存储级别

Spark支持丰富的存储级别,每一种存储级别都包含3个最基本的要素

  • 存储介质:数据缓存到内存还是磁盘,或是两者都有
  • 存储形式:数据内容是对象值还是字节数组,带SER字样的表示以序列化方式存储,不带SER则表示采用对象值
  • 副本数量:存储级别名字最后的数字代表拷贝数量,没有数字默认为1份副本

<极客时间:零基础入门Spark> 学习笔记(持续更新中...)相关推荐

  1. Python学习(1)——小甲鱼零基础入门python学习笔记(更新-ing)

    001讲 愉快的开始 1. Python的安装 此电脑-->右击-->检查电脑是32位还是64位 第二步:访问官网:http://www.python.org-->Downloads ...

  2. AI Studio 飞桨 零基础入门深度学习笔记6.3-手写数字识别之数据处理

    AI Studio 飞桨 零基础入门深度学习笔记6.3-手写数字识别之数据处理) 概述 前提条件 读入数据并划分数据集 扩展阅读:为什么学术界的模型总在不断精进呢? 训练样本乱序.生成批次数据 校验数 ...

  3. AI Studio 飞桨 零基础入门深度学习笔记1-深度学习的定义

    AI Studio 飞桨 零基础入门深度学习-笔记 人工智能.机器学习.深度学习的关系 机器学习 机器学习的实现 机器学习的方法论 案例:牛顿第二定律 确定模型参数 模型结构介绍 深度学习 神经网络的 ...

  4. AI Studio 飞桨 零基础入门深度学习笔记2-基于Python编写完成房价预测任务的神经网络模型

    AI Studio 飞桨 零基础入门深度学习笔记2-基于Python编写完成房价预测任务的神经网络模型 波士顿房价预测任务 线性回归模型 线性回归模型的神经网络结构 构建波士顿房价预测任务的神经网络模 ...

  5. AI Studio 飞桨 零基础入门深度学习笔记4-飞桨开源深度学习平台介绍

    AI Studio 飞桨 零基础入门深度学习笔记4-飞桨开源深度学习平台介绍 深度学习框架 深度学习框架优势 深度学习框架设计思路 飞桨开源深度学习平台 飞桨开源深度学习平台全景 框架和全流程工具 模 ...

  6. JS逆向学习笔记 - 持续更新中

    JS逆向学习笔记 寻找深圳爬虫工作,微信:cjh-18888 文章目录 JS逆向学习笔记 一. JS Hook 1. JS HOOK 原理和作用 原理:替换原来的方法. (好像写了句废话) 作用: 可 ...

  7. typescript-----javascript的超集,typescript学习笔记持续更新中......

    Typescript,冲! Typescript 不是一门全新的语言,Typescript是 JavaScript 的超集,它对 JavaScript进行了一些规范和补充.使代码更加严谨. 一个特别好 ...

  8. 专升本 计算机 公共课学习笔记(持续更新中...)

    计算机公共课学习笔记 第一章 计算机基础知识(30分) 1.计算机概述 计算机(Computer)的起源与发展 计算机(Computer)也称"电脑",是一种具有计算功能.记忆功能 ...

  9. 极客时间零基础学python

    文章目录 01 Python语言的特点 02 Python的发展历程 03 Python的安装 04 Python的程序书写规则 05 基础数据类型 06 变量的定义和常用操作 07 序列的概念 08 ...

  10. 极客时间MySQL实战45讲学习笔记

    零:基础 第一讲:基础架构:一条SQL查询语句是如何执行的? MySQL的基本架构示意图 1.MySQL基础架构 大体来说,MySQL可以分为Server层和存储引擎层两部分. Server层包括连接 ...

最新文章

  1. GaussianView5对分子结构.gjf文件的解读
  2. 江苏省高中计算机课程标准,教育部普通高中信息技术课程标准
  3. python oracle数据库开发_python连接Oracle数据库
  4. go get 命令提示没有权限问题解决
  5. 【CyberSecurityLearning 18】ACL及实验演示
  6. 《剑指offer》第四十三题(从1到n整数中1出现的次数)
  7. python 百度ai批量识别_Python基于百度AI的文字识别的示例
  8. objectC 数据类型转换
  9. 学习android 画板源代码,Android实现画画板案例
  10. HCIE-RS面试--MAC地址漂移及应对
  11. 【图像压缩】基于余弦变换及霍夫曼编码实现jpeg压缩和解压附matlab代码
  12. 数据可视化-制作交易收盘价
  13. python定义变量不赋值_python定义变量
  14. java 中介模式_java设计模式-中介者模式
  15. 【互联网大厂机试真题 - 华为】九宫格
  16. 如何搭建一个自己图床网站
  17. Android activity进出动画,类似于左右拉窗帘效果
  18. OpenStack Trove1
  19. 主机连接服务器的过程
  20. 【Matlab路径规划】蚁群算法机器人大规模栅格地图最短路径规划【含源码 1860期】

热门文章

  1. 计算机JAVA相关说课稿_七年级信息技术Windows资源管理器说课稿
  2. Python爬虫一键下载yy全站短视频详细步骤(附源码)
  3. 传播智客J2EE学习线路图
  4. lvgl 显示图片示例
  5. html简单网页编写3
  6. 测试Hpa自动扩缩容
  7. 英国Pickering公司加入了OpenTAP 自动化测试开源软件合作伙伴计划
  8. f803配置_中兴 console口 调试线 配置线 串口线【产品介绍】
  9. LED显示屏上的毛毛虫的形成原因
  10. poj_1974,最长回文字串manacher