Kafka 作为老牌消息中间件, 高吞吐是它的拿手好戏. Spring 的生态中也有官方提供的 spring-kafka.jar. 本文主要学习 Spring 官方提供的 spring-kafka.jar 是怎么实现的.

环境

下面将本文用到的一些环境版本贴出来, Spring Kafka 在 1.X 版本和 2.X 源码上区别还是挺大的. 本文拉取的是当时的最新版.

  • linux kafka server: kafka_2.13-2.7.0(注: 2.13 是 scala version, 2.7.0 才是 kafka version)
  • spring-kafka: 2.7.0
  • spring: 5.3.6

网上较多的对 Spring Kafka 1.X 版本的解读, 本文选用的是 2.X 请甄别后阅读

SpringKafka的依赖

下图不难看出, spring-kafka 的实现实际上是对 kafka-clients 这个包的又一次封装.

kafka-clients 是由 Apache 支持的开源项目

核心结构

先来看一下 spring-kafka 核心图.
我们在 Spring 中注册一个 Listener, 框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理.
再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器. 每个 KafkaMessageListenerContainer 相当于一个线程, 这个线程会不断向 server 端发起 poll 请求来实现监听

核心方法调用关系

先看一下核心方法调用关系图. 笔者简化了方法栈中一些核心方法的执行过程.

实际的执行过程应该是:

  • Spring 启动
  • Spring 生命周期为 finishRefresh() 时, 调用 KafkaListenerEndpointRegistry 中复写的钩子函数 start()
  • 根据 Listener 创建对应数量的 ConcurrentMessageListenerContainer
  • 根据并发配置 concurrency 创建对应数量的 KafkaMessageListenerContainer
  • 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor. 值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程, 并在线程停止时执行钩子 stop()
  • 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中
  • ListenerConsumer 的 run() 方法被调用
  • run 方法中开启自旋
  • 不断调用 kafka-client 提供的 poll API 拉取新的消息
  • 收到新的消息就执行, 执行完了就继续自旋
  • 收不新消息, 重启下一轮自旋

源码解析

Spring

观察 Spring 线程方法栈, 我们可以知道

  • Spring Kafka 其实是在 Spring 上下文的 finishRefresh() 这个生命周期中被唤醒的
  • 最后调用调用 bean.start() 方法正式启动

DefaultLifecycleProcessor

@Override
public void onRefresh() {// 入口startBeans(true);this.running = true;
}private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {Lifecycle bean = lifecycleBeans.remove(beanName);if (bean != null && bean != this) {String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);for (String dependency : dependenciesForBean) {doStart(lifecycleBeans, dependency, autoStartupOnly);}if (!bean.isRunning() &&(!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {if (logger.isTraceEnabled()) {logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");}try {// =================================// 在这里被启动真正启动// =================================bean.start();}catch (Throwable ex) {throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);}if (logger.isDebugEnabled()) {logger.debug("Successfully started bean '" + beanName + "'");}}}
}

Spring Kafka 启动

下文开始都是 Spring Kafka 启动运行时的相关源码

KafkaListenerEndpointRegistry

这个入口类主要用来管理侦听器容器的生命周期.
对于本文针对学习的启动时的入口则是 startIfNecessary()

public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, DisposableBean, SmartLifecycle,ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {...ignore any codes.../*** 这个方法让容器校验是否在正确的时机(refreshed)或者是自动启动的情况* 然后再去启动 spring-kafka*/private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}}...ignore any codes...
}

ConcurrentMessageListenerContainer

这是个容器, 用于创建至少一个 KafkaMessageListenerContainer 容器.

@Override
protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);// 根据并发参数 concurrency, 来确定最终会创建多少个 KafkaMessageListenerContainer// concurrency 缺省为 1for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);configureChildContainer(i, container);if (isPaused()) {container.pause();}container.start();this.containers.add(container);}}
}

KafkaMessageListenerContainer

这个类主要作用如下:

  • 通过配置文件生成 consumerExecutor, 用户如果没有配置, 则使用 SimpleAsyncTaskExecutor(后文会提到类似于线程池) 来创建
  • 然后根据用户配置的 Listener 参数生成 ListenerConsumer(实现了Runnable接口)
  • 最后让 consumerExecutor 去执行 ListenerConsumer

对 SimpleAsyncTaskExecutor 感兴趣的同学可以看这里: https://blog.csdn.net/weixin_30430169/article/details/99490928

@Override
protected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix == null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();// 试图从配置文件中获取 consumerExecutorAsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();if (consumerExecutor == null) {// 如果用户没有自定义配置, 则提供 SimpleAsyncTaskExecutor 为缺省实现consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);// 生成 listenerConsumer 对象this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);// 让 consumerExecutor 去执行 ListenerConsumerthis.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);try {if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {this.logger.error("Consumer thread failed to start - does the configured task executor "+ "have enough threads to support all containers and concurrency?");publishConsumerFailedToStart();}}catch (@SuppressWarnings(UNUSED) InterruptedException e) {Thread.currentThread().interrupt();}
}

