1.Spark数据倾斜问题

  • 数据倾斜的主要表现:

    • Executor丢失、OOM、shuffle过程出错
    • Driver OOM
    • 单个Executor执行时间特别长、整体任务卡在某个阶段不能结束
    • 正常运行的任务突然消失
  • Debug:

    • Spark 4040 端口的 Web UI界面,查看哪个阶段卡住了
    • 联想代码中涉及shuffle的算子groupByKey、countByKey、reduceByKey、join…
    • 如果报异常了,则在YARN的日志中可以定位到具体的代码位置
    • 或者看log,看看是执行到了第几个stage。spark代码,是怎么划分成一个一个的stage的。哪一个stage生成的task特别慢,就能够自己用肉眼去对你的spark代码进行stage的划分,就能够通过stage定位到你的代码,到底哪里发生了数据倾斜。
  • 解决方案:

    • 聚合源数据

      • Spark作业的数据来源,90%都是来源于Hive表(HDFS)。数据倾斜,直接在生成hive表的hive etl中对数据进行聚合。比如按key来分组,将key对应的所有的values全部用一种特殊的格式拼接到一个字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火锅|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

        对key进行group,在spark中,拿到key=sessionid,values。hive etl中,直接对key进行了聚合。那么也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串进行map操作,进行你需要的操作即可。

    • 过滤导致倾斜的key

      • 如果你能够接受某些数据在spark作业中直接就摒弃掉不使用。比如说,总共有100万个key。只有2个key是数据量达到10万的。其他所有的key,对应的数量都是几十万。这个时候,你自己可以去取舍,如果业务和需求可以理解和接受的话,在你从hive表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key。那么这几个原先有大量数据,会导致数据倾斜的key,被过滤掉之后,那么在你的spark作业中,自然就不会发生数据倾斜了。
    • 提高shuffle操作reduce并行度

      • 将reduce task的数量变多,就可以让每个reduce task分配到更少的数据量。这样的话也许就可以缓解甚至是基本解决掉数据倾斜的问题。

      • 主要给我们所有的shuffle算子,比如groupByKey、countByKey、reduceByKey。在调用的时候,传入进去一个参数。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。这样的话,就可以让每个reduce task分配到更少的数据。基本可以缓解数据倾斜的问题。比如说,原本某个task分配数据特别多,直接OOM,内存溢出了,程序没法运行,直接挂掉。按照log,找到发生数据倾斜的shuffle操作,给它传入一个并行度数字,这样的话,原先那个task分配到的数据,肯定会变少。就至少可以避免OOM的情况,程序至少是可以跑的。

      • 治标不治本的意思,因为它没有从根本上改变数据倾斜的本质和问题。不像第一个和第二个方案(直接避免了数据倾斜的发生)。原理没有改变,只是说,尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。

        实际生产环境中的经验:

        1、如果最理想的情况下,提升并行度以后,减轻了数据倾斜的问题,或者甚至可以让数据倾斜的现象忽略不计,那么就最好。就不用做其他的数据倾斜解决方案了。

        2、不太理想的情况下,比如之前某个task运行特别慢,要5个小时,现在稍微快了一点,变成了4个小时。或者是原先运行到某个task,直接OOM,现在至少不会OOM了,但是那个task运行特别慢,要5个小时才能跑完。

        那么,如果出现第二种情况的话,各位,就立即放弃第三种方案,开始去尝试和选择后面的四种方案。

    • 使用随机key实现双重聚合

      • groupByKey、reduceByKey比较适合使用这种方式。join咱们通常不会这样来做。
      • 第一轮聚合的时候,对key进行打散,将原先一样的key,变成不一样的key,相当于是将每个key分为多组。先针对多个组,进行key的局部聚合。接着,再去除掉每个key的前缀,然后对所有的key进行全局的聚合。对groupByKey、reduceByKey造成的数据倾斜,有比较好的效果。
    • reduce join 转换成 map join(将小表使用广播变量广播出去,然后再大表join操作时,使用map算子从广播变量中获取数据进行合并)

    • rdd.sample 抽样统计key的个数(抽样取出key,对这些key进行 wordcount,看那个key对应的值比较多),然后将倾斜的过滤掉,前提是不影响最终的结果。

    • 大表与大表之间join:

      • 首先给RDD进行扩容,Spark默认是起200个task,一个大表起1000个task,另一个大表中的key随机加前缀

核心思想:将key均匀分布到不同的partition中,并行的去处理这批数据,就能解决数据倾斜的问题。


2.DAG图有没有看过,看到那些信息?

DAG:有向无环图,有方向无闭环。

DAG记录的是RDD的执行流程、RDD的依赖关系。

创建RDD的时候就构建了DAG图,执行action算子的时候一个完整的DAG图就形成了。

一个应该里面有多少个DAG图,就相当于执行了多少action算子。

一个Spark应用里面可以有多个DAG,有几个DAG是取决于触发了多少次的Action

一个DAG中会有不同的阶段也就是Stage,划分Stage的依据是宽依赖

  • 窄依赖和宽依赖的区别:

    • 窄依赖:
      1个子RDD的分区对应1个夫RDD的分区(map、filter、union…)
      1个子RDD的分区对应N个夫RDD的分区(co-partitioned join…)
    • 宽依赖:
      1个夫RDD对应非全部多个子RDD分区(groupByKey、reduceByKey、sortByKey)
      1个夫RDD对应所有子RDD分区(未经协调划分的join)

