参考文章:http://www.jianshu.com/p/60344796f8a5

在结合 Spark Streaming 及 Kafka 的实时应用中,我们通常使用以下两个 API 来获取最初的 DStream(这里不关心这两个 API 的重载):

KafkaUtils#createDirectStream

KafkaUtils#createStream

这两个 API 除了要传入的参数不同外,接收 kafka 数据的节点、拉取数据的时机也完全不同。

本文将分别就两者进行详细分析。

一、KafkaUtils#createStream

先来分析 createStream,在该函数中,会新建一个 KafkaInputDStream对象,KafkaInputDStream继承于 ReceiverInputDStream。我们在文章揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入分析过

  1. 继承ReceiverInputDStream的类需要重载 getReceiver 函数以提供用于接收数据的 receiver
  2. recever 会调度到某个 executor 上并启动,不间断的接收数据并将收到的数据交由 ReceiverSupervisor 存成 block 作为 RDD 输入数据

KafkaInputDStream当然也实现了getReceiver方法,如下:

  def getReceiver(): Receiver[(K, V)] = {if (!useReliableReceiver) {//< 不启用 WALnew KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { //< 启用 WAL new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } }

根据是否启用 WAL,receiver 分为 KafkaReceiver 和 ReliableKafkaReceiver。揭开Spark Streaming神秘面纱②-ReceiverTracker 与数据导入一文中详细地介绍了

  1. receiver 是如何被分发启动的
  2. receiver 接受数据后数据的流转过程
    并在 揭开Spark Streaming神秘面纱③ - 动态生成 job 一文中详细介绍了
  3. receiver 接受的数据存储为 block 后,如何将 blocks 作为 RDD 的输入数据
  4. 动态生成 job

以上两篇文章并没有具体介绍 receiver 是如何接收数据的,当然每个重载了 ReceiverInputDStream 的类的 receiver 接收数据方式都不相同。下图描述了 KafkaReceiver 接收数据的具体流程:

KafkaUtils#createDirectStream

在揭开Spark Streaming神秘面纱③ - 动态生成 job中,介绍了在生成每个 batch 的过程中,会去取这个 batch 对应的 RDD,若未生成该 RDD,则会取该 RDD 对应的 blocks 数据来生成 RDD,最终会调用到DStream#compute(validTime: Time)函数,在KafkaUtils#createDirectStream调用中,会新建DirectKafkaInputDStreamDirectKafkaInputDStream#compute(validTime: Time)会从 kafka 拉取数据并生成 RDD,流程如下:

如上图所示,该函数主要做了以下三个事情:

  1. 确定要接收的 partitions 的 offsetRange,以作为第2步创建的 RDD 的数据来源
  2. 创建 RDD 并执行 count 操作,使 RDD 真实具有数据
  3. 以 streamId、数据条数,offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中

进一步看 KafkaRDD 的 getPartitions 实现:

  override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }

从上面的代码可以很明显看到,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的


通过以上分析,我们可以对这两种方式的区别做一个总结:

  1. createStream会使用 Receiver;而createDirectStream不会
  2. createStream使用的 Receiver 会分发到某个 executor 上去启动并接受数据;而createDirectStream直接在 driver 上接收数据
  3. createStream使用 Receiver 源源不断的接收数据并把数据交给 ReceiverSupervisor 处理最终存储为 blocks 作为 RDD 的输入,从 kafka 拉取数据与计算消费数据相互独立;而createDirectStream会在每个 batch 拉取数据并就地消费,到下个 batch 再次拉取消费,周而复始,从 kafka 拉取数据与计算消费数据是连续的,没有独立开
  4. createStream中创建的KafkaInputDStream 每个 batch 所对应的 RDD 的 partition 不与 Kafka partition 一一对应;而createDirectStream中创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的 partition 与 Kafka partition 一一对应

转载于:https://www.cnblogs.com/liugh/articles/6817553.html

SparkStreaming从Kafka读取数据两种方式相关推荐

  1. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

  2. JAVA与PLC通讯读取数据(两种方式)

    第一种方式(s7connector) S7官网:S7Connector - Documentation,有简单的读写操作参考. 1.创建maven工程引入依赖 <dependency>&l ...

  3. flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic

    flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...

  4. ireport参数传递json_Json传递数据两种方式(json大全)

    1.Json传递数据两种方式(json大全) ----------------------------字符串 var list1 = ["number","name&qu ...

  5. java 读取css文件_java文件读取的两种方式

    JAVA中读取文件(二进制,字符)内容的几种方 JAVA中读取文件内容的方法有很多,比如按字节读取文件内容,按字符读取文件内容,按行读取文件内容,随机读取文件内容等方法,本文就以上方法的具体实现给出代 ...

  6. Hive元数据的读取的两种方式

    1.直连模式 使用JDBC的方式直接去mysql中读取元数据,称为直连模式 需要的条件: 连接Mysql的驱动,已经放入到$HIVE_HOME/lib下 创建连接时,需要有url,username,p ...

  7. 前端项目模拟数据两种方式

    文章目录 1. Mock.js 1.1 安装依赖包mockjs 1.2 在 src 目录下创建 mock 文件夹 1.3 准备模拟的数据 1.4 创建 mockServer.js 1.5 引入mock ...

  8. Excel文件读取的两种方式

    1.Pandas库的读取操作 from pandas import read_excel dr=read_excel(filename,header) dr#dataframe数据 dw=DataFr ...

  9. python 爬虫The One的 数据两种方式

    系统:windows 语言:python 工具:pycharm 需要的包:beautifulsoup.requests 安装以上的包,方法类似, 手动方式 任务:爬300条.采用多线程.非多线程 单线 ...

最新文章

  1. flash动画制作成品_flash动画制作
  2. 利用返回引用来操作结构体
  3. 一张图片学Python
  4. RxJs SwitchMapTo 操作符之移花接木
  5. Betty's Sales team BP SQL
  6. Java接口的防御性API演进
  7. Stimulsoft reports .net中创建变量
  8. robocopy的退出返回代码
  9. 《C++游戏编程入门(第4版)》——1.9 本章小结
  10. android自定义播放器按钮,android – 使用exo播放器添加全屏视频按钮
  11. 万象:庸人容易因欠缺自知之明而自我膨胀
  12. Mac Xcode 更改编辑器文本字体大小
  13. 1.7 什么是软件生命周期模型?试比较瀑布模型,快速原型模型,增量模型和螺旋模型的优缺点,说明每种模型的适用范围
  14. Python opencv:人眼/人脸识别并实时打码处理
  15. vue项目中使用swiper 实现无缝滚动
  16. Android N 应用内更新
  17. 标准IO fgets的使用
  18. Saturn Console部署踩坑总结
  19. vs不一致的行尾对话框怎么调出_爱喝白兰地的福建人,跟你讲讲“VS、VSOP、XO”...
  20. 与开发斗智斗勇的日子

热门文章

  1. 浅谈Laravel中的设计模式(四) Contract 契约模式
  2. setState 是异步的
  3. windows下php7.1安装redis扩展以及redis测试使用全过程
  4. mysql 密码 You must reset your password using ALTER USER statement before executing this statement....
  5. 求两个数的最大公因数
  6. Ansible详解(二)
  7. 音乐播放器的实现-音乐文件的获取(1)
  8. 2010 PDC Party @深圳 免费技术交流活动公告
  9. 转载:写给计算机专业的朋友们
  10. Android投票列表设计,AndroidCustomView一个简单的投票排名对比图