文章目录

  • 第1章 Spark 概述
    • 1.1 Spark 是什么?
    • 1.2 Spark and Hadoop
    • 1.3 Spark or Hadoop
    • 迭代计算 (根本差异 )
  • 第2章 Spark 快速上手
    • 2.1 增加 Scala 插件
    • 2.2 Word Count 案例
  • 第三章 Spark 运行环境
    • 3.1 Local 本地模式
      • 3.1.1 启动 Local 环境
      • 3.1.2 命令行工具
      • 3.1.3 退出本地模式
    • 3.2 Standalone ( 独立部署 ) 模式
    • 3.3 Yarn 模式
      • 启动HDFS 和YARN 集群
      • 提交应用
      • 3.4 部署模式对比
      • 3.5端口号
  • 第4章 Spark 运行架构
    • 4.1 运行架构
    • 4.2 核心组件
      • 4.2.1 Driver ( 驱动器节点 )
      • 4.2.2 Executor ( 执行器节点 )
      • 4.2.3 Master & Worker
      • 4.2.4 ApplicationMaster
    • 4.3 核心概念
      • 4.3.1 Executor 与 Core
      • 4.3.2 并行度(Parallelism)
  • 第5章 Spark 核心编程
    • 5.1 RDD
      • 5.1.1 什么是 RDD
      • 5.1.2 执行原理
        • 5.1.2.1 启动 Yarn 集群环境
        • 5.1.2.2 Spark 通过申请资源创建调度节点和计算节点
        • 5.1.2.3 Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
        • 5.1.2.4 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
      • 5.1.4 基础编程
        • 5.1.4.1 RDD 创建
        • 5.1.4.2 RDD 并行度与分区
        • 5.1.4.3 RDD 转换算子
          • 5.1.4.3.1转换算子-单Value类型
          • 5.1.4.3.2转换算子-双 Value 类型
          • 5.1.4.3.2转换算子-Key - Value 类型
        • 5.1.4.4 RDD 行动算子 会触发返回结果
        • 5.1.4.5 RDD 序列化
        • 5.1.4.6 RDD 依赖关系
        • 5.1.4.7 RDD 持久化
        • 5.1.4.8 RDD 分区器(案例,自定义分区器)
        • 5.1.4.9 RDD 文件读取与保存
    • 5.2 累加器
      • 5.2.1 系统累加器
      • 5.2.2 自定义累加器
    • 5.3 广播变量
      • 5.3.1 实现原理

第1章 Spark 概述

1.1 Spark 是什么?

  Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

1.2 Spark and Hadoop

  在之前的学习中,Hadoop 的 MapReduce 是大家广为熟知的计算框架,那为什么咱们还
要学习新的计算框架 Spark 呢,这里就不得不提到 Spark 和 Hadoop 的关系。
  Hadoop 是由 java 语言编写的,在分布式服务器集群上存储海量数据并运行分布式
分析应用的开源框架.
  作为 Hadoop 分布式文件系统,HDFS 处于 Hadoop 生态圈的最下层,存储着所有
的 数 据 , 支 持 着 Hadoop 的 所 有 服 务 。 它 的 理 论 基 础 源 于 Google 的
TheGoogleFileSystem 这篇论文,它是 GFS 的开源实现。
  MapReduce 是一种编程模型,Hadoop 根据 Google 的 MapReduce 论文将其实现,
作为 Hadoop 的分布式计算模型,是 Hadoop 的核心。基于这个框架,分布式并行程序的编写变得异常简单。综合了 HDFS 的分布式存储和 MapReduce 的分布式计算,Hadoop 在处理海量数据时,性能横向扩展变得非常容易。
   HBase 是对 Google 的 Bigtable 的开源实现,但又和 Bigtable 存在许多不同之处。
HBase 是一个基于 HDFS 的分布式数据库,擅长实时地随机读/写超大规模数据集。
它也是 Hadoop 非常重要的组件。


   Spark 是一种由 Scala 语言开发的快速、通用、可扩展的大数据分析引擎
   Spark Core 中提供了 Spark 最基础与最核心的功能
   Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
 &emspSpark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的
处理数据流的 API。

1.3 Spark or Hadoop

  Hadoop 的 MR 框架和 Spark 框架都是数据处理框架,那么我们在使用时如何选择呢?
