SparkStreaming手动维护Kafka Offset的几种方式
Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次,保证语义一致性。但是当作业发生故障或重启时,要保障从当前的消费位点去处理数据(即Exactly Once语义),单纯的依靠SparkStreaming本身的机制是不太理想的,生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset,希望对你有所帮助。本文主要包括以下内容:
如何使用MySQL管理Kafka的Offset
如何使用Redis管理Kafka的OffSet
如何使用MySQL管理Kafka的Offset
我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量,偏移量可以从每一批流处理中生成的RDDS偏移量来获取,获取方式为:
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
当获取到偏移量之后,可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。
使用案例代码
MySQL中用于保存偏移量的表
CREATE TABLE `topic_par_group_offset` (
`topic` varchar(255) NOT NULL
SparkStreaming手动维护Kafka Offset的几种方式相关推荐
- sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式
Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息.输入流保证每个消息从Kafka ...
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方 ...
- springboot手动提交kafka offset
转载自 springboot手动提交kafka offset enable.auto.commit参数设置成了false 但是测试发现enable.auto.commit参数设置成了false,kaf ...
- java kafka设置偏移量_kafka实战宝典:手动修改消费偏移量的两种方式
kafka实战宝典:手动修改消费偏移量的两种方式 工作中遇到过消费端报错的问题:包括数据Invalid Message和Failed_to_UNcompress等报错信息,导致消费端的iterator ...
- flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic
flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...
- kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式
Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...
- SparkStreaming从Kafka读取数据两种方式
参考文章:http://www.jianshu.com/p/60344796f8a5 在结合 Spark Streaming 及 Kafka 的实时应用中,我们通常使用以下两个 API 来获取最初的 ...
- 指定开始_Flink-Kafka指定offset的五种方式
默认:从topic中指定的group上次消费的位置开始消费. 所以必须配置group.id参数从消费者组提交的偏移量开始读取分区(kafka或zookeeper中).如果找不到分区的偏移量,auto. ...
- Spark Streaming读取Kafka数据的两种方式
Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...
最新文章
- python二分法求解_Python使用二分法求平方根的简单示例
- Require.js
- jaxb_JAXB –新手的观点,第1部分
- lintcode433 岛屿的个数
- 二叉树的存储结构入门(java描述)
- Excel直接跳过隐藏行,粘贴数据到可见单元格
- 关于购买域名的一些建议
- getinfo怎么用php,PHP的函数curl-curl_getinfo
- Chrome html播放器卡顿,谷歌Chrome浏览器卡顿原因及解决办法
- Muti-Similarity Loss:考虑了batch中整体距离分布的对比损失函数
- 安培龙IPO过会:年营收5亿 同创伟业与中移创新是股东
- Can not modify more than one base table through a join view
- export ‘createStore‘ (imported as ‘createStore‘) was not found in ‘./store/index.js‘ (possible expor
- c 语言中古括号,如何将中古调式运用在你的作品上
- Oracle分区之五:创建分区索引总结
- 夏磊2019MySQL高级学习笔记
- scrapy爬取——阿里招聘信息
- 聊聊公钥私钥的那点事儿
- SNA中心论的相关概念
- 学员故事:老男孩Linux运维班学习五个月,让我实现月薪万元+
热门文章
- oracle ||#039; where #039;||condition;,帝国cms后台添加字段提示#039;Row size too large. The maximum row size...
- vue watch 监听不到变化_关于vue中watch检测到不到对象属性的变化的解决方法
- 55寸鸿蒙安卓,深网|荣耀智慧屏发布:搭载鸿蒙系统 配55英寸屏3799元起
- php七牛持久化处理,使用七牛自定义数据处理范例
- 抑制过拟合的方法之Dropout(随机删除神经元)
- Oracle循环语句
- 以腾讯云IoT Suite为例 谈谈边缘计算在物联网的实践与实现
- 对于 Redux 的理解
- Openresty 学习笔记(一)opm 工具的使用
- eclipse集成maven