欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-producer-interceptor/

Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器。本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理。

使用Kafka Producer端的拦截器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法:

  1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作。一般来说最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样也会影响Broker端日志压缩(Log Compaction)的功能。
  2. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被应答(Acknowledgement)之前或者消息发送失败时调用,优先于用户设定的Callback之前执行。这个方法运行在Producer的IO线程中,所以这个方法里实现的代码逻辑越简单越好,否则会影响消息的发送速率。
  3. void close():关闭当前的拦截器,此方法主要用于执行一些资源的清理工作。
  4. configure(Map<String, ?> configs):用来初始化此类的方法,这个是ProducerInterceptor接口的父接口Configurable中的方法。

一般情况下只需要关注并实现onSend或者onAcknowledgement方法即可。下面我们来举个案例,通过onSend方法来过滤消息体为空的消息以及通过onAcknowledgement方法来计算发送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {if(record.value().length()<=0)return null;return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (exception == null) {sendSuccess++;} else {sendFailure ++;}}@Overridepublic void close() {double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO] 发送成功率="+String.format("%f", successRatio * 100)+"%");}@Overridepublic void configure(Map<String, ?> configs) {}
}

自定义的ProducerInterceptorDemo类实现之后就可以在Kafka Producer的主程序中指定,示例代码如下:

public class ProducerMain {public static final String brokerList = "localhost:9092";public static final String topic = "hidden-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", brokerList);properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");Producer<String, String> producer = new KafkaProducer<String, String>(properties);for(int i=0;i<100;i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);producer.send(producerRecord).get();}producer.close();}
}

Kafka Producer不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链,这个拦截链会按照其中的拦截器的加入顺序一一执行。比如上面的程序多添加一个拦截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");

这样Kafka Producer会先执行拦截器ProducerInterceptorDemo,之后再执行ProducerInterceptorDemoPlus。

有关interceptor.classes参数,在kafka 1.0.0版本中的定义如下:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-producer-interceptor/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


Kafka Producer拦截器相关推荐

  1. Kafka详解与总结(七)-Kafka producer拦截器(interceptor)

    1. 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑. 对于producer而言,interceptor ...

  2. 60-50-010-API-Kafka producer拦截器(interceptor)

    文章目录 1.视界 概述 1.视界 概述 Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实 ...

  3. Flume下读取kafka数据后再打把数据输出到kafka,利用拦截器解决topic覆盖问题

    1:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指 ...

  4. java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

    Java kafka如何实现自定义分区类和拦截器 2.producer配置文件指定,具体的分区类 // 具体的分区类 props.put(ProducerConfig.PARTITIONER_CLAS ...

  5. java kafka 分区_Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进 ...

  6. 四十七、Kafka中的拦截器(Interceptor)

    前两篇文章我们分别介绍了Kafka生产者和消费者的API,本文我们介绍一下Kafka中拦截器的知识.关注专栏<破茧成蝶--大数据篇>,查看更多相关的内容~ 目录 一.拦截器介绍 二.拦截器 ...

  7. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  8. 手撸kafka producer生产者的分区器(partition)API

    简介:本篇博客是对kafka produce 生产者分区器的API(Java) 包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果. 目录标题 ...

  9. java生产者实现kafka拦截器

    [RAEDME] 本文中, java客户端作为生产者, centos中consumer线程作为消费者: [1]拦截器简述 1)拦截器是什么? 很明显,为了实现面向切面编码,即在 具体逻辑的上下文 添加 ...

最新文章

  1. 正则表达式(Regular Expression)
  2. ASP.NET Web API
  3. python爬虫基础扫盲之HTTP以及HTTPS
  4. 创新品牌体验团队_如何推动软件团队创新
  5. crc16几种标准校验算法及c语言代码
  6. 微信小程序反编译工具及方法
  7. mysql删除命令历史记录_MySQL历史命令记录清除
  8. 利用EasyPub为Kindle制作mobi格式书籍
  9. HotPower超级CRC计算器与第三方CRC计算器名词解释与对照及操作
  10. JS - 将tree(树形)数据结构格式改为一维数组对象格式(扁平化)
  11. Python多线程编程详解,文章比较长,需耐心浏览
  12. uniapp修改html样式,关于css:uniapp操作dom改变css样式
  13. android studio 出现: Design editor is unavailable until a successful build 问题
  14. mysql备份数据库某表格_mysql数据库的备份以及表格数据之间的复制
  15. 学生表/教师表/课程表/成绩表常见SQL查询
  16. 三高越来越多应该注意什么
  17. WordPress自定义设置管理员和用户头像以及批量设置评论者头像
  18. 远程办公:如何提高自制力?
  19. ArcGIS批量修改CASS标注 | 小技巧,高效率
  20. python+树莓派+方向盘打造4G遥控车——之五 MCP3008方向盘角度监测

热门文章

  1. Linux文件系统和文本编辑器
  2. 写csv文件_机器学习Python实践——数据导入(CSV)
  3. log4j2.xml 的标签 loggers 中 root 的属性 level 指的是什么
  4. 笔试编程常用函数(Java)
  5. 河南版权登记,给自己的“孩子”一个身份证
  6. 未发先侃?对比华为,高通第二代5G调制解调器如何?
  7. 431.chapter10. working with flat files
  8. 使用SpringMVC 的MultipartFile文件上传时参数获取的一个坑
  9. mybatis+dubbo+ springmvc+zookeeper分布式架构
  10. java-信息安全(一)-BASE64,MD5,SHA,HMAC,RIPEMD算法