Hadoop 一次性数据计算

Spark 一次性数据计算

迭代计算 (根本差异 )

  Spark 和Hadoop 的根本差异是多个作业之间的数据通信问题 : Spark 多个作业之间数据
通信是基于内存,而 Hadoop 是基于磁盘。
  Spark Task 的启动时间快。Spark 采用 fork 线程的方式,而 Hadoop 采用创建新的进程
的方式。
  Spark 只有在 shuffle 的时候将数据写入磁盘,而 Hadoop 中多个 MR 作业之间的数据交
互都要依赖于磁盘交互。
  Spark 的缓存机制比 HDFS 的缓存机制高效。

  经过上面的比较,我们可以看出在绝大多数的数据计算场景中,Spark 确实会比 MapReduce更有优势。但是 Spark 是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致 Job 执行失败,此时,MapReduce 其实是一个更好的选择,所以 Spark并不能完全替代 MR。
Spark Core
  Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的

Spark SQL
   Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。

Spark Streaming
   Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。

   Spark MLlib,Spark GraphX ,此处不讲

第2章 Spark 快速上手

2.1 增加 Scala 插件



2.2 Word Count 案例

   为了能直观地感受 Spark 框架的效果,接下来我们实现一个大数据学科中最常见的教学案例 WordCount

// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)
// 读取文件数据
val fileRDD: RDD[String] = sc.textFile("input/word.txt")
// 将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))
// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)
// 将数据聚合结果采集到内存中
val word2Count: Array[(String, Int)] = word2CountRDD.collect()
// 打印结果
word2Count.foreach(println)
//关闭 Spark 连接
sc.stop()

第三章 Spark 运行环境

3.1 Local 本地模式

3.1.1 启动 Local 环境

bin/spark-shell

启动成功后,可以输入网址进行 Web UI 监控页面访问

http://虚拟机地址:4040

3.1.2 命令行工具

   在解压缩文件夹下的 data 目录中,添加 word.txt 文件。在命令行工具中执行如下代码指
令(和 IDEA 中代码简化版一致)

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

3.1.3 退出本地模式

:quit 或者CTRL +C

3.2 Standalone ( 独立部署 ) 模式

   是一种经典 master-slave 模式。

集群规划:

   步骤
1.解压缩 文件
2.修改配置文件
   1) 进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves
   2) 修改 slaves 文件,添加 work 节点
   3) 修改 spark-env.sh.template 文件名为 spark-env.sh
   4) 修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点

export JAVA_HOME=/opt/module/jdk1.8.0_144
SPARK_MASTER_HOST=linux1
SPARK_MASTER_PORT=7077

注意:7077 端口,相当于 hadoop3 内部通信的 8020 端口,此处的端口需要确认自己的 Hadoop配置
   5) 分发 spark-standalone 目录
   6) 启动集群
  7)查看 Master 资源监控 Web UI 界面: http://linux1:8080

3.3 Yarn 模式

   Spark客户端直接连接Yarn;不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

  • yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出
  • yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境

启动HDFS 和YARN 集群

提交应用

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \   Spark 程序中包含主函数的类
--master yarn \   Spark 程序运行的模式(环境)
--deploy-mode cluster \    模式
./examples/jars/spark-examples_2.12-3.0.0.jar \
10


  Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。
➢ 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动ApplicationMaster,
➢ 随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver。
➢ Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动Executor 进程
➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main 函数,
➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行

  Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。
➢ Driver 在任务提交的本地机器上运行
➢ Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
➢ ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
➢ ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main 函数
➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。


  • 在YARN-Cluster模式下,Spark Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,所以YARN-Cluster模式不适合运行交互类型的作业。
  • 在YARN-Client模式下,AM仅仅向YARN请求Executor,Client会和请求得到的Container通信来调度它们工作,也就是说Client不能离开。
      总结起来就是集群模式的Spark Driver运行在AM中,而客户端模式的Spark Driver运行在客户端。所以,YARN-Cluster适用于生产环境,而YARN-Client适用于交互和调试,也就是希望快速地看到应用的输出信息。

3.4 部署模式对比

