文章目录

  • 概述
  • 使用场景
  • 实战
    • 配置文件
    • 配置类
    • 自定义ConSumerInterceptor
    • 使用


概述

ConsumerInterceptor是Kafka中的一个重要组件,它允许开发人员在Kafka消费者端拦截和修改消息的处理过程。ConsumerInterceptor可以用于实现各种功能,从消息监控到数据转换和错误处理,为开发人员提供了更大的灵活性和可定制性。

ConsumerInterceptor的主要作用是在消息被消费之前和之后对其进行拦截和处理。它可以用于以下几个方面:

  1. 监控:通过ConsumerInterceptor,可以在消息被消费之前和之后记录和监控消息的元数据,例如消息的偏移量、主题、分区等信息。这对于跟踪和分析消息流的健康状况以及性能优化非常有用。

  2. 转换:ConsumerInterceptor还可以用于对消息进行转换和修改。通过拦截消息并对其进行操作,可以在消费者端对消息进行格式转换、数据解析或者其他自定义处理。例如,你可以将消息从一种格式转换为另一种格式,或者对消息进行特定的业务处理。

  3. 错误处理:当消费者在处理消息时发生错误或异常情况时,ConsumerInterceptor可以捕获这些错误并采取适当的措施。你可以在拦截器中实现自定义的错误处理逻辑,例如记录错误日志、发送告警通知或者进行重试操作,从而提高应用程序的可靠性和容错性。


使用场景

使用场景方面,ConsumerInterceptor可以在多种情况下发挥作用,例如:

  1. 监控和统计:你可以使用ConsumerInterceptor来收集和记录消费者端的统计信息,例如消费速率、处理延迟等。这样可以帮助你监控应用程序的性能并进行性能优化。

  2. 数据转换:如果你需要将消息从一种格式转换为另一种格式,例如将JSON消息转换为Avro格式,你可以使用ConsumerInterceptor来实现这个转换过程。

  3. 数据验证:ConsumerInterceptor可以用于验证消息的有效性和完整性。你可以在拦截器中实现验证逻辑,例如检查消息的签名或者校验消息的结构,以确保只有符合要求的消息被消费。

  4. 错误处理和重试:当消费者在处理消息时遇到错误,例如数据库连接失败或者网络故障,你可以使用ConsumerInterceptor来捕获这些错误并采取适当的措施。你可以在拦截器中实现自定义的错误处理逻辑,例如记录错误日志、发送告警通知或者进行消息重试。

总之,ConsumerInterceptor为开发人员提供了在消费者端对消息进行拦截、处理和定制的能力。通过使用ConsumerInterceptor,你可以实现一系列功能,包括监控、数据转换和错误处理,从而更好地控制和管理Kafka消费者端的消息处理过程。


实战

配置文件

spring:kafka:bootstrap-servers: 20.10.110.137:9888 # Kafka服务的地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器acks: 1 # acks=0:表示producer不需要等待任何确认收到的信息。副本将立即加到socket缓冲区并认为已经发送。如果使用此选项,则存在丢失数据的风险,因为服务器在数据到达副本之前可能会崩溃。retries: 0 # 失败重试次数,0表示不启用重试机制batch-size: 16384 # 发送缓冲区大小,按照字节计算linger-ms: 1 # 发送延时,单位毫秒buffer-memory: 33554432 # 发送缓存区的大小,按照字节计算compression-type: gzip # 压缩类型,默认是none,可选snappy、gzip、lz4consumer:#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】auto-offset-reset: earliest#是否开启自动提交enable-auto-commit: false#key的解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#value的解码方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer#消费者组groupidgroup-id: process-group#消费者最大拉取的消息数量max-poll-records: 2000#消费者最大等待时间max-poll-interval-ms: 2000listener:type: batchack-mode: manual # 手动提交concurrency: 12 # 并发数

配置类