SimpleAsyncTaskExecutor

简单介绍一下 SimpleAsyncTaskExecutor, 虽然这个不是 spring-kafka 提供的, 但是在这里引用到了. 为了方便理解, 就提一下.
这个类主要是创建一个新的线程, 并在这个线程中执行 其他线程 submit(task) 到 SimpleAsyncTaskExecutor 中的 task 方法. 这些 task 需要实现 Runnable (Callable) 接口, SimpleAsyncTaskExecutor 会去消费他们.
这个类另一个作用呢, 其实是在关闭线程的时候, 提供一个钩子回去, 方便上文 KafkaListenerEndpointRegistry 进行管理

// 跟线程池一样都实现了 Executor 接口
public void execute(Runnable task, long startTimeout) {Assert.notNull(task, "Runnable must not be null");Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;if (this.isThrottleActive() && startTimeout > 0L) {this.concurrencyThrottle.beforeAccess();// 创建新线程this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));} else {// 创建新线程this.doExecute(taskToUse);}}protected void doExecute(Runnable task) {Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);thread.start();
}

ListenerConsumer.run

这个方法中总共做了三件事

  • 初始化一些操作(对本文中电来说, 不太要紧)
  • 开启自旋
  • 在自旋中不断拉取 kafka 消息
  • 拉取不到消息也有心跳机制保证连接
  • 拉取到了则通过反射执行 Java 中对应业务处理的代码
public class KafkaMessageListenerContainer<K, V> // NOSONAR line countextends AbstractMessageListenerContainer<K, V> {...ignore any codes...@Override // NOSONAR complexitypublic void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable = null;// 开启自旋, 即失败重试机制while (isRunning()) {try {// 见名知意, 向 kafka-server 发起 poll 请求// 收到后, 执行对应的方法pollAndInvoke();}// ===============// 这里为了方便阅读省略了一大段类似下文的异常处理// ===============catch (Exception e) {handleConsumerException(e);}}wrapUp(exitThrowable);}...ignore any codes...
}

ListenerConsumer.pollAndInvoke

这是调用 kafka-clients 并执行业务代码的入口方法
调用本类的 doPoll() 方法, 拉取 kafka 消息
调用 invokeIfHaveRecords() 对消息进行处理

protected void pollAndInvoke() {if (!this.autoCommit && !this.isRecordAck) {processCommits();}fixTxOffsetsIfNeeded();idleBetweenPollIfNecessary();if (this.seeks.size() > 0) {processSeeks();}pauseConsumerIfNecessary();pausePartitionsIfNecessary();this.lastPoll = System.currentTimeMillis();if (!isRunning()) {return;}this.polling.set(true);// 这个方法真正封装了 kafka-client 的 APIConsumerRecords<K, V> records = doPoll();if (!this.polling.compareAndSet(true, false) && records != null) {/** There is a small race condition where wakeIfNecessary was called between* exiting the poll and before we reset the boolean.*/if (records.count() > 0) {this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());}return;}resumeConsumerIfNeccessary();resumePartitionsIfNecessary();debugRecords(records);// 不管有没有值, 都会交给这个方法进行处理// 有值则会反射调用 java handler method// 没有值的话, 则直接返回invokeIfHaveRecords(records);// 该方法结束后进入下一次自旋
}

ListenerConsumer.doPoll

通过调用 this.consumer.poll(this.pollTimeout) 方法, 我们可以知道, this.consumer 其实是 kafka-client 提供的消费者实例