3.5端口号

  Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)
  Spark Master 内部通信服务端口号:7077
  Standalone 模式下,Spark Master Web 端口号:8080(资源)
  Spark 历史服务器端口号:18080
  Hadoop YARN 任务运行情况查看端口号:8088

第4章 Spark 运行架构

4.1 运行架构

  从物理部署层面上来看,Spark主要分为两种类型的节点,Master节点和Worker节点:Master节点主要运行集群管理器的中心化部分,所承载的作用是分配Application到Worker节点,维护Worker节点,Driver,Application的状态。Worker节点负责具体的业务运行。从Spark程序运行的层面来看,Spark主要分为驱动器节点和执行器节点。

4.2 核心组件

由上图可以看出,对于 Spark 框架有两个核心组件:

4.2.1 Driver ( 驱动器节点 )

  Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
Driver 在 Spark 作业执行时主要负责:
  将用户程序转化为作业(job)
  在 Executor 之间调度任务(task)
  跟踪 Executor 的执行情况
  通过 UI 展示查询运行情况
   实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关
Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类。

4.2.2 Executor ( 执行器节点 )

  Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
   Executor 有两个核心功能:
  负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
  它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进   程内的,因此任务可以在运行时充分利用缓存数据加速运算。

4.2.3 Master & Worker

  Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对数据进行并行的处理和计算,类似于 Yarn 环境中 NM。

4.2.4 ApplicationMaster

  Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
  说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是ApplicationMaster。

4.3 核心概念

4.3.1 Executor 与 Core

  Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量。

4.3.2 并行度(Parallelism)

  在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决于框架的默认配置。应用程序也可以在运行过程中动态修改。

第5章 Spark 核心编程

  Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
 ➢ RDD : 弹性分布式数据集
 ➢ 累加器: 分布式共享只写变量
 ➢ 广播变量:分布式共享只读变量

5.1 RDD

5.1.1 什么是 RDD

  RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

   RDD底层逻辑 是根据装饰者设计模型每个RDD之间嵌套 拓展其功能,和javaIO一样(结合2.2 WC案例)

只有Collect被调用 ,才是真正执行业务逻辑,前面都是扩展功能

  • 弹性
       存储的弹性:内存与磁盘的自动切换;
       Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换。
       容错的弹性:数据丢失可以自动恢复;
       在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。
       计算的弹性:计算出错重试机制;
        1. Task如果失败会自动进行特定次数的重试
          RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。
       2. Stage如果失败会自动进行特定次数的重试
         如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4 次。
       分片的弹性:可根据需要重新分片
        可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。
       Checkpoint和Persist可主动或被动触发
       RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查    点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。
    数据调度弹性
       Spark把整个Job执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD封装了计算逻辑,并不保存数据
  • 数据抽象:RDD是一个抽象类,需要子类具体实现
  • 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
  • 可分区、并行计算

5.1.2 执行原理

  从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
  Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
  RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD的工作原理:

5.1.2.1 启动 Yarn 集群环境

5.1.2.2 Spark 通过申请资源创建调度节点和计算节点

5.1.2.3 Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

5.1.2.4 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算


  从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给
Executor 节点执行计算,接下来我们就一起看看 Spark 框架中 RDD 是具体是如何进行数据处理的。

5.1.4 基础编程

5.1.4.1 RDD 创建

   在 Spark 中创建 RDD 的创建方式可以分为四种:
(1) 从集合(内存)中创建 RDD

