kafka auto.offset.reset参数解析

  • 1.latest和earliest区别
  • 2.创建topic
  • 3.生产数据和接收生产数据
  • 4.测试代码

auto.offset.reset关乎kafka数据的读取。常用的二个值是latest和earliest,默认是latest。

如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以。

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

1.latest和earliest区别

  1. earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  2. latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

2.创建topic

# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank
Created topic "tank".  # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank
Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:  Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2  Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0  Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1

3.生产数据和接收生产数据

[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank
>1
>2
>3
。。。。。。。。。省略。。。。。。。。。
[root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning
1
2
3

4.测试代码

object tank {  def main(args: Array[String]): Unit = {  val pros: Properties = new Properties  pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")  /*分组由消费者决定,完全自定义,没有要求*/  pros.put("group.id", "tank")  //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic  pros.put("enable.auto.commit", "false")  pros.put("auto.commit.interval.ms", "1000")  pros.put("max.poll.records", "5")  pros.put("session.timeout.ms", "30000")  //只有当offset不存在的时候,才用latest或者earliest  pros.put("auto.offset.reset", "latest")  pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)  /*这里填写主题名称*/  consumer.subscribe(util.Arrays.asList("tank"))  val system = akka.actor.ActorSystem("system")  system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))  }  object tankTest {  def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {  val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))  if (!records.isEmpty) {  for (record <- records) {  if (record.value != null && !record.value.equals("")) {  myLog.syncLog(record.value + "\t准备开启消费者出列数据", "kafka", "get")  }  }  consumer.commitSync()  }  }  }
}

kafka auto.offset.reset参数解析相关推荐

  1. Kafka auto.offset.reset

    要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...

  2. Kafka之auto.offset.reset值解析

    今日在使用kafka时,发现将 auto.offset.reset 设置为earliest.latest.none 都没有达到自己预期的效果. earliest: 当各分区下有已提交的offset时, ...

  3. kafka auto.offset.reset设置earliest从头开始消费

    auto.offset.reset设置为earliest spring:kafka:bootstrap-servers: 192.168.?.x:9092 consumer:auto-offset-r ...

  4. 【kafka】kafka 消费速度 小于 日志清理速度 (kafka数据被清理了)会发生什么 auto.offset.reset 参数

    文章目录 1.概述 2.segment 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 因为遇到了这个问题[Kafka]Kafka Recor ...

  5. kafka的auto.offset.reset详解与测试

    取值及定义# auto.offset.reset有以下三个可选值: latest (默认) earliest none 三者均有共同定义: 对于同一个消费者组,若已有提交的offset,则从提交的of ...

  6. kafka_2.11-0.10.2.1中的auto.offset.reset

    在使用spark连接kafka消费topic时,发现无论怎么设置,也无法从头开始消费. 查看配置得出auto.offset.reset的以下3种设置及含义: earliest 当各分区下有已提交的of ...

  7. SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

    如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...

  8. SparkStreaming整合Kafka(Offset保存在zookeeper上,Spark2.X + kafka0.10.X)

    先来一段到处都有的原理(出处到处都有,就不注明了) Streaming和Kafka整合有两种方式--Receiver和Direct,简单理解为:Receiver方式是通过zookeeper来连接kaf ...

  9. kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

    MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...

  10. golang 将kafka的offset置为最新

    需要解决: 当需要用同一个group_id去消费kafka的partition时,如果程序down掉,可能存在已经消费的数据尚未提交的可能,此时会造成重复消费的问题,且在重启这段时间会产生新的数据,重 ...

最新文章

  1. jsp实现简单的分页
  2. Linux network source code
  3. arp 不同网段 相同vlan_H3C交换机配置VLAN
  4. SuperEdge — Overview
  5. windows 系统监视器 以及建议阀值
  6. Java 异常处理的误区和经验总结--转载
  7. jquery对Select的操作
  8. Linux服务-Samba文件服务器部署
  9. 工作57:element格式化内容
  10. 使用MFC开发ActiveX控件
  11. git删除本地tag和远程tag
  12. Android反编译工具总结
  13. 职称计算机 将计算机broad_1下的e盘映射为k盘网络驱动器,计算机职称考试题目(网络基础答案)...
  14. android 锁的使用教程,Android中对象锁
  15. java人才市场需求分析_人才招聘需求及分析报告.doc
  16. mysql pga_PGA概述
  17. java 使用*打印图形(菱形、平行四边形、三角形)
  18. 烽火通信科技股份有限公司
  19. 解决客户之间的矛盾-生米煮成熟饭
  20. 计算机研究与发展 杂志,计算机研究与发展杂志

热门文章

  1. 阿里云国际版跨境加速,全球加速和Squid 缓存代理解决方案
  2. mac可装云服务器_Mac 下阿里云服务器的配置方法
  3. 【智能手环APP for Android 】01 百度地图展示行动轨迹
  4. 关于mysql的时区(下):如何设置mysql的时区
  5. 两种无密码解锁iPhone锁屏密码的方法
  6. 医院挂号系统代码_人脸识别+身份绑定!高科技精准打击医院号贩子
  7. 3乘3魔方第四步_三阶魔方第四步
  8. 计算机重装系统后黑屏,电脑重装系统后黑屏怎么办
  9. Math.abs()方法
  10. PS调出怀旧雨中特写的非主流照片