一个Stage中有1个Taskset,一个分区对应一个Task


3.TaskScheduler(任务调度)的作用是什么?

Spark任务的真正的运行时,由action算子进行一个触发,最终调用 sc.runJob() 方法,在driver端回初始化2个重要的组件 DagScheduler 和 TaskScheduler。

  • TaskScheduler的主要职责:

    • 负责将DagScheduler发送过来的taskset放入到Taskset的缓冲池中
    • TaskScheduler中的实现类 TaskSchedulerImpl -> 创建TasksetManager来进行对Taskset缓冲池中的Taskset进行管理(主要管理的时Taskset的生命周期)
    • Stage也就是teakset发送给TaskScheduler,TaskScheduler 将task发送给Executor执行

4.Spark的内存分布

  • 堆内内存

    • 在使用堆内内存的时候,如果设置的堆内内存是2G,读取的数据也是2G,然后又来了2G的数据,这样就会出现OOM内存溢出的情况,因为处理完2G的数据,堆内内存空间并不会马上进行GC回收。
  • 堆外内存
    • 堆外内存也就是物理内存,堆外内存可以精准的申请和释放空间,不需要GC回收,性能比较高,提升了任务执行的效率。

5.Spark处理小文件

每个小文件都会创建一个RDD,每个RDD都会有分区,分区过多,shuffle过程慢,读写开销大。所以要对小文件进行处理。

  • 处理方法

    • 重新分区默认不执行shuffle

      rdd.coalsece(numPartition: Int)
    • 重新分区底层调用 coalsece() , shuffle =true , 默认开启shuffle

      rdd.repartition(numPartition: Int)(implicit ord: Ordering[T] = null):RDD[T] = withScope{coalsece(numPartition,shuffle = true)
      }

    多分区 ——> 少分区 :可以不开启shuffle
    少分区 ——> 多分区 :必须开启shuffle

  • 总结:

    • 采用coalesce(numPartition: Int)对数据集repartition ,来合理的设置分区数。

6.Spark如何实现监控预警功能

Spark监控预警功能:可以实时监控Spark应用程序的运行状态,然后做一些预警。

  • 写代码实现
  • 使用YARNClient类获取Spark任务的状态
  • 判断任务的存活状态
  • 写代码获取Executor的个数,CPU,内存等信息
  • 拿到想要的数据之后,就可以对Spark进行一个状态监控的功能

7.reduceByKey 和 groupByKey 的区别

  • reduceByKey

    • 按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回的结果是RDD[K,V]
  • groupByKey
    • 按照key进行分组后,直接进行shuffle

建议使用 reduceByKey ,但是注意是否影响最终的业务逻辑


8.Spark的Cache缓存

Cache:Spark的缓存可以起到一个优化的作用

  • 将频繁使用的RDD进行缓存到内存中,后面的RDD用到的时候直接去内存中读取,不需要重新计算,提高了任务执行的效率
  • Cache是将数据保存在内存中
  • 程序结束后会被清除或者是手动调用 unpersist() 方法清除
  • Cache会记录RDD的血缘关系

9.什么是RDD?

RDD: 弹性分布式数据集

