多线程开发消费者实例

  • Kafka Java Consumer设计原理
  • 多线程方案
    • 1. 消费者程序启动多个线程
    • 2. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。
  • 方案一的优势
  • 方案一的不足
  • 方案二的优势
  • 方案二的不足
  • 方案一主体实现
  • 方案二主体实现

Kafka Java Consumer设计原理

Kafka0.10.1.0版本开锁后,KafkaConsumer就变成了双线程设计,即用户主线程心跳线程。但是在消费这个层面上看都还是单线程的设计。

而在老版本kafka中Scala Consumer 的API是多线程的,并且是阻塞机制,为了更好的打造上下游生态,Kafka将更好实现的单线程API推出,并且是否使用多线程来交给用户的选择。

多线程方案

1. 消费者程序启动多个线程

每个线程维护专属的 KafkaConsumer 实例负责完整的消息获取、消息处理流程。如下图所示:

2. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

方案一的优势

  1. 实现起来简单,因为它比较符合目前我们使用 Consumer API 的习惯。我们在写代码的
    时候,使用多个线程并在每个线程中创建专属的 KafkaConsumer 实例就可以了。
  2. 多个线程之间彼此没有任何交互,省去了很多保障线程安全方面的开销。
  3. 由于每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因
    此,Kafka 主题中的每个分区都能保证只被一个线程处理,这样就很容易实现分区内的
    消息消费顺序。这对在乎事件先后顺序的应用场景来说,是非常重要的优势。

方案一的不足

  1. 每个线程都维护自己的 KafkaConsumer 实例,必然会占用更多的系统资源,比如内
    存、TCP 连接等。在资源紧张的系统环境中,方案 1 的这个劣势会表现得更加明显。
  2. 这个方案能使用的线程数受限于 Consumer 订阅主题的总分区数。我们知道,在一个消
    费者组中,每个订阅分区都只能被组内的一个消费者实例所消费。假设一个消费者组订
    阅了 100 个分区,那么方案 1 最多只能扩展到 100 个线程,多余的线程无法分配到任
    何分区,只会白白消耗系统资源。当然了,这种扩展性方面的局限可以被多机架构所缓
    解。除了在一台机器上启用 100 个线程消费数据,我们也可以选择在 100 台机器上分别
    创建 1 个线程,效果是一样的。因此,如果你的机器资源很丰富,这个劣势就不足为虑
    了。
  3. 每个线程完整地执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,造成消息处理
    速度慢,就很容易出现不必要的 Rebalance,从而引发整个消费者组的消费停滞。这个
    劣势你一定要注意。我们之前讨论过如何避免 Rebalance。

方案二的优势

方案 2 将任务切分成了消息获取和消息处理两个部分,分别由不
同的线程处理它们。比起方案 1,方案 2 的最大优势就在于它的高伸缩性,就是说我们可
以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影
响。如果你的消费获取速度慢,那么增加消费获取的线程数即可;如果是消息的处理速度
慢,那么增加 Worker 线程池线程数即可。

方案二的不足

  1. 它的实现难度要比方案 1 大得多,毕竟它有两组线程,你需要分别管理它们。
  2. 因为该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消
    息的线程,因此无法保证分区内的消费顺序。举个例子,比如在某个分区中,消息 1 在
    消息 2 之前被保存,那么 Consumer 获取消息的顺序必然是消息 1 在前,消息 2 在
    后,但是,后面的 Worker 线程却有可能先处理消息 2,再处理消息 1,这就破坏了消
    息在分区中的顺序。还是那句话,如果你在意 Kafka 中消息的先后顺序,方案 2 的这个
    劣势是致命的。
  3. 方案 2 引入了多组线程,使得整个消息消费链路被拉长,最终导致正确位移提交会变得
    异常困难,结果就是可能会出现消息的重复消费。如果你在意这一点,那么我不推荐你
    使用方案 2。

方案一主体实现

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));// 执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}...
}

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构。

方案二主体实现

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}

当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑,这样就实现了方案 2 的多线程架构。

如有错误欢迎指正

多线程开发Kafka消费者的方案和优劣相关推荐

  1. kafka消费者(Consumer)端多线程消费的实现方案

    kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...

  2. 第十章 进程间的通信 之 Java/Android多线程开发(二)

    文章目录 (一)Java 多线程开发 1.1)线程状态 1.2)线程控制方法 (1.2.1)Synchronized (1.2.2)Volatile (1.2.3)ReentrantLock 1.3) ...

  3. 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的

    大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...

  4. Linux下实现多线程的生产者消费者问题

    Linux下实现多线程的生产者消费者问题 一.原理的理解 生产者-消费者问题是一个经典的线程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个线程地址空间内执行的两个线程 ...

  5. Python 多进程开发与多线程开发

    我们先来了解什么是进程? 程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程.程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本:进程 ...

  6. iOS 多线程的四种技术方案

    iOS 多线程的四种技术方案 image pthread 实现多线程操作 代码实现: void * run(void *param) {for (NSInteger i = 0; i < 100 ...

  7. Kafka消费者详解

    一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...

  8. Kafka消费者APi

    Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...

  9. kafka消费者开发方式小结

    [README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...

最新文章

  1. 安装fiddler做代理,本地开发手机端看效果
  2. 文件菜单服务器属性,服务器属性(“数据库设置”页) - SQL Server | Microsoft Docs...
  3. 计算机错误2 找不到指定文件,无法执行目录中的文件 错误2系统找不到指定文件怎么办?...
  4. Elasticsearch7.15.2 基础概念和基础语法
  5. 百度云盘云知梦php_教你搭建私有云盘,简单快速,完全傻瓜式!不限速,永久有效!...
  6. EXTJS4自学手册——EXT基本方法、属性(mixins多继承、statics、require)
  7. 如何处理Ibatis结合MySQL数据库使用时的事务操作
  8. 编译原理第三版课后习题
  9. LM358电压跟随器
  10. 01-简单的基于SVM的SAR海冰分离-Arcgis制作数据集标签
  11. 网络电视机顶盒的工作原理
  12. Excel 2010 SQL应用048 比较两列数据是否相同
  13. c语言数组转换树存储结构,c语言, 一棵具有n个结点的完全二叉树以数组存储,试写一个非递归 算法实现对 该树的前序遍历。...
  14. AR技术从无到有浅谈AR市的未来场发展
  15. Android查看源代码网站
  16. vue与微信小程序的写法对比
  17. Ubuntu-pyqt5+qtDesigner安装手顺
  18. 写字机结构---coreXY结构
  19. Java POI 导出Excel,设置单元格无法编辑, 开启工作表保护后,依然可以筛选, 冻结行列不移动
  20. java win10窗口启动假死_win10 任务栏假死

热门文章

  1. vlc播放器或者web实现rtmp拉流
  2. 第一章计算机系统基础知识,计算机系统基础知识 第一章(未完待续)
  3. python Subprocess的使用
  4. 阿里云ECS CentOS6.5搭建iRedMail邮件服务器
  5. 电脑无线共享手机wify上网
  6. 收音机磁棒天线4根接法_有关收音机磁性天线和电路的小知识
  7. KENALLRYLLDKDD|359821-54-8
  8. windows7的快速启动栏
  9. 简单的JS实现口风琴设计
  10. L1-001 ~ L1-005