【RAEDME】

本文中, java客户端作为生产者, centos中consumer线程作为消费者;

【1】拦截器简述

1)拦截器是什么? 很明显,为了实现面向切面编码,即在 具体逻辑的上下文 添加一些逻辑;如

逻辑1
具体逻辑
逻辑2

2)什么时候调用拦截器?这就要从 kafka生产者发送数据说起了;

kafka生产者使用了2个线程来发送数据:
step1)生产者中的main线程把数据经过 拦截器-》序列化器-》分区器 处理;然后再把数据写到 RecordAccumulator;
step2)send 线程从 RecordAccumulator 中取出数据写入到broker list;

【2】拦截器实现

/* 添加拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));

1)需求

第1个拦截器, 在消息发送前将时间戳加到消息value的 最前面;
第2个拦截器,在消息发送后更新成功发送消息数或失败发送消息数; 

2)代码实现

-- 添加拦截器

/* 添加拦截器 */
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));

-- 带有拦截器的生产者

/*** 带有拦截器的生产者*/
public class InterceptorProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");  /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);  /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);  /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 添加拦截器 */props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);  /* 10.发送数据 */ for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("first", "key"+i, "value-first-20210101--J" + i));}/* 11.关闭资源 */  producer.close(); // 间接调用了拦截器的close 方法  System.out.println("kafka生产者写入数据完成"); }
}
/*** 时间拦截器-在消息前添加时间戳 */
public class TimeInterceptor implements ProducerInterceptor<String, String>{@Overridepublic void configure(Map<String, ?> configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 获取消息值 String value = record.value();return new ProducerRecord<>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + record.value()); }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用; 且通常在 生产者回调逻辑触发之前调用*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}
}
/*** 计数拦截器*/
public class CounterInterceptor implements ProducerInterceptor<String, String>{int sucCounter = 0;int errCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}/*生产者确保在 消息被序列化以及计算分区前调用该方法。*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record; }/*该方法会在消息从 RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用; 且通常在 生产者回调逻辑触发之前调用*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(metadata !=null ) {sucCounter++;} else {errCounter++; }}@Overridepublic void close() {System.out.println("sucCounter =" + sucCounter + ", errCounter=" + errCounter);  }
}

-- java生产者打印日志

sucCounter =10, errCounter=0
kafka生产者写入数据完成

3)centos消费者消费结果

[root@centos201 ~]# kafka-console-consumer.sh --topic first --zookeeper centos201:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1609599630884,value-first-20210102--A0
1609599631203,value-first-20210102--A1
1609599631204,value-first-20210102--A6
1609599631203,value-first-20210102--A2
1609599631203,value-first-20210102--A3
1609599631204,value-first-20210102--A4
1609599631204,value-first-20210102--A5
1609599631204,value-first-20210102--A7
1609599631204,value-first-20210102--A8
1609599631204,value-first-20210102--A9

java生产者实现kafka拦截器相关推荐

  1. java web 过滤器跟拦截器的区别和使用

    2019独角兽企业重金招聘Python工程师标准>>> 1.首先要明确什么是拦截器.什么是过滤器 1.1 什么是拦截器: 拦截器,在AOP(Aspect-Oriented Progr ...

  2. 拦截器 java_在Java后端如何添加拦截器

    (1)InterceptorConfig.java文件内容如下: import org.springframework.web.servlet.config.annotation.WebMvcConf ...

  3. 谈谈Java编程中的拦截器与过滤器的区别

    拦截器是基于 Java 反射机制的,而过滤器是基于函数回调的. 过滤器依赖于 Servlet 容器,而拦截器不依赖于 Servlet 容器. 拦截器只能对 Action 请求起作用,而过滤器则可以对几 ...

  4. Java笔记-CXF增加拦截器与自定义拦截器

    如下搭建的Webservice: 在服务端填写如下代码: 这里就是添加进和出的拦截器 public class Main {public static void main(String[] args) ...

  5. java Struts2 过滤器和拦截器的区别

    来源http://zhidao.baidu.com/link?url=xr9D15NBd0ZkjDV7M7l6MuLsOg6ksehFXMO7ueZPzHRcv6HRd8f7DM0lg0Tk919-V ...

  6. java对过滤器或者拦截器中Request.parameter中参数进行添加或修改

    在讲解这个问题之前,我先讲讲我的需求. 我的需求就是处理前台传来的请求,在过滤器里面给表达式的值赋值为系统的值.然后传到具体方法中. 过滤器是这么写的: if(StringUtils.isNotEmp ...

  7. 四十七、Kafka中的拦截器(Interceptor)

    前两篇文章我们分别介绍了Kafka生产者和消费者的API,本文我们介绍一下Kafka中拦截器的知识.关注专栏<破茧成蝶--大数据篇>,查看更多相关的内容~ 目录 一.拦截器介绍 二.拦截器 ...

  8. java面试 拦截器问题_面试必问:给我说一下Spring MVC拦截器的原理?

    拦截器是每个Web框架必备的功能,也是个老生常谈的主题了.本文将分析SpringMVC的拦截器功能是如何设计的,让读者了解该功能设计的原理. 重要接口及类介绍1. HandlerExecutionCh ...

  9. java 微信请求_Java web微信请求拦截器(微信公众号开发)

    Java web微信请求拦截器(微信公众号开发),获取微信用户信息. package com.mvc.interceptor; import com.alibaba.fastjson.JSONObje ...

最新文章

  1. 【常规的01背包 POJ3624 UVA562 HDU2546 HDU3466 poj1745】
  2. Kubernetes学习笔记(一)
  3. istringstream字符串流,实现类似字符串截取的功能,字符串流中的put,str()将流转换成为字符串string
  4. 最新抗灾诗作:生死不离
  5. 事件处理介绍(简要学习笔记十七)
  6. Ubuntu安装UFW防火墙
  7. ask调制matlab实验,ASK调制的matlab代码
  8. C# 基础(二十五)WPF/WinForm 控件的句柄是什么意思?
  9. JAVA 类和对象的实例
  10. Comsol多孔介质内的粒子流动案例,可以追踪粒子运动轨迹
  11. java 一年有多少周_用java怎么计算当前年有多少周
  12. 2D图像像素点操作——平移,旋转,缩放 tcy
  13. 常用的60招电脑操作
  14. 数据结构-循环双链表
  15. python 判断是否是元音字母
  16. 怎么做网站?网站用什么服务器好?
  17. 想要成为一个游戏美术设计师,需要学习什么?游戏建模教程
  18. THead USB适配方案
  19. ROS py文件编译错误“ catkin_install_python() called with non-existing file”
  20. 【NLP】讯飞英文学术论文分类挑战赛Top10开源多方案–4 机器学习LGB 方案

热门文章

  1. P2764 最小路径覆盖问题(网络流)
  2. Codeforces Round #507 (Div. 1) D. You Are Given a Tree 根号分治 + dp
  3. 【NOI2013】快餐店【基环树】【树的直径】【set】
  4. HTTP状态码的类别
  5. P1064 [NOIP2006 提高组] 金明的预算方案
  6. CF39C-Moon Craters【dp】
  7. P5163-WD与地图【tarjan,整体二分,线段树合并】
  8. P5516-[MtOI2019]小铃的烦恼【期望dp,线性消元】
  9. P3168-[CQOI2015]任务查询系统【主席树】
  10. 动态规划训练11 [String painter HDU - 2476]