@Nullable
private ConsumerRecords<K, V> doPoll() {ConsumerRecords<K, V> records;if (this.isBatchListener && this.subBatchPerPartition) {if (this.batchIterator == null) {// 底层都是调用 kafka-client 提供的 poll apithis.lastBatch = this.consumer.poll(this.pollTimeout);if (this.lastBatch.count() == 0) {return this.lastBatch;}else {this.batchIterator = this.lastBatch.partitions().iterator();}}TopicPartition next = this.batchIterator.next();List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));if (!this.batchIterator.hasNext()) {this.batchIterator = null;}}else {// 底层都是调用 kafka-client 提供的 poll apirecords = this.consumer.poll(this.pollTimeout);checkRebalanceCommits();}return records;
}

ListenerConsumer.invokeIfHaveRecords

看了堆栈树之后可以知道, invokeIfHaveRecords () 方法其实底层是调用了 Java 反射实现的. 那么这个方法就不具体展开了(一层一层剥离源码调用的逻辑), 因为太多次调用到了别处实现, 这里展示一下在 spring-kafka 源码层面最底层的调用
其实在 KafkaMessageListenerContainer 生成时, 就会在全局内存中存储一个对应的 RecordMessagingMessageListenerAdapter 当新消息来了之后, RecordMessagingMessageListenerAdapter 会自动适合的方法去执行(配合 @KafkaHandler 注解使用).

RecordMessagingMessageListenerAdapter 见名知意, 适配器模式, 寻找合适的 Handler 方法来适配这次返回的参数结构

public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONARif (this.invokerHandlerMethod != null) {// 这是执行的入口return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR}else if (this.delegatingHandler.hasDefaultHandler()) {// Needed to avoid returning raw Message which matches ObjectObject[] args = new Object[providedArgs.length + 1];args[0] = message.getPayload();System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);return this.delegatingHandler.invoke(message, args);}else {return this.delegatingHandler.invoke(message, providedArgs);}
}

SpringKafka原理解析及源码学习-Spring生态(一)相关推荐

  1. 【Unity】 Spine渲染原理解析与源码解读

    Spine渲染原理解析与源码解读 安装环境 从Spine编辑器导出 将资源导入Unity 基础概念 其他相关概念 Spine架构 Spine运行时的各个模块 有状态(Stateful) 和 无状态(S ...

  2. springboot启动过程_spring5/springboot2源码学习 -- spring boot 应用的启动过程

    推荐阅读: Spring全家桶笔记:Spring+Spring Boot+Spring Cloud+Spring MVC 疫情期间"闭关修炼",吃透这本Java核心知识,跳槽面试不 ...

  3. Android悬浮窗原理解析(Window)[源码]

    悬浮窗,在大多数应用中还是很少见的,目前我们接触到的悬浮窗,差不多都是一些系统级的应用软件,例如:360安全卫士,腾讯手机管家等:在某些服务行业如金融,餐饮等,也会在应用中添加悬浮窗,例如:美团的偷红 ...

  4. 浏览器解析jsx_preact 源码学习系列之一:JSX解析与DOM渲染

    前言 一直以来我都想研究 React 的源码,但总是看不懂.即便是去翻看最早的源码,代码量也有1万多行,研究起来难度太高了,这个问题困扰了我很久. 直到前两天我跟同事讨论起这个问题,忽然发现一个可行的 ...

  5. cvMorphology形态学原理解析及源码分析

    ⑴ 图像形态学处理的概念...1 ⑵ 二值图像的逻辑运算...3 ⑶ 膨胀和腐蚀...4 (4) 高级形态学变换...8 (5) 细化...10 ⑴ 图像形态学处理的概念 数字图像处理中的形态学处理是 ...

  6. mysql 一致性读_MySQL半一致性读原理解析-从源码角度解析

    1.什么是半一致性读 A type of read operation used for UPDATE statements, that is a combination of read commit ...

  7. 第十四课 k8s源码学习和二次开发原理篇-调度器原理

    第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...

  8. 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理

    第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...

  9. VUE源码学习第一篇--前言

    一.目的 前端技术的发展,现在以vue,react,angular为代表的MVVM模式以成为主流,这三个框架大有三分天下之势.react和angular有facebook与谷歌背书,而vue是以一己之 ...

最新文章

  1. 深度学习:梯度下降算法改进
  2. Linux常用软件和安装方法,Linux软件安装与卸载常用方法(转)
  3. Spring Boot WebFlux-02——WebFlux Web CRUD 实践
  4. 成长型公司的存储架构
  5. 数据结构的一些命名英文全称
  6. linux中设备文件和磁盘文件的区别,Linux下磁盘设备文件有哪些变化问题?
  7. 图的深度优先搜索遍历
  8. 在线短视频缩略图剪切工具
  9. python自动化办公入门书籍-用Python自动办公,做职场高手 | 「讲文兄博客」
  10. 解决报错: The name tf.app.run is deprecated. Please use tf.compat.v1.app.run instead.
  11. teemo使用手册(内附脚本一键提取,邮箱、IP、domain、whatweb筛选可打开网址)
  12. 华为云发布桌面IDE-CodeArts
  13. 最近面试SAR ADC总是被问到DAC 单位电容的取法
  14. git 国外镜像下载慢的解决方案
  15. FFmpeg视频剪辑拼接
  16. JavaScript语法糖写法--JS代码优化
  17. 【笔记】openwrt - 单线复用(VLAN):拨号上网、局域网、IPTV
  18. 小分子药物logp预测/计算
  19. 为什么要用花焊盘?全连接和花连接有什么区别?梅花焊盘和泪滴焊盘各有什么作用?
  20. 科普一下IP路由基础

热门文章

  1. 详解JSON.parse 和 JSON.stringify
  2. JavaScript强制类型转换(一)String字符串型
  3. Android联系人全部删除技巧
  4. FPGA - Zynq - 加载 - FSBL源码解析1
  5. 数据结构算法——滑动窗口问题(以LeetCode滑动窗口题为例)
  6. 30分钟带你入门Oracle
  7. 携程个性化推荐算法实践
  8. 国内大模型领域进入乱战时代
  9. 重启计算机网络连接巨慢,电脑重启后很久才能打开宽带连接怎么样解决
  10. 一文弄懂Jupyter的配置与使用(呕心沥血版)