Kafka详解与总结(七)-Kafka producer拦截器(interceptor)
1. 拦截器原理
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
2. 拦截器举例
(1)增加时间戳拦截器
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 创建一个新的record,把时间戳写入消息体的最前部return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),System.currentTimeMillis() + "," + record.value().toString());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}
}
(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{private int errorCounter = 0;private int successCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计成功和失败的次数if (exception == null) {successCounter++;} else {errorCounter++;}}@Overridepublic void close() {// 保存结果System.out.println("Successful sent: " + successCounter);System.out.println("Failed sent: " + errorCounter);}
}
(3)producer主程序
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer {public static void main(String[] args) throws Exception {// 1 设置配置信息Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2 构建拦截链List<String> interceptors = new ArrayList<>();interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "first";Producer<String, String> producer = new KafkaProducer<>(props);// 3 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);producer.send(record);}// 4 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}
}
3)测试
(1)在kafka上启动消费者,然后运行客户端java程序。
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic first 1501904047034,message01501904047225,message11501904047230,message21501904047234,message31501904047236,message41501904047240,message51501904047243,message61501904047246,message71501904047249,message81501904047252,message9
(2)观察java平台控制台输出数据如下:
Successful sent: 10
Failed sent: 0
转载于:https://www.cnblogs.com/htkj/p/10941479.html
Kafka详解与总结(七)-Kafka producer拦截器(interceptor)相关推荐
- 60-50-010-API-Kafka producer拦截器(interceptor)
文章目录 1.视界 概述 1.视界 概述 Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实 ...
- kafka详解三:开发Kafka应用
一.整体看一下Kafka 我们知道,Kafka系统有三大组件:Producer.Consumer.broker . producers 生产(produce)消息(message)并推(push)送给 ...
- Kafka详解(五)Kafka副本机制
所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.它的作用主要有以下几点: 提供数据冗余.即使系统部分组件失效,系统依然能够 ...
- kafka详解及集群环境搭建
一.kafka详解 安装包下载地址:https://download.csdn.net/download/weixin_45894220/87020758 1.1Kafka是什么? 1.Kafka是一 ...
- python怎么设置七牛云_详解Python在七牛云平台的应用(一)
七牛云七牛云是国内领先的企业级云服务商.专注于以数据为核心的云计算业务,围绕富媒体场景推出了对象存储.融合CDN.容器云.大数据.深度学习平台等产品,并提供一站式视频云解决方案,同时打造简单,可信赖的 ...
- Tensorflow 2.x(keras)源码详解之第七章:keras中的tf.keras.layers
大家好,我是爱编程的喵喵.双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中.从事机器学习以及相关的前后端开发工作.曾在阿里云.科大讯飞.CCF等比赛获得多次Top名次.现 ...
- 《视频直播技术详解》系列之七:现代播放器原理
七牛云于 6 月底发布了一个针对视频直播的实时流网络 LiveNet 和完整的直播云解决方案,很多开发者对这个网络和解决方案的细节和使用场景非常感兴趣. 结合七牛实时流网络 LiveNet 和直播云解 ...
- 「视频直播技术详解」系列之六:现代播放器原理
关于直播的技术文章不少,成体系的不多.我们将用七篇文章,更系统化地介绍当下大热的视频直播各环节的关键技术,帮助视频直播创业者们更全面.深入地了解视频直播技术,更好地技术选型. 本系列文章大纲如下: ...
- 桌面widget详解(四)——桌面音乐播放器(实战)
前言:这将是这个系列的最后一篇了,我写这几篇文章也是累的快不行了,再写就真的要吐了,言归正转,前面三篇已经把widget中涉及到的基本知识基本上讲完了,今天我们就做一个小例子,看看桌面音乐播放器wid ...
最新文章
- office 2007打字慢问题
- startsWith(),endsWith()的作用,用法,判断字符串a 是不是以字符串b开头或结尾
- java中构造器快捷方式_java 构造器 (构造方法)
- 微软将人工智能嵌入Windows 10更新
- 【转载】App.config/Web.config 中特殊字符的处理
- 暑期训练日志----2018.8.13
- TM4C123核心板焊接须知
- 操作系统系列题型分析(更新中~)
- apipost如何设置断言
- iis反向代理tomcat
- QImage与QPixmap区别
- 计算机常见软件故障有哪几种,计算机常见故障可分为硬件和软件故障,具体介绍...
- 文献翻译——基于关联规则挖掘识别的鸡源大肠杆菌共有多重耐药模式(下)
- shell打开wifi命令_shell WIFI
- TCP/UDP 区别
- 腐败团--刘一手火锅
- 习题 8.21 用指向指针的指针的方法对n个整数排序并输出。要求将排序单独写成一个函数。n个整数在主函数中输入,最后在主函数中输出。
- 让Fedora 19支持ThinkPad鼠标中键和小红点实现滚轮效果
- Matlab怎么从table变成matrix
- 快来围观,又一个大厂首席架构师真正财务了