程序运行中,生产者可以成功生产数据,消费者却一直拿不到存储的数据,运行消费者命令:kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic saturn-importer-br-job-kafka-test --from-beginning  没有问题。

在网上查找资料,发现了一个比较关键的词条:kafka重新消费问题

后来发现是Kafka auto.offset.reset参数的问题;

总结:Kafka auto.offset.reset = earliest 时会从头开始消费,如果group消费成功后,会消费新加入的数据。如果想重新消费,修改一个没用过的group 就可以,或者删除__consumer__offsets里的数据(Kafka之重新消费数据)

参考资料:

转自:https://blog.csdn.net/qq_31617409/article/details/82317267

kafka-consumer配置参数(大部分默认值均可,但是下面这些参数对性能以及可用性影响较大)
参数名称 参数含义
fetch.min.bytes 消费者从服务器获取记录的最小字节数,broker收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么broker将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和broker的工作负载,因为当主题不是很活跃的情况下,就不需要来来回回的处理消息,如果没有很多可用数据,但消费者的CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。
fetch.max.wait.ms

fetch.min.bytes设置了broker返回给消费者最小的数据量,而fetch.max.wait.ms设置的则是broker的等待时间,两个属性只要满足了任何一条,broker都会将数据返回给消费者,也就是说举个例子,fetch.min.bytes设置成1MB,fetch.max.wait.ms设置成1000ms,那么如果在1000ms时间内,如果数据量达到了1MB,broker将会把数据返回给消费者;如果已经过了1000ms,但是数据量还没有达到1MB,那么broker仍然会把当前积累的所有数据返回给消费者。

max.partition.fetch.bytes 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是lMB , 也
就是说,kafkaConsumer.poll() 方法从每个分区里返回的记录最多不超max.partitions.fetch.bytes�0�2指定的字节。如果一个主题有20 个分区和5 个消费者,那么每个消费者需要至少4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩愤,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比broker 能够接收的最大消息的字节数(通过max.message.size 属性配置)大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试,例如,max.message.size设置为2MB,而该属性设置为1MB,那么当一个生产者可能就会生产一条大小为2MB的消息,那么就会出现问题,消费者能从分区取回的最大消息大小就只有1MB,但是数据量是2MB,所以就会导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll()方法
来避免会话过期和发生分区再均衡,如果单次调用poll () 返回的数据太多,消费者需要更多的时间来处理,可能无怯及时进行下一个轮询来避免会话过期。如果出现这种情况, 可以把max.partitioin.fetch.bytes 值改,或者延长会话过期时间。
session.timeout.ms

该属性指定了当消费者被认为已经挂掉之前可以与服务器断开连接的时间。默认是3s,消费者在3s之内没有再次向服务器发送心跳,那么将会被认为已经死亡。此时,协调器将会出发再均衡,把它的分区分配给其他的消费者,该属性与heartbeat.interval.ms紧密相关,该参数定义了消费者发送心跳的时间间隔,也就是心跳频率,一般要同时修改这两个参数,heartbeat.interval.ms参数值必须要小于session.timeout.ms,一般是session.timeout.ms的三分之一,比如,session.timeout.ms设置成3min,那么heartbeat.interval.ms一般设置成1min,这样,可以更快的检测以及恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡(有一种情况就是网络延迟,本身消费者是没有挂掉的,但是网络延迟造成了心跳超时,这样本不该发生再均衡,但是因为网络原因造成了非预期的再均衡),把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩愤-需要更长的时间。

auto.offset.reset 该属性指定了消费者在读取一个没有偏移量后者偏移量无效(消费者长时间失效当前的偏移量已经过时并且被删除了)的分区的情况下,应该作何处理,默认值是latest,也就是从最新记录读取数据(消费者启动之后生成的记录),另一个值是earliest,意思是在偏移量无效的情况下,消费者从起始位置开始读取数据。
enable.auto.commit 指定了消费者是否自动提交偏移量,默认值是true,为了尽量避免重复数据和数据丢失,可以把它设置为false,有自己控制合适提交偏移量,如果设置为true, 可以通过设置 auto.commit.interval.ms属性来控制提交的频率
partition.assignment.strategy

分区分配策略,kafka又两个默认策略,

  1. Range:该策略会把主题的若干个连续的分区分配给消费者
  2. 该策略把主题的所有分区逐个分配给消费者

分区策略默认是:org.apache.kafka.clients.consumer.RangeAssignor=>Range策略

org.apache.kafka.clients.consumer.RoundRobinAssignor=>Robin策略

client.id 表示从客户端发来的消息
max.poll.records 控制单次调用call方法能够返回的记录数量,帮助控制在轮询里需要处理的数据量。

receive.buffer.bytes

