kafka auto.offset.reset参数解析
kafka auto.offset.reset参数解析
- 1.latest和earliest区别
- 2.创建topic
- 3.生产数据和接收生产数据
- 4.测试代码
auto.offset.reset关乎kafka数据的读取。常用的二个值是latest和earliest,默认是latest。
如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以。
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
1.latest和earliest区别
- earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。
2.创建topic
# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank
Created topic "tank". # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank
Topic:tank PartitionCount:3 ReplicationFactor:2 Configs: Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
3.生产数据和接收生产数据
[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank
>1
>2
>3
。。。。。。。。。省略。。。。。。。。。
[root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning
1
2
3
4.测试代码
object tank { def main(args: Array[String]): Unit = { val pros: Properties = new Properties pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092") /*分组由消费者决定,完全自定义,没有要求*/ pros.put("group.id", "tank") //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic pros.put("enable.auto.commit", "false") pros.put("auto.commit.interval.ms", "1000") pros.put("max.poll.records", "5") pros.put("session.timeout.ms", "30000") //只有当offset不存在的时候,才用latest或者earliest pros.put("auto.offset.reset", "latest") pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros) /*这里填写主题名称*/ consumer.subscribe(util.Arrays.asList("tank")) val system = akka.actor.ActorSystem("system") system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer)) } object tankTest { def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = { val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3)) if (!records.isEmpty) { for (record <- records) { if (record.value != null && !record.value.equals("")) { myLog.syncLog(record.value + "\t准备开启消费者出列数据", "kafka", "get") } } consumer.commitSync() } } }
}
kafka auto.offset.reset参数解析相关推荐
- Kafka auto.offset.reset
要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...
- Kafka之auto.offset.reset值解析
今日在使用kafka时,发现将 auto.offset.reset 设置为earliest.latest.none 都没有达到自己预期的效果. earliest: 当各分区下有已提交的offset时, ...
- kafka auto.offset.reset设置earliest从头开始消费
auto.offset.reset设置为earliest spring:kafka:bootstrap-servers: 192.168.?.x:9092 consumer:auto-offset-r ...
- 【kafka】kafka 消费速度 小于 日志清理速度 (kafka数据被清理了)会发生什么 auto.offset.reset 参数
文章目录 1.概述 2.segment 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 因为遇到了这个问题[Kafka]Kafka Recor ...
- kafka的auto.offset.reset详解与测试
取值及定义# auto.offset.reset有以下三个可选值: latest (默认) earliest none 三者均有共同定义: 对于同一个消费者组,若已有提交的offset,则从提交的of ...
- kafka_2.11-0.10.2.1中的auto.offset.reset
在使用spark连接kafka消费topic时,发现无论怎么设置,也无法从头开始消费. 查看配置得出auto.offset.reset的以下3种设置及含义: earliest 当各分区下有已提交的of ...
- SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)
如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...
- SparkStreaming整合Kafka(Offset保存在zookeeper上,Spark2.X + kafka0.10.X)
先来一段到处都有的原理(出处到处都有,就不注明了) Streaming和Kafka整合有两种方式--Receiver和Direct,简单理解为:Receiver方式是通过zookeeper来连接kaf ...
- kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例
MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...
- golang 将kafka的offset置为最新
需要解决: 当需要用同一个group_id去消费kafka的partition时,如果程序down掉,可能存在已经消费的数据尚未提交的可能,此时会造成重复消费的问题,且在重启这段时间会产生新的数据,重 ...
最新文章
- jsp实现简单的分页
- Linux network source code
- arp 不同网段 相同vlan_H3C交换机配置VLAN
- SuperEdge — Overview
- windows 系统监视器 以及建议阀值
- Java 异常处理的误区和经验总结--转载
- jquery对Select的操作
- Linux服务-Samba文件服务器部署
- 工作57:element格式化内容
- 使用MFC开发ActiveX控件
- git删除本地tag和远程tag
- Android反编译工具总结
- 职称计算机 将计算机broad_1下的e盘映射为k盘网络驱动器,计算机职称考试题目(网络基础答案)...
- android 锁的使用教程,Android中对象锁
- java人才市场需求分析_人才招聘需求及分析报告.doc
- mysql pga_PGA概述
- java 使用*打印图形(菱形、平行四边形、三角形)
- 烽火通信科技股份有限公司
- 解决客户之间的矛盾-生米煮成熟饭
- 计算机研究与发展 杂志,计算机研究与发展杂志