RDD是Spark中最基本的数据抽象

  • RDD的三个特征

    • 分区

      • 每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。

      • 在物理存储中,每个分区指向一个存储在内存或者硬盘中的数据块 (Block) ,其实这个数据块就是每个 task 计算出的数据块,它们可以分布在不同的节点上。

      • 所以,RDD 只是抽象意义的数据集合,分区内部并不会存储具体的数据,只会存储它在该 RDD 中的 index,通过该 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,然后通过底层存储层的接口提取到数据进行处理。

      • 在集群中,各个节点上的数据块会尽可能的存储在内存中,只有当内存没有空间时才会放入硬盘存储,这样可以最大化的减少硬盘 IO 的开销。

    • 不可变

      • 不可变性是指每个 RDD 都是只读的,它所包含的分区信息是不可变的。由于已有的 RDD 是不可变的,所以我们只有对现有的 RDD 进行转化 (Transformation) 操作,才能得到新的 RDD ,一步一步的计算出我们想要的结果。
      • 这样会带来这样的好处:我们在 RDD 的计算过程中,不需要立刻去存储计算出的数据本身,我们只要记录每个 RDD 是经过哪些转化操作得来的,即:依赖关系,这样一方面可以提高计算效率,一方面是错误恢复会更加容易。如果在计算过程中,第 N 步输出的 RDD 的节点发生故障,数据丢失,那么可以根据依赖关系从第 N-1 步去重新计算出该 RDD,这也是 RDD 叫做**“弹性”**分布式数据集的一个原因。
    • 并行操作

      • 因为 RDD 的分区特性,所以其天然支持并行处理的特性。即不同节点上的数据可以分别被处理,然后生成一个新的 RDD。
  • RDD的结构

    • 每个RDD里都会包括 分区信息、依赖关系 … … 信息,如图所示:

    • Partitions

      • Partitions 就是上面所说的,代表着 RDD 中数据的逻辑结构,每个 Partion 会映射到某个节点内存或者硬盘的一个数据块。
    • SparkContext

      • SparkContext 是所有 Spark 功能的入口,代表了与 Spark 节点的连接,可以用来创建 RDD 对象以及在节点中的广播变量等等。
      • 一个线程只有一个 SparkContext。
    • SparkConf

      • SparkConf 是一些配置信息。
    • Partitioner

      • Partitioner 决定了 RDD 的分区方式,目前两种主流的分区方式:Hash partioner 和 Range partitioner。
      • Hash 就是对数据的 Key 进行散列分布。
      • Rang 是按照 Key 的排序进行的分区。
      • 也可以自定义 Partitioner。
    • Dependencies

      • Dependencies 也就是依赖关系,记录了该 RDD 的计算过程,也就是说这个 RDD 是通过哪个 RDD 经过怎么样的转化操作得到的。

      • 这里有个概念,根据每个 RDD 的分区计算后生成的新的 RDD 的分区的对应关系,可以分成窄依赖宽依赖

        • 窄依赖就是父 RDD 的分区可以一 一对应到子 RDD 的分区。
        • 宽依赖是说父 RDD 的每个分区可以被多个子 RDD 分区使用。

        由于窄依赖的特性,窄依赖允许子 RDD 的每个分区可以被并行处理产生,而且支持在同一个节点上链式执行多条指令,无需等待其它父 RDD 的分区操作。

  • Spark 区分宽窄依赖的原因主要有两点:

    • 窄依赖支持在同一节点上进行链式操作,比如在执行了 map 后,紧接着执行 filter 操作。相反,宽依赖需要所有父分区都是可用的。

    • 从失败恢复的角度考虑,窄依赖失败恢复更有效,因为只要重新计算丢失的父分区即可,而宽依赖涉及到 RDD 的各级多个父分区。

  • Checkpoint

    • 检查点机制,在计算过程中有一些比较耗时的 RDD,我们可以将它缓存到硬盘或者 HDFS 中,标记这个 RDD 有被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于 CheckpointRDD 的依赖关系,CheckpintRDD 可以用来从硬盘中读取 RDD 和生成新的分区信息。
    • 这么做之后,当某个 RDD 需要错误恢复时,回溯到该 RDD,发现它被检查点记录过,就可以直接去硬盘读取该 RDD,无需重新计算。
  • Preferred Location

    • 针对每一个分片,都会选择一个最优的位置来计算,数据不动,代码动。
  • Storage Level

    • 用来记录RDD持久化时存储的级别,常用的有:

      • MEMORY_ONLY:只存在缓存中,如果内存不够,则不缓存剩余的部分。RDD默认的存储级别。
      • MEMORY_AND_DISK:缓存在内存中,不够则缓存至磁盘。
      • DISK_ONLY:只存硬盘。
      • MEMORY_ONLY_2 和 MEMORY_AND_DISK_2 等:与上面的级别和功能相同,只不过每个分区在集群两个节点上简历副本。
  • Iterator

    • 迭代函数和计算函数是用来表示 RDD 怎样通过父 RDD 计算得到的。
    • 迭代函数首先会判断缓存中是否有想要计算的 RDD,如果有就直接读取,如果没有就查找想要计算的 RDD 是否被检查点处理过。
    • 如果有,就直接读取,如果没有,就调用计算函数向上递归,查找父 RDD 进行计算。

10.Spark的落盘场景

  • 在shuffle过程中回进行落盘操作
  • shuffle分为 shuffle write 和 shuffle read 在这期间回进行一个数据落盘的操作
  • 设置 checkpoint 也会落盘到磁盘或HDFS
  • 最后将数据写出也会落到磁盘上

11.Spark Shuffle

大多数Spark作业的性能主要是消耗在 shuffle 环节,因为shuffle包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,想要让Spark作业性能更好,就必须对 shuffle 过程进行调优。

我们知道stage中是高效快速的pipline的计算模式,宽依赖之间会划分stage,而Stage之间就是Shuffle,如图中的stage0,stage1和stage3之间就会产生Shuffle。

在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。

ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种

  • Spark Shuffle发展史

    • 在Spark的源码中,负责 shuffle 过程的执行、计算、处理的组件主要就是 ShuffleManager,即 shuffle 管理器。随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。
    • 在Spark 1.2 以前,默认的shuffle计算引擎是 HashShuffleManager。该ShuffleManager(HashShuffleManager)有一个非常严重的弊端,就是会产生大量的中间磁盘文件,由大量的磁盘IO操作影响了性能。
    • 因此在Spark 1.2 以后的版本中,默认的ShuffleManager改成了 SortShufflerManager。SortShuffleManager 相较于 HashShuffleManager来说,有了一定的改进。主要在于,每个Task在进行Shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在一个Stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
  • Hash Shuffle

    • HashShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是合并的运行机制。

      合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量。Hash shuffle是不具有排序的Shuffle。

    • 普通机制的Hash shuffle

      • 图解

        • 这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

          图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。

        • shuffle write阶段

          主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey,groupByKey),而将每个task处理的数据按key进行“分区”。所谓“分区”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于reduce端的stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

          那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

        • shuffle read阶段

          shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给Reduce端的stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

          shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

        • 注意

          buffer起到的是缓存作用,缓存能够加速写磁盘,提高计算的效率,buffer的默认大小32k。

          分区器:根据hash/numRedcue取模决定数据由几个Reduce处理,也决定了写入几个buffer中。

          block file:磁盘小文件,从图中我们可以知道磁盘小文件的个数计算公式:block file=M*R

          M为map task的数量,R为Reduce的数量,一般Reduce的数量等于buffer的数量,都是由分区器决定的。

    • 合并机制的Hash shuffle

      合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

      • 图解

        • 这里还是有4个Tasks,数据类别还是分成3种类型,因为Hash算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少个Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。

        • 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

        • Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

        • 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

        • 注意

          • 启动HashShuffle的合并机制ConsolidatedShuffle的配置:

            spark.shuffle.consolidateFiles=true

          • block file=Core*R

            Core为CPU的核数,R为Reduce的数量

          • Hash shuffle合并机制的问题

            如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

    • Sort Shuffle

      • SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

      • Sort shuffle的普通机制

        • 图解

          • 写入内存数据结构

            该图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

          • 注意

            • shuffle中的定时器:定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:

              applyMemory = nowMenory * 2 - oldMemory (申请的内存 = 当前的内存情况 * 2 - 上一次的内嵌情况)

              意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写

            • 排序

              在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。

            • 溢写

              排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

            • merge

              一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

          SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

        • 注意

          • block file= 2M

            一个map task会产生一个索引文件和一个数据大文件

          • m*r>2m(r>2):SortShuffle会使得磁盘小文件的个数再次的减少

      • Sort shuffle的bypass机制

      • 图解

        • bypass运行机制的触发条件如下:

          • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
          • 不是聚合类的shuffle算子(比如reduceByKey)。
        • 此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

          该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

          而该机制与普通SortShuffleManager运行机制的不同在于:

          第一,磁盘写机制不同;

          第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

      • 总结

        Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。

        shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle,

        HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到CoreR个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

        SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能

        在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager,因为HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。