val rdd1 = sparkContext.parallelize(List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(List(1,2,3,4)
)从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {parallelize(seq, numSlices)
}

(2)从外部存储(文件)创建 RDD
  由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。

 val fileRDD: RDD[String] = sparkContext.textFile("input")

(3) 从其他 RDD 创建
  主要是通过一个 RDD 运算完后,再产生新的 RDD。详情请参考后续章节
(4) 直接创建 RDD(new)
  使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用

5.1.4.2 RDD 并行度与分区

  默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

val dataRDD: RDD[Int] =sparkContext.makeRDD(List(1,2,3,4),  4)
val fileRDD: RDD[String] =sparkContext.textFile("input", 2)

  读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的
   读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异

5.1.4.3 RDD 转换算子

5.1.4.3.1转换算子-单Value类型

  (1) map

 def map[U: ClassTag](f: T => U): RDD[U]

   将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(num => {num * 2}
)
val dataRDD2: RDD[String] = dataRDD1.map(num => {"" + num}
)

  (2) mapPartitions

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

  将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

val dataRDD1: RDD[Int] = dataRDD.mapPartitions(datas => {datas.filter(_==2)}
)

小功能:获取每个数据分区的最大值
 ➢ 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
 ➢ 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
 ➢ 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
  完成比完美更重要

(3) mapPartitionsWithIndex

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

  将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

val dataRDD1 = dataRDD.mapPartitionsWithIndex((index, datas) => {datas.map(index, _)}
)

小功能:获取第二个数据分区的数据

(4) flatMap

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

  将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
   扁平化 :
List(List(1,2),List(4,5)) -》 List(1,2,4,5)这就是扁平化

val dataRDD = sparkContext.makeRDD(List(List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(list => list
)

小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作
模式匹配 ,3转换成list
(5) glom

def glom(): RDD[Array[T]]

  将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4  ),2)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()

小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

  val list: List[Int] =List( 3, 4,1,2,5)val rdd1: RDD[Int] = sc.makeRDD(list,2)val glomRDD: RDD[Array[Int]] = rdd1.glom()val value: RDD[Int] = glomRDD.map(a => {a.max})println(value.collect().sum)

(6) groupBy

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

  将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
   一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(_%2
)

❖ 小功能:将 List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。
❖ 小功能:从服务器日志数据 apache.log 中获取每个时间段访问量。

(7) filter

def filter(f: T => Boolean): RDD[T]
def filter(f: T => Boolean): RDD[T]

  将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
  当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜

val dataRDD = sparkContext.makeRDD(List(1,2,3,4 ),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

❖ 小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径

(8) sample

def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]

  根据指定的规则从数据集中抽取数据

val dataRDD = sparkContext.makeRDD(List(1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

(9) distinct

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)
(implicit ord: Ordering[T] = null): RDD[T]

将数据集中重复的数据去重

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2 ),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)//分区参数

(10) coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

  根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),6)
val dataRDD1 = dataRDD.coalesce(2)
//默认不Shuffle  有可能数据倾斜,第二个参数是,是否开启Shuffle

思考一个问题:我想要扩大分区,怎么办?
Coalesce 只有开启Shuffle 才能扩大分区

(11) repartition = coa;esce +true

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明
  该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经shuffle 过程。

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),2)
val dataRDD1 = dataRDD.repartition(4)

思考一个问题:coalesce 和 repartition 区别?
(12) sortBy

def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

➢ 函数说明
  该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
//第二个参数是true升序
5.1.4.3.2转换算子-双 Value 类型

(13) intersection

def intersection(other: RDD[T]): RDD[T]

  对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

(14) union

def union(other: RDD[T]): RDD[T]

  对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

(15) subtract

def subtract(other: RDD[T]): RDD[T]

  以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)

  交并差 都要 数据类型一致
(16) zip

 def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

  将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

 val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)

拉链分区得一致,长度也得一致

5.1.4.3.2转换算子-Key - Value 类型

(17) partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

  将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

val rdd: RDD[(Int, String)] =sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
import org.apache.spark.HashPartitioner
val rdd2: RDD[(Int, String)] =
rdd.partitionBy(new HashPartitioner(2))

(18) reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

  可以将数据按照相同的 Key 对 Value 进行聚合

 val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

(19) groupByKey

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

  将数据源的数据根据 key 对 value 进行分组

val dataRDD1 =sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

  从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
  从功能的角度:reduceByKey 其实包含分组和聚合的功能.GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
(20) aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]

  将数据根据不同的规则进行分区内计算和分区间计算
❖ 取出每个分区内相同 key 的最大值然后分区间相加

