1.概述

  目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。

2.内容

  其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了自一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。

  在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。

  当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的Group,Topic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 Group,Topic 和 Partition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。

3.实现

  那我们如何实现获取这部分消费的 offset,我们可以在内存中定义一个Map集合,来维护消费中所捕捉到 offset,如下所示:

protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();

  然后,我们通过一个监听线程来更新内存中的Map,代码如下所示:

private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(consumerOffsetTopic, new Integer(1));KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0);ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();while (true) {MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {try {GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));if (offsetMsg.message() == null) {continue;}OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));offsetMap.put(commitKey, commitValue);} catch (Exception e) {e.printStackTrace();}}}}

  在拿到这部分更新后的offset数据,我们可以通过 RPC 将这部分数据共享出去,让客户端获取这部分数据并可视化。RPC 接口如下所示:

namespace java org.smartloli.kafka.eagle.ipcservice KafkaOffsetServer{string query(1:string group,2:string topic,3:i32 partition),string getOffset(),string sql(1:string sql),string getConsumer(),string getActiverConsumer()
}

  这里,如果我们不想写接口来操作 offset,可以通过 SQL 来操作消费的 offset 数组,使用方式如下所示:

  • 引入依赖JAR
<dependency><groupId>org.smartloli</groupId><artifactId>jsql-client</artifactId><version>1.0.0</version>
</dependency>

  • 使用接口
JSqlUtils.query(tabSchema, tableName, dataSets, sql);

  tabSchema:表结构;tableName:表名;dataSets:数据集;sql:操作的SQL语句。

4.预览

  消费者预览如下图所示:

  正在消费的关系图如下所示:

  消费详细 offset 如下所示:

  消费和生产的速率图,如下所示:

5.总结

  这里,说明一下,当 offset 存入到 Kafka 的topic中后,消费线程ID信息并没有记录,不过,我们通过阅读Kafka消费线程ID的组成规则后,可以手动生成,其消费线程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由于消费者在其他节点,我们暂时无法确定ConsumerLocalAddress。最后,欢迎大家使用 Kafka 集群监控 ——[ Kafka Eagle ],[ 操作手册 ]。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter:https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1):424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢!

热爱生活,享受编程,与君共勉!

本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者




Kafka Offset Storage相关推荐

  1. How Kafka’s Storage Internals Work

    In this post I'm going to help you understand how Kafka stores its data. I've found understanding th ...

  2. springboot手动提交kafka offset

    转载自 springboot手动提交kafka offset enable.auto.commit参数设置成了false 但是测试发现enable.auto.commit参数设置成了false,kaf ...

  3. 【kafka】kafka offset 的存储 (存储zookeeper 与 存储 kafka)

    1.概述 offset即消费消息的偏移值,记录了kafka每个consumergroup的下一个需要读取消费位置,保障其消息的消费可靠性. 2.旧版本offset保存 kafka0.8.1.1以前,o ...

  4. python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset

    spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...

  5. sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式

    Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息.输入流保证每个消息从Kafka ...

  6. 【kafka】kafka Offset commit failed on partition The coordinator is not aware of this member

    文章目录 1.背景 1.2 参考 2. 场景2 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.背景 Offset commit failed on ...

  7. SparkStreaming手动维护Kafka Offset的几种方式

    Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息.输入流保证每个消息从Kafka ...

  8. Spark createDirectStream 维护 Kafka offset(Scala)

    createDirectStream方式需要自己维护offset,使程序可以实现中断后从中断处继续消费数据. KafkaManager.scala import kafka.common.TopicA ...

  9. kafka offset入门理解

    offset是什么? 对于每一个topic, Kafka集群都会维持一个分区日志 每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的log文件.分区中的每一个记录都会分配一个id号来表示顺 ...

最新文章

  1. Python之父重回决策层,未来如何发展?
  2. 最短路径-Dijkstra算法与Floyd算法
  3. sql 优化之关于null 和数据类型
  4. 【KVM】Ubuntu14.04 安装KVM
  5. ssm_maven idea分模块开发
  6. JMeter场景设置叙述
  7. 优雅地关闭资源,try-with-resource语法和lombok@Cleanup
  8. K-means聚类分析算法(二)
  9. 第四章 Python 外壳 :代码结构
  10. Codeforces Round #288 (Div. 2)E. Arthur and Brackets
  11. Wireshark实战分析之IP协议(四)
  12. winsock类型病毒后遗症处理
  13. 【QT】QCustomPlot图表控件
  14. 揪出手机耗电元凶——高德地图缓存数据
  15. WIN10系统进入BIOS的方法(无需开机时按快捷键)
  16. 人脑与计算机之间有什么联系,再谈人脑与电脑的关系
  17. 阿里云大数据助理工程师ACA认证最新笔记(2021)
  18. 体系结构 | 五段流水线 | 流水线技术
  19. python实现酷狗音乐下载,以及利用tk界面可视化
  20. 数据结构课程设计之项目三---算术表达式求解

热门文章

  1. [Java] 蓝桥杯BASIC-18 基础练习 矩形面积交
  2. 蓝桥杯 ALGO-95 算法训练 2的次幂表示
  3. 浅谈管理系统操作日志设计(附操作日志类)
  4. shell 参数,shell与Java 交互参数
  5. package.json 入门
  6. android开发学习 ------- json数据与实体类之间的相互转换
  7. Nginx关于日志记录实例应用
  8. Windows XP SP3细节官方详解
  9. 开机SystemServer到ActivityManagerService启动过程分析
  10. OSPF系列小实验之6:网络类型对邻居关系及路由学习的影响