+

send.buffer.bytes

socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽

Kafka之重新消费数据

转自:https://blog.csdn.net/usagoole/article/details/82813091

转自:https://my.oschina.net/u/3346994/blog/1859039/

昨天在写一个java消费kafka数据的实例,明明设置auto.offset.reset为earliest,但还是不从头开始消费,官网给出的含义太抽象了。 
earliest: automatically reset the offset to the earliest offset,自动将偏移量置为最早的。难道不是topic中各分区的开始?结果还真不是,具体含义如下:

auto.offset.reset值含义解释

earliest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
latest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
none 
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

以下为测试详细:

1.同分组下测试

1.1测试一

1.1.1测试环境

Topic为lsztopic7,并生产30条信息。lsztopic7详情: 

创建组为“testtopi7”的consumer,将enable.auto.commit设置为false,不提交offset。依次更改auto.offset.reset的值。此时查看offset情况为: 

1.1.2测试结果

earliest 
客户端读取30条信息,且各分区的offset从0开始消费。 
latest 
客户端读取0条信息。 
none 
抛出NoOffsetForPartitionException异常。 

1.1.3测试结论

新建一个同组名的消费者时,auto.offset.reset值含义: 
earliest 每个分区是从头开始消费的。 
none 没有为消费者组找到先前的offset值时,抛出异常

1.2测试二

1.2.1测试环境

测试场景一下latest时未接受到数据,保证该消费者在启动状态,使用生产者继续生产10条数据,总数据为40条。 

1.2.2测试结果

latest 
客户端取到了后生产的10条数据

1.2.3测试结论

当创建一个新分组的消费者时,auto.offset.reset值为latest时,表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。

1.3测试三

1.3.1测试环境

在测试环境二,总数为40条,无消费情况下,消费一批数据。运行消费者消费程序后,取到5条数据。 
即,总数为40条,已消费5条,剩余35条。 

1.3.2测试结果

earliest 
消费35条数据,即将剩余的全部数据消费完。

latest 
消费9条数据,都是分区3的值。 
offset:0 partition:3 
offset:1 partition:3 
offset:2 partition:3 
offset:3 partition:3 
offset:4 partition:3 
offset:5 partition:3 
offset:6 partition:3 
offset:7 partition:3 
offset:8 partition:3

none 
抛出NoOffsetForPartitionException异常。 

1.3.3测试结论

earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 
none 当该topic下所有分区中存在未提交的offset时,抛出异常。

1.4测试四

1.4.1测试环境

再测试三的基础上,将数据消费完,再生产10条数据,确保每个分区上都有已提交的offset。 
此时,总数为50,已消费40,剩余10条 

1.4.2测试结果

none 
消费10条信息,且各分区都是从offset开始消费 
offset:9 partition:3 
offset:10 partition:3 
offset:11 partition:3 
offset:15 partition:0 
offset:16 partition:0 
offset:17 partition:0 
offset:18 partition:0 
offset:19 partition:0 
offset:20 partition:0 
offset:5 partition:2

1.4.3测试结论

值为none时,topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

2.不同分组下测试

2.1测试五

2.1.1测试环境

在测试四环境的基础上:总数为50,已消费40,剩余10条,创建不同组的消费者,组名为testother7 

2.1.2 测试结果

earliest 
消费50条数据,即将全部数据消费完。

latest 
消费0条数据。

none 
抛出异常 

2.1.3测试结论

组与组间的消费者是没有关系的。 
topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。

转自:https://blog.csdn.net/matrix_google/article/details/88658234

设置消费者properties的两个参数

consumer.group.id

