我试图使用< KStream> .process()与Time

Windows.of(“name”,30000)批量处理一些KTable值并发送它们.似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区.

我已经尝试提高轮询频率和提交间隔以避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");

config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

不幸的是,这些错误仍在发生:

(很多这些)

ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

其次是:

INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1

WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

显然,我需要更频繁地将心跳发送回服务器.怎么样?

我的拓扑结构是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream lines = kStreamBuilder.stream(TOPIC);

KTable, String> kt = lines.aggregateByKey(

new DBAggregateInit(),

new DBAggregate(),

TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

KTable每隔30秒按键对值进行分组.在Processor.init()中,我调用context.schedule(30000).

DBProcessorSupplier提供DBProcessor的实例.这是AbstractProcessor的一个实现,其中提供了所有覆盖.他们只做LOG,所以我知道每个人都被击中.

这是一个非常简单的拓扑结构,但很明显我在某个地方错过了一个步骤.

编辑:

我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案.我喜欢在客户端退出/死亡时很快就可以使用分区的概念.

编辑:

为了简化问题,我从图中删除了聚合步骤.它现在只是消费者 – >处理器(). (如果我将消费者直接发送到.print(),它会很快工作,所以我知道没关系). (类似地,如果我通过.print()输出聚合(KTable),它似乎也可以.

我发现.process() – 应该每隔30秒调用一次.punctuate()实际上阻塞了可变长度的时间并且随机输出(如果有的话).

进一步:

我将调试级别设置为’debug’并重新启动.我看到很多消息:

DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord

但是.punctuate()函数中的断点没有被击中.所以它做了很多工作,但没有让我有机会使用它.

java 连接kafka超时_java – Kafka KStreams – 处理超时相关推荐

  1. java 连接redis失败_java 连接Redis问题及demo

    java连接linux Redis遇到的问题 昨天在Linux搭建了Redis服务,今天使用java连接测试了一下.要想使用java连接redis服务,就离不开jedis-2.6.1.jar.使用je ...

  2. java连接access驱动_Java 连接Access

    Java 连接Access 第一次使用连接Access数据库, 记录一下遇到的坑 Access驱动下载地址 http://pan.baidu.com/s/1o8ltTfc 不使用WINDOW的建立数据 ...

  3. java 连接多实例_Java如何连接多实例SQL Server?

    异常产生 Java连接SQL Server一般字符串:jdbc:sqlserver://127.0.0.1:1433;DatabaseName=数据库名; 大家应该知道:如果机器上安装的是SQLSer ...

  4. java连接sqlserver使用_java连接sqlserver

    用Java连接SQL Server2000数据库有多种方法,下面介绍其中最常用的两种(通过JDBC驱动连接数据库). 1. 通过Microsoft的JDBC驱动连接.此JDBC驱动共有三个文件,分别是 ...

  5. java连接mysql乱码_java链接mysql 中文乱码

    {转!} 背景: 由于最近在开发一个APP的后台程序,需要Java连接远程的MySQL数据库进行数据的更新和查询操作,并且插入的数据里有中文,在插入到数据库后发现中文都是乱码.网上查了很多教程,最后都 ...

  6. modbus调试时间超时_Java调试器和超时

    modbus调试时间超时 在代码中存在超时的情况下如何使用调试器. 我的调试器王国! 因此,您一直忙于编写一个项目,一切顺利,直到出现错误为止. 您可以进入开发人员的工具箱,然后拔出调试器. 很棒–您 ...

  7. java kafka 分区_Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进 ...

  8. java连接redis不稳定_java相关:jedispool连redis高并发卡死的问题

    java相关:jedispool连redis高并发卡死的问题 发布于 2020-6-30| 复制链接 本篇文章主要介绍了jedispool连redis高并发卡死的问题,小妖觉得挺不错的,现在分享给大家 ...

  9. java 连接池技术_java数据库连接池技术原理(浅析)

    在执行数据库SQL语句时,我们先要进行数据连接:而每次创建新的数据库的连接要消耗大量的资源,这样,大家就想出了数据库连接池技术.它的原理是,在运行过程中,同时打开着一定数量的数据库连接,形成数据连接池 ...

最新文章

  1. 最近面试了一位4年的Java,什么都不知道!还自认为很牛逼...
  2. pandas 排序 给excel_懂Excel轻松入门Python数据分析包pandas(二十五):循环序列分组...
  3. 摩尔定律行将就木?AI灵丹助其返老还童!(附论文)
  4. Android之View的绘制流程解析
  5. Markdown会干掉Html吗?
  6. Java:选择排序法
  7. python作业表达式求值_用Python3实现表达式求值
  8. HTML表格基础学习
  9. 在哪里编写写php,php扩展编写
  10. 业务gis 怎么让别的开发人员不需要懂gis就可以搞开发? (五)
  11. HappyAA服务器部署笔记1(nginx+tomcat的安装与配置)
  12. [洛谷P1501][国家集训队]Tree II(LCT)
  13. python 使用 config 文件
  14. ElasticSearch 多字段分组求和
  15. oracle 软件证书错误,IE上ORACLE OEM 证书错误 , 导航阻止,无法”继续浏览此网站”...
  16. 用ClickHouse在GitHub上数星星
  17. OpenCV:03图像的算数运算
  18. 给定出生年月日及现在年月日,计算天数
  19. 视频大小与码率计算及像素及分辨率问题
  20. mongdb权限问题

热门文章

  1. realarm Android系统编译后内核无法启动的解决方法
  2. s4-介质访问控制子层-1 MAC子层
  3. linux解决windows应用程序,关于Linux下使用Windows应用程序的尝试总结
  4. Apache 的管理及优化web
  5. 地壳中元素含量排名记忆口诀_【中考化学】初中化学记忆性知识点03-生活中的化学-生活常识...
  6. Gartner魔力象限到底有何“魔力”?
  7. 人工智能们再也不用担心撞上玻璃橱窗了
  8. 揭秘 SIGCOMM 20‘ 论文:阿里云网络洛神 VTrace 系统
  9. 万万没想到,枯燥的“机器学习”还可以这样学!
  10. dbms_metadata遇到ORA-31603的解决方案