SparkStreaming从Kafka读取数据两种方式
参考文章:http://www.jianshu.com/p/60344796f8a5
在结合 Spark Streaming 及 Kafka 的实时应用中,我们通常使用以下两个 API 来获取最初的 DStream(这里不关心这两个 API 的重载):
KafkaUtils#createDirectStream
及
KafkaUtils#createStream
这两个 API 除了要传入的参数不同外,接收 kafka 数据的节点、拉取数据的时机也完全不同。
本文将分别就两者进行详细分析。
一、KafkaUtils#createStream
先来分析 createStream
,在该函数中,会新建一个 KafkaInputDStream
对象,KafkaInputDStream
继承于 ReceiverInputDStream
。我们在文章揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入分析过
- 继承ReceiverInputDStream的类需要重载 getReceiver 函数以提供用于接收数据的 receiver
- recever 会调度到某个 executor 上并启动,不间断的接收数据并将收到的数据交由 ReceiverSupervisor 存成 block 作为 RDD 输入数据
KafkaInputDStream当然也实现了getReceiver方法,如下:
def getReceiver(): Receiver[(K, V)] = {if (!useReliableReceiver) {//< 不启用 WALnew KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { //< 启用 WAL new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } }
根据是否启用 WAL,receiver 分为 KafkaReceiver 和 ReliableKafkaReceiver。揭开Spark Streaming神秘面纱②-ReceiverTracker 与数据导入一文中详细地介绍了
- receiver 是如何被分发启动的
- receiver 接受数据后数据的流转过程
并在 揭开Spark Streaming神秘面纱③ - 动态生成 job 一文中详细介绍了 - receiver 接受的数据存储为 block 后,如何将 blocks 作为 RDD 的输入数据
- 动态生成 job
以上两篇文章并没有具体介绍 receiver 是如何接收数据的,当然每个重载了 ReceiverInputDStream 的类的 receiver 接收数据方式都不相同。下图描述了 KafkaReceiver 接收数据的具体流程:
KafkaUtils#createDirectStream
在揭开Spark Streaming神秘面纱③ - 动态生成 job中,介绍了在生成每个 batch 的过程中,会去取这个 batch 对应的 RDD,若未生成该 RDD,则会取该 RDD 对应的 blocks 数据来生成 RDD,最终会调用到DStream#compute(validTime: Time)
函数,在KafkaUtils#createDirectStream
调用中,会新建DirectKafkaInputDStream
,DirectKafkaInputDStream#compute(validTime: Time)
会从 kafka 拉取数据并生成 RDD,流程如下:
如上图所示,该函数主要做了以下三个事情:
- 确定要接收的 partitions 的 offsetRange,以作为第2步创建的 RDD 的数据来源
- 创建 RDD 并执行 count 操作,使 RDD 真实具有数据
- 以 streamId、数据条数,offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中
进一步看 KafkaRDD 的 getPartitions 实现:
override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }
从上面的代码可以很明显看到,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的
通过以上分析,我们可以对这两种方式的区别做一个总结:
- createStream会使用 Receiver;而createDirectStream不会
- createStream使用的 Receiver 会分发到某个 executor 上去启动并接受数据;而createDirectStream直接在 driver 上接收数据
- createStream使用 Receiver 源源不断的接收数据并把数据交给 ReceiverSupervisor 处理最终存储为 blocks 作为 RDD 的输入,从 kafka 拉取数据与计算消费数据相互独立;而createDirectStream会在每个 batch 拉取数据并就地消费,到下个 batch 再次拉取消费,周而复始,从 kafka 拉取数据与计算消费数据是连续的,没有独立开
- createStream中创建的KafkaInputDStream 每个 batch 所对应的 RDD 的 partition 不与 Kafka partition 一一对应;而createDirectStream中创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的 partition 与 Kafka partition 一一对应
转载于:https://www.cnblogs.com/liugh/articles/6817553.html
SparkStreaming从Kafka读取数据两种方式相关推荐
- kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式
Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...
- JAVA与PLC通讯读取数据(两种方式)
第一种方式(s7connector) S7官网:S7Connector - Documentation,有简单的读写操作参考. 1.创建maven工程引入依赖 <dependency>&l ...
- flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic
flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...
- ireport参数传递json_Json传递数据两种方式(json大全)
1.Json传递数据两种方式(json大全) ----------------------------字符串 var list1 = ["number","name&qu ...
- java 读取css文件_java文件读取的两种方式
JAVA中读取文件(二进制,字符)内容的几种方 JAVA中读取文件内容的方法有很多,比如按字节读取文件内容,按字符读取文件内容,按行读取文件内容,随机读取文件内容等方法,本文就以上方法的具体实现给出代 ...
- Hive元数据的读取的两种方式
1.直连模式 使用JDBC的方式直接去mysql中读取元数据,称为直连模式 需要的条件: 连接Mysql的驱动,已经放入到$HIVE_HOME/lib下 创建连接时,需要有url,username,p ...
- 前端项目模拟数据两种方式
文章目录 1. Mock.js 1.1 安装依赖包mockjs 1.2 在 src 目录下创建 mock 文件夹 1.3 准备模拟的数据 1.4 创建 mockServer.js 1.5 引入mock ...
- Excel文件读取的两种方式
1.Pandas库的读取操作 from pandas import read_excel dr=read_excel(filename,header) dr#dataframe数据 dw=DataFr ...
- python 爬虫The One的 数据两种方式
系统:windows 语言:python 工具:pycharm 需要的包:beautifulsoup.requests 安装以上的包,方法类似, 手动方式 任务:爬300条.采用多线程.非多线程 单线 ...
最新文章
- flash动画制作成品_flash动画制作
- 利用返回引用来操作结构体
- 一张图片学Python
- RxJs SwitchMapTo 操作符之移花接木
- Betty's Sales team BP SQL
- Java接口的防御性API演进
- Stimulsoft reports .net中创建变量
- robocopy的退出返回代码
- 《C++游戏编程入门(第4版)》——1.9 本章小结
- android自定义播放器按钮,android – 使用exo播放器添加全屏视频按钮
- 万象:庸人容易因欠缺自知之明而自我膨胀
- Mac Xcode 更改编辑器文本字体大小
- 1.7 什么是软件生命周期模型?试比较瀑布模型,快速原型模型,增量模型和螺旋模型的优缺点,说明每种模型的适用范围
- Python opencv:人眼/人脸识别并实时打码处理
- vue项目中使用swiper 实现无缝滚动
- Android N 应用内更新
- 标准IO fgets的使用
- Saturn Console部署踩坑总结
- vs不一致的行尾对话框怎么调出_爱喝白兰地的福建人,跟你讲讲“VS、VSOP、XO”...
- 与开发斗智斗勇的日子
热门文章
- 浅谈Laravel中的设计模式(四) Contract 契约模式
- setState 是异步的
- windows下php7.1安装redis扩展以及redis测试使用全过程
- mysql 密码 You must reset your password using ALTER USER statement before executing this statement....
- 求两个数的最大公因数
- Ansible详解(二)
- 音乐播放器的实现-音乐文件的获取(1)
- 2010 PDC Party @深圳 免费技术交流活动公告
- 转载:写给计算机专业的朋友们
- Android投票列表设计,AndroidCustomView一个简单的投票排名对比图