properties.setProperty("auto.offset.reset", "earliest”) // latest

注意:

只要不更改group.id,每次重新消费kafka,都是从上次消费结束的地方继续开始,不论"auto.offset.reset”属性设置的是什么

场景一:Kafka上在实时被灌入数据,但kafka上已经积累了两天的数据,如何从最新的offset开始消费?

(最新指相对于当前系统时间最新)

1.将group.id换成新的名字(相当于加入新的消费组)

2.网上文章写还要设置 properties.setProperty("auto.offset.reset", "latest”)

实验发现即使不设置这个,只要group.id是全新的,就会从最新的的offset开始消费

场景二:kafka在实时在灌入数据,kafka上已经积累了两天的数据,如何从两天前最开始的位置消费?

1.将group.id换成新的名字

2.properties.setProperty("auto.offset.reset", "earliest”)

场景三:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "earliest”),consumer会从两天前最开始的位置消费吗?

不会,只要不更改消费组,只会从上次消费结束的地方继续消费

场景四:不更改group.id,只是添加了properties.setProperty("auto.offset.reset", "latest”),consumer会从距离现在最近的位置消费吗?

不会,只要不更改消费组,只会从上次消费结束的地方继续消费

应用:

正式打包上线前应该使用新的group.id,以便于从kafka最新的位置开始消费

只要将group.id换成全新的,不论"auto.offset.reset”是否设置,设置成什么,都会从最新的位置开始消费

kafka消费者参数详解 java读取不到消费者数据相关推荐

  1. java访问本地文件_详解Java读取本地文件并显示在JSP文件中

    详解Java读取本地文件并显示在JSP文件中 当我们初学IMG标签时,我们知道通过设置img标签的src属性,能够在页面中显示想要展示的图片.其中src的值,可以是磁盘目录上的绝对,也可以是项目下的相 ...

  2. kafka配置参数详解

    https://www.cnblogs.com/gxc2015/p/9835837.html

  3. Android 系统(95)---Android build.prop参数详解

    Android build.prop参数详解 前言 build.prop是Android系统中的一个重要的属性文件,它记录了Android系统运行的很多配置信息,当程序运行时需要某种系统状态时,会到该 ...

  4. Android build.prop参数详解

    前言 build.prop是Android系统中的一个重要的属性文件,它记录了Android系统运行的很多配置信息,当程序运行时需要某种系统状态时,会到该模块中进行读取,类似Window中的注册表对少 ...

  5. java 配置文件的路径_详解java配置文件的路径问题

    详解java配置文件的路径问题 详解java配置文件的路径问题 各种语言都有自己所支持的配置文件,配置文件中有很多变量是经常改变的.不将程序中的各种变量写死,这样能更方便地脱离程序本身去修改相关变量设 ...

  6. oracle spool 分隔符_sqlplus--spool命令参数详解

    sqlplus--SPOOL参数详解 Spool是Oracle快速导出数据的工具,是sqlplus的指令,不是sql语法里的东西 一.Spool常用的设置 set arraysize 5000;  / ...

  7. pytorch中DataLoader的num_workers参数详解与设置大小建议

    Q:在给Dataloader设置worker数量(num_worker)时,到底设置多少合适?这个worker到底怎么工作的? train_loader = torch.utils.data.Data ...

  8. java spring启动和终止_springBoot jar启动以停止脚本参数详解

    一.启动脚本 Springboot 项目打成jar包后,在Linux环境上一般有如下几种启动方式: 1. "java -jar XXX.jar " 命令结尾没有 "&am ...

  9. Java线程池七个参数详解

    java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释. 从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize.maximumPoolS ...

最新文章

  1. 梦有感 2009-10
  2. python读取文件_python这么受欢迎,你知道如何以正确的方式来读取文件内容吗
  3. C#总结(四)调用C++动态库
  4. 蓝桥杯 ADV-145 算法提高 铺地毯
  5. 案例 自动办公_国浩分享 | 非诉讼律师办公神器盘点
  6. SpringBoot设置外置tomcat
  7. windows mobile进程查看器开发(二)—— 停止进程
  8. 发那科机器人圆弧指令怎么用_发那科机器人PR指令
  9. openid是什么意思?token是什么意思?
  10. HiC-Pro | HiC数据处理工具
  11. java swing 聊天表情功能的实现(带完整代码)
  12. mysql pt 慢日志_MySQL优化之慢日志分析(Anemometer+Pt-query-digest)
  13. 托管调试助手“LoaderLock”在XXX中检测到故障。其他信息:正尝试在OS加载程序锁内执行托管代码。不要尝试在DllMain或映像初始化函数内运行托管代码,这样做会导致应用程序挂起。
  14. Python猴子摘香蕉问题
  15. 心理学的166个现象---之五
  16. 我对测绘工程专业所学相关软件的看法
  17. java窗体显示字体_文字字体设计窗体 实验!求大神
  18. 从头开始学算法:考研机试题练习(C/C++)–STL使用
  19. 天轰穿典型多层架构留言本项目实战免费下载
  20. 解决浏览器主页被t999.cn劫持

热门文章

  1. 班尼机器人维修方法_OTC机器人示教器维修和常见故障分析
  2. 云服务器1M带宽是什么意思,购买服务器时带宽怎么选择?
  3. CS0619号错误是什么
  4. Linux中动态内存的分配与回收(heap, buddy system, stab)
  5. 耳机对耳朵影响有多大?骨传导耳机对耳朵的影响是最小的吗?
  6. Java校招面试汇总
  7. Wifi P2p连接步骤整理
  8. iphone邮箱加密发送_为什么从iPhone发送的视频质量如此之高?
  9. 关于QQ总提示已被损坏或部分文件丢失解决办法
  10. 10.Unity3D商业游戏源码研究-变身吧主公-PanelPlayInfo