转:http://blog.csdn.net/looklook5/article/details/42008079

之前在做Kafka 整合Storm的时候,因为对Kafka 不是很熟,考虑过这样的一个场景问题,针对一个Topic,Kafka消息日志中有个offset信息来标注消息的位置,Storm每次从kafka 消费数据,都是通过zookeeper存储的数据offset,来判断需要获取消息在消息日志里的起始位置。

那么我们想,这个Offset 是消息在日志里是一个什么样的位置,是绝对位置还是相对位置?而Kafla 有个参数log.retention.hours会根据设定的小时,来清理日志文件。这样就可能会有这样的一个问题,针对一个Topic,Kafka 生产数据后,消费者消费信息后,此时的消息的offset是一个高位,比如100,消费者在消费完会记录这个offset准备下个数据的获取。而当系统时间达到参数log.retention.hours设定的时间后,kafka会自动删除这个Topic的缓存日志,那么这个时候新加入10条消息,消息的offset 是重新开始还是从删除日志前的Offset 开始?如果是前者,这个时候消费者因为记录消费这个Topic信息的Offset 仍在高位,那么他就获取不到在这个Offset前的新加入数据,这样就比较麻烦了。而后者,offset又是怎么记录消息相对位置的从而消费者一直消费到数据,无论系统怎么处理日志。这个是Storm 消费Kafla数据的时候必须要确认的问题。

所以做了一个针对性测试。

log.retention.hours设置为1个小时,然后重启kafka,创建一个测试Topic,然后往这个Topic里生产点数据,然后消费者那边也有输出,保证程序通顺正常。观察参数log.dirs=/tmp/kafka-logs目录下对应的话题目录。

下图是Topic下面的消息日志。

然后等一个小时后,继续观察日志

我们发现之前的消息日志被打上deleted 标志,然后并生成了新的日志。且日志名称改变了。

这个时候我们再生产数据会发现Kafka消费者还是能够正常输出数据的。那么之前假设offset是消息日志的绝对位置是不成立的。

官网5.5 Log的介绍,http://kafka.apache.org/documentation.html#introduction这里有对kafka的消息日志有详细的说明,其中也说到Offset的内容。

如果英语阅读困难,可以看这篇文章http://my.oschina.net/frankwu/blog/305010

下面是我是摘录的。

[plain] view plain copy
  1. 日志
  2. 如果一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了一序列"log entries"(日志条目),每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

[plain] view plain copy
  1. 其中每个partiton中所持有的segments列表信息会存储在zookeeper中.
  2. 当segment文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件;当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时如果"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件.如果broker失效,极有可能会丢失那些尚未flush到文件的消息.因为server意外失败,仍然会导致log文件格式的破坏(文件尾部),那么就要求当server启动时需要检测最后一个segment的文件结构是否合法并进行必要的修复.
  3. 获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.
  4. 日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式.

从文章中可以看出,消息日志名是最小offset位置,消息所在位置加上文件名的offset就是消息的offset位置,而系统没生成一个新的日志后会就将最后的offset作为新日志文件的文件名。我们可以认为kafka 消息日志里的offset实际就相当于是一个增量序列索引。那样我们就不用纠结消费数据的时候会不会丢失,而可以安心关注Storm的业务问题了

kafka的offset是个什么鬼。。相关推荐

  1. Kafka auto.offset.reset

    要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...

  2. flink 写kafka_flink消费kafka的offset与checkpoint

    生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...

  3. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  4. 【Kafka】kafka Current offset xxx for partition xxx out range

    文章目录 1.背景 1.背景 kafka报错 kafka Current offset xxx for partition xxx out range 该问题和以下2个问题有所关系 [Kafka]ka ...

  5. 一次kafka的offset回退事件及相关知识点

    一次kafka的offset回退事件及相关知识点 原文链接:https://blog.csdn.net/lkforce/article/details/83384747

  6. SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

    如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...

  7. 聊聊kafka consumer offset lag increase异常

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...

  8. java 获取kafka lag,聊聊kafka consumer offset lag的监控

    序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...

  9. kafka查询offset生产者offset计算消费offset计算

    本文目录 kafka查询offset&生产者offset计算&消费offset计算 1.简介 2.需求背景 3.前期准备 4.获取kafka生产者的offset以及消费者的offset ...

最新文章

  1. [Python3网络爬虫开发实战] 7-动态渲染页面爬取-4-使用Selenium爬取淘宝商品
  2. oracle 清空表数据的2种方式及速度比较
  3. Bezier(贝塞尔曲线)
  4. 算法的优缺点_机器学习算法优缺点 amp; 如何选择
  5. DOM-14 【实战】解决事件代理和鼠标移动事件的窘态
  6. NXP(I.MX6uLL)DDR3实验——DDR3重要时间参数、时钟配置与原理图简析
  7. 工业以太网交换机选机攻略
  8. Win11系统使用Excel表格的时候很卡怎么办
  9. python求100以内的素数和(只作新手参考)
  10. Linux 进程间通信(IPC)---大总结
  11. oracle 频繁 tm tx,oracle频繁出现TX/TM锁问题
  12. LINUX查询版本情况
  13. vmware workstation server 服务无法启动
  14. python考拉兹猜想_Python练习题 042:Project Euler 014:最长的考拉兹序列
  15. 微软MSDN Web cast系列视频教程集锦
  16. WEKA( OneR,过拟合)
  17. mysql源码安装详解
  18. 关于UI使用ContentSizeFitter组件同步立即响应
  19. text/css什麼意思
  20. Flex 布局教程:语法篇

热门文章

  1. 2.2 多线程:concurrent.futures实现线程池
  2. 【OpenGL】glm库的配置
  3. 首涂第八套苹果CMSv10自适应视频模板原创4种颜色风格一键切换
  4. DNS测试bat脚本分析
  5. java中黑点是什么意思_[Java教程]input输入密码变黑点密文
  6. ospfdr选举规则_OSPF如何选举DR/BDR规则
  7. 炫“库”行动-人大金仓有奖征文-KingbaseES V8R6 手工创建主备流复制集群案例
  8. 我该如何向我的朋友解释“01背包”问题?
  9. linux如何切换到设备,如何编写Linux设备驱动程序(转)
  10. (原创)CRC计算流程分析(RefIn,Init,RefOut,XorOut)