来源:大数据技术与架构作者:王知无

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

By 大数据技术与架构

场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。只有管理好offset,才能使整个流式系统最大限度地接近exactly once语义。

关键词:offset Spark Streaming

Kafka+Spark Streaming主要用于实时流处理。到目前为止,在大数据领域中是一种非常常见的架构。Kafka在其中主要起着一个缓冲的作用,所有的实时数据都会经过kafka。所以对kafka offset的管理是其中至关重要的一环。

我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。

一但管理不善,就会到导致数据丢失或重复消费。

offset的管理方式

一个简单的流程如下:

  • 在Kafka DirectStream初始化时,取得当前所有partition的存量offset,以让DirectStream能够从正确的位置开始读取数据。
  • 读取消息数据,处理并存储结果。
  • 提交offset,并将其持久化在可靠的外部存储中。
  • 图中的“process and store results”及“commit offsets”两项,都可以施加更强的限制,比如存储结果时保证幂等性,或者提交offset时采用原子操作。

保存offset的方式

Checkpoint:

Spark Streaming的checkpoints是最基本的存储状态信息的方式,一般是保存在HDFS中。但是最大的问题是如果streaming程序升级的话,checkpoints的数据无法使用,所以几乎没人使用。

offset的三种管理方式:

自动提交offset:

  • enable.auto.commit=true。
  • 一但consumer挂掉,就会导致数据丢失或重复消费。
  • offset不可控。

Kafka自身的offset管理:

  • (属于At-least-once语义,如果做好了幂等性,可以使用这种方式):
  • 在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。
  • Spark Streaming也专门提供了commitAsync() API用于提交offset。
  • 需要将参数修改为enable.auto.commit=false。
  • 在我实际测试中发现,这种offset的管理方式,不会丢失数据,但会出现重复消费。
  • 停掉streaming应用程序再次启动后,会再次消费停掉前最后的一个批次数据,应该是由于offset是异步提交的方式导致,offset更新不及时引起的。
  • 因此需要做好数据的幂等性。
  • (修改源码将异步改为同步,应该是可以做到Exactly-once语义的)

自定义offset:

  • (推荐,采用这种方式,可以做到At-least-once语义):
  • 可以将offset存放在第三方储中,包括RDBMS、Redis、ZK、ES等。
  • 若消费数据存储在带事务的组件上,则强烈推荐将offset存储在一起,借助事务实现 Exactly-once 语义。

示例

Kafka自身管理offset:

在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。所以我们读写offset的对象正是这个topic,Spark Streaming也专门提供了commitAsync() API用于提交offset。实际上,一切都已经封装好了,直接调用相关API即可。

stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 确保结果都已经正确且幂等地输出了 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}

ZooKeeper

在Spark Streaming连接Kafka应用中使用Zookeeper来存储offsets也是一种比较可靠的方式。

在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。

一个典型的工具类:

class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000); private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) // /consumers//offsets// partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}

string 中的offset_Kafka+Spark Streaming管理offset的几种方法相关推荐

  1. string 中的offset_Kafka+Spark Streaming管理offset的两种方法

    ​Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析. 为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kaf ...

  2. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

  3. java7 javascript引擎_Java7中脚本引擎的一般用法,共三种方法获得JavaScript引擎:名称、文件扩展名、MIME类型 | 学步园...

    package com.sino.java7; import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; i ...

  4. java整型转换为数组_基于java中byte数组与int类型的转换(两种方法)

    java中byte数组与int类型的转换,在网络编程中这个算法是最基本的算法,我们都知道,在socket传输中,发送.者接收的数据都是 byte数组,但是int类型是4个byte组成的,如何把一个整形 ...

  5. PMP 冲突管理常用的五种方法

    PMP 冲突管理常用的五种方法 一.五种常用的方法 • 撤退/回避:从实际或者潜在冲突中退出,将问题推迟到准备充分的时候,或推给其他人 • 缓和/包容:强调一致而非差异(求同存异) • 妥协/调解:为 ...

  6. Python中记住过去(模型状态)的五种方法

    在Python中记住过去(模型状态)的五种方法 从封闭函数和迭代器到状态机Python库 有人说... "那些不能记住过去的人,注定要重复它".G. Santayana, 1905 ...

  7. python中none算变量吗_在python中对变量判断是否为None的三种方法总结

    三种主要的写法有: 第一种:if X is None; 第二种:if not X: 当X为None,  False, 空字符串"", 0, 空列表[], 空字典{}, 空元组()这 ...

  8. ASP.NET中WEB上弹出消息框的N种方法(为了以后方便,转了很多网友的文章!希望不会介意)...

    ASP.NET中WEB上弹出消息框的N种方法 第一个确定之后跳转到另一页面,第二个确定之后返回前一页 Response.Write("<script langage='javascri ...

  9. 解决VMware中虚拟机(centos7)无法上网的一种方法

    解决VMware中虚拟机(centos7)无法上网的一种方法 参考文章: (1)解决VMware中虚拟机(centos7)无法上网的一种方法 (2)https://www.cnblogs.com/cu ...

最新文章

  1. Java中native关键字
  2. 洛谷 1969 积木大赛——水题
  3. linux命令重定向、、 1、 2、 1、 2、
  4. (17万浏览量) .NET Core的介绍
  5. 最幸福的事就是吃饺子
  6. C++作用域、局部变量、全局变量、传引用传值对比的一个例子
  7. 磁盘碎片整理程序的原理是什么?
  8. 虚拟机linux下git clone 报SSL connect error错误
  9. JS之RegExp的使用
  10. 颜色空间转换-从RGB到LCH-亮度饱和度色度
  11. 安装装ankhsvn
  12. cad两直线相交画圆弧_cad制图中两个圆相交于一条直线怎么画
  13. 【JY】浅析消能附加阻尼比
  14. 机器学习代码整理pLSA、BoW、DBN、DNN
  15. mysql 存储树形结构
  16. 万丈高楼平地起 功夫不负有心人
  17. python使用pyecharts绘制地图
  18. python生成的word表格设置内容居中
  19. 深度ghostxp_sp3至尊纯净版 v2013.08
  20. 高等数学复习之二重积分

热门文章

  1. linux 指定库名 登录mysql_数据库学习笔记之MySQL(01)
  2. python多层数组合成一个数组后循环打印出数组内的每一项元素的方法
  3. Python读取文本的三种方式对比
  4. Python 中引入多个模块,包的概念
  5. 三类医械计算机系统要求,三类医疗器械计算机管理系统要求
  6. C语言 socket shutdown()函数(将与 sockfd 关联的套接字上的全双工连接全部或部分关闭)
  7. pytorch torch.nn.Sequential(* args)(嘎哈用的?构建神经网络用的?)
  8. vscode运行虚拟环境virtualenv时报错:\Scripts\Activate.ps1,因为在此系统上禁止运行脚本
  9. python 切片 单冒号的作用[:](批量赋值最小数组单元)
  10. 元素class属性中的空格