Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次,保证语义一致性。但是当作业发生故障或重启时,要保障从当前的消费位点去处理数据(即Exactly Once语义),单纯的依靠SparkStreaming本身的机制是不太理想的,生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset,希望对你有所帮助。本文主要包括以下内容:

  • 如何使用MySQL管理Kafka的Offset

  • 如何使用Redis管理Kafka的OffSet

如何使用MySQL管理Kafka的Offset

我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量,偏移量可以从每一批流处理中生成的RDDS偏移量来获取,获取方式为:

KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 ...
  }

当获取到偏移量之后,可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。

使用案例代码

  • MySQL中用于保存偏移量的表

CREATE TABLE `topic_par_group_offset` (
  `topic` varchar(255) NOT NULL

SparkStreaming手动维护Kafka Offset的几种方式相关推荐

  1. sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式

    Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息.输入流保证每个消息从Kafka ...

  2. Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

    Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方 ...

  3. springboot手动提交kafka offset

    转载自 springboot手动提交kafka offset enable.auto.commit参数设置成了false 但是测试发现enable.auto.commit参数设置成了false,kaf ...

  4. java kafka设置偏移量_kafka实战宝典:手动修改消费偏移量的两种方式

    kafka实战宝典:手动修改消费偏移量的两种方式 工作中遇到过消费端报错的问题:包括数据Invalid Message和Failed_to_UNcompress等报错信息,导致消费端的iterator ...

  5. flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic

    flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...

  6. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

  7. SparkStreaming从Kafka读取数据两种方式

    参考文章:http://www.jianshu.com/p/60344796f8a5 在结合 Spark Streaming 及 Kafka 的实时应用中,我们通常使用以下两个 API 来获取最初的 ...

  8. 指定开始_Flink-Kafka指定offset的五种方式

    默认:从topic中指定的group上次消费的位置开始消费. 所以必须配置group.id参数从消费者组提交的偏移量开始读取分区(kafka或zookeeper中).如果找不到分区的偏移量,auto. ...

  9. Spark Streaming读取Kafka数据的两种方式

    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...

最新文章

  1. python二分法求解_Python使用二分法求平方根的简单示例
  2. Require.js
  3. jaxb_JAXB –新手的观点,第1部分
  4. lintcode433 岛屿的个数
  5. 二叉树的存储结构入门(java描述)
  6. Excel直接跳过隐藏行,粘贴数据到可见单元格
  7. 关于购买域名的一些建议
  8. getinfo怎么用php,PHP的函数curl-curl_getinfo
  9. Chrome html播放器卡顿,谷歌Chrome浏览器卡顿原因及解决办法
  10. Muti-Similarity Loss:考虑了batch中整体距离分布的对比损失函数
  11. 安培龙IPO过会:年营收5亿 同创伟业与中移创新是股东
  12. Can not modify more than one base table through a join view
  13. export ‘createStore‘ (imported as ‘createStore‘) was not found in ‘./store/index.js‘ (possible expor
  14. c 语言中古括号,如何将中古调式运用在你的作品上
  15. Oracle分区之五:创建分区索引总结
  16. 夏磊2019MySQL高级学习笔记
  17. scrapy爬取——阿里招聘信息
  18. 聊聊公钥私钥的那点事儿
  19. SNA中心论的相关概念
  20. 学员故事:老男孩Linux运维班学习五个月,让我实现月薪万元+

热门文章

  1. oracle ||#039; where #039;||condition;,帝国cms后台添加字段提示#039;Row size too large. The maximum row size...
  2. vue watch 监听不到变化_关于vue中watch检测到不到对象属性的变化的解决方法
  3. 55寸鸿蒙安卓,深网|荣耀智慧屏发布:搭载鸿蒙系统 配55英寸屏3799元起
  4. php七牛持久化处理,使用七牛自定义数据处理范例
  5. 抑制过拟合的方法之Dropout(随机删除神经元)
  6. Oracle循环语句
  7. 以腾讯云IoT Suite为例 谈谈边缘计算在物联网的实践与实现
  8. 对于 Redux 的理解
  9. Openresty 学习笔记(一)opm 工具的使用
  10. eclipse集成maven