12.Spark OOM 处理 及 优化

处理

  • map过程产生大量对象导致内存溢出

    这种溢出的原因是在单个map中产生了大量的对象导致的。

    例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。

    针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。

    具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。

    例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)

    面对这种问题注意,不能使用rdd.coalesce方法,coalesce这个方法只能减少分区,不能增加分区,不会有shuffle的过程。

  • 数据不平衡导致内存溢出

    数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。

  • coalesce调用导致内存溢出

    因为hdfs中不适合存小文件问题,所以Spark计算后如果产生的文件太小,我们会调用coalesce合并文件再存入hdfs中。

    但是这会导致一个问题,例如在coalesce之前有100个文件,这也意味着能够有100个Task,现在调用coalesce(10),最后只产生10个文件,因为coalesce并不是shuffle操作,这意味着coalesce并不是按照我原本想的那样先执行100个Task,再将Task的执行结果合并成10个,而是从头到尾只有10个Task在执行,原本100个文件是分开执行的,现在每个Task同时一次读取10个文件,使用的内存是原来的10倍,这导致了OOM。

    解决这个问题的方法是令程序按照我们想的先执行100个Task再将结果合并成10个文件,这个问题同样可以通过repartition解决,调用repartition(10),因为这就有一个shuffle的过程,shuffle前后是两个Stage,一个100个分区,一个是10个分区,就能按照我们的想法执行。

  • shuffle后内存溢出

    shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。

    在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。

  • Stand alone 模式下资源分配不均匀导致内存溢出

    在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。

  • 在 RDD 中,共用对象能减少 OOM 的情况

    这个比较特殊,这里说记录一下,遇到过一种情况,类似这样rdd.flatMap(x=>for(i <- 1 to 1000) yield (“key”,”value”))导致OOM,但是在同样的情况下,使用rdd.flatMap(x=>for(i <- 1 to 1000) yield “key”+”value”)就不会有OOM的问题,这是因为每次(“key”,”value”)都产生一个Tuple对象,而”key”+”value”,不管多少个,都只有一个对象,指向常量池。具体测试如下:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bpbQx5qR-1597360629885)(C:\Users\etert\AppData\Roaming\Typora\typora-user-images\image-20200811082131487.png)]

这个例子说明(“key”,”value”)和(“key”,”value”)在内存中是存在不同位置的,也就是存了两份,但是”key”+”value”虽然出现了两次,但是只存了一份,在同一个地址,这用到了JVM常量池的知识.于是乎,如果RDD中有大量的重复数据,或者Array中需要存大量重复数据的时候我们都可以将重复数据转化为String,能够有效的减少内存使用.

优化:

  • 使用 mapPartitions 代替大部分 map 操作,或者连续使用的 map 操作

    这里需要稍微讲一下RDD和DataFrame的区别。RDD强调的是不可变对象,每个RDD都是不可变的,当调用RDD的map类型操作的时候,都是产生一个新的对象,这就导致了一个问题,如果对一个RDD调用大量的map类型操作的话,每个map操作会产生一个到多个RDD对象,这虽然不一定会导致内存溢出,但是会产生大量的中间数据,增加了GC操作。另外RDD在调用action操作的时候,会触发Stage的划分,但是在每个Stage内部可优化的部分是不会进行优化的,例如rdd.map(+1).map(+1),这个操作在数值型RDD中是等价于rdd.map(_+2)的,但是RDD内部不会对这个过程进行优化。DataFrame则不同,DataFrame由于有类型信息所以是可变的,并且在可以使用sql的程序中,都有除了解释器外,都会有一个sql优化器,DataFrame也不例外,有一个优化器Catalyst,具体介绍看后面参考的文章。

    上面说到的这些RDD的弊端,有一部分就可以使用mapPartitions进行优化,mapPartitions可以同时替代rdd.map,rdd.filter,rdd.flatMap的作用,所以在长操作中,可以在mapPartitons中将RDD大量的操作写在一起,避免产生大量的中间rdd对象,另外是mapPartitions在一个partition中可以复用可变类型,这也能够避免频繁的创建新对象。使用mapPartitions的弊端就是牺牲了代码的易读性。

  • broadcast join 和普通 join

    在大数据分布式系统中,大量数据的移动对性能的影响也是巨大的。基于这个思想,在两个RDD进行join操作的时候,如果其中一个RDD相对小很多,可以将小的RDD进行collect操作然后设置为broadcast变量,这样做之后,另一个RDD就可以使用map操作进行join,这样能够有效的减少相对大很多的那个RDD的数据移动。

  • 先 filter 再 join

    这个就是谓词下推,这个很显然,filter之后再join,shuffle的数据量会减少,这里提一点是spark-sql的优化器已经对这部分有优化了,不需要用户显示的操作,个人实现rdd的计算的时候需要注意这个。

  • partitionBy 优化

    《spark partitioner使用技巧》

  • combineByKey的使用

    这个操作在Map-Reduce中也有,这里举个例子:rdd.groupByKey().mapValue(_.sum) 比 rdd.reduceByKey的效率低,原因如下两幅图所示:

