Kafka集成Spring-AcknowledgeMessageListener接口实现
2019独角兽企业重金招聘Python工程师标准>>>
前言
因工作需要,需在系统利用Kafka监听接口,实现消息队列中,对消息的消费,首选Kafka,因为看中其超高的吞吐量。
基本概念
- 1 Producer: 特指消息的生产者
- 2 Consumer :特指消息的消费者
- 3 Consumer Group :消费者组,可以并行消费Topic中partition的消息
- 4 Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为 broker。
- 5 Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
- 6 Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)
- 7 Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息
- 8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。
集成Spring
- 添加Maven依赖
由于项目使用Maven进行管理,引入Kafka-Spring相关Jar包,需要添加依赖,此处使用的是Kafka0.10.2
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.2.2.RELEASE</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
1 版本兼容性
配置完Maven依赖以后,还需要确认,因为Kafka与Spring有依赖关系,需要确定Spring的版本是否能和Kafka0.10.2完美兼容,查阅Spring For Apache Kafka 文档可知:
Compatibility- Apache Kafka 0.10.2.0
- Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
- Annotation-based listeners require Spring Framework 4.1 or higher, however.
- Minimum Java version: 7.
Kafka 0.10.2 需要SpringFrameWork 4.3.7,但后续会逐渐兼容SpringFrameWork更早期的版本,实践发现,Kafka的生产者里面的api会受SpringFrameWork版本影响,而消费者无影响,因此,可以保持项目中原有springframework不变。
2 排除重复包
引入Maven依赖以后,Kafka的maven依赖,自动包含了springframework相关jar包,需要排除。
<dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-oxm</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope><optional>true</optional></dependency>
- 3 接口区别
Kafka消费者,实现有两种方式:client客户端和listener监听接口,这里因业务需要,采用监听接口的方式实现,Spring提供了四种接口,如下所示:
public interface MessageListener<K, V> {} 1void onMessage(ConsumerRecord<K, V> data);}public interface AcknowledgingMessageListener<K, V> {} 2void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);}public interface BatchMessageListener<K, V> {} 3void onMessage(List<ConsumerRecord<K, V>> data);}public interface BatchAcknowledgingMessageListener<K, V> {} 4void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}
对应的解释如下
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.
使用MessageListener接口实现时,当消费者拉取消息之后,消费完成会自动提交offset,即enable.auto.commit为true时,适合使用此接口
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
使用AcknowledgeMessageListener时,当消费者消费一条消息之后,不会自动提交offset,需要手动ack,即enable.auto.commit为false时,适合使用此接口
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.
4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
BatchMessageListener和BatchAcknowledgingMessageListener接口作用与上述两个接口大体类似,只是适合批量消费消息决定是否自动提交offset
由于业务较重,且offset自动提交时,出现消费异常或者消费失败的情况,消费者容易丢失消息,所以需要采用手动提交offset的方式,因此,这里实现了AcknowledgeMessageListener接口。
Spring配置文件
配置思路:
1、确定需要定义的beans:
- 1 consumerProperties 消费者的基本属性,包括指定bootstrap.servers,group.id等
- 2 consumerFactory :消费者工厂,配置完consumerProperties 后,需要将consumerProperties 作为参数,配置进consumerFactory中
- 3 containProperties: 消费者容器属性对象的bean,这个bean会指定后续自定义的监听接口bean及ackMode(手动提交时,采取什么提交方式)
- 4 messageListenerContainer:消费者容器,启动监听接口的bean,需要将先前定义的consumerFactory 、containProperties配置进这个bean,并定义其init-method = doStart,在启动spring时,便会自动启动监听接口,同时,此bean指定了topic
- 5 kafkaMessageListener:监听接口,这个接口由自己定义,需要将其配置进containProperties中,
具体完整消费者的配置文件如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><!--1、consumer属性配置,hashMap--><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/><entry key="group.id" value="${kafka.group.id}"/><entry key="enable.auto.commit" value="false"/><entry key="session.timeout.ms" value="15000"/><!--<entry key="auto.offset.reset" value="earliest"/>--><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer.encoding" value="UTF8"/><entry key="value.deserializer.encoding" value="UTF8"/></map></constructor-arg></bean><!--2、Kafka消费者工厂,DefaultKafkaConsumerFactory--><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean><!--3、监听接口,AcknowledgingMessageListener--><bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener"><property name="threadPool" ref="kafkaWorkerThreadPool"/></bean><bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="20"/><property name="maxPoolSize" value="200"/><property name="queueCapacity" value="500"/><property name="keepAliveSeconds" value="1800"/><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property></bean><!--4、Kafka消费者容器,属性配置--><bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg value="${kafka.topic}"/><property name="ackMode" value="MANUAL_IMMEDIATE"/><property name="messageListener" ref="kafkaMessageListener"/></bean><!--5、Kafka消费者容器--><bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" ><constructor-arg ref="consumerFactory"/><constructor-arg ref="containProperties"/></bean>
</bean>
示例代码
写了个简单的测试用例
生产者:
实现每秒定时向brokers发送一条消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.HashMap;
import java.util.Map;public class SimpleKafkaProducer implements Runnable {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);@Overridepublic void run() {Map<String, Object> sendProps = senderProps();Producer producer = new KafkaProducer(sendProps);Integer currentNum = 0;try {LOGGER.info("start produce message");while (true){ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum);producer.send(producerRecord);LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());currentNum++;Thread.sleep(1000);}}catch (Exception e){LOGGER.error("send message fail", e);}finally {producer.close();}}public static void main(String[] args) {SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();new Thread(simpleKafkaProducer).start();}private Map<String, Object> senderProps() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}
消费者
public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);@Overridepublic void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {//TODO 这里具体实现个人业务逻辑// 最后 调用acknowledgment的ack方法,提交offsetacknowledgment.acknowledge();}
}
消费者使用示例:这里参考spring官方文档,简单实现了一个消费者监听接口示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;public class SimpleKafkaConsumer extends SpringUnitTest {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);@Resource(name = "kafkaMessageListener")private KafkaMessageListener kafkaMessageListener;@Testpublic void TestLinstener(){ContainerProperties containerProps = new ContainerProperties("testTopic");containerProps.setMessageListener(kafkaMessageListener);KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);container.setBeanName("messageListenerContainer");container.start();}private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {Map<String, Object> props = consumerProps();DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(props);KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}private static Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}
实现acknowledgeMessageListener接口之前,查阅了网上现有的文档,结果不尽如人意,只能试着自己去参考spring官方文档,慢慢摸索,最终实现手动提交offset的监听接口,当然,Kafka的知识点,远不止这些,后续还将继续学习。
转载于:https://my.oschina.net/xiaominmin/blog/1810338
Kafka集成Spring-AcknowledgeMessageListener接口实现相关推荐
- kafka与Spring的集成
准备工作 kafka版本:kafka_2.10-0.10.1.0 spring版本:spring4.3 配置文件 pom文件配置(也可以直接下载jar包) Kafka和spring集成的支持类库,sp ...
- Kafka——使用spring进行集成
生产者: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://w ...
- 一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录
一.前言 Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架. 本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现 ...
- springboot集成swagger2测试接口
springboot集成swagger2测试接口 1.需要的依赖 2.开始编写一个swagger2 3.演示效果图片 1.需要的依赖 <dependency><groupId> ...
- Spring Aware接口
Spring中有很多继承于aware中的接口,这些接口到底是做什么用到的. public interface Aware {} Aware是一个具有标识作用的超级接口,实现该接口的bean是具有被sp ...
- 集成spring mvc_向Spring MVC Web应用程序添加社交登录:集成测试
集成spring mvc 我已经写了关于为使用Spring Social 1.1.0的应用程序编写单元测试的挑战,并为此提供了一种解决方案 . 尽管单元测试很有价值,但它并不能真正告诉我们我们的应用程 ...
- 项目集成Spring Security
前言 之前写的 涂涂影院管理系统 这个 demo 是基于 shiro 来鉴权的,项目前后端分离后,显然集成 Spring Security 更加方便一些,毕竟,都用 Spring 了,权限管理当然 S ...
- hibernate mysql 读写分离_SpringBoot集成Spring Data JPA及读写分离
JPA是什么 JPA(Java Persistence API)是Sun官方提出的Java持久化规范,它为Java开发人员提供了一种对象/关联映射工具 来管理Java应用中的关系数据.它包括以下几方面 ...
- Memcached集成Spring缓存环境构建
2019独角兽企业重金招聘Python工程师标准>>> Memcached简要说明: Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它 ...
最新文章
- Python趣味打怪:60秒学会一个例子,147段简单代码助你从入门到大师 | 中文资源...
- 139. Word Break
- 循环神经网络 递归神经网络_了解递归神经网络中的注意力
- 网络操作系统P12页答案
- 引用头文件报错 .pch引用不了其他的.h文件
- 跳转点算法_跳转搜索算法介绍
- 关于MVC与三层架构
- 直播、线上办公、IoT需求井喷,Wi-Fi 6如何防止网络“塞车”?
- 026 模块3-random库的使用
- CarMaker快速入门
- android软件游戏显示fps测试工具,别被跑分骗了!能看安卓游戏帧数的小工具
- SOUI::SStatic 动态设置属性的值
- 自顶向下(top down)简介
- Dr.com校园网客户端故障解决方法
- 测试之smart原则
- 阿里云盘来了,百度网盘VS阿里云盘,你更看好谁!
- HTML5对网络营销的影响,什么是互联网营销思维,简述互联网思维对网络营销的影响...
- h5 vr效果_浅谈html5在vr中的应用
- VMWare16Pro 调整中文
- 设置echarts 的网格样式颜色
热门文章
- boost::gregorian模块实现打印一个月中的所有日期的测试程序
- DCMTK:演示状态查看器-网络发送组件(存储SCU)
- VTK:图片之ImageRange3D
- OpenCV自动跟踪移动目标DaSiamRPN的实例(附完整代码)
- OpenCV创建小部件Creating Widgets
- OpenGL coordinate systems坐标系统的实例
- OpenGL次表面散射
- OpenGL渲染水water
- OpenGL 点光源阴影Point Shadows
- C++ 暴力搜索String pattern search字符串模式的实现算法(附完整源码)