package net.zf.module.system.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @author artisan*/
@Slf4j
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServer;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.enable-auto-commit}")private String enableAutoCommit;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${spring.kafka.consumer.group-id}")private String group_id;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.max-poll-interval-ms}")private String maxPollIntervalMs;@Value("${spring.kafka.listener.concurrency}")private Integer concurrency;private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";/*** 消费者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>(32);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );return props;}/*** 消费者批量工厂*/@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);factory.setBatchListener(true);factory.setConcurrency(concurrency);return factory;}/*** 异常处理器** @return*/@Beanpublic ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {return (message, exception, consumer) -> {//            log.error("消息{} , 异常原因{}", message, exception.getMessage());log.error("consumerAwareListenerErrorHandler called");return null;};}}

这段代码是一个用于配置Kafka消费者的Spring配置类。它使用了Spring Kafka库来设置Kafka的消费者配置和相关的监听器。

以下是代码的主要部分的解释:

  1. 通过@Configuration注解将该类标记为一个Spring配置类。
  2. 使用@Value注解注入配置属性,这些属性来自于应用的配置文件(比如application.properties)。
  3. consumerConfigs()方法创建了一个包含Kafka消费者配置信息的props对象,并将其返回。这些配置包括Kafka服务器地址、消费者组ID、序列化/反序列化类等。
  4. batchFactory()方法创建了一个ConcurrentKafkaListenerContainerFactory对象,并设置了相关的属性。它使用了前面定义的消费者配置,并设置了批量消费和并发处理的参数。
  5. consumerAwareListenerErrorHandler()方法创建了一个ConsumerAwareListenerErrorHandler对象,用于处理消费过程中出现的异常。在这个例子中,它只是打印了错误日志。

总体而言,这段代码的目的是配置Kafka消费者的相关属性,包括连接到Kafka服务器的配置、消费者组ID、序列化/反序列化类等。它还定义了一个批量消费的监听器工厂和一个异常处理器。这些配置可以通过注入KafkaListenerContainerFactoryConsumerAwareListenerErrorHandler来在应用中使用。


自定义ConSumerInterceptor

package net.zf.module.system.kafka.interceptor;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;import java.util.Map;/*** @author artisan*/@Slf4j
@Component
public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> {/*** 消息消费前的拦截处理** @param consumerRecords* @return*/@Overridepublic ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {// TODOlog.info("FailureRateInterceptor#onConsume");// 根据设定的规则计算失败率,并进行判断是否跳过消息的消费// 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象 (ConsumerRecords.EMPTY)return consumerRecords;}/*** 消息提交前进行拦截处理** @param map*/@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {log.info("FailureRateInterceptor#onCommit");}/*** 拦截器关闭前进行拦截处理(如果有的话)*/@Overridepublic void close() {log.info("FailureRateInterceptor#close");}/*** 初始化配置(如果有的话)** @param map*/@Overridepublic void configure(Map<String, ?> map) {log.info("FailureRateInterceptor#configure");}
}

onConsume 可以控制 ConsumerRecords, 通过返回null ,可以暂停消费。

这段代码是一个自定义的Kafka消费者拦截器,实现了ConsumerInterceptor接口。拦截器可以在消息消费和提交的过程中插入自定义的逻辑,用于处理消息或拦截操作。

以下是代码的主要部分的解释:

  1. @Slf4j注解用于自动生成日志记录器。
  2. @Component注解将该类标记为Spring组件,使得它可以被自动扫描和注入到应用中。
  3. 实现了ConsumerInterceptor接口,并重写了其中的方法。
    • onConsume()方法在消费者消费消息之前被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
    • onCommit()方法在消息提交之前被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
    • close()方法在拦截器关闭之前被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
    • configure()方法在拦截器初始化配置时被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
  4. 拦截器的具体逻辑还没有实现,而是用// TODO标记了需要填充的部分。根据注释的描述,它可能会根据设定的规则计算消费失败率,并根据判断跳过或继续消费消息。

总体而言,这段代码定义了一个自定义的Kafka消费者拦截器。拦截器可以在消息消费和提交的过程中执行自定义的逻辑。在这个例子中,拦截器的逻辑还没有实现,只是打印了日志信息以表示拦截器的执行。你需要根据需求实现onConsume()方法中的拦截逻辑,以便根据设定的规则处理消息消费的失败率。


使用

package net.zf.module.system.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import net.zf.module.system.entity.AttackMessage;
import net.zf.module.system.executors.factory.MessageExecutorFactory;
import net.zf.module.system.service.es.AttackMessageESService;
import net.zf.module.system.util.constants.KafkaTopicConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;/*** @author artisan*/
@Component
@Slf4j
public class AttackKafkaConsumer {@Autowiredprivate MessageExecutorFactory messageExecutorFactory;@Autowiredprivate AttackMessageESService attackMessageESService;@KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",containerFactory = "batchFactory",errorHandler = "consumerAwareListenerErrorHandler")public void processMessage(List<String> records, Acknowledgment ack)  {log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());try {List<AttackMessage> attackMessages = new ArrayList();records.stream().forEach(record -> {messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);});if (!attackMessages.isEmpty()) {String response = attackMessageESService.addDocuments(attackMessages, false);log.info("AttackKafkaConsumer本次处理的数据总量:{}, 响应结果: {}", attackMessages.size(), response);}} finally {ack.acknowledge();}}
}

这段代码定义了一个名为AttackKafkaConsumer的类,它是一个Kafka消费者。它使用了Spring Kafka提供的@KafkaListener注解来指定消费者的相关配置。

以下是代码的主要部分的解释:

  1. @Component注解将该类标记为Spring组件,使得它可以被自动扫描和注入到应用中。
  2. @Slf4j注解用于自动生成日志记录器。
  3. AttackKafkaConsumer类中注入了MessageExecutorFactoryAttackMessageESService两个依赖,通过@Autowired注解实现自动注入。
  4. @KafkaListener注解标记了processMessage()方法作为Kafka消费者的消息处理方法。
    • topicPattern属性指定了要监听的Kafka主题的模式,使用了常量KafkaTopicConstant.ATTACK_MESSAGE并结合通配符.*
    • containerFactory属性指定了用于创建Kafka监听容器的工厂bean的名称,使用了名为batchFactory的工厂。
    • errorHandler属性指定了用于处理消费者异常的错误处理器的bean的名称,使用了名为consumerAwareListenerErrorHandler的错误处理器。
  5. processMessage()方法是消息的实际处理逻辑。它接收一个List<String>类型的消息记录和一个Acknowledgment对象作为参数。
    • 首先,它记录了当前线程ID和本次拉取的数据总量的日志信息。
    • 然后,它创建了一个空的AttackMessage列表,用于存储处理后的消息。
    • 使用records.stream().forEach()遍历每条消息记录,并通过messageExecutorFactory调用process()方法来处理每条记录,同时将处理结果添加到attackMessages列表中。
    • 在处理完所有消息后,如果attackMessages列表不为空,将调用attackMessageESServiceaddDocuments()方法将消息添加到Elasticsearch中,并记录处理的数据总量和响应结果的日志信息。
    • 最后,在finally块中调用ack.acknowledge()手动确认消费完成。

总体而言,这段代码定义了一个Kafka消费者类AttackKafkaConsumer,并使用@KafkaListener注解指定了监听的主题、容器工厂和错误处理器。processMessage()方法是处理消息的具体逻辑,它遍历消息记录并调用适当的执行器进行处理,最后将处理结果添加到列表中,并通过Elasticsearch服务将消息存储到数据库中。消费完成后,手动确认消息的消费。

Apache Kafka - ConsumerInterceptor 实战 (1)相关推荐

  1. DataPipeline |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优

    胡夕,<Apache Kafka实战>作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM.搜狗.微博等公司.国内活跃的Kafka代码贡献者. 前言 虽然目前Apache ...

  2. 《Apache Kafka实战》读书笔记-调优Kafka集群

    <Apache Kafka实战>读书笔记-调优Kafka集群 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.确定调优目标 1>.常见的非功能性要求 一.性能( ...

  3. 《Apache Kafka 实战》读书笔记-认识Apache Kafka

    <Apache Kafka 实战>读书笔记-认识Apache Kafka 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.kafka概要设计 kafka在设计初衷就是 ...

  4. Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆)

    Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆) 认识AK 快速入门 安装和启动 小案例 消息引擎系统 消息引擎范型 AK的概要设计 吞吐量/延时 消息持久化 负载均衡和故障转移: 伸缩性 ...

  5. 【Flink实战系列】Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/

    java.lang.AbstractMethodError: Method flink/stream/deserialization/PoJoDeserializationSchema.deseria ...

  6. Apache Kafka 在 vivo 的实战

    作者:vivo互联网服务器团队-Yang Yijun 一.Kafka应用 本文主要总结当Kafka集群流量达到 万亿级记录/天或者十万亿级记录/天  甚至更高后,我们需要具备哪些能力才能保障集群高可用 ...

  7. Apache Kafka:下一代分布式消息系统

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  8. 大数据开发hadoop核心的分布式消息系统:Apache Kafka 你知道吗

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  9. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

最新文章

  1. 【LeetCode 剑指offer刷题】字符串题6:67 把字符串转成整数
  2. springboot和kafka集成
  3. cxf 服务端soap报文_使用Apache CXF开发SOAP Web服务
  4. LeetCode MySQL 1398. 购买了产品A和产品B却没有购买产品C的顾客
  5. linux网站465端口是什么端口,发送端口25,465,587端口疑问解答
  6. 栈的复习(加减乘除表达式求值)
  7. matlab光束,matlab仿真光束的传输特性
  8. 汇编 内存段划分和寄存器
  9. 百度地图经纬度获取标点与城市编码
  10. php apache mpm,Apache的三种MPM模式比较:prefork,worker,event
  11. 坚果云 我的电脑图标_坚果云使用教程
  12. 如何用windows xp自带的画图工具画箭头
  13. 国外教育邮箱购买?国外邮箱哪个好?
  14. 【C++】代码实现:数据线性平滑算法:3点线性平滑、5点(1次、2次、3次)线性平滑、7点(1次、2次)线性平滑
  15. android模拟器+文件传输,夜神安卓模拟器怎么和电脑互传文件_夜神模拟器和电脑互传文件的教程-系统城...
  16. windows系统用cmd命令开启WiFi共享功能
  17. 获取b站某个up的视频aid、cid
  18. macOS 入门指南
  19. 【MacM1+PyCharm+PyQt5】记录一次Pycharm有Pyqt5环境但无代码提示的解决流程
  20. VC无负担实现XP风格界面 [转]

热门文章

  1. Fakeapp2.2.0安装图文实录-见坑填坑
  2. Chia官方矿池测试版正式上线!?
  3. 安卓手机屏幕共享给电脑操作的几款软件
  4. 这种情况,你会不会离职?
  5. Web —— 单页面和多页面模式
  6. 使用NODEJS+REDIS开发一个消息队列以及定时任务处理
  7. C语言中的signal函数
  8. 典型可编程接口芯片及应用
  9. 操作系统——存储器管理
  10. ambari全攻略流程,开发ambari(四)