// TODO : 取出每个分区内相同 key 的最大值然后分区间相加
// aggregateByKey 算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数表示初始值
// 2. 第二个参数列表中含有两个参数
// 2.1 第一个参数表示分区内的计算规则
// 2.2 第二个参数表示分区间的计算规则
val rdd =sc.makeRDD(List(("a",1),("a",2),("c",3),("b",4),("c",5),("c",6)),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
// => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
val resultRDD =rdd.aggregateByKey(10)((x, y) => math.max(x,y),(x, y) => x + y)
resultRDD.collect().foreach(println)

  小练习:将数据 List((“a”, 88), (“b”, 95), (“a”, 91), (“b”, 93), (“a”, 95), (“b”, 98))求每个 key 的平均值

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)val value: RDD[(String, (Int, Int))] = input.aggregateByKey((0, 0))((t, v) => {(t._1 + v, t._2 + 1) //t._1 是 分区内总和 ,t._2是次数},(t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})
)

(21) foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

  当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

(22) combineByKey

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]

  最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
  小练习:将数据 List((“a”, 88), (“b”, 95), (“a”, 91), (“b”, 93), (“a”, 95), (“b”, 98))求每个 key 的平均值

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey((_, 1),(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

  思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
  reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同.
  CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
(23) sortByKey

 def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

  在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

小功能:设置 key 为自定义类 User.
(24) join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD
笛卡尔乘积增长

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "a"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (1, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
rdd1 两个1 下面两个1 会出现乘积

(25) leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V,Option[W]))]

  类似于 SQL 语句的左外连接

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

(26) cogroup = connect + group

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

  在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
dataRDD1.cogroup(dataRDD2)

5.1.4.4 RDD 行动算子 会触发返回结果

(1) reduce

def reduce(f: (T, T) => T): T

  聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 聚合数据
val reduceResult: Int = rdd.reduce(_+_)

(2) collect

def collect(): Array[T]

   在驱动程序中,以数组 Array 的形式返回数据集的所有元素
将不同分区数据按照分区顺序采集数据到Driver端内存 ,形成数组

def collect(): Array[T]

(3) count

def count(): Long

  返回 RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()

(4) first

def first(): T

  返回 RDD 中的第一个元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val firstResult: Int = rdd.first()
println(firstResult)

(5) take

def take(num: Int): Array[T]

  返回一个由 RDD 的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))

(6) takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

  返回该 RDD 排序后的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
// 返回 RDD 中元素的个数
val result: Array[Int] = rdd.takeOrdered(2)

(7) aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

  分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
// 将该 RDD 所有元素相加得到结果
//val result: Int = rdd.aggregate(0)(_ + _, _ + _)
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
输出100 初始值参与9次 8个区间内+一个区间间+1+2+3+4 =100

(8) fold

def fold(zeroValue: T)(op: (T, T) => T): T

  折叠操作,aggregate 的简化版操作

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_+_)

(9) countByKey

def countByKey(): Map[K, Long]

  统计每种 key 的个数

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
"b"), (3, "c"), (3, "c")))
// 统计每种 key 的个数
val result: collection.Map[Int, Long] = rdd.countByKey()

(10) save 相关算子

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit

  将数据保存到不同格式的文件中

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")

(11) foreach

def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

  分布式遍历 RDD 中的每一个元素,调用指定函数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集后打印  Driver端打印
rdd.map(num=>num).collect().foreach(println)
println("****************")
// 分布式打印  Executor端打印 (分布式节点)
rdd.foreach(println)

5.1.4.5 RDD 序列化

(1) 闭包检查
  **从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。**那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
(2) 序列化方法和属性
  从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行

5.1.4.6 RDD 依赖关系

(1) RDD 血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.toDebugString)//查看血缘关系
println("----------------------")
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println("----------------------")
val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println("----------------------")
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)
resultRDD.collect()

(2)依赖关系

  这里所谓的依赖关系,其实就是RDD之间的关系

(3)窄依赖 OneToOne =NarrowDependency
  窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

(4)宽依赖
  宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为超生。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]]

(5)RDD的运行机制

  输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block。当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

  1. 每个节点可以起一个或多个Executor。
  2. 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。
  3. 每个Task执行的结果就是生成了目标RDD的一个partiton。
    注意: 这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。
      而 Task被执行的并发度 = Executor数目 * 每个Executor核数。至于partition的数目:
  4. 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
  5. 在Map阶段partition数目保持不变。
  6. 在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。
      RDD在计算的时候,每个分区都会起一个task,所以rdd的分区数目决定了总的的task数目。申请的计算节点(Executor)数目和每个计算节点核数,决定了你同一时刻可以并行执行的task。

  比如有的RDD有100个分区,那么计算的时候就会生成100个task,你的资源配置为10个计算节点,每个两2个核,同一时刻可以并行的task数目为20,计算这个RDD就需要5个轮次。如果计算资源不变,你有101个task的话,就需要6个轮次,在最后一轮中,只有一个task在执行,其余核都在空转。如果资源不变,你的RDD只有2个分区,那么同一时刻只有2个task运行,其余18个核空转,造成资源浪费。这就是在spark调优中,增大RDD分区数目,增大任务并行度的做法。