上下两幅图的区别就是上面那幅有combineByKey的过程减少了shuffle的数据量,下面的没有。combineByKey是key-value型rdd自带的API,可以直接使用。

  • 内存不足时的优化

    在内存不足的使用,使用 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 代替 rdd.cache():
    rdd.cache() 和 rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

参数优化:

  • spark.driver.memory(default 1g)

    这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。

  • spark.rdd.compress(default false)

    这个参数在内存吃紧的时候,又需要persist数据有良好的性能,就可以设置这个参数为true,这样在使用persist(StorageLevel.MEMORY_ONLY_SER)的时候,就能够压缩内存中的rdd数据。减少内存消耗,就是在使用的时候会占用CPU的解压时间。

  • spark.serializer(default org.apache.spark.serializer.JavaSerializer)

    建议设置为 org.apache.spark.serializer.KryoSerializer,因为KryoSerializer比JavaSerializer快,但是有可能会有些Object会序列化失败,这个时候就需要显示的对序列化失败的类进行KryoSerializer的注册,这个时候要配置spark.kryo.registrator参数或者使用参照如下代码:

    valconf=newSparkConf().setMaster(…).setAppName(…)
    conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
    valsc =newSparkContext(conf)
    
  • spark.memory.storageFraction(default 0.5)

    这个参数设置内存表示 Executor内存中 storage/(storage+execution),虽然spark-1.6.0+的版本内存storage和execution的内存已经是可以互相借用的了,但是借用和赎回也是需要消耗性能的,所以如果明知道程序中storage是多是少就可以调节一下这个参数。

  • spark.locality.wait(default 3s)

    spark中有4种本地化执行level,PROCESS_LOCAL->NODE_LOCAL->RACK_LOCAL->ANY,一个task执行完,等待spark.locality.wait时间如果,第一次等待PROCESS的Task到达,如果没有,等待任务的等级下调到NODE再等待spark.locality.wait时间,依次类推,直到ANY。分布式系统是否能够很好的执行本地文件对性能的影响也是很大的。如果RDD的每个分区数据比较多,每个分区处理时间过长,就应该把 spark.locality.wait 适当调大一点,让Task能够有更多的时间等待本地数据。特别是在使用persist或者cache后,这两个操作过后,在本地机器调用内存中保存的数据效率会很高,但是如果需要跨机器传输内存中的数据,效率就会很低。

  • spark.speculation(default false)

    一个大的集群中,每个节点的性能会有差异,spark.speculation这个参数表示空闲的资源节点会不会尝试执行还在运行,并且运行时间过长的Task,避免单个节点运行速度过慢导致整个任务卡在一个节点上。这个参数最好设置为true。与之相配合可以一起设置的参数有spark.speculation.×开头的参数。参考中有文章详细说明这个参数。


13.Spark Shuffle 默认并行度

Spark Shuffle 默认并行度,由 spark.sql.shuffle.partitions 决定,默认并行度 200


14.Coalesce 和 Reparation 的关系与区别

关系:

​ 两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}

区别:

repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shufuule。

一般情况下增大RDD的partition数量使用repartition,减少partition数量时使用coalesce。

在Spark中减少文件个数,会使用 coalesce来减少分区来达到目的。

但是如果数据量过大,分区数过多会出现OOM。所以 coalesce 缩小分区个数也需合理。


15.如何使用Spark实现TopN的获取

  • 方法1:

    1.按照key对数据进行聚合(groupByKey)

    2.将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。

  • 方法2:

    1.自定义分区器,按照key进行分区,使用不同的key进到不同的分区

    2.对每个分区用Spark的排序算子进行排序


16.Spark中共享变量

Spark两种共享变量:广播变量(broadcast variable),累加器(accumulator)

broadcast variable:用来高效分发较大对象

accumulate:用来对信息进行聚合

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序(Driver)中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器(Driver)中的对应变量。Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

用一段代码来更直观的解释:

object BroadcastTest{def main(args:Array[String]):Unit={val conf = new SparkConf().setMaster("local").setAppName("broadcast")val sc = new SparkContext(conf)val list = List("hello java")val linesRDD = sc.textFile("./word")linesRDD.filter(line => {list.contains(line)}).foreach(println)}
}

