2019独角兽企业重金招聘Python工程师标准>>>

前言

因工作需要,需在系统利用Kafka监听接口,实现消息队列中,对消息的消费,首选Kafka,因为看中其超高的吞吐量。

基本概念

  • 1 Producer: 特指消息的生产者
  • 2 Consumer :特指消息的消费者
  • 3 Consumer Group :消费者组,可以并行消费Topic中partition的消息
  • 4 Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为 broker。
  • 5 Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
  • 6 Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)
  • 7 Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息
  • 8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。

集成Spring

  • 添加Maven依赖 
    由于项目使用Maven进行管理,引入Kafka-Spring相关Jar包,需要添加依赖,此处使用的是Kafka0.10.2
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.2.2.RELEASE</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1 版本兼容性 
    配置完Maven依赖以后,还需要确认,因为Kafka与Spring有依赖关系,需要确定Spring的版本是否能和Kafka0.10.2完美兼容,查阅Spring For Apache Kafka 文档可知: 
      Compatibility

    • Apache Kafka 0.10.2.0
    • Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
    • Annotation-based listeners require Spring Framework 4.1 or higher, however.
    • Minimum Java version: 7. 
      Kafka 0.10.2 需要SpringFrameWork 4.3.7,但后续会逐渐兼容SpringFrameWork更早期的版本,实践发现,Kafka的生产者里面的api会受SpringFrameWork版本影响,而消费者无影响,因此,可以保持项目中原有springframework不变。
  • 2 排除重复包 
    引入Maven依赖以后,Kafka的maven依赖,自动包含了springframework相关jar包,需要排除。

<dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-oxm</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope><optional>true</optional></dependency>
  • 3 接口区别 
    Kafka消费者,实现有两种方式:client客户端和listener监听接口,这里因业务需要,采用监听接口的方式实现,Spring提供了四种接口,如下所示:
public interface MessageListener<K, V> {} 1void onMessage(ConsumerRecord<K, V> data);}public interface AcknowledgingMessageListener<K, V> {} 2void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);}public interface BatchMessageListener<K, V> {} 3void onMessage(List<ConsumerRecord<K, V>> data);}public interface BatchAcknowledgingMessageListener<K, V> {} 4void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}

对应的解释如下 
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. 
使用MessageListener接口实现时,当消费者拉取消息之后,消费完成会自动提交offset,即enable.auto.commit为true时,适合使用此接口 
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods. 
使用AcknowledgeMessageListener时,当消费者消费一条消息之后,不会自动提交offset,需要手动ack,即enable.auto.commit为false时,适合使用此接口 
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.

4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.

BatchMessageListener和BatchAcknowledgingMessageListener接口作用与上述两个接口大体类似,只是适合批量消费消息决定是否自动提交offset

由于业务较重,且offset自动提交时,出现消费异常或者消费失败的情况,消费者容易丢失消息,所以需要采用手动提交offset的方式,因此,这里实现了AcknowledgeMessageListener接口。

Spring配置文件

配置思路: 
1、确定需要定义的beans:

  • 1 consumerProperties 消费者的基本属性,包括指定bootstrap.servers,group.id等
  • 2 consumerFactory :消费者工厂,配置完consumerProperties 后,需要将consumerProperties 作为参数,配置进consumerFactory中
  • 3 containProperties: 消费者容器属性对象的bean,这个bean会指定后续自定义的监听接口bean及ackMode(手动提交时,采取什么提交方式)
  • 4 messageListenerContainer:消费者容器,启动监听接口的bean,需要将先前定义的consumerFactory 、containProperties配置进这个bean,并定义其init-method = doStart,在启动spring时,便会自动启动监听接口,同时,此bean指定了topic
  • 5 kafkaMessageListener:监听接口,这个接口由自己定义,需要将其配置进containProperties中, 
    具体完整消费者的配置文件如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><!--1、consumer属性配置,hashMap--><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/><entry key="group.id" value="${kafka.group.id}"/><entry key="enable.auto.commit" value="false"/><entry key="session.timeout.ms" value="15000"/><!--<entry key="auto.offset.reset" value="earliest"/>--><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer.encoding" value="UTF8"/><entry key="value.deserializer.encoding" value="UTF8"/></map></constructor-arg></bean><!--2、Kafka消费者工厂,DefaultKafkaConsumerFactory--><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean><!--3、监听接口,AcknowledgingMessageListener--><bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener"><property name="threadPool" ref="kafkaWorkerThreadPool"/></bean><bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="20"/><property name="maxPoolSize" value="200"/><property name="queueCapacity" value="500"/><property name="keepAliveSeconds" value="1800"/><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property></bean><!--4、Kafka消费者容器,属性配置--><bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg value="${kafka.topic}"/><property name="ackMode" value="MANUAL_IMMEDIATE"/><property name="messageListener" ref="kafkaMessageListener"/></bean><!--5、Kafka消费者容器--><bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" ><constructor-arg ref="consumerFactory"/><constructor-arg ref="containProperties"/></bean>
</bean>

示例代码

