1.Steaming 的数据处理大致流程
Receiving(数据的接收器) —> Transforming(你写的数据处理代码) --> Outputing(把处理的数据保存起来)

那么问题来了:
1.数据的发送速率突然间急剧变化,怎么处理?
2.数据转化处理的速度每天都不一样,怎么办?
3.当写外部数据库或者HDFS文件的时候突然间速度慢了下来,怎么解决?

以上三个问题,的解决办法如下:
Backpressure(压力反馈),解决的问题:1,3
Elastic Scaling(弹性伸缩),解决的问题:2

好了不要说了,下面开始直接进入主题,怎么去根据上面两个来调优

1.控制Receiver的数量
就比如,你有多个数据接收源,比如是socket的数据源,你用一个接收器去接收,就好比,你只能吃一碗饭,给你两碗~ 因此你需要找个人来一起吃
比如:

 //创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)// 联合起来组成一个,然后再进行处理val lines = lines1.union(lines2)

2.控制Receiver数据块的数量
更多的文字描述参考:https://blog.csdn.net/kwu_ganymede/article/details/50577920,我就不再啰嗦啦
batchInterval : 触发批处理的时间间隔
blockInterval(spark.streaming.blockInterval(默认是200ms)
) : 将接收到的数据生成Block的时间间隔
那么,BlockRDD的分区数 = batchInterval / blockInterval
即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10
如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval
blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了
3.Receiver接受数据的速率
permits per second 每秒允许接受的数据量
Spark Streaming默认的PPS是没有限制的
可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue 这个根据你的实际业务来设置
4.数据处理的并行度
1.BlockRDD的分区数
通过Receiver接受数据的特点决定
也可以自己通过repartition设置
2.ShuffleRDD的分区数
默认的分区数为spark.default.parallelism(core的大小)
通过我们自己设置决定
5.数据的序列化
两种需要序列化的数据:
1.输入数据
默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
MEMORY_AND_DISK 表示是先存内存,内存不够了再存入磁盘中去,SER是代表序列化的意思,2是存两份
2.Streaming操作中产生的缓存RDD
默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
一般情况下我们是使用Kryo序列化机制,因为它比Java序列化机制性能好,
通过sparkConf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)去设置
6.内存调优
1.需要内存大小
和transform类型有关系
数据存储的级别
2.GC
driver端和executor端都使用CMS垃圾收集器
原因:
CMS(Concurrent Mark Sweep)收集器
是一种以获取最短回收停顿时间为目标的收集器
通过–driver-java-options和spark.executor.extraJavaOptions来设置
7.Output性能
比如我们保存数据到数据库中去的时候:
以简单的WordCount来举个例子

 // rdd 每次遍历的rdd  time 每次运行的时间wordCounts.foreachRDD { (rdd, time) =>rdd.foreachPartition { partitionRecords =>  // foreachPartition 每个分区只创建一个连接val conn = ConnectionPool.getConnection  // 连接池conn.setAutoCommit(false)  // 手动的打开事物val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")  // 这个wordcount表示你事先就已经创建好的partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>statement.setLong(1, time.milliseconds)statement.setString(2, word)statement.setInt(3, count)statement.addBatch()  // 批处理if (index != 0 && index % 500 == 0) {  // 分批进行操作 每500条处理一次statement.executeBatch()conn.commit()}}// 最后不足500条还是要提交statement.executeBatch()  //数据都放一起,和数据库的通信一次就行了statement.close()conn.commit()conn.setAutoCommit(true)ConnectionPool.returnConnection(conn)}}

这种方式的保存效率,无疑是最高的
数据库的连接池代码如下:

import com.mchange.v2.c3p0.ComboPooledDataSource;  import java.sql.Connection;
import java.sql.SQLException;public class ConnectionPool {private static ComboPooledDataSource dataSource = new ComboPooledDataSource();static {dataSource.setJdbcUrl("jdbc:mysql://master:3306/test");//设置连接数据库的URLdataSource.setUser("root");//设置连接数据库的用户名dataSource.setPassword("root");//设置连接数据库的密码dataSource.setMaxPoolSize(40);//设置连接池的最大连接数dataSource.setMinPoolSize(2);//设置连接池的最小连接数dataSource.setInitialPoolSize(10);//设置连接池的初始连接数dataSource.setMaxStatements(100);//设置连接池的缓存Statement的最大数}public static Connection getConnection() {try {return dataSource.getConnection();} catch (SQLException e) {e.printStackTrace();}return null;}public static void returnConnection(Connection connection) {if (connection != null) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}
}

8.Backpressure(压力反馈)
一个稳定的steaming程序是:规定2S计算一次,在这2S内饰可以计算完成的,如果超出了2S还没有计算完成,那么这个程序显然是不稳定的
比如:生产过剩:
Feedback Loop(反馈循环) : 动态使得Streaming app从unstable状态回到stable状态
就比如和kafka之间做个反馈,steaming在规定时间处理不完,就会限制输入的数据量,从而使得这个程序回到稳定的状态
比如:hdfs处理过慢
如果写HDFS的时候太慢,导致处理时间变长,那么Spark Streaming会限制从Kafka中接受数据的速度

如何配置压力反馈:
配置: spark.streaming.backpressure.enabled = true
这个配置是从1.5版本开始的

9.Elastic Scaling(资源动态分配)
Spark Batch Application 动态的决定这个application中需要多少个Executors
比如:
1、当一个Executor空闲的时候,将这个Executor杀掉,比如:一个2S的程序,1S处理完成了,那么steaming就会杀死一些Executor,以此来提高处理的时间,充分的利用集群的资源

2、当task太多的时候,动态的启动Executors,比如:2S程序,2S都还处理不完,那么就会再启动一些Executors,使得这个程序变成稳定的
Streaming分配Executor的原则是比对 process time / batchInterval 的比率
比如:
如果Kafka接受到的数据的速率比backpressure限制的速率还要快,那么Spark Streaming会增加Executors来增加处理的速率
Kafka接受到的数据将保存在Kafka中等待Streaming app接受速率的调节
如何配置:
spark.streaming.dynamicAllocation.enabled = true
这个是从2.0版本才有的

spark steaming的性能问题相关推荐

  1. Project Tungsten:让Spark将硬件性能压榨到极限

     Project Tungsten:让Spark将硬件性能压榨到极限 摘要:对于Spark来说,通用只是其目标之一,更好的性能同样是其赖以生存的立足之本.北京时间4月28日晚,Databricks ...

  2. 计算质数通过分区(Partition)提高Spark的运行性能(转载+自己理解)

    这篇博客是对[1]的进一步详细描述 自己的配置是台式机一台+笔记本组成spark集群 #-------------------------------------------------------- ...

  3. spark 应用程序性能优化经验

    一 常规性能调优 1 . 分配更多资源 --num-executors 3 \  配置executor的数量 --driver-memory 100m \  配置driver的内存(影响不大) --e ...

  4. 【spark】spark学习-27-Spark性能调优(2)

    文章目录 目的 继基础篇分析了开发调优与资源调优之后,本文作为拓展篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 简述 有时候,大家可能会遇到大数据开发过程中 ...

  5. 通过分区(Partitioning)提高Spark的运行性能

    在Sortable公司,很多数据处理的工作都是使用Spark完成的.在使用Spark的过程中他们发现了一个能够提高Sparkjob性能的一个技巧,也就是修改数据的分区数,本文将举个例子并详细地介绍如何 ...

  6. Spark6:Spark Steaming

    Spark Steaming 一.流计算概述 二.Spark Streaming 三.DStream 四.文件流操作 五.套接字流 参考 一.流计算概述 静态数据.流数据 特点 实时处理.主动推送 大 ...

  7. 第一个 Spark Steaming 程序

    我的第三个发明专利也通过了内部专家的审核,我真是个创新满满的小天才亚,虽然说自己也赚到了一点小钱钱,但是和伟大的创作事业相比,那都是不值一提的小插曲.今天再次起航Spark Steaming的学习~ ...

  8. Spark Steaming快速入门

    Spark Steaming Spark Streaming 简介 什么是Spark Streaming Spark Streaming使用Spark Core的快速调度功能来执行流分析.它以小批量方 ...

  9. Spark Steaming流式日志过滤与分析

    Spark Steaming流式日志过滤与分析 这篇大概讲的是 spark steaming 监听 hdfs 的某个目录,当你在终端A使用 spark-submit 运行 Log2DB.py 文件后, ...

最新文章

  1. 新的一年,,,新的生活
  2. html css百分比效果,css百分比不起作用是什么原因?
  3. D-Link防火墙操作初步
  4. windows内存管理概述
  5. Hyperledger Fabric 1.0 实战开发系列 第三课 chaincode开发
  6. c# namespace不能和class的name 相同
  7. java耗时操作阻塞_springboot~高并发下耗时操作的实现
  8. 【Python图像特征的音乐序列生成】思路的转变
  9. .Net5 WPF快速入门系列教程
  10. 通过Main的Checkpoint Restore加快Java启动速度
  11. apk提取加密素材_从apk包中提取unity资源
  12. 将一个16进制数转化为10进制数
  13. Thrift实现C#调用Java开发步骤详解
  14. 应用id_科普贴:什么是OpenID、AppID 、用户ID等各种ID?
  15. 1小时搞懂设计模式之原型模式
  16. 电脑硬件故障维护小全
  17. 港股常见的宽基指数:恒生指数、H股指数和香港中小指数
  18. git基本命令及核心
  19. 计算机无法设置双屏显示,电脑怎么设置双屏或多屏显示?
  20. 程序员都是技术宅?他们完全刷新了我们对程序员的认知

热门文章

  1. DNS无法解析IP_DNS之基本原理
  2. 大一下学期第十一周及以前学习总结
  3. 魔术表演的核心秘密(六)——从障眼法到错误引导和案例分享
  4. 初探微信摇一摇周边与iBeacon
  5. VCC、 VDD、VSS、VEE 电压符号的解释
  6. 一个苹果成就了牛顿,一个苹果杀死了图灵。
  7. 正弦余弦编码器与增量编码器的区别
  8. 论文解读:医学影像中的注意力机制
  9. 华为公有云服务-计算类(2)
  10. 【2020年高被引学者】 杨笛一 佐治亚理工大学