文章目录

  • 1. 说明
    • 1.1 configure(configs)
    • 1.2 onSend(ProducerRecord)
    • 1.3 onAcknowledgement(RecordMetadata, Exception)
    • 1.4 close
  • 2. 案例

1. 说明

  producer生成消息时,interceptor使得用户在消息发送前以及producer回调逻辑触发前对消息做定制化处理。producer允许用户通过配置interceptor.classes参数指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor

1.1 configure(configs)

  可获取配置信息,初始化调用一次

1.2 onSend(ProducerRecord)

  方法会运行在用户主线程中,封装进org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback),确保在消息被序列化以及计算分区前调用。(最好不要修改消息所属的topic和分区

1.3 onAcknowledgement(RecordMetadata, Exception)

  方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用,即在producer回调逻辑触发之前调用。(不要添加复杂逻辑,否则会拖慢producer的消息发送效率

1.4 close

  关闭拦截器,执行资源清理工作

注:producer将按照指定顺序调用拦截器,并仅捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递


2. 案例

  生成拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数到控制台

拦截器一:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return new ProducerRecord<String, String>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "----" + record.value());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}}

拦截器二:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CounterInterceptor implements ProducerInterceptor<String, String> {private int success;private int error;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (metadata != null) {success++;} else {error++;}}@Overridepublic void close() {System.out.println("success:" + success);System.out.println("error:" + error);}@Overridepublic void configure(Map<String, ?> configs) {}
}

生产者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.ArrayList;
import java.util.Properties;public class InterceptorProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092");//ack应答级别props.put(ProducerConfig.ACKS_CONFIG, "all");//重试次数props.put("retries", 1);//批次大小props.put("batch.size", 16384);//等待时间props.put("linger.ms", 1);//RecordAccumulator缓冲区大小props.put("buffer.memory", 33554432);// key value 序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*添加拦截器链*/ArrayList<String> classes = new ArrayList<>();classes.add("com.cz.kafka.interceptor.TimeInterceptor");classes.add("com.cz.kafka.interceptor.CounterInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classes);//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);//发送数据for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("charges", i + ",female,25.84,0,no,northwest,28923.13692"));}//关闭producer.close();}
}

kafka 自定义Interceptor(通过拦截器对消息进行定制化处理)相关推荐

  1. 好用的自定义Okhttp日志拦截器

    Okhttp中自带的日志拦截器 HttpLoggingInterceptor 实在是不好用,日志太多太乱,所以想要有好看.简洁的日志打印就要靠自定义了,下面分享我参照 HttpLoggingInter ...

  2. AOP(面向切面编程)、Filter(过虑器)、Interceptor(拦截器)

    AOP(面向切面编程) 面向切面编程(AOP是Aspect Oriented Program的首字母缩写) ,我们知道,面向对象的特点是继承.多态和封装.而封装就要求将功能分散到不同的对象中去,这在软 ...

  3. spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例...

    本文介绍spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例 集成swagger--对于做前后端分离的项目,后端只需要提供接口访问,swagger提供了接口 ...

  4. Filter(过滤器) 和 interceptor(拦截器)的区别

    Filter(过滤器) 和 interceptor(拦截器)的区别 1.拦截器是基于java反射机制的,而过滤器是基于函数回调的. 2.过滤器依赖于Servlet容器,而拦截器不依赖于Servlet容 ...

  5. 自定义注解在拦截器中为空_如何在Android中为特定联系人设置自定义铃声

    自定义注解在拦截器中为空 Everyone likes to know who's calling before they actually pick up the phone, and the ea ...

  6. BOS项目 第2天(BaseDao、BaseAction、用户登录、自定义strust登录拦截器)

    BOS项目 第2天 今天内容安排: 1.根据提供的pdm文件生成sql 2.持久层和表现层设计---BaseDao.BaseAction 3.实现用户登录功能 4.jQuery EasyUI 消息提示 ...

  7. webservice拦截器 查看消息包(soap)

    服务端: 1.获取EndpointImpl对象 2.调用EndpointImpl对象中的方法获取In拦截器 3.调用EndpointImpl对象中的方法获取out拦截器 4.添加自己的In拦截器与Ou ...

  8. 自定义注解和拦截器,实现接口限流防刷

    我们的目的是在指定时间内,每个用户只能进行秒杀请求指定次数. 首先,定义一个注解 写一个拦截器.就是当执行某个方法之前,将请求截获: (这里实现的只是一个思路,由于StringRedisTemplat ...

  9. Spring Boot——自定义多个拦截器(HandlerInterceptor)配置方法与执行顺序

    执行顺序 源代码 拦截器类 package com.jd.m.tg.interceptor;import org.slf4j.Logger; import org.slf4j.LoggerFactor ...

最新文章

  1. PHP Storm Built In Server Doesn't Recognize mod_rewrite
  2. mysql charindex_mysql中替代charindex的函数substring_index、find_in_set | 学步园
  3. 类属性的特征java_java定义类、属性、方法
  4. python canvas画移动物体_Python GUI编程入门(25)-移动Canvas对象
  5. 中文python笔记_python 中文编码笔记
  6. Qt DLL总结【一】-链接库预备知识
  7. Selenium自动化测试-7.获取元素属性信息
  8. 双层pdf软件free_这款软件神器,让你读文献的效率翻一倍!(文末有福利哦)...
  9. 阿里巴巴矢量图标网使用的小方法
  10. win10如何录制内部声音(非麦克风录音)
  11. 「题解」NOIP模拟测试题解乱写II(36)
  12. 短说 3.7.1正式版更新【新增悬赏问答、打赏、付费看帖、IP属地】
  13. JAVA语言编程练习--图形界面+文件输入输出流--实现简单的用户注册登录系统
  14. 频响测试低12dB问题
  15. 阿里点赞立法惩治刷单炒信:坚决拥护、全力支持
  16. 【论文笔记】Regional Differential Information Entropy for Super-Resolution ImageQuality Assessment
  17. python 递增递减数列
  18. 解决win10 windows mobile 设备中心无法打开问题,MC3200无法连上win10问题
  19. 解决错误:Unsatisfied dependency expressed through field ‘XXXService‘
  20. android 魅族动画效果,魅族 Flyme 9 知意动效:动画自然,提供 Flyme 1-8 经典系统主题...

热门文章

  1. 软考中级——计算机网络与信息安全基础概要
  2. 在没SQL Server数据库情况下怎么打开.MDF文件?
  3. 技术的发展与互联网的发展
  4. 高德地图搜索以后生成的marker的点击事件
  5. 深圳软件测试培训:SVN与Git的差异
  6. iOS APP上线流程规范
  7. 144-小珂的苦恼(nyoj)
  8. Day【10】相交链表
  9. 7.3 cas与流量风暴
  10. h5 换脸 php,HTML5/WebGL变脸(换脸)动画