写了个简单的测试用例 
生产者: 
实现每秒定时向brokers发送一条消息

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.HashMap;
import java.util.Map;public class SimpleKafkaProducer implements Runnable {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);@Overridepublic void run() {Map<String, Object> sendProps = senderProps();Producer producer = new KafkaProducer(sendProps);Integer currentNum = 0;try {LOGGER.info("start produce message");while (true){ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum);producer.send(producerRecord);LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());currentNum++;Thread.sleep(1000);}}catch (Exception e){LOGGER.error("send message fail", e);}finally {producer.close();}}public static void main(String[] args) {SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();new Thread(simpleKafkaProducer).start();}private Map<String, Object> senderProps() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}

消费者

public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);@Overridepublic void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {//TODO 这里具体实现个人业务逻辑// 最后 调用acknowledgment的ack方法,提交offsetacknowledgment.acknowledge();}
}

消费者使用示例:这里参考spring官方文档,简单实现了一个消费者监听接口示例

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;public class SimpleKafkaConsumer extends SpringUnitTest {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);@Resource(name = "kafkaMessageListener")private  KafkaMessageListener kafkaMessageListener;@Testpublic void TestLinstener(){ContainerProperties containerProps = new ContainerProperties("testTopic");containerProps.setMessageListener(kafkaMessageListener);KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);container.setBeanName("messageListenerContainer");container.start();}private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {Map<String, Object> props = consumerProps();DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(props);KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}private static Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}

实现acknowledgeMessageListener接口之前,查阅了网上现有的文档,结果不尽如人意,只能试着自己去参考spring官方文档,慢慢摸索,最终实现手动提交offset的监听接口,当然,Kafka的知识点,远不止这些,后续还将继续学习。

转载于:https://my.oschina.net/xiaominmin/blog/1810338

Kafka集成Spring-AcknowledgeMessageListener接口实现相关推荐

  1. kafka与Spring的集成

    准备工作 kafka版本:kafka_2.10-0.10.1.0 spring版本:spring4.3 配置文件 pom文件配置(也可以直接下载jar包) Kafka和spring集成的支持类库,sp ...

  2. Kafka——使用spring进行集成

    生产者: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://w ...

  3. 一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录

    一.前言 Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架. 本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现 ...

  4. springboot集成swagger2测试接口

    springboot集成swagger2测试接口 1.需要的依赖 2.开始编写一个swagger2 3.演示效果图片 1.需要的依赖 <dependency><groupId> ...

  5. Spring Aware接口

    Spring中有很多继承于aware中的接口,这些接口到底是做什么用到的. public interface Aware {} Aware是一个具有标识作用的超级接口,实现该接口的bean是具有被sp ...

  6. 集成spring mvc_向Spring MVC Web应用程序添加社交登录:集成测试

    集成spring mvc 我已经写了关于为使用Spring Social 1.1.0的应用程序编写单元测试的挑战,并为此提供了一种解决方案 . 尽管单元测试很有价值,但它并不能真正告诉我们我们的应用程 ...

  7. 项目集成Spring Security

    前言 之前写的 涂涂影院管理系统 这个 demo 是基于 shiro 来鉴权的,项目前后端分离后,显然集成 Spring Security 更加方便一些,毕竟,都用 Spring 了,权限管理当然 S ...

  8. hibernate mysql 读写分离_SpringBoot集成Spring Data JPA及读写分离

    JPA是什么 JPA(Java Persistence API)是Sun官方提出的Java持久化规范,它为Java开发人员提供了一种对象/关联映射工具 来管理Java应用中的关系数据.它包括以下几方面 ...

  9. Memcached集成Spring缓存环境构建

    2019独角兽企业重金招聘Python工程师标准>>> Memcached简要说明: Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它 ...

最新文章

  1. Python趣味打怪:60秒学会一个例子,147段简单代码助你从入门到大师 | 中文资源...
  2. 139. Word Break
  3. 循环神经网络 递归神经网络_了解递归神经网络中的注意力
  4. 网络操作系统P12页答案
  5. 引用头文件报错 .pch引用不了其他的.h文件
  6. 跳转点算法_跳转搜索算法介绍
  7. 关于MVC与三层架构
  8. 直播、线上办公、IoT需求井喷,Wi-Fi 6如何防止网络“塞车”?
  9. 026 模块3-random库的使用
  10. CarMaker快速入门
  11. android软件游戏显示fps测试工具,别被跑分骗了!能看安卓游戏帧数的小工具
  12. SOUI::SStatic 动态设置属性的值
  13. 自顶向下(top down)简介
  14. Dr.com校园网客户端故障解决方法
  15. 测试之smart原则
  16. 阿里云盘来了,百度网盘VS阿里云盘,你更看好谁!
  17. HTML5对网络营销的影响,什么是互联网营销思维,简述互联网思维对网络营销的影响...
  18. h5 vr效果_浅谈html5在vr中的应用
  19. VMWare16Pro 调整中文
  20. 设置echarts 的网格样式颜色

热门文章

  1. boost::gregorian模块实现打印一个月中的所有日期的测试程序
  2. DCMTK:演示状态查看器-网络发送组件(存储SCU)
  3. VTK:图片之ImageRange3D
  4. OpenCV自动跟踪移动目标DaSiamRPN的实例(附完整代码)
  5. OpenCV创建小部件Creating Widgets
  6. OpenGL coordinate systems坐标系统的实例
  7. OpenGL次表面散射
  8. OpenGL渲染水water
  9. OpenGL 点光源阴影Point Shadows
  10. C++ 暴力搜索String pattern search字符串模式的实现算法(附完整源码)