在这里插入图片描述

(6)阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。
例如,DAG记录了RDD的转换过程和任务的阶段。

(7)RDD任务划分
RDD任务切分中间分为:Application、Job、Stage和Task
Application:初始化一个SparkContext即生成一个Application;

  • Job:一个Action算子就会生成一个Job;
  • Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;
  • Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

5.1.4.7 RDD 持久化

  1. RDD Cache 缓存
      RDD 通过 Cache 或者 Persist(持久化) 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action(行动)算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
    为啥需要持久化呢 ,因为RDD不缓存数据 ,如果中间有大量相同操作 ,会重新来一遍
// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

  缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
2. RDD CheckPoint 检查点
  所谓的检查点其实就是通过将 RDD 中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {word => {(word, System.currentTimeMillis())}
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

3.缓存和检查点区别
 1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
 2)Cache 缓存的数据通常存储在磁盘,内存等地方.可靠性低.Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
 3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
  具体如下图所示:

5.1.4.8 RDD 分区器(案例,自定义分区器)

   Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。
 ➢ 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
 ➢ 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
   1) Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余
   2) Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

object ParitionBy {def main(args: Array[String]): Unit = {val conf :SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")val sc = new SparkContext(conf)val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(("hadoop", 3), ("spark", 5), ("hive", 3), ("scala", 66)), 2)val partitionByRDD = kvRDD.partitionBy(new HashPartitioner(2))// partitionByRDD.saveAsTextFile("spark_demo/output/kvdemo2")val mypartitionByrdd: RDD[(String, Int)] = kvRDD.partitionBy(new MyPartitioner(2))mypartitionByrdd.saveAsTextFile("spark_demo/output/out21")sc.stop()}
}
class MyPartitioner(partition : Int) extends Partitioner{override def numPartitions: Int = partitionoverride def getPartition(key: Any): Int = {key.asInstanceOf[String].charAt(0)}
}

5.1.4.9 RDD 文件读取与保存

  Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
  文件格式分为:text 文件、csv 文件、sequence 文件以及Object 文件;
  文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。
➢ text 文件

// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")

➢ sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(FlatFile)。在 SparkContext 中,可以调用sequenceFile[keyClass, valueClass] (path)。

// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)

➢ object 对象文件
  对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T:ClassTag] (path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用

saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)

5.2 累加器

5.2.1 系统累加器

val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
// 就是Driver进入分别的每个Execute,每个Execute会返回值给 Driver端rdd.foreach(num => {// 使用累加器sum.add(num)}
)
// 获取累加器的值
println("sum = " + sum.value)

5.2.2 自定义累加器

// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String,
Long]]{var map : mutable.Map[String, Long] = mutable.Map()
// 累加器是否为初始状态
override def isZero: Boolean = {map.isEmpty
}
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new WordCountAccumulator
}
// 重置累加器
override def reset(): Unit = {map.clear()
}
// 向累加器中增加数据 (In)
override def add(word: String): Unit = {// 查询 map 中是否存在相同的单词// 如果有相同的单词,那么单词的数量加 1// 如果没有相同的单词,那么在 map 中增加这个单词map(word) = map.getOrElse(word, 0L) + 1L
}
// 合并累加器
override def merge(other: AccumulatorV2[String,mutable.Map[String, Long]]):
Unit = {val map1 = mapval map2 = other.value// 两个 Map 的合并map = map1.foldLeft(map2)(( innerMap, kv ) => {innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2innerMap})
}
// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
}

5.3 广播变量

