kafka消息处理类:MessageAndOffset

case class MessageAndOffset(message: Message, offset: Long) {/*** Compute the offset of the next message in the log*/def nextOffset: Long = offset + 1
}

其主构造方法的参数offset:是这条message上一条的offset  =》 lastOffset = messageAndOffset.offset();

其主构造方法的参数message:是需要处理的消息,其offset=lastOffset+1     =    messageAndOffset.nextOffset()

lastOffset (即MessageAndOffset.offset)是已经消费过的offset数据(可能是无效message的offset)

kafka  consumer 消费数据是通过lastOffset来处理消息message (message.offset=lastOffset+1)

通过lastOffset来判断是否有未处理的数据,若有消息pull下来,封装在MessageAndOffset(message,lastOffset)中

kafka offset是从1开始计数,0是没有数据

创建topic时,创建partition文件,topic没有数据,定义其lastOffset=0,文件名是0

partition的文件名称是以存入数据的第一条message的lastOffset做为文件名

//从zookeeper中获取,获得的是订阅者消费过的offset(即是当前message的lastOffset);
//message对应的真实offset是(lastOffset+1)
Long offset = kafkaOffset.getOffset(leadBroker, i);
//获得是最旧有效message的lastOffset;可能是通过文件名成来获取的(待源码校验)
Long et=      kafkaOffset.getEarliestOffset(consumer,i);
//topic的最新offset,是有效值。
Long ft=      kafkaOffset.getLatestOffset(consumer,i);

代码实现:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {long lastOffset = messageAndOffset.offset();if (lastOffset < readOffset) {log.warn("Topic:" + topic + " and Partition:" + partition + "Found an  old offset: " + lastOffset + " Expecting: " + readOffset);continue;}//lastOffset是上次已经消费过的message.offset//每个message的真实offset=(lastOffset+1)readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);JSONObject value = JSONObject.parseObject(new String(bytes, "utf-8"));list.add(value);
}

kafka offset 机制相关推荐

  1. Kafka设计解析(八)- Kafka事务机制与Exactly Once语义实现原理

    http://www.infoq.com/cn/articles/kafka-analysis-part-8# 写在前面的话 本文所有Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本 ...

  2. springboot手动提交kafka offset

    转载自 springboot手动提交kafka offset enable.auto.commit参数设置成了false 但是测试发现enable.auto.commit参数设置成了false,kaf ...

  3. kafka分区机制详解

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 kafka分区机制 分区个数选择 分区写入策略 轮询策略 随机策略 按键保存策略 本文小结 kafka分区机制 分区机制是k ...

  4. kafka 同步提交 异步_详解Kafka设计架构核心——Kafka副本机制详解

    所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.副本机制有什么好处呢? 1. 提供数据冗余.即使系统部分组件失效,系统依然 ...

  5. kafka 消费机制

    一.本文介绍了kafka的基础概念:topic.partition.broker.consumer.consumer group和producer. Topic 一个Topic代表了一类资源,一种事件 ...

  6. Kafka 心跳机制 重复消费

    kafka 心跳机制 Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了.心跳超时会导致消息重复消费. 在 ...

  7. Kafka详解(五)Kafka副本机制

    所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.它的作用主要有以下几点: 提供数据冗余.即使系统部分组件失效,系统依然能够 ...

  8. 6张图阐述Kafka心跳机制(时间轮算法的具体运用)

    Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重 ...

  9. Kafka | Kafka副本机制详解

    今天我要和你分享的主题是:Apache Kafka 的副本机制. 所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.副本机制 ...

  10. 大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)

    文章目录 一.概述 1)SASL认证概述 2)Delegation Token认证概述 3)SSL认证概述(本章实现) 二.各种安全认证机制对比和使用场景 三.Kafka SSL认证实现 1)创建ss ...

最新文章

  1. Android 画虚线 DashPathEffect 使用详解
  2. Hashcat从入门到入土(一)
  3. redis集群添加master节点
  4. 3月11日Linux课程笔记
  5. mysql indexkey提取,MySQL元数据获取基础笔记day06
  6. 华为oj题目c语言,【华为OJ平台习题】
  7. 在服务器 和 虚拟机中 查看代码 samba source insight
  8. discuz x2.5后台界面html版本
  9. 压缩文件服务器返回出错,解压缩多部分文件 - 错误的zipfile偏移量(本地头信号):4...
  10. PMP : PMP备考心得 (8)
  11. 嵌入式系统工程师的职位要求,你还差多少?
  12. 阳关林场的前世今生:结合锁眼卫星(Keyhole[KH])发掘“阳关林场”55年来的变化
  13. 微医网爬虫(一) java实现
  14. 给el-input type=“number“的文本框设置默认值
  15. 北京市公安局“人工智能安全研究中心”和“关键信息基础设施保护中心”2022年公开招聘工作人员公告
  16. 计算机毕业设计Java医院药品管理系统(系统+源码+mysql数据库+Lw文档)
  17. asp.net夜话之八:数据绑定控件
  18. 如何搭建一个Vue项目和配置环境
  19. Shiro集成Spring框架并且多Realms报错No realms have been configured! One or more realms must be ……解决办法
  20. firebird优化笔记

热门文章

  1. 计算机安全群,大开眼界||斯坦福大学信息安全课程群
  2. 【以太网交换安全】---端口安全及MAC地址飘移防止与检测
  3. python根据生日自动批量产生中奖双色球
  4. Audio Effect
  5. bootmgr is missing
  6. SDCC和Keil之stc89c52资料(纪念51单片机40周年)
  7. android CircleIndicator 实现引导页
  8. 杨强教授漫谈《西部世界》、生成式对抗网络及迁移学习
  9. andriod studio 开发
  10. 基于天地图热力图及区域划分