一 数据接收并行度调优

通过网络接收数据的时候,比如kafka或者flume,会将数据反序列化,并存储在在Spark内存中。如果数据接收成为系统的瓶颈,那么可以考虑并行化接收数据。

1.1除了创建更多输入DStream和Receiver

每一个InputDStream都会在某个Worker上的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个InputDStream,并且配置他们接收数据源不同的分区数据,达到接收多个数据流的效果。比如说,一个接收两个kafka topic的InputDStream,可以拆分成2个InputDStream,每一个分别接收一个topic的数据。这样就会创建2个Receiver,从而并行的接收数据,进而提升吞吐量。多个DStream可以使用union算子进行联合,从而形成一个新的DStream,后续的操作都可以基于联合的DStream.

intnumStreams = 5;
List<JavaPairDStream<String,String>> kafkaStreams= new ArrayList<JavaPairDStream<String,String>>(numStreams);
for (int i= 0; i < numStreams; i++) {
    kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String,String> unifiedStream= streamingContext.union(kafkaStreams.get(0),kafkaStreams.subList(1,kafkaStreams.size()));
unifiedStream.print();

1.2 调整block 时间间隔

我们可以通过调整spark.streaming.blockInterval参数来设置产生一个block的时间间隔,默认是200ms。对于大多数Receiver来说,在将接收到的数据保存到Spark的BlockManager之前,都会将数据根据设置的时间间隔分成构造成不同的block,然后推送给BlockManager存储。每一个批次中block的数量决定了该批次对应的RDD的partition数量,以及针对该RDD执行转换操作的时候创建的task数量,每一个batch对应的task数量大约是 = (batch 时间间隔)/(block时间间隔) ,即batch 时间间隔为1s,block时间间隔为200ms,相当于一个batch会包括5个block,即batch对应的RDD就有5个分区,也就决定task的数量是5.

如何认定batch对应的task数量太少呢?

如果每个batch的task数量低于每台机器的CPU Core数量,那么就说明batch的task数量是不够的,因为所有的CPU资源无法完全被利用起来。

要为batch增加block的数量,那么就减小block interval。然而,推荐的block interval最小值是50ms,如果低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。

1.3 调大task数量

如果并行的task数量不是很多,那么集群资源无法充分利用,举例来说对于分布式的reduce操作,比如reduceByKeyAndWindow或者reduceByKey等操作,默认的并行度是由spark.default. parallelism参数决定的,你可以在reduceByKey操作中传入第二个参数,手动指定并行度,也可以调整全局的spark.default.parallelism参数

如果输入的数据流,如果你觉得分区数目太少,也可以对输入的数据流重新分区,显示对输入数据流执行repartition操作,这样调大分区数目,那么task的数量也就大了

二 任务启动优化

如果每一秒钟启动的task数量太多,假设50个,即1s的batch时间间隔和50ms的block时间间隔。如果发送这些task去worker上的executor,那么性能开销会比较大,这是很难到到毫秒级的延迟了。

2.1task序列化

使用Kryo序列化类库来序列化task,可以减小task的大小从而减少driver发送这些task到各个executor的发送时间,即节省网络资源

2.2 执行模式

在standalone模式下运行spark,可以达到更少的task启动时间

三 数据序列化优化

数据序列化造成的系统开销可以由序列化格式的优化来减小。在流式场景下,有两种类型的数据需要序列化:

1 输入数据:默认情况下,接收到的输入数据,是存储在Executor内存中,使用的持久化级别是MEMORY_AND_DISK_2.这就意味着数据被序列化字节从而减小GC开销,并且还会复制到其他节点已进行容错。因此Receiver必须反序列化接收到的数据,然后在使用Spark的序列化格式序列化数据

2 流式计算操作生成持久化的RDD: 流式计算生成持久化RDD,可能会持久化到内存,这里默认持久化级别就是MEMORY_ONLY_SER,默认就会减小GC开销。

四 batch时间间隔优化

batch应该在生成之后就尽可能块的处理掉,对于一个应用来说,可以通过观察Spark UI上batch的处理时间来定。batch的处理时间必须小于batch 时间间隔,假设batch 时间间隔1s, 那么这个批次的处理时间不应超过1s

为应用计算正确batch比较好的办法:

给定一个很保守的batch interval,比如5s-10s,以很慢的数据接受速率进行测试,要检查应用是否你跟的上这个数据接收速率,可以检查每一个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的,否则如果batch调度延迟持续增长,那么就意味着应用无法跟得上这个速率,也就是不稳定的。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。

五 内存调优

如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简单的map-filter-store操作,那么需要使用的内存就很少。

通常来说,通过Receiver接收到的数据,会使用MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。

内存调优的另外一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。有很多参数可以帮助降低内存使用和GC开销:

5.1DStream的持久化

默认持久化的时候会序列化为字节,与非序列化的方式相比,这会降低内存和GC开销。使用Kryo序列化机制可以进一步减少内存使用和GC开销。如果还要进一步降低内库存是用了,可以进行数据压缩,spark.rdd.compress参数控制(默认false)。

但是CPU资源的消耗可能就大了。

5.2 清理旧数据

默认情况下,所有输入数据和通过DStreamtransformation操作生成的持久化RDD,会自动被清理。Spark Streaming会决定何时清理这些数据,取决于transformation操作类型。

例如,你在使用窗口长度为10分钟内的window操作,Spark会保持10分钟以内的数据,时间过了以后就会清理旧数据。但是在某些特殊场景下,比如Spark SQL和Spark Streaming整合使用时,在异步开启的线程中,使用Spark SQL针对batch RDD进行执行查询。那么就需要让Spark保存更长时间的数据,直到Spark SQL查询结束。可以使用streamingContext.remember()方法来实现。

5.3CMS垃圾回收器

使用并行的mark-sweep垃圾回收机制,被推荐使用,用来保持GC低开销。虽然并行的GC会降低吞吐量,但是还是建议使用它,来减少batch的处理时间(降低处理过程中的GC开销)。如果要使用,那么要在driver端和executor端都开启。

在spark-submit中使用--driver-java-options设置使spark.executor.extra

JavaOptions参数设置-XX:+UseConcMarkSweepGC。

spark streaming性能优化相关推荐

  1. Spark Streaming性能优化: 如何在生成环境下应对流数据峰值巨变

    1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > ...

  2. Spark的性能优化案例分析(下)

    前言 Spark的性能优化案例分析(上),介绍了软件性能优化必须经过进行性能测试,并在了解软件架构和技术的基础上进行.今天,我们通过几个 Spark 性能优化的案例,看一看所讲的性能优化原则如何落地. ...

  3. Spark之性能优化(重点:并行流数据接收)

    问题导读 1.如何减少批数据的执行时间? 2.Spark有哪些方面的性能优化? 3.有哪些错误我们需要关心? (一)减少批数据的执行时间 在Spark中有几个优化可以减少批处理的时间.这些可以在优化指 ...

  4. Spark SQL性能优化

    性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List;import org.apache.spark.SparkConf; import or ...

  5. spark sql 性能优化

    一 设置shuffle的并行度 我们可以通过属性spark.sql.shuffle.partitions设置shuffle并行度 二 Hive数据仓库建设的时候,合理设置数据类型,比如你设置成INT的 ...

  6. Spark程序性能优化之persist()

    Spark的RDD Persistence,是一个重要的能力,可以将中间结果保存,提供复用能力,加速基于中间结果的后续计算,经常可以提高10x以上的性能.在PySpark的DataFrame中同样适用 ...

  7. Spark Streaming 实时计算在甜橙金融监控系统中的应用、性能优化、任务监控

    1 写在前面 目前公司对实时性计算的需要及应用越来越多,本文选取了其中之一的 Spark Streaming 来介绍如何实现高吞吐量并具备容错机制的实时流应用.在甜橙金融监控系统项目中,需要对每天亿万 ...

  8. Spark SQL运行流程及性能优化:RBO和CBO

    1 Spark SQL运行流程 1.1 Spark SQL核心--Catalyst Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过 ...

  9. Spark Streaming实践和优化

    2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...

最新文章

  1. php 任意文件上传,任意文件上传漏洞
  2. python为什么那么难学_Python入门很难吗? 为什么越来越多的人都学Python?
  3. webView loadData 中文乱码问题
  4. 数据库事务的介绍和使用
  5. java 事务处理 是不是aop思想_理解原生JAVA AOP思想
  6. 吹爆google colab
  7. Unity 2017 Game Optimization 读书笔记 Dynamic Graphics (6)
  8. mysql 查询 带数据库实例_数据库查询实例(包含所有where条件例子)
  9. centos 6.6 mysql5.7_centos6.6 下安装mysql5.7
  10. 为什么 Python 不用设计模式?
  11. php 正则断言里面使用*+
  12. 利用python在word文档中查找关键字(支持多个文档和多个关键字)
  13. CFA一级学习笔记--衍生品(二)--定价与估值
  14. 数学符号、希腊、拉丁字母、单位、标点的中英文读法
  15. 在微信里接收文件后如何指着文件用咱编的APP打开?
  16. 北京国际学校ib成绩排名如何?
  17. JAVA-多线程 三 {多线程状态}JAVA从基础开始 -- 3
  18. 计算机科学 期刊怎么样,《计算机科学》杂志怎么样?提交的好吗?
  19. 汉字在计算机上的表达方式
  20. (开源)arduino和ESP8266-01制作数据监测系统+手机App实时显示

热门文章

  1. mysql 多对多_mysql多对一、多对多查询实践
  2. 稳定性之重试,如何优雅地重试,防止系统雪崩
  3. 【系统架构设计师】软考高级职称,一次通过,倾尽所有,2016年下半年系统架构设计师考试论文真题(论述软件设计模式技术及应用)
  4. curl -s http://192.168.232.191/openapi/v2 | jq 不显示JSON格式的文档说明
  5. Deep Learning of Binary Hash Codes for Fast Image Retrieval(2015)
  6. git配置中文乱码_解决git中文乱码问题
  7. 北京计算机组织专家对,全球顶级专家齐聚北京 探讨计算机产业“大挑战”
  8. 解决新版DBUtils使用连接池from DBUtils.PooledDB import PooledDB报错
  9. simpy练习案例(二):小车运行与充电
  10. Hurst exponent(赫斯特指数)代码与R/S值计算——python