问题导读
1、如何减少批数据的执行时间?
2、Spark有哪些方面的性能优化?
3、有哪些错误我们需要关心?

(一)减少批数据的执行时间
在Spark中有几个优化可以减少批处理的时间。这些可以在优化指南中作了讨论。这节重点讨论几个重要的。

数据接收的并行水平

通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上) 接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将 在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。

  1. val numStreams = 5
  2. val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
  3. val unifiedStream = streamingContext.union(kafkaStreams)
  4. unifiedStream.print()

复制代码

另外一个需要考虑的参数是receiver的阻塞时间。对于大部分的receiver,在存入Spark内存之前,接收的数据都被合并成了一个大数据块。每批数据中块的个数决定了任务的个数。这些任务是用类 似map的transformation操作接收的数据。阻塞间隔由配置参数spark.streaming.blockInterval决定,默认的值是200毫秒。

多输入流或者多receiver的可选的方法是明确地重新分配输入数据流(利用inputStream.repartition(<number of partitions>)),在进一步操作之前,通过集群的机器数分配接收的批数据。

数据处理的并行水平

如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。例如,对于分布式reduce操作如reduceByKey和reduceByKeyAndWindow,默认的并发任务数通过配置属性来确定(configuration.html#spark-properties) spark.default.parallelism。你可以通过参数(PairDStreamFunctions (api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))传递并行度,或者设置参数 spark.default.parallelism修改默认值。

数据序列化

数据序列化的总开销是平常大的,特别是当sub-second级的批数据被接收时。下面有两个相关点:

  • Spark中RDD数据的序列化。关于数据序列化请参照Spark优化指南。注意,与Spark不同的是,默认的RDD会被持久化为序列化的字节数组,以减少与垃圾回收相关的暂停。
  • 输入数据的序列化。从外部获取数据存到Spark中,获取的byte数据需要从byte反序列化,然后再按照Spark的序列化格式重新序列化到Spark中。因此,输入数据的反序列化花费可能是一个瓶颈。

任务的启动开支

每秒钟启动的任务数是非常大的(50或者更多)。发送任务到slave的花费明显,这使请求很难获得亚秒(sub-second)级别的反应。通过下面的改变可以减小开支

  • 任务序列化。运行kyro序列化任何可以减小任务的大小,从而减小任务发送到slave的时间。
  • 执行模式。在Standalone模式下或者粗粒度的Mesos模式下运行Spark可以在比细粒度Mesos模式下运行Spark获得更短的任务启动时间。可以在在Mesos下运行Spark中获取更多信息。

These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.

(二)设置正确的批容量
为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。

根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数 (批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能满足数据处理速率,你可以通过检查端到端的延迟值来判断(可以在 Spark驱动程序的log4j日志中查看"Total delay"或者利用StreamingListener接口)。如果延迟维持稳定,那么系统是稳定的。如果延迟持续增长,那么系统无法跟上数据处理速率,是不稳定的。 你能够尝试着增加数据处理速率或者减少批容量来作进一步的测试。注意,因为瞬间的数据处理速度增加导致延迟瞬间的增长可能是正常的,只要延迟能重新回到了低值(小于批容量)。

(三)内存调优
调整内存的使用以及Spark应用程序的垃圾回收行为已经在Spark优化指南中详细介绍。在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以 减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。

  • Default persistence level of DStreams:和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。 即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。
  • Clearing persistent RDDs:默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化 RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个 配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
  • Concurrent garbage collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。

(四)容错语义
这一节,我们将讨论在节点错误事件时Spark Streaming的行为。为了理解这些,让我们先记住一些Spark RDD的基本容错语义。

  • 一个RDD是不可变的、确定可重复计算的、分布式数据集。每个RDD记住一个确定性操作的谱系(lineage),这个谱系用在容错的输入数据集上来创建该RDD。
  • 如果任何一个RDD的分区因为节点故障而丢失,这个分区可以通过操作谱系从源容错的数据集中重新计算得到。
  • 假定所有的RDD transformations是确定的,那么最终转换的数据是一样的,不论Spark机器中发生何种错误。

Spark运行在像HDFS或S3等容错系统的数据上。因此,任何从容错数据而来的RDD都是容错的。然而,这不是在Spark Streaming的情况下,因为Spark Streaming的数据大部分情况下是从 网络中得到的。为了获得生成的RDD相同的容错属性,接收的数据需要重复保存在worker node的多个Spark executor上(默认的复制因子是2),这导致了当出现错误事件时,有两类数据需要被恢复

  • Data received and replicated :在单个worker节点的故障中,这个数据会幸存下来,因为有另外一个节点保存有这个数据的副本。
  • Data received but buffered for replication:因为没有重复保存,所以为了恢复数据,唯一的办法是从源中重新读取数据。

有两种错误我们需要关心

  • worker节点故障:任何运行executor的worker节点都有可能出故障,那样在这个节点中的所有内存数据都会丢失。如果有任何receiver运行在错误节点,它们的缓存数据将会丢失
  • Driver节点故障:如果运行Spark Streaming应用程序的Driver节点出现故障,很明显SparkContext将会丢失,所有执行在其上的executors也会丢失。

作为输入源的文件语义(Semantics with files as input source)
如果所有的输入数据都存在于一个容错的文件系统如HDFS,Spark Streaming总可以从任何错误中恢复并且执行所有数据。这给出了一个恰好一次(exactly-once)语义,即无论发生什么故障, 所有的数据都将会恰好处理一次。

基于receiver的输入源语义

对于基于receiver的输入源,容错的语义既依赖于故障的情形也依赖于receiver的类型。正如之前讨论的,有两种类型的receiver

Reliable Receiver:这些receivers只有在确保数据复制之后才会告知可靠源。如果这样一个receiver失败了,缓冲(非复制)数据不会被源所承认。如果receiver重启,源会重发数 据,因此不会丢失数据。
Unreliable Receiver:当worker或者driver节点故障,这种receiver会丢失数据
选择哪种类型的receiver依赖于这些语义。如果一个worker节点出现故障,Reliable Receiver不会丢失数据,Unreliable Receiver会丢失接收了但是没有复制的数据。如果driver节点 出现故障,除了以上情况下的数据丢失,所有过去接收并复制到内存中的数据都会丢失,这会影响有状态transformation的结果。

为了避免丢失过去接收的数据,Spark 1.2引入了一个实验性的特征write ahead logs,它保存接收的数据到容错存储系统中。有了write ahead logs和Reliable Receiver,我们可以 做到零数据丢失以及exactly-once语义。

下面的表格总结了错误语义:

输出操作的语义

根据其确定操作的谱系,所有数据都被建模成了RDD,所有的重新计算都会产生同样的结果。所有的DStream transformation都有exactly-once语义。那就是说,即使某个worker节点出现 故障,最终的转换结果都是一样。然而,输出操作(如foreachRDD)具有at-least once语义,那就是说,在有worker事件故障的情况下,变换后的数据可能被写入到一个外部实体不止一次。 利用saveAs***Files将数据保存到HDFS中的情况下,以上写多次是能够被接受的(因为文件会被相同的数据覆盖)。

Spark之性能优化(重点:并行流数据接收)相关推荐

  1. Spark的性能优化案例分析(下)

    前言 Spark的性能优化案例分析(上),介绍了软件性能优化必须经过进行性能测试,并在了解软件架构和技术的基础上进行.今天,我们通过几个 Spark 性能优化的案例,看一看所讲的性能优化原则如何落地. ...

  2. Spark Streaming性能优化: 如何在生成环境下应对流数据峰值巨变

    1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > ...

  3. spark streaming性能优化

    一 数据接收并行度调优 通过网络接收数据的时候,比如kafka或者flume,会将数据反序列化,并存储在在Spark内存中.如果数据接收成为系统的瓶颈,那么可以考虑并行化接收数据. 1.1除了创建更多 ...

  4. spark sql 性能优化

    一 设置shuffle的并行度 我们可以通过属性spark.sql.shuffle.partitions设置shuffle并行度 二 Hive数据仓库建设的时候,合理设置数据类型,比如你设置成INT的 ...

  5. rdd数据存内存 数据量_Spark 性能优化(二)——数据倾斜优化

    1.2 数据倾斜优化 1.2.1 为何要处理数据倾斜(Data Skew) 什么是数据倾斜?对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜. 何谓数据倾斜?数据 ...

  6. Spark SQL性能优化

    性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List;import org.apache.spark.SparkConf; import or ...

  7. 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

    特别说明:  在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说. 博文的目标是:  Spark Streaming在接收 ...

  8. 【MySQL性能优化系列】百万数据limit分页优化

    背景 众所周知,在使用limit分页过程中,随着前端传过来的PageSize越来越大,查询速度会越来越慢.那有什么优化的办法呢? 本文将通过百万数据表进行演示和优化, 欲知详情,请看下文分解. lim ...

  9. 鲲鹏服务器php性能,对鲲鹏服务器的内存进行性能优化后的前后数据对比

    测试的鲲鹏服务器是96核2numa节点,内存测试用的是Imbench里面的stream. 优化说明,bios和绑核优化后性能提升明显,编译参数优化后性能提升不明显可以根据情况是否进行编译优化 1.Bi ...

最新文章

  1. openstack页面自定义插件使用详解(django、ajax、post)(zTree为例)
  2. 7 Papers Radios | 机器人「造孩子」;谷歌裸眼3D全息视频聊天技术公开
  3. python工程师-Python工程师学习之旅
  4. 基于CentOS 6.8平台最新源代码包编译安装企业版MariaDB数据库
  5. minikube报错:This computer doesn’t have VT-X/AMD-v enabled. Enabling it in the BIOS is mandatory.
  6. java -jar debug_java – 如何在运行时调试jar?
  7. 音频放大电路_详细分析:电容器的四个典型应用电路图
  8. 多节点 devstack 部署
  9. C#读取数据库返回泛型集合(DataSetToList)
  10. 【毕设狗】【单片机毕业设计】基于单片机的智能垃圾桶设计-实物设计
  11. Linux内核加载f2fs,固态硬盘使用f2fs文件系统作为deepin引导分区经验
  12. cesium 经纬度绘制点_Cesium搜索经纬度并标点
  13. 加拿大渥太华民众寒冬享受运河滑冰道乐趣
  14. 爬虫实战 爬取谷歌图片 Google images
  15. mysql微信昵称存储_mysql保存微信昵称特殊字符的方法
  16. 哈希函数(散列函数)详解
  17. c程序设计总结(红皮书+真题)
  18. Cesium地表透明(地下模式)
  19. win10的IE闪退及“启用或关闭windows功能”里没有IE选项
  20. 想做好流程管理,你一定要知道这些

热门文章

  1. 校外分散实习(14)
  2. 在线代码片段管理工具gistbox + github
  3. 盛夏光年——14年暑期总结
  4. 一个字符串排列的小算法
  5. tlplayer for ios V1.0
  6. 《童梦奇缘-梦幻般的羁绊》第一章-朦胧
  7. 全局缓存管理工具-安装部署时提供小小的方便
  8. zabbix mysql安装配置_ZABBIX4.4 安装及配置
  9. 計算機二級-java04
  10. 基础知识—函数-默然参数