算子的部分实在Executor端执行,其他在Driver端执行。

list是在Driver端创建的,但是因为需要在Executor端使用,所以Driver会把list以task的形式发送到Executor端,如果有很多个task,就会有很多给Executor端携带很多个list,如果这个list非常大的时候,就会造成内存溢出(如下图所示)。这个时候就引出了广播变量。

使用广播变量后:

使用广播变量的过程:

  1. 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。任何可序列化的类型都可以这么实现。

  2. 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。

  3. 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

案例如下:

object BroadcastTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("broadcast")val sc = new SparkContext(conf)val list = List("hello java")val broadcast = sc.broadcast(list)val linesRDD = sc.textFile("./word")linesRDD.filter(line => {broadcast.value.contains(line)}).foreach(println)sc.stop()}
}

注意事项:

  • 能不能将一个RDD使用广播变量广播出去?

    不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

  • 广播变量只能在Driver端定义,不能在Executor端定义。

  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。


累加器 Accumulator

object AccumulatorTest{def main(args:Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("accumulator")val sc = new SparkContext(conf)val linesRDD = sc.textFile("./word")// 定义一个统计行数的变量var i = 0val result = linesRDD.map(s => {i += 1s})result.collect()println("word lines is :" + i)sc.stop()}
}

打印结果为:

word lines is : 0

依然是Driver端和Executor端的数据不能共享的问题。Executor端修改了变量,根本不会让Driver端跟着修改,这个就是累加器出现的原因。

累加器的作用:

提供了将Executor节点中的值聚合到Driver程序中的简单语法

常用场景:

调试时对作业执行过程中的时间进行计数。

累加器的用法:

  1. 通过在Driver中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。
  2. Spark闭包(函数序列化)里的Executor代码可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。
  3. Driver程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue() )来访问累加器的值。

案例如下:

object AccumulatorTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("accumulator")val sc = new SparkContext(conf)//创建accumulator并初始化为0val accumulator = sc.accumulator(0); val linesRDD = sc.textFile("./word")val result = linesRDD.map(s => {//有一条数据就增加1accumulator.add(1) s})result.collect();println("words lines is :" + accumulator.value)sc.stop()}
}

输出结果为:

word lines is :6

注意事项:

累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Executor端更新


17.YARN-Client 提交流程

  1. 脚本启动执行
  2. SparkSubmit里面调用Driver内部的main方法
  3. 申请启动ExecutorLauncher(发射器)
  4. ResourceManager选择一台NodeManager启动ExecutorLauncher
  5. ExecutorLauncher执行ApplicationMaster的main方法
  6. ExecutorLauncher向ResourceManager申请资源
  7. 启动ExecutorBackend(后端)
  8. 在ExecutorBackend内部创建Executor对象
  9. 分配任务给Executor

18.YARN-Cluster 提交流程

  1. 脚本启动执行
  2. SparkSubmit里面调用Client内部的main方法
  3. 封装并向ApplicationMaster发送指令
  4. ResourceManager选择一台NodeManager启动ApplicationMaster
  5. ApplicationMaster启动Driver线程执行作业
  6. Driver向ResourceManager申请资源
  7. 启动ExecutorBackend
  8. 在ExecutorBackend内部创建Executor对象
  9. 分配任务给Executor

20.Spark调优原则

  • 避免创建重复的RDD
  • 尽可能复用同一个RDD
  • 对多次使用的RDD进行持久化
  • 尽量避免使用shuffle类算子
  • 使用map-side预聚合的shuffle操作
  • 使用高性能的算子
  • 广播大变量
  • 使用Kyro优化序列化性能
    • kryo序列化比java序列化更紧凑,但Spark默认的序列化时java序列化而不是Spark序列化
    • 因为Spark序列化并不支持所有序列化类型,而且每次使用都必须进行注册
    • 注册只针对于RDD,在DataFrames和DataSet当中自动实现kryo序列化
  • 优化数据结构
    • 在java中有三种类型比较耗费内存

      • 对象(有对象头,引用等额外信息)
      • 字符串(每个字符串都有一个字符数组及长度等额外信息)
      • 集合类型,比如hashMap,linkedList等,因为集合类型内部会用一些内部类来封装集合元素
    • 尽量少使用上述的三种数据类型可以起到优化,但是后期代码维护会比较麻烦

21.提交Spark任务到yarn设置了8个G,Executor内存为6个G,结果消耗了7个G的资源,为什么?

  • 会超过设置的6个G + 1024的归整化因子内存(默认是1G) = 7G内存
  • 合理的去根据源码调整申请内存的大小,申请多了会造成一个资源的了浪费,比如我就申请的6个G抛出了近7个G的资源消耗,管理集群的哥们又该找上门来了

