问题导读:

1.streaming application 如何兼容众多数据源?

2.receivers 是如何分发并启动的?
3.receiver 接收到的数据是如何流转的?

Spark Streaming 在数据接收与导入方面需要满足有以下三个特点:

  • 兼容众多输入源,包括HDFS, Flume, Kafka, Twitter and ZeroMQ。还可以自定义数据源
  • 要能为每个 batch 的 RDD 提供相应的输入数据
  • 为适应 7*24h 不间断运行,要有接收数据挂掉的容错机制


有容乃大,兼容众多数据源
在文章DStreamGraph 与 DStream DAG中,我们提到

InputDStream是所有 input streams(数据输入流) 的虚基类。该类提供了 start() 和 stop()方法供 streaming 系统来开始和停止接收数据。那些只需要在 driver 端接收数据并转成 RDD 的 input streams 可以直接继承 InputDStream,例如 FileInputDStream是 InputDStream 的子类,它监控一个 HDFS 目录并将新文件转成RDDs。而那些需要在 workers 上运行receiver 来接收数据的 Input DStream,需要继承 ReceiverInputDStream,比如 KafkaReceiver

只需在 driver 端接收数据的 input stream 一般比较简单且在生产环境中使用的比较少,本文不作分析,只分析继承了 ReceiverInputDStream 的 input stream 是如何导入数据的。

ReceiverInputDStream有一个def getReceiver(): Receiver[T]方法,每个继承了ReceiverInputDStream的 input stream 都必须实现这个方法。该方法用来获取将要分发到各个 worker 节点上用来接收数据的 receiver(接收器)。不同的 ReceiverInputDStream 子类都有它们对应的不同的 receiver,如KafkaInputDStream对应KafkaReceiver,FlumeInputDStream对应FlumeReceiver,TwitterInputDStream对应TwitterReceiver,如果你要实现自己的数据源,也需要定义相应的 receiver。

继承 ReceiverInputDStream 并定义相应的 receiver,就是 Spark Streaming 能兼容众多数据源的原因。

为每个 batch 的 RDD 提供输入数据
在 StreamingContext 中,有一个重要的组件叫做 ReceiverTracker,它是 Spark Streaming 作业调度器 JobScheduler 的成员,负责启动、管理各个 receiver 及管理各个 receiver 接收到的数据。

确定 receiver 要分发到哪些 executors 上执行
创建 ReceiverTracker 实例

我们来看 StreamingContext#start() 方法部分调用实现,如下:

204749-bf9b38d23925a091.jpg (17.82 KB, 下载次数: 0)

下载附件  保存到相册

昨天 15:30 上传

可以看到,StreamingContext#start() 会调用 JobScheduler#start() 方法,在 JobScheduler#start() 中,会创建一个新的 ReceiverTracker 实例 receiverTracker,并调用其 start() 方法。

ReceiverTracker#start()

继续跟进 ReceiverTracker#start(),如下图,它主要做了两件事:

  • 初始化一个 endpoint: ReceiverTrackerEndpoint,用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息
  • 调用 launchTasks 来自将各个 receivers 分发到 executors 上

204749-355a9beff1de7903.jpg (23.76 KB, 下载次数: 0)

下载附件  保存到相册

昨天 15:31 上传

ReceiverTracker#launchTasks()

继续跟进 launchTasks,它也主要干了两件事:

  • 获取 DStreamGraph.inputStreams 中继承了 ReceiverInputDStream 的 input streams 的 receivers。也就是数据接收器
  • 给消息接收处理器 endpoint 发送 StartAllReceivers(receivers)消息。直接返回,不等待消息被处理

204749-ced9c860d8d7c02a.jpg (64.94 KB, 下载次数: 0)

下载附件  保存到相册

昨天 15:32 上传

处理StartAllReceivers消息

endpoint 在接收到消息后,会先判断消息类型,对不同的消息做不同处理。对于StartAllReceivers消息,处理流程如下:

  • 计算每个 receiver 要分发的目的 executors。遵循两条原则:

    • 将 receiver 分布的尽量均匀
    • 如果 receiver 的preferredLocation本身不均匀,以preferredLocation为准
  • 遍历每个 receiver,根据第1步中得到的目的 executors 调用 startReceiver 方法

204749-933b30645f821f62.jpg (41.93 KB, 下载次数: 0)

下载附件  保存到相册

昨天 15:32 上传

到这里,已经确定了每个 receiver 要分发到哪些 executors 上

启动 receivers

接上,通过 ReceiverTracker#startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]) 来启动 receivers,我们来看具体流程:

204749-cd51158e2d2877d1.jpg (55.8 KB, 下载次数: 0)

下载附件  保存到相册

昨天 15:33 上传

如上流程图所述,分发和启动 receiver 的方式不可谓不精彩。其中,startReceiverFunc 函数主要实现如下:

