Spark Streaming实例
Spark Streaming实例分析
转载地址:http://www.aboutyun.com/thread-8901-1-1.html
这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程讲解 》。
Example代码分析
- val ssc = new StreamingContext(sparkConf, Seconds(1));
- // 获得一个DStream负责连接 监听端口:地址
- val lines = ssc.socketTextStream(serverIP, serverPort);
- // 对每一行数据执行Split操作
- val words = lines.flatMap(_.split(" "));
- // 统计word的数量
- val pairs = words.map(word => (word, 1));
- val wordCounts = pairs.reduceByKey(_ + _);
- // 输出结果
- wordCounts.print();
- ssc.start(); // 开始
- ssc.awaitTermination(); // 计算完毕退出
复制代码
1、首先实例化一个StreamingContext
2、调用StreamingContext的socketTextStream
3、对获得的DStream进行处理
4、调用StreamingContext是start方法,然后等待
我们看StreamingContext的socketTextStream方法吧。
- def socketTextStream(
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[String] = {
- socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
- }
复制代码
1、StoageLevel是StorageLevel.MEMORY_AND_DISK_SER_2
2、使用SocketReceiver的bytesToLines把输入流转换成可遍历的数据
继续看socketStream方法,它直接new了一个
- new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
复制代码
继续深入挖掘SocketInputDStream,追述一下它的继承关系,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。
具体实现ReceiverInputDStream的类有好几个,基本上都是从网络端来数据的。
它实现了ReceiverInputDStream的getReceiver方法,实例化了一个SocketReceiver来接收数据。
SocketReceiver的onStart方法里面调用了receive方法,处理代码如下:
- socket = new Socket(host, port)
- val iterator = bytesToObjects(socket.getInputStream())
- while(!isStopped && iterator.hasNext) {
- store(iterator.next)
- }
复制代码
1、new了一个Socket来结束数据,用bytesToLines方法把InputStream转换成一行一行的字符串。
2、把每一行数据用store方法保存起来,store方法是从SocketReceiver的父类Receiver继承而来,内部实现是:
- def store(dataItem: T) {
- executor.pushSingle(dataItem)
- }
复制代码
executor是ReceiverSupervisor类型,Receiver的操作都是由它来处理。这里先不深纠,后面我们再说这个pushSingle的实现。
到这里我们知道lines的类型是SocketInputDStream,然后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是RDD的那种方法,而是DStream独有的。
讲到上面这几个方法,我们开始转入DStream了,flatMap、map、reduceByKey、print方法都涉及到DStream的转换,这和RDD的转换是类似的。我们讲一下reduceByKey和print。
reduceByKey方法和RDD一样,调用的combineByKey方法实现的,不一样的是它直接new了一个ShuffledDStream了,我们接着看一下它的实现吧。
- override def compute(validTime: Time): Option[RDD[(K,C)]] = {
- parent.getOrCompute(validTime) match {
- case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
- case None => None
- }
- }
复制代码
在compute阶段,对通过Time获得的rdd进行reduceByKey操作。接下来的print方法也是一个转换:
- new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
复制代码
打印前十个,超过10个打印"..."。需要注意register方法。
- ssc.graph.addOutputStream(this)
复制代码
启动过程分析
- def start(): Unit = synchronized {
- // 接受到JobSchedulerEvent就处理事件
- eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
- case event: JobSchedulerEvent => processEvent(event)
- }
- }), "JobScheduler")
- listenerBus.start()
- receiverTracker = new ReceiverTracker(ssc)
- receiverTracker.start()
- jobGenerator.start()
- }
复制代码
1、启动了一个Actor来处理JobScheduler的JobStarted、JobCompleted、ErrorReported事件。
2、启动StreamingListenerBus作为监听器。
3、启动ReceiverTracker。
4、启动JobGenerator。
我们接下来看看ReceiverTracker的start方法。
- def start() = synchronized {if (!receiverInputStreams.isEmpty) {
- actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
- receiverExecutor.start()
- }
- }
复制代码
1、首先判断了一下receiverInputStreams不能为空,那receiverInputStreams是怎么时候写入值的呢?答案在SocketInputDStream的父类InputDStream当中,当实例化InputDStream的时候会在DStreamGraph里面添加InputStream。
- abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
- ssc.graph.addInputStream(this)
- //....
- }
复制代码
2、实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。
3、启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。
- private def startReceivers() {
- // 对应着上面的那个例子,getReceiver方法获得是SocketReceiver
- val receivers = receiverInputStreams.map(nis => {
- val rcvr = nis.getReceiver()
- rcvr.setReceiverId(nis.id)
- rcvr
- })
- // 查看是否所有的receivers都有优先选择机器,这个需要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
- val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
- // 创建一个并行receiver集合的RDD, 把它们分散到各个worker节点上
- val tempRDD =
- if (hasLocationPreferences) {
- val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
- ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
- } else {
- ssc.sc.makeRDD(receivers, receivers.size)
- }
- // 在worker节点上启动Receiver的方法,遍历所有Receiver,然后启动
- val startReceiver = (iterator: Iterator[Receiver[_]]) => {
- if (!iterator.hasNext) {
- throw new SparkException("Could not start receiver as object not found.")
- }
- val receiver = iterator.next()
- val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
- executor.start()
- executor.awaitTermination()
- }
- // 运行这个重复的作业来确保所有的slave都已经注册了,避免所有的receivers都到一个节点上
- if (!ssc.sparkContext.isLocal) {
- ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
- }
- // 把receivers分发出去,启动
- ssc.sparkContext.runJob(tempRDD, startReceiver)
- }
复制代码
保存接收到的数据
- // 这是ReceiverSupervisorImpl的方法
- def pushSingle(data: Any) {
- blockGenerator += (data)
- }
- // 这是BlockGenerator的方法
- def += (data: Any): Unit = synchronized {
- currentBuffer += data
- }
复制代码
我们看一下它的start方法吧。
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- }
复制代码
- while (!stopped) {
- clock.waitTillTime(nextTime)
- callback(nextTime)
- prevTime = nextTime
- nextTime += period
- }
复制代码
每隔一段时间就执行callback函数,callback函数是new的时候传进来的,是BlockGenerator的updateCurrentBuffer方法。
- private def updateCurrentBuffer(time: Long): Unit = synchronized {
- try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[Any]
- if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(receiverId, time - blockInterval)
- val newBlock = new Block(blockId, newBlockBuffer)
- blocksForPushing.put(newBlock)
- }
- } catch {case t: Throwable =>
- reportError("Error in block updating thread", t)
- }
- }
复制代码
- spark.streaming.blockInterval 默认值是200
- spark.streaming.blockQueueSize 默认值是10
复制代码
这是前面提到的间隔时间和队列的长度,间隔时间默认是200毫秒,队列是最多能容纳10个Block,多了就要阻塞了。
我们接下来看一下BlockGenerator另外启动的那个线程执行的keepPushingBlocks方法到底在干什么?
- private def keepPushingBlocks() {
- while(!stopped) {
- Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
- case Some(block) => pushBlock(block)
- case None =>
- }
- }
- // ...退出之前把剩下的也输出去了
- }
复制代码
它在把blocksForPushing中的block不停的拿出来,调用pushBlock方法,这个方法属于在实例化BlockGenerator的时候,从ReceiverSupervisorImpl传进来的BlockGeneratorListener的。
- private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
- def onError(message: String, throwable: Throwable) {
- reportError(message, throwable)
- }
- def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
- pushArrayBuffer(arrayBuffer, None, Some(blockId))
- }
- }, streamId, env.conf)
复制代码
- def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
- ) {
- val blockId = optionalBlockId.getOrElse(nextBlockId)
- val time = System.currentTimeMillis
- blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
- reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
- }
复制代码
处理接收到的数据
- def start(): Unit = synchronized {
- eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
- case event: JobGeneratorEvent => processEvent(event)
- }
- }), "JobGenerator")
- if (ssc.isCheckpointPresent) {
- restart()
- } else {
- startFirstTime()
- }
- }
复制代码
1、启动一个actor处理JobGeneratorEvent事件。
2、如果是已经有CheckPoint了,就接着上次的记录进行处理,否则就是第一次启动。
我们先看startFirstTime吧,CheckPoint以后再说吧,有点儿小复杂。
- private def startFirstTime() {
- val startTime = new Time(timer.getStartTime())
- graph.start(startTime - graph.batchDuration)
- timer.start(startTime.milliseconds)
- }
复制代码
- private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
复制代码
- private def processEvent(event: JobGeneratorEvent) {
- event match {
- case GenerateJobs(time) => generateJobs(time)
- case ClearMetadata(time) => clearMetadata(time)
- case DoCheckpoint(time) => doCheckpoint(time)
- case ClearCheckpointData(time) => clearCheckpointData(time)
- }
- }
复制代码
下面是generateJobs方法。
- private def generateJobs(time: Time) {
- SparkEnv.set(ssc.env)
- Try(graph.generateJobs(time)) match {
- case Success(jobs) =>
- val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
- val streamId = stream.id
- val receivedBlockInfo = stream.getReceivedBlockInfo(time)
- (streamId, receivedBlockInfo)
- }.toMap
- jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
- case Failure(e) =>
- jobScheduler.reportError("Error generating jobs for time " + time, e)
- }
- eventActor ! DoCheckpoint(time)
- }
复制代码
- def generateJobs(time: Time): Seq[Job] = {
- val jobs = this.synchronized {
- outputStreams.flatMap(outputStream => outputStream.generateJob(time))
- }
- jobs
- }
复制代码
outputStreams在这个例子里面是print这个方法里面添加的,这个在前面说了,我们继续看DStream的generateJob。
- private[streaming] def generateJob(time: Time): Option[Job] = {
- getOrCompute(time) match {
- case Some(rdd) => {
- val jobFunc = () => {
- val emptyFunc = { (iterator: Iterator[T]) => {} }
- context.sparkContext.runJob(rdd, emptyFunc)
- }
- Some(new Job(time, jobFunc))
- }
- case None => None
- }
- }
复制代码
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- foreachFunc(rdd, time)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
复制代码
- private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
- // If this DStream was not initialized (i.e., zeroTime not set), then do it
- // If RDD was already generated, then retrieve it from HashMap
- generatedRDDs.get(time) match {
- // 这个RDD已经被生成过了,直接用就是了
- case Some(oldRDD) => Some(oldRDD)
- // 还没生成过,就调用compte函数生成一个
- case None => {
- if (isTimeValid(time)) {
- compute(time) match {
- case Some(newRDD) =>
- // 设置保存的级别
- if (storageLevel != StorageLevel.NONE) {
- newRDD.persist(storageLevel)
- }
- // 如果现在需要,就做CheckPoint
- if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
- newRDD.checkpoint()
- }
- // 添加到generatedRDDs里面去,可以再次利用
- generatedRDDs.put(time, newRDD)
- Some(newRDD)
- case None =>
- None
- }
- } else {
- None
- }
- }
- }
- }
复制代码
从上面的方法可以看出来它是通过每个DStream自己实现的compute函数得出来的RDD。我们找到SocketInputDStream,没有compute函数,在父类ReceiverInputDStream里面找到了。
- override def compute(validTime: Time): Option[RDD[T]] = {
- // 如果出现了时间比startTime早的话,就返回一个空的RDD,因为这个很可能是master挂了之后的错误恢复
- if (validTime >= graph.startTime) {
- val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
- receivedBlockInfo(validTime) = blockInfo
- val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
- Some(new BlockRDD[T](ssc.sc, blockIds))
- } else {
- Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
- }
- }
复制代码
- jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
复制代码
- def submitJobSet(jobSet: JobSet) {
- if (jobSet.jobs.isEmpty) {
- } else {
- jobSets.put(jobSet.time, jobSet)
- jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- }
- }
复制代码
遍历jobSet里面的所有jobs,通过jobExecutor这个线程池提交。我们看一下JobHandler就知道了。
- private class JobHandler(job: Job) extends Runnable {
- def run() {
- eventActor ! JobStarted(job)
- job.run()
- eventActor ! JobCompleted(job)
- }
- }
复制代码
- def run() {
- result = Try(func())
- }
复制代码
- 顶
- 0
- 踩
转载于:https://www.cnblogs.com/think90/p/6024490.html
Spark Streaming实例相关推荐
- Spark Streaming实例分析
这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照< Spark Streaming编程讲解 >. Example代码分析 val ssc = new St ...
- Spark Streaming处理Socket流简单实例
在本文中我将在IDEA工具中开发一个SparkStream程序用于监听本机9999端口所接收的数据 首先,我们将Spark Streaming类的名称以及从StreamingContext进行的一些隐 ...
- spark streaming python实例_kafka+spark streaming代码实例(pyspark+python)
一.系统准备 1.启动zookeeper:bin/zkServer.cmd start 2.启动kafka:bin/kafka-server-start.sh -daemon config/serve ...
- 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例
文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...
- 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)
导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...
- Spark Streaming与Kafka Streaming对比
♚ 叙述 对流处理的需求每天都在增加.原因是,处理大量数据通常是不够的. 必须快速处理数据,以便公司能够对不断变化的业务条件作出实时反应. 流处理是对数据进行连续.并行的实时处理. 流式处理是处理数据 ...
- Spark Streaming学习笔记
特点: Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性. Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Tw ...
- 通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验
本期内容 : spark streaming另类在线实验 瞬间理解spark streaming本质 一. 我们最开始将从Spark Streaming入手 为何从Spark Streaming切入 ...
- Spark详解(十三):Spark Streaming 运行架构分析
1. 运行架构 SparkStreaming的主要功能包括流处理引擎的流数据接收与存储以及批处理作业的生成与管理,而Spark核心负责处理Spark Streaming发送过来的作用.Spark St ...
- Spark详解(十二):Spark Streaming原理和实现
1 简介 SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字 ...
最新文章
- python使用pandas通过聚合获取时序数据的最后一个指标数据(例如长度指标、时间指标)生成标签并与原表连接(join)进行不同标签特征的可视化分析
- Python生成随机五位数——模仿手机验证码
- 利用MATLAB帮助求解作业中的Laplace变换和Z变换
- RoIPooling
- 系列文章--jQuery教程
- python split(), os.path.split()和os.path.splitext()函数的区别
- python bottle web框架上传静态文件与加载静态文件
- hyperopt中文文档:Installation-Notes安装说明
- 浅析HTML、CSS、JavaScript之间的联系与区别
- 阿里巴巴证实全资收购协作软件平台 Teambition
- 在B/S系统中引入定时器的功能
- 【TWVRP】基于matlab遗传算法求解带时间窗的车辆路径问题【含Matlab源码 002期】
- 支持向量回归(多核函数)
- Windows批处理添加注释
- SIF协议(一线通)
- 编码通信与魔术初步(六)——经典魔术《傅氏幻术》赏析和《我的心灵感应》...
- 10.13 写一个用矩形法求定积分的通用函数,分别求∫_0^1▒sinxdx 、∫_0^1▒cosxdx、∫_0^1▒〖e^x dx〗的值。
- mysql存储过程算四分位
- 阿里百川淘宝联盟私域会员对接
- 什么级别的企业可以进行数字化转型?
热门文章
- C++ set 排序 修改元素之后不会改变原来的排序
- 【Django 2021年最新版教程25】模板语言 前端for循环怎么用 实例
- 【Django 2021年最新版教程13】Cookie是什么 如何使用
- centos 6.5 编译php mysql5.6_CentOS6.5 编译安装PHP5.6(apache模块)
- php点击表格单元格链接,详解PhpSpreadsheet单元格设置样式、图片、超链接等
- Unity MRTK 制作按钮调整大小
- java ctr v,解决 ctrol c ctrol v 复制 粘贴 不好用的问题 只复制一次的问题
- 二十三、Oracle学习笔记:综合案例
- 新xp系统如何链接网络连接服务器地址,xp系统如何设置宽带连接
- arduino 读取串口信息hex_进阶教程1:Arduino串口通信与电脑控制LED