22.Spark SQL 中RDD与DataFream及DataSet之间的关系

  • 宏观

    • RDD:弹性分布式数据集
    • DataFream:在RDD之上多了一层schema
    • DataSet:在DataFream之上多了一个数据类型
  • 微观

    • RDD

      • 优点

        • 编译时安全

          编译的时候可以检查类型是否安全

        • 面向对象的编程风格

          可以通过直接点方法对数据进行操作

      • 缺点

        • 序列化和反序列化消耗资源太大

          反序列化时会将数据结构和数据内容都反序列化

        • GC操作频繁

          RDD要频繁的创建和销毁,务必会产生很多的GC操作

    • DataFream

      • 在RDD之上引入了schema和off-heap
      • 多个RDD的每一行的数据结构都一致,Spark就可以通过schema来识别数据结构,在反序列化的时候可以只反序列化数据而结构就可以忽略掉了
    • DataSet

      • DataSet综合了RDD和DataFream的优点,并引入了Encodeing
      • 数据在进行序列化时.encodeing出来的字节码和off-heap互通,这样就可以做到按需读取数据,而不需要将所有的数据反序列化
  • 三者之间的转换方式

    • RDD - DF = toDF
    • RDD - DS = toDS
    • DF - DS = as[类型]
    • DS - DF = toDF
    • DS - RDD = .rdd
    • DF - RDD = .rdd

23.append 和 overwrite 的区别

  • append

    在原有分区上进行追加

  • overwrite

    在原有分区上进行全量刷新


24.通过Kafka连接Flume去落盘HDFS,为什么不去用SparkStreaming去落盘呢?

如果做一些优化的操作时,使用SparkStreaming就会非常复杂,而使用Flume只需要配置参数就行了。

比如优化小文件这些等等。


25.SparkStreaming消费Kafka数据的两种方式及之间的区别

  • receiver

    • receiver 是通过调用 Kafka 的高阶 API 来获取 Kafka 中的数据,读取到 Kafka 数据后,会先加载到 Spark Executor 中的内存里,(如果一时间数据保证可能会出现OOM)然后 SparkStreaming 启动的 job 就会处理这批数据
    • 这种方式默认是无法保证数据的高可靠的,可能会因为底层节点的一些失败而丢失数据,如果要想保证数据的高可靠,则要开启WAL预写日志机制,WAL机制会将消费到的数据往分布式文件系统中也写一份,这样就可以保证底层节点的失败,可以通过预写日志中的数据进行恢复
  • direct
    • direct 是spark1.3版本以后引入的,可以更加确保健壮的一个机制,他会定时的去读取 kafka 中 topic+partition 中最新的offset,从而定义每个 batch 的offset 的范围,通过调用 kafka 的低阶API指定 offset 来获取数据
    • 优点
      • 拥有消费一次而且仅消费一次的事务
      • 简化并行处理
        • 如果读取多个 partition 中的数据,不需要创建多个 Dstream 然后再做 union 的操作
          Spark在创建 RDD 的时候,会创建相应数量的分区,并行的从 kafka 中读取数据,kafka 的 Partition 与 RDD 中的 Partition 是一对一的映射关系
      • 高性能
        • receiver 如果要想保证高可靠的前提下,他就要开启WAL机制,但是这个机制的效率是十分的低的,因为kafka中本来就有副本,你在读取的时候再往分布式文件系统中写一份,就会造成一个数据的冗余,,而direct方式只要是kafka中开启了副本机制,他就可以通过kafka中的副本进行恢复数据,来完成数据的高可靠
  • 区别
    • receiver是使用kafka的高阶API在Zookeeper中保存offset,而这是消费kafka数据的默认方式,保证数据高可靠是通过WAL机制,但是他无法保障数据消费一次而且仅被消费一次,因为zookeeper中保存的offset和SparkStreaming中保存可能不同步
    • direct方式是通过调用consumer的低阶API来自己维护offset,并保存到checkpoint中,由于是sparkStreaming自己维护offset,他就可以保证数据消费一次且仅消费一次

26.实时有哪些指标,怎么实现的

  • 一小时内日活实时统计
  • 一小时内订单数实时统计
  • 一小时内交易额实时统计
  • 一小时内光谷点击实时统计
    • 数据打到kafka中,SparkStreaming消费kafka中的数据,使用设备id作为key保存到redis中,判断可以存在吗?存在累加,不存在+1,
      并将key对应的数量存入到redis中,如果点击超过100次我们会将此设备拉入到广告黑名单中

27.如何监控SparkStreaming任务的状态

  • 自定义 StreamingListener

    • 监控批次处理时间,若超过阀值则警告

28.简述 SparkStreaming 窗口函数的原理

窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,

每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。


29.Spark Streaming 默认分区个数

Spark Streaming默认分区个数与所对接的kafka topic分区个数一致,

Spark Streaming里一般不会使用repartition算子增大分区,因为repartition会进行shuffle增加耗时。


30.Spark Streaming 精准一次消费

  • 手动维护偏移量
  • 处理完业务数据后,再进行提交偏移量操作
  • 极端情况下,如在提交偏移量时断网或停电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事务保证精准一次消费

a中开启了副本机制,他就可以通过kafka中的副本进行恢复数据,来完成数据的高可靠

  • 区别

    • receiver是使用kafka的高阶API在Zookeeper中保存offset,而这是消费kafka数据的默认方式,保证数据高可靠是通过WAL机制,但是他无法保障数据消费一次而且仅被消费一次,因为zookeeper中保存的offset和SparkStreaming中保存可能不同步
    • direct方式是通过调用consumer的低阶API来自己维护offset,并保存到checkpoint中,由于是sparkStreaming自己维护offset,他就可以保证数据消费一次且仅消费一次

26.实时有哪些指标,怎么实现的

  • 一小时内日活实时统计
  • 一小时内订单数实时统计
  • 一小时内交易额实时统计
  • 一小时内光谷点击实时统计
    • 数据打到kafka中,SparkStreaming消费kafka中的数据,使用设备id作为key保存到redis中,判断可以存在吗?存在累加,不存在+1,
      并将key对应的数量存入到redis中,如果点击超过100次我们会将此设备拉入到广告黑名单中

