通过Spark listener实现Direct模式读取Kafaka数据
参考文章:
http://coolplayer.net/2016/11/30/spark-streaming-从kafka-拉数据如何保证数据不丢失/
https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/spark_streaming使用kafka保证数据零丢失.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
为什么使用 direct 方式
Direct Approach VS Receiver-based Approach
- 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了。这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整。
- 数据默认就被分布到了多个Executor上。Receiver-based Approach 你需要做特定的处理,才能让 Receiver分不到多个Executor上。
Receiver-based Approach 的方式,一旦你的Batch Processing 被delay了,或者被delay了很多个batch,那估计你的Spark Streaming程序离奔溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你delay了很多个batch time,你内存中的数据只有这次处理的。
能保证exact once 语意, 图解参考
spark streaming 里面应该怎么做
- spark 可以使用 Direct 和 Receiver-based 两种方式从kafka中取数据,显然我们应该选用Direct 方式
- 在处理过程中应该考虑如何 recovery, 这样我们需要把每个batch中的分区消费位置持久化存储在hdfs上, 为了重启的时候可以从上次断掉的位置继续消费
实现思路如下
我们可以 override StreamingListener 的onBatchCompleted函数, 在每个 batch 处理完的时候保存一下当前处理的kafka 的 offset, offset 数组信息可以从InputInfoTracker中获取,
重启的时候可以从持久化目录里面获取最后消费的分区消费位置数组, 然后设置一下 DirectKafkaInputDStream 的 currentOffsets 字段, 就可以做到从上次断掉的位置继续消费
关键代码如下
streamingContext.addStreamingListener(new BatchStreamingListener(this))
class BatchStreamingListener(runtime: SparkStreamingRuntime) extends StreamingListener {override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {operator.directKafkaRecoverSource.saveJobSate(time)} }
val info = ssc.scheduler.inputInfoTracker.getInfo(time) info(jobName).metadata("offsets")
这里确保每一批数据消费完了之后,就持久化kafka中的分区offset 数组, 可以从 inputInfoTracker 获取每个batch 中处理的offset数组,写成一个文件,下面就是根据参数中设置的hdfs持久化路径上传到hdfs
val field = classOf[DirectKafkaInputDStream[_, _, _, _, _]].getDeclaredField("currentOffsets") field.setAccessible(true) field.set(dkid, state)
测试下面就简单了,在下次重启spark streaming 的时候遍历hdfs持久化路径中最末尾的文件, 找到其中保存的kafka offset 数组, 然后设置 DirectKafkaInputDStream 中的currentOffsets 字段, 然后就可以做到从上次断掉的位置继续消费
下面模拟一个spark streaming 从kafka中消费数据的场景, 由于spark streming 中是direct 的方式, offset的数组没有打入 zookeeper , 所以kafka 中自带的监控工具是失效的, 这里有一个比较 trick的方法, 你可以使用 bin/kafka-console-consumer.sh 工具消费同一个topic, 这样的话你就可以从kafka 的监控工具中看到消息写到哪个位置了, 然后再观察你spark 里面持久化的位置数组, 然后就可以确认是从上次断掉的位置继续消费还是从最新的位置消费
转载于:https://www.cnblogs.com/liugh/articles/6991993.html
通过Spark listener实现Direct模式读取Kafaka数据相关推荐
- Unity读取串口数据
读取串口数据的过程其实就跟你读取文件操作IO时的过程差不多: 首先要使用using System.IO.Ports;时需要先将 改为 具体操作: Edit -> Project Settings ...
- Spark Streaming读取Kafka数据的两种方式
Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...
- Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...
目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...
- Spark读取Hive数据的两种方式与保存数据到HDFS
Spark读取Hive数据的两种方式与保存数据到HDFS Spark读取Hive数据的方式主要有两种 1. 通过访问hive metastore的方式,这种方式通过访问hive的metastore元数 ...
- 第8课:彻底实战详解使用IDE开发Spark程序--集群模式运行
第8课:彻底实战详解使用IDE开发Spark程序--集群模式运行 拷贝WordCount.scala生成WordCountCluster.scala. 1. 将object WordCount改为ob ...
- spark shell的运行模式汇总
scala语言 模式 启动命令 local模式 spark-shell --master local yarn模式 spark-shell --master yarn standalone ...
- skip与direct模式区别 ,他们与CBP的关系
1 CBP表示残差的编码状态,CBP一共6bit,低4位表示4个亮度8x8块,第4位表示U,第五位表示V,如果相应的位为"1", 表示此块有残差系数,反之没有残差,此宏 ...
- 本地提交spark_spark快速入门(三)-------spark部署及运行模式
spark支持多种部署方案,包括spark自带的standalone资源调度模式(StandAlone):运行在hadoop的yarn资源调度框架中(SparkOnYARN):local本地模式:可以 ...
- python 读取文件到字典读取顺序_Python用list或dict字段模式读取文件的方法
前言 Python用于处理文本数据绝对是个利器,极为简单的读取.分割.过滤.转换支持,使得开发者不需要考虑繁杂的流文件处理过程(相对于JAVA来说的,嘻嘻).博主自己工作中,一些复杂的文本数据处理计算 ...
最新文章
- WDS部署服务之四镜像捕获(1)
- session的工作原理[择]
- 【Android 应用开发】Activity 状态保存 OnSaveInstanceState參数解析
- Simulink之负载换流式无源逆变电路
- oracle的序列为什么会出错,Oracle系列:(24)序列
- 图像形状特征(二)--Hu距
- 用Python将一个文件夹下多个子文件夹中相同文件拷贝到同一个文件夹中并重新命名
- Linux常用软件包
- Gtest之TEST宏的用法
- SQL WITH TIES解释与用法
- Catagory添加属性、扩展方法
- 看名言后的心得体会学会融会贯通
- 群晖存储空间不足-处理方案一
- 48V LDO三端稳压IC 60v 100V 300V电源降压芯片系统解决方案
- Rainbow Brackets彩虹括号插件(简明安装)
- win7怎样打开无线服务器,Win7怎么设置tplink路由器_Win7安装tplink路由器方法-192路由网...
- EWSTM8系列教程03_主窗口、工具栏的概述
- 【转载】不死族资深玩家的三年心得
- 分享自己使用python+pyserial+pyQT5写的串口调试助手
- django、vue如何实现websock通信,如何实现多人群聊
热门文章
- Unity3D笔记 GUI 二 、实现选项卡一窗口
- Github拉取远端的时候提示“ssh: connect to host github.com port 22: Connection timed out”错误...
- sql根据年月日查询注册数或者和值
- linkin大话面向对象--多态
- 《Ext JS权威指南》节选:在Visual Studio中实现Ext JS智能提示
- Windows内核情景分析 笔记
- EMC NetWorker备份oracle安装配置指南
- ie在线邮件html编辑器,IE中HTML编辑器的修改与使用.doc
- 在路由器上设置虚拟ftp服务器,怎么在路由器上开启ftp服务器配置
- FPGA实现多个数的加法运算