1、为什么引入Backpressure

默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate
”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。
2、Backpressure
Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled
”来控制是否启用backpressure机制,默认值false,即不启用。
2.1 Streaming架构如下图所示(详见Streaming数据接收过程文档和Streaming 源码解析)
2.2 BackPressure执行过程如下图所示:
在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息. Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).
3、BackPressure 源码解析
3.1 RateController类体系
RatenController 继承自StreamingListener. 用于处理BatchCompleted事件。核心代码为:
**
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`

*/

private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)

newRate.foreach { s => rateLimit.set(s.toLong)

def getLatestRate(): Long = rateLimit.get()

publish(getLatestRate()) } }

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime

workDelay <- batchCompleted.batchInfo.processingDelay

elems <- elements.get(streamUID).map(_.numRecords)

waitDelay <- batchCompleted.batchInfo.schedulingDelay

}

} computeAndPublish(processingEnd, elems, workDelay, waitDelay)

}
3.2 RateController的注册
JobScheduler启动时会抽取在DStreamGraph中注册的所有InputDstream中的rateController,并向ListenerBus注册监听. 此部分代码如下:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

}

eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)</span>

listenerBus.start()

receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()

jobGenerator.start()

logInfo("Started JobScheduler")
}
3.3 BackPressure执行过程分析
BackPressure 执行过程分为BatchCompleted事件触发时机和事件处理两个过程
3.3.1 BatchCompleted触发过程
对BatchedCompleted的分析,应该从JobGenerator入手,因为BatchedCompleted是批次处理结束的标志,也就是JobGenerator产生的作业执行完成时触发的,因此进行作业执行分析。
Streaming 应用中JobGenerator每个Batch Interval都会为应用中的每个Output Stream建立一个Job, 该批次中的所有Job组成一个Job Set.使用JobScheduler的submitJobSet进行批量Job提交。此部分代码结构如下所示
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

其中,sumitJobSet会创建固定数量的后台线程(具体由“spark.streaming.concurrentJobs”指定),去处理Job Set中的Job. 具体实现逻辑为:

def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)

}

}
其中JobHandler用于执行Job及处理Job执行结果信息。当Job执行完成时会产生JobCompleted事件. JobHandler的具体逻辑如下面代码所示:
+ View Code

当Job执行完成时,向eventLoop发送JobCompleted事件。EventLoop事件处理器接到JobCompleted事件后将调用handleJobCompletion 来处理Job完成事件。handleJobCompletion使用Job执行信息创建StreamingListenerBatchCompleted事件并通过StreamingListenerBus向监听器发送。实现如下:

private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)

jobSet.handleJobCompletion(job)

listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))

job.setEndTime(completedTime)

logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))

} job.result match {

case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>

}

}
3.3.2、BatchCompleted事件处理过程
StreamingListenerBus将事件转交给具体的StreamingListener,因此BatchCompleted将交由RateController进行处理。RateController接到BatchCompleted事件后将调用onBatchCompleted对事件进行处理。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime

workDelay <- batchCompleted.batchInfo.processingDelay

elems <- elements.get(streamUID).map(_.numRecords)

waitDelay <- batchCompleted.batchInfo.schedulingDelay

} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

onBatchCompleted会从完成的任务中抽取任务的执行延迟和调度延迟,然后用这两个参数用RateEstimator(目前存在唯一实现PIDRateEstimator,proportional-integral-derivative (PID) controller, PID控制器)估算出新的rate并发布。代码如下:

/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =

Future[Unit] { val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)

newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())

}

}

其中publish()由RateController的子类ReceiverRateController来定义。具体逻辑如下(ReceiverInputDStream中定义):

/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}

publish的功能为新生成的rate 借助ReceiverTracker进行转发。ReceiverTracker将rate包装成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint

/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))

}

}
ReceiverTrackerEndpoint接到消息后,其将会从receiverTrackingInfos列表中获取Receiver注册时使用的endpoint(实为ReceiverSupervisorImpl),再将rate包装成UpdateLimit发送至endpoint.其接到信息后,使用updateRate更新BlockGenerators(RateLimiter子类),来计算出一个固定的令牌间隔。
+ View Code

其中RateLimiter的updateRate实现如下:

/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by

* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. *

* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)

}

}

setRate的实现 如下:

public final void setRate(double permitsPerSecond) {
Preconditions.checkArgument(permitsPerSecond > 0.0
&& !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex) {
resync(readSafeMicros());
double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond; //固定间隔
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);

}

}

到此,backpressure反压机制调整rate结束。

4.流量控制点
当Receiver开始接收数据时,会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。如果获取到许可就可以将数据存入buffer, 否则将被阻塞,进而阻塞Receiver从数据源拉取数据。
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush() //获取令牌
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}

}

} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}

}

其令牌投放采用令牌桶机制进行, 原理如下图所示:

令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。
Streaming 数据流被Receiver接收后,按行解析后存入iterator中。然后逐个存入Buffer,在存入buffer时会先获取token,如果没有token存在,则阻塞;如果获取到则将数据存入buffer. 然后等价后续生成block操作。

转自 http://www.cnblogs.com/barrenlake/p/5349949.html

作者:曹振华
链接:https://www.jianshu.com/p/87e2d66d92bb
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Spark Streaming性能优化: 如何在生成环境下应对流数据峰值巨变相关推荐

  1. spark streaming性能优化

    一 数据接收并行度调优 通过网络接收数据的时候,比如kafka或者flume,会将数据反序列化,并存储在在Spark内存中.如果数据接收成为系统的瓶颈,那么可以考虑并行化接收数据. 1.1除了创建更多 ...

  2. Spark的性能优化案例分析(下)

    前言 Spark的性能优化案例分析(上),介绍了软件性能优化必须经过进行性能测试,并在了解软件架构和技术的基础上进行.今天,我们通过几个 Spark 性能优化的案例,看一看所讲的性能优化原则如何落地. ...

  3. 电动汽车动力系统匹配计算模型:输入整车参数及性能要求,一键生成驱动系统的扭矩功率峰值转速等参数

    1.电动汽车动力系统匹配计算模型:输入整车参数及性能要求,一键生成驱动系统的扭矩功率峰值转速等参数. 2.整车动力经济性计算模型:包含NEDC WLTC CLTC工况,输入整车参数可生成工况电耗.百公 ...

  4. python处理mysql数据结构_python环境下使用mysql数据及数据结构和二叉树算法(图)...

    python环境下使用mysql数据及数据结构和二叉树算法(图): 1 python环境下使用mysql 2使用的是 pymysql库 3 开始-->创建connection-->获取cu ...

  5. python实现mysql二叉树_python环境下使用mysql数据及数据结构和二叉树算法(图)...

    python环境下使用mysql数据及数据结构和二叉树算法(图): 1 python环境下使用mysql 2使用的是 pymysql库 3 开始-->创建connection-->获取cu ...

  6. mysql 控制台环境下查询中文数据乱码,插入、更新中文数据不成功

    mysql 控制台环境下查询中文数据乱码,插入.更新中文数据不成功 登录mysql密码是加入编码参数--default-character-set,中文用gbk mysql -uroot -pabct ...

  7. Spark之性能优化(重点:并行流数据接收)

    问题导读 1.如何减少批数据的执行时间? 2.Spark有哪些方面的性能优化? 3.有哪些错误我们需要关心? (一)减少批数据的执行时间 在Spark中有几个优化可以减少批处理的时间.这些可以在优化指 ...

  8. Spark SQL性能优化

    性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List;import org.apache.spark.SparkConf; import or ...

  9. Fluid — 云原生环境下的高效“数据物流系统”

    作者 | 顾荣  南京大学 PASALab (注:本文基于作者公开演讲报告内容整理完成) _来源 | _阿里巴巴云原生公众号 得益于计算成本低.易于扩展.部署便捷.运维高效等多方面的优势,云计算平台吸 ...

最新文章

  1. 【alibaba-cloud】Gateway网关
  2. ImportError: No module named babel.dates
  3. boost avplayer
  4. Docker 搜索镜像
  5. python中的魔法属性和方法
  6. 获取socket对应的接收缓冲区中的可读数据量
  7. 移动端使用页尾文字使用绝对定位遇到input框会飘起来的处理方案
  8. Android-Note
  9. Node.js 应用故障排查手册 —— 综合性 GC 问题和优化
  10. stm32关定时器_STM32F103ZET6的基本定时器
  11. PocketLibs(3)—— 进度条 NProgress
  12. 2020年日历电子版(打印版)_2020年日历电子版(打印版)79451
  13. 【机器学习】无监督学习--(聚类)K-Means
  14. python教程七牛云_通过Python来使用七牛云存储的方法详解
  15. Ubuntu 18.04下搭建单机Hadoop和Spark集群环境
  16. luarocks安装以及lfs安装
  17. IPS(入侵防御系统)技术
  18. Computer Networking——transport layer QA
  19. HDU 6287 口算训练(分解质因子 + 二分下标)
  20. 魔方(7)五魔方、二阶五魔方

热门文章

  1. python实现单纯形法迭代形式(待更新人工法+对偶)
  2. 【Kaggle微课程】Natural Language Processing - 1. Intro to NLP
  3. 我对程序员做副业的看法
  4. 复杂度控制与“道法自然”
  5. SugarCRM商业客户关系管理软件介绍
  6. linux中wait与waitpid
  7. OCAD应用:平行光路中打入型变焦系统设计
  8. 森林防火巡查采集总体质量控制
  9. 序列流_SequenceInputStream类
  10. Vue3应用API——use解析