27.如何监控SparkStreaming任务的状态

  • 自定义 StreamingListener

    • 监控批次处理时间,若超过阀值则警告

28.简述 SparkStreaming 窗口函数的原理

窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,

每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。


29.Spark Streaming 默认分区个数

Spark Streaming默认分区个数与所对接的kafka topic分区个数一致,

Spark Streaming里一般不会使用repartition算子增大分区,因为repartition会进行shuffle增加耗时。


30.Spark Streaming 精准一次消费

  • 手动维护偏移量
  • 处理完业务数据后,再进行提交偏移量操作
  • 极端情况下,如在提交偏移量时断网或停电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事务保证精准一次消费

Spark面试题——常问相关推荐

  1. 2022Java春招面试经历,mybatis面试题常问

    前言 在网络技术中基于浏览器的B/S结构无论在PC端还是手机端都充当着至关重要的角色. PC端自不必说,手机中很多应用虽然是以APP的形式存在,但它采用的还是B/S结构.如今日头条.微信的朋友圈等,这 ...

  2. 最常问的MySQL面试题集合

    除了基础题部分,本文还收集整理的MySQL面试题还包括如下知识点或题型: MySQL高性能索引 SQL语句 MySQL查询优化 MySQL高扩展高可用 MySQL安全性 问题1:char.varcha ...

  3. 2022最新Spring相关大厂常问技术面试题大全 —— 金三银四好时机

    Spring相关大厂常问面试题 1. 什么是 Spring 框架? 2. 列举一些重要的Spring模块? 3. @RestController 与 @Controller 的区别 4. 谈谈自己对于 ...

  4. Java继承_30道最常问的Java基础面试题

    30道最常问的Java基础面试题 1. 面向对象和面向过程的区别面向过程优点: 性能比面向对象高,因为类调用时需要实例化,开销比较大,比较消耗资源;比如单片机.嵌入式开发.Linux/Unix等一般采 ...

  5. 企业面试题|最常问的MySQL面试题集合(二)

    MySQL的关联查询语句 六种关联查询 交叉连接(CROSS JOIN) 内连接(INNER JOIN) 外连接(LEFT JOIN/RIGHT JOIN) 联合查询(UNION与UNION ALL) ...

  6. 2020年Java常问面试题--聂

    2020年Java常问面试题 打算这几天每天更新15~20题.(这样有助于你们阅读和理解!)我们先从简单的开始 加qq群:568680919,免费赠送1000道java面试题和简历模板 JDK 和 J ...

  7. 企业面试题|最常问的MySQL面试题集合(一)

    问题1:char.varchar的区别是什么? varchar是变长而char的长度是固定的.如果你的内容是固定大小的,你会得到更好的性能. 问题2: TRUNCATE和DELETE的区别是什么? D ...

  8. 软件测试常问100道面试题(含答案以及案例解析),全网最全最新

    软件测试常问100道面试题,找工作.招人必备之良品.后期不断完善中-- 面试完整版答案文末直接查看 1.您所熟悉的测试用例设计方法都有哪些?请分别以具体的例子来说明这些方法在测试用例设计工作中的应用. ...

  9. HTTP协议常问的面试题(吐血整理)

    HTTP协议常问的面试题(吐血整理) 1.http协议请求方式 : HTTP1.0定义了三种请求方法: GET, POST 和 HEAD方法 HTTP1.1新增了五种请求方法:OPTIONS, PUT ...

最新文章

  1. Dockerfile构建实践
  2. 深度学习的五个能力级别
  3. php post收不到值,php 取不到POST 值
  4. github(入门),不入门找卢姥爷
  5. day18 15.自定义连接池
  6. 字符串操作之字符串拷贝功能实现
  7. 揭开Vmware 虚拟软件层的秘密
  8. python解析器是什么_如何用python写一个简单的词法分析器
  9. Linux下不停止服务,清空nohup.out文件
  10. java毕业设计开题报告论文基于JavaWeb项目实现的高校学生在线选课系统
  11. vue css下载字体并引入使用
  12. 【Wifi密码破解】安卓手机、电脑如何破解Wifi密码
  13. 八人抢答器讲解_八人竞赛抢答器需求分析
  14. Receiver Operating Characteristic(ROC)
  15. 震撼心灵、洗礼灵魂--【经典的大师参禅的禅语】
  16. 去除IDEA报黄色/灰色的重复代码的下划波浪线
  17. Android Watchdog框架解析、应用与改造(上)
  18. c++文件操作案例-----创建文本文件
  19. 微信小程序--自定义组件(超详细 从新建到使用)
  20. Objective-c:内存管理

热门文章

  1. 短小精悍的视频拍摄利器-Winavi Video Capture 绿色版_我是亲民_新浪博客
  2. IOS快捷指令-高德地图一键回家
  3. flask markdown 报错解决
  4. linux下c语言修改时间和时区,Linux 修改时间和时区为上海时区
  5. 婚礼拍得像MV一样,好感动
  6. Android恶意软件的行为总结
  7. 瘦死的骆驼比马大,通用破产,悍马卖给了四川一家公司
  8. 京东双十一销售额突破2000亿
  9. 深拷贝和浅拷贝开发常用方法总结
  10. 贪吃蛇游戏代码--初篇