[Python] 纯文本查看 复制代码
?
1
2
3
4
val supervisor = new ReceiverSupervisorImpl(
  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()

supervisor.start() 中会调用 receiver#onStart 后立即返回。receiver#onStart 一般自行新建线程或线程池来接收数据,比如在 KafkaReceiver 中,就新建了线程池,在线程池中接收 topics 的数据。

supervisor.start() 返回后,由 supervisor.awaitTermination() 阻塞住线程,以让这个 task 一直不退出,从而可以源源不断接收数据。

数据流转

204749-372c9a75a4b76f9b.jpg (79.13 KB, 下载次数: 0)

下载附件  保存到相册

昨天 15:34 上传

上图为 receiver 接收到的数据的流转过程,让我们来逐一分析

Step1: Receiver -> ReceiverSupervisor

这一步中,Receiver 将接收到的数据源源不断地传给 ReceiverSupervisor。Receiver 调用其 store(...) 方法,store 方法中继续调用 supervisor.pushSingle 或 supervisor.pushArrayBuffer 等方法来传递数据。Receiver#store 有多重形式, ReceiverSupervisor 也有 pushSingle、pushArrayBuffer、pushIterator、pushBytes 方法与不同的 store 对应。

  • pushSingle: 对应单条小数据
  • pushArrayBuffer: 对应数组形式的数据
  • pushIterator: 对应 iterator 形式数据
  • pushBytes: 对应 ByteBuffer 形式的块数据

对于细小的数据,存储时需要 BlockGenerator 聚集多条数据成一块,然后再成块存储;反之就不用聚集,直接成块存储。当然,存储操作并不在 Step1 中执行,只为说明之后不同的操作逻辑。

Step2.1: ReceiverSupervisor -> BlockManager -> disk/memory

在这一步中,主要将从 receiver 收到的数据以 block(数据块)的形式存储
存储 block 的是receivedBlockHandler: ReceivedBlockHandler,根据参数spark.streaming.receiver.writeAheadLog.enable配置的不同,默认为 false,receivedBlockHandler对象对应的类也不同,如下:

[Python] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    //< 先写 WAL,再存储到 executor 的内存或硬盘
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    //< 直接存到 executor 的内存或硬盘
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

启动 WAL 的好处就是在application 挂掉之后,可以恢复数据。

[Python] 纯文本查看 复制代码
?
1
2
3
4
5
6
//< 调用 receivedBlockHandler.storeBlock 方法存储 block,并得到一个 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//< 使用blockStoreResult初始化一个ReceivedBlockInfo实例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//< 发送消息通知 ReceiverTracker 新增并存储了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

不管是 WriteAheadLogBasedBlockHandler 还是 BlockManagerBasedBlockHandler 最终都是通过 BlockManager 将 block 数据存储 execuor 内存或磁盘或还有 WAL 方式存入。

这里需要说明的是 streamId,每个 InputDStream 都有它自己唯一的 id,即 streamId,blockInfo包含 streamId 是为了区分block 是哪个 InputDStream 的数据。之后为 batch 分配 blocks 时,需要知道每个 InputDStream 都有哪些未分配的 blocks。

Step2.2: ReceiverSupervisor -> ReceiverTracker

将 block 存储之后,获得 block 描述信息 blockInfo: ReceivedBlockInfo,这里面包含:streamId、数据位置、数据条数、数据 size 等信息。

之后,封装以 block 作为参数的 AddBlock(blockInfo) 消息并发送给 ReceiverTracker 以通知其有新增 block 数据块。

Step3: ReceiverTracker -> ReceivedBlockTracker

ReceiverTracker 收到 ReceiverSupervisor 发来的 AddBlock(blockInfo) 消息后,直接调用以下代码将 block 信息传给 ReceivedBlockTracker:

[Python] 纯文本查看 复制代码
?
1
2
3
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

receivedBlockTracker.addBlock中,如果启用了 WAL,会将新增的 block 信息以 WAL 方式保存。

无论 WAL 是否启用,都会将新增的 block 信息保存到 streamIdToUnallocatedBlockQueues: mutable.HashMap[Int, ReceivedBlockQueue]中,该变量 key 为 InputDStream 的唯一 id,value 为已存储未分配的 block 信息。之后为 batch 分配blocks,会访问该结构来获取每个 InputDStream 对应的未消费的 blocks。

相关文章

揭开Spark Streaming神秘面纱① - DStreamGraph 与 DStream DAG
http://www.aboutyun.com/forum.php?mod=viewthread&tid=17807

揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
http://www.aboutyun.com/forum.php?mod=viewthread&tid=17825

揭开Spark Streaming神秘面纱③ - 动态生成 job

http://www.aboutyun.com/forum.php?mod=viewthread&tid=17826

文/牛肉圆粉不加葱(简书作者)
原文链接:http://www.jianshu.com/p/3195fb3c4191

揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入相关推荐

  1. Flink难点解析:揭开Watermark的神秘面纱

    目录 一.时间 1.1 时间语义 1.1.1 Event Time 1.1.2 Ingestion Time 1.1.3 Processing Time 1.2 设置时间语义 二.Watermark ...

  2. 揭开木马的神秘面纱 1

    揭开木马的神秘面纱 1 前言 在网上,大家最关心的事情之一就是木马:最近出了新的木马吗?木马究竟能实现 哪些功能?木马如何防治?木马究竟是如何工作的?本文试图以我国最著名的木马之  - 冰河为例,向大 ...

  3. 未来已来?揭开量子计算机的神秘面纱

    从第一台现代计算机ENIAC的诞生到个人PC时代的降临,从互联网概念的提出到移动互联的疾跑,在这个信息年代里,变革正以前所未有的速度改变着我们熟悉的世界.熟悉的生活. 作为个人,我们早已习惯于智能计算 ...

  4. ASP.NET 运行时详解 揭开请求过程神秘面纱

    对于ASP.NET开发,排在前五的话题离不开请求生命周期.像什么Cache.身份认证.Role管理.Routing映射,微软到底在请求过程中干了哪些隐秘的事,现在是时候揭晓了.抛开乌云见晴天,接下来就 ...

  5. 冰河浅析 - 揭开木马的神秘面纱(下)

    冰河浅析   -   揭开木马的神秘面纱(下)     作者:·   shotgun·yesky 四.破解篇(魔高一尺.道高一丈)         本文主要是探讨木马的基本原理,   木马的破解并非是 ...

  6. 揭开木马的神秘面纱 2

    揭开木马的神秘面纱zz 2 离冰河二的问世已经快一年了,大家对于木马这种远程控制软件也有了一定的认 识,比如:他会改注册表,他会监听端口等等,和一年前几乎没有人懂得木马是什么东   西相比,这是一个质 ...

  7. 【翻译】揭开HTML5的神秘面纱

    写在前面的话: 这篇文章摘自Mozilla官网,主要针对HTML5和本地应用发表了一些.没有设计到技术,所以基本是逐字翻译,但愿我蹩脚的英语水平能把大师的 Chris Heilmann的思想整理明白. ...

  8. Azure Stack技术深入浅出系列6:Azure Stack一体机探究 — 揭开黑盒子的神秘面纱

    Azure Stack是微软公有云平台的延伸,为客户环境里提供接口和相关的功能.微软的Azure Stack安装在指定的合作伙伴的一体机中,并以一体机的形式部署到客户的混合云应用环境里的.2017年7 ...

  9. 揭开POJO的神秘面纱

    众所周知,Struts2中的Action编写有三种方式,分别是:(1)POJO(Plain Old Java Object):(2)实现Action接口,重写execute()方法:(3)继承Acti ...

最新文章

  1. Codeforces 396A 数论,组合数学
  2. 训练数据也外包?这家公司“承包”了不少注释训练数据,原来是这样做的……...
  3. 【CSS】CSS前期回顾(2)
  4. IDEA运行第一个Spring Boot应用程序
  5. 市场定位和硬件设计的错误-浅谈GM8126的封装
  6. Zune 3.0与XNA GS 3.0 Beta
  7. kodi刮削器 中文_教你PLEX插件播放4K不能使用KODI解码导致卡顿的解决办法
  8. 《UNIX网络编程卷1》第一例及问题
  9. Avalanche发布AvalancheGo v1.2.3版本
  10. mdb新版本打不开_救命!!! 我打不开她给我的Pr工程……
  11. MySQL-5.6.14-winx64的免安装配置方法
  12. hashmap hash冲突怎么解决_让我再撸一次HashMap
  13. C#中跨工程跨项目注释的显示
  14. 中华优秀传统文化教育的有效渗透
  15. 局域网传输文件_局域网共享软件,详细教您局域网共享软件如何使用
  16. MicroMsg.SDK.WXMsgImplComm: ignore wechat app signature validation
  17. 基于VEH调试寄存器实现无痕HOOK(5)
  18. java 解析 键值_JAVA:解析单个字符串键值对
  19. matlab solve 矩阵,在Matlab中求解矩阵DAE系统
  20. 解决win10系统点击飞行模式后找不到WiFi连接问题

热门文章

  1. lua使用table例子
  2. JS----DOM节点操作:创建 ,插入,删除,复制,查找节点
  3. Z01282Visual C#从入门到精通 第八版PDF
  4. 太强了,Python还可以计算农历
  5. Axure高大上低保真组件库
  6. ssh用户身份验证不能选择password
  7. 【知网研学】使用方法
  8. JSD-2204-API-进制-Day19
  9. 数据可视化ECharts:中国地图模拟飞行模块
  10. Mulliken电荷,来自哪里?又归于何方?