bug背景:使用mysql cdc kafka 处理消息,当mysql删除一条记录时会触发此bug
错误信息:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String)]
Bean [com.flex.notify.listener.NotifyListener@ce6cd62]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] , failedMessage=GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4e24013e, headers={kafka_offset=1807, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@852319e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey={"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"id"}],"optional":false,"name":"debezium_connector.flex.pre_order.Key"},"payload":{"id":"062c68e78adfa1e5c2ef130603bad10e"}}, kafka_receivedTopic=debezium_connector.flex.pre_order, kafka_receivedTimestamp=1648020906745, kafka_groupId=notify1}]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] , failedMessage=GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4e24013e, headers={kafka_offset=1807, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@852319e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey={"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"id"}],"optional":false,"name":"debezium_connector.flex.pre_order.Key"},"payload":{"id":"062c68e78adfa1e5c2ef130603bad10e"}}, kafka_receivedTopic=debezium_connector.flex.pre_order, kafka_receivedTimestamp=1648020906745, kafka_groupId=notify1}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2604) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2565) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2492) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2402) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2281) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1955) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) [spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) [spring-kafka-2.8.3.jar:2.8.3]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_302]at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_302]at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_302]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_302]Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Traceat org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:374) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:355) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2584) [spring-kafka-2.8.3.jar:2.8.3]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:122) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.3.jar:2.8.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2584) [spring-kafka-2.8.3.jar:2.8.3]... 12 common frames omitted

解决方法:加上 @Payload(required = false)

原因:

这里发现删除一条mysql记录后 kafka会产生两个消息,第一个消息正常有内容,第二个消息为null

具体原因需要看下cdc实现,这里不做说明了

springboot消费kafka Listener method could not be invoked with the incoming message相关推荐

  1. rabbitmq报错:Listener method could not be invoked with the incoming message

    rabbitmq报错:Listener method could not be invoked with the incoming message 错误翻译:无法使用传入消息调用监听器方法 一个top ...

  2. Listener method could not be invoked with the incoming message消息队列RabbitMQ项目启动报错及监听队列报错

    Listener method could not be invoked with the incoming message 报错如图: 说是不能调用监听器的方法,问题原因是Channel依赖导错 应 ...

  3. springboot消费kafka设置topics 以及 groupId

    关于kafka的这些概念和理论请见这篇博客 https://www.jianshu.com/p/d3e963ff8b70, 在下只简单阐述一下自己遇到的问题以及解决办法  由于之前自己配置的maven ...

  4. springboot 整合kafka 实现生产,消费数据

    一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...

  5. 解决Kafka消费端错误:o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

    简单记录下使用kafka遇到的问题,万一也会有小伙伴遇到了呢. 程序端使用springboot服务消费kafka,某天出现消息大量堆积,经过定位到错误日志如下: 2020-05-12 10:22:36 ...

  6. SpringBoot 集成 kafka,基于注解批量消费设置

    网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:[弄nèng - Kafka]应用篇(三) -- Springboot整合Kafka(批量消费)_司马缸砸缸了- ...

  7. springboot集成kafka消费手动启动停止

    项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...

  8. springboot整合kafka实现批量消费

    linux安装kafka:https://blog.csdn.net/qq_37936542/article/details/109453249 kafka版本:kafka_2.12-2.6.0.tg ...

  9. SpringBoot集成Kafka低版本和高版本

    SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...

  10. SpringBoot集成Kafka

    SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...

最新文章

  1. Python程序员的“避坑”指南
  2. CTO集体怒吼:我到底要不要继续写代码(上篇)
  3. VSTO Office二次开发对PPT自定义任务窗格测试
  4. AI芯片怎么降功耗?从ISSCC2017说起
  5. python 3des加密_python – 使用3DES和CBC破坏我的加密数据的前8个字节
  6. 【报错笔记】在做struts项目时,所有项目代码没问题但就是报404错误。
  7. 已知法向量 求投影_MIT—线性代数笔记15 子空间投影
  8. sql server的密码采用自带什么密码技术存储_【技术分享】浅谈MYSQL 8.0新特性
  9. POJ 3186 Treats for the Cows dp
  10. 蚂蚁金服:超大规模分布式计算系统 + 超大规模分布式优化算法
  11. .net vue漂亮登录界面_一文弄懂前端框架Vue 的核心——数据绑定,为升职涨薪加分
  12. spring-speed-up.xml
  13. 小米手机修改ip代理服务器,小米路由器3 IP地址更改方法【图文】
  14. android fresco的底层,详解Android之图片加载框架Fresco基本使用(一)
  15. zabbix_proxy代理服务器搭建教程
  16. 人间不正经生活语录(一)
  17. 基于vi构建强大的IDE
  18. 东方博宜OJ 1043 - 【入门】行李托运价格
  19. 苹果手机无法更新系统问题
  20. union和union all 的区别

热门文章

  1. Cascading Style Sheet层叠样式表
  2. an error occurred while creating opening the c++ browsing database 解决办法
  3. android 树莓派 图片,Android Things:树莓派3上手就是这么简单
  4. Task5 | 结构方程 | “老年病”与身份的关系
  5. 安装程序无法打开注册表项 UNKNOWN\Components\…的简单解决办法
  6. New Phy: 中科院城环所朱永官等综述全球变化对叶际微生物组的影响
  7. Code.V光学设计学习(三)——公差分析
  8. R语言使用sort函数对向量数据进行排序、默认从小到大升序排序、设置decreasing为真进行降序排序
  9. win10多用户同时登陆
  10. 撤销性CP-ABE方案研究现状总结 - 2021