5.3.1 实现原理

  广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {case (key, num) => {var num2 = 0// 使用广播变量for ((k, v) <- broadcast.value) {if (k == key) {num2 = v}}(key, (num, num2))}
}

Spark Core是相当多的知识点,希望读者细品,细细品,到这也就结束了,下篇再见

大数据技术之Spark(一)Spark Core相关推荐

  1. 【大数据】企业级大数据技术体系概述

    目录 产生背景 常见应用场景 企业级大数据技术框架 数据收集层 数据存储层 资源管理与服务协调层 计算引擎层 数据分析层 数据可视层 企业级大数据技术实现方案 Google 大数据技术栈 Hadoop ...

  2. 大表与大表join数据倾斜_技术分享|大数据技术初探之Spark数据倾斜调优

    侯亚南 数据技术处 支宸啸 数据技术处 在大数据计算中,我们可能会遇到一个很棘手的问题--数据倾斜,此时spark任务的性能会比预期要差很多:绝大多数task都很快执行完成,但个别task执行极慢或者 ...

  3. 大数据技术之Spark(一)——Spark概述

    大数据技术之Spark(一)--Spark概述 文章目录 前言 一.Spark基础 1.1 Spark是什么 1.2 Spark VS Hadoop 1.3 Spark优势及特点 1.3.1 优秀的数 ...

  4. 大数据技术原理与应用(第十章 Spark)

    目录 10.1 Spark简介 Spark的主要特点 Scala简介 Scala的特性 Spark与Hadoop的对比 Hadoop与Spark的执行流程对比 10.2 Spark生态系统 BDAS架 ...

  5. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  6. Spark大数据技术与应用 第一章Spark简介与运行原理

    Spark大数据技术与应用 第一章Spark简介与运行原理 1.Spark是2009年由马泰·扎哈里亚在美国加州大学伯克利分校的AMPLab实验室开发的子项目,经过开源后捐赠给Aspache软件基金会 ...

  7. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  8. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  9. 大数据技术原理与应用 第三篇 大数据处理与分析(三)Spark

    一. Spark简介 Spark最初由美国加州伯克利大学(UCBerkeley)的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的.低延迟的数据分析应用程序 1.1 ...

  10. 《Spark大数据分析:核心概念、技术及实践》大数据技术一览

    本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第1章,第1节,作者穆罕默德·古勒(Mohammed Guller)更多章节内容可以访问云栖社区"华章 ...

最新文章

  1. 【OpenCV 4开发详解】图像卷积
  2. 经验分享:微信小程序外包接单常见问题及流程
  3. php常量 变量,php语言的变量和常量
  4. 关于若干数据库数据插入性能的对比
  5. 10分钟理解CSS3 FlexBox
  6. python将姓王的都改成老王_Python老王视频习题答案
  7. C#实现局域网UDP广播
  8. db2数据库日期减一天_DB2 日期时间函数
  9. 连接Oracle时报错ORA-28547
  10. sudo chmod 755 ....指令分析
  11. 【数学】等差乘等比数列-差比数列求和公式
  12. java雅虎邮件发送
  13. 几种常见的十进制代码(笔记)
  14. 木兰编程语言,当事人最新回复来了
  15. 如何使用mtPaint制作像素艺术和GIF动画
  16. 2019年全国高校计算机能力挑战赛C++组初赛
  17. 太卷了,年薪40W的软件测试大D佬工作经验分享,原来我存在这么多问题......
  18. 锐捷Vlan基础实验
  19. BiLSTM, CRF,BiLSTM+CRF原理讲解以及viterbi算法python实现
  20. UE4玻璃材质不受sequencer焦距影响的解决办法

热门文章

  1. 技术交流:springboot配置阿里云日志服务与log4j2 lookup
  2. 机械革命笔记本开关键盘亮度
  3. (OK) MIMP - 18 ( 5 nodes) - 抓包-缺少 MPTCP-JION - 分析 mptcp-kmsg-client-5-nodes-no-ping.txt
  4. %3c %3e是什么编程语言,第1章 网站数据分析与网站统计工具基础.ppt
  5. RT_Thread中rtconfig.h解析
  6. UE4镜头抖动CameraShake
  7. 初学者如何选择适合自己的服务器
  8. 编程计算1至50中是7倍数的数值之和
  9. 【PAT】L1-050. 倒数第N个字符串【C语言实现】
  10. EXCEL——提取身份证中的出生年月日