# 多线程 Consumer Instance

## Kafka Java Consumer 设计原理

* Kafka Java Consumer 是单线程设计

* 从 Kafka V0.10.1.0,KafkaConsumer 是双线程:用户主线程 & 心跳线程

* 用户主线程

* 启动 Consumer 应用 main 方法的线程

* 心跳线程

* 只负责定期给对应的 Broker 发送心跳,标示 Consumer 的存活性(liveness)

* 新版本设计:单线程 + 轮询机制:

* 实现非阻塞式的消息获取

## 多线程方案

* KafkaConsumer 类不是 thread-safe

* 所有的网络 IO 处理都是发生在用户主线程中

* 不能在多个线程中共享同一个 KafkaConsumer 实例

* 可以使用 `KafkaConsumer.wakeup()` 在其他线程中唤醒 Consumer

基于非 thread-safe,两套多线程方案

* 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer Instance,负责完整的消息获取、消息处理流程

* 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑

* 处理消息交由特定的线程池来做

* 将消息获取与处理解耦

![](https://img.kancloud.cn/40/70/4070c15055bf275c44cb7b470fb1f850_696x326.jpeg)

## Code

### 方案 1

```

public class KafkaConsumerRunner implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer consumer;

public void run() {

try {

consumer.subscribe(Arrays.asList("topic"));

while (!closed.get()) {

ConsumerRecords records =

consumer.poll(Duration.ofMillis(10000));

// 执行消息处理逻辑

}

} catch (WakeupException e) {

// Ignore exception if closing

if (!closed.get()) throw e;

} finally {

consumer.close();

}

}

// Shutdown hook which can be called from a separate thread

public void shutdown() {

closed.set(true);

consumer.wakeup();

}

```

### 方案 2

```

private final KafkaConsumer consumer;

private ExecutorService executors;

...

private int workerNum = ...;

executors = new ThreadPoolExecutor(

workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<>(1000),

new ThreadPoolExecutor.CallerRunsPolicy());

...

while (true) {

ConsumerRecords records =

consumer.poll(Duration.ofSeconds(1));

for (final ConsumerRecord record : records) {

executors.submit(new Worker(record));

}

}

..

```

kafka java 多线程_20. 多线程开发者实例相关推荐

  1. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  2. java中的多线程有什么意义_Java多线程与并发面试题(小结)

    1,什么是线程? 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位.程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速.比如,如果一个线程完成一 ...

  3. JAVA高并发多线程必须懂的50个问题

    http://www.importnew.com/12773.html ImportNew 首页所有文章资讯Web架构基础技术书籍教程Java小组工具资源 Java线程面试题 Top 50 2014/ ...

  4. 100道Java并发和多线程面试题

    1.多线程有什么用? 一个可能在很多人看来很扯淡的一个问题:我会用多线程就好了,还管它有什么用?在我看来,这个回答更扯淡.所谓"知其然知其所以然","会用"只是 ...

  5. java基础学习-多线程笔记

    说说Java中实现多线程有几种方法 创建线程的常用三种方式: 1. 继承Thread类 2. 实现Runnable接口 3. 实现Callable接口( JDK1.5>= ) 4. 线程池方式创 ...

  6. Java中控制多线程顺序执行

    Java中控制多线程顺序执行 一.概述 二.普通示例 三.控制示例 3.1.设置线程优先级 3.2.使用线程类的join() 3.2.1.在主线程join() 3.2.2.在子线程join() 3.3 ...

  7. Java基础、多线程、JVM、集合八股文自述(持续更新)

    Java基础.多线程.JVM.集合八股文自述 一.Java基础 1.1 object类有哪些方法? getClass().hashCode().equals().clone().toString(). ...

  8. Java回顾之多线程

    在这篇文章里,我们关注多线程.多线程是一个复杂的话题,包含了很多内容,这篇文章主要关注线程的基本属性.如何创建线程.线程的状态切换以及线程通信,我们把线程同步的话题留到下一篇文章中. 线程是操作系统运 ...

  9. python3多线程异步爬虫_python3爬虫中多线程进行解锁操作实例

    生活中我们为了保障房间里物品的安全,所以给门进行上锁,在我们需要进入房间的时候又会重新打开.同样的之间我们讲过多线程中的lock,作用是为了不让多个线程运行是出错所以进行锁住的指令.但是鉴于我们实际运 ...

最新文章

  1. 一种新方法或让AI模型拥有“联想”力,或能识别从未见过的事物
  2. 【采用】风控模型评估方法以及大数据风控模型概念
  3. 一名 40 岁“老”程序员的反思~
  4. TOPAS 命令详解
  5. android调用python框架_在Java中从Android应用程序执行Python脚本?
  6. 在线直接绘制倾斜45度的“火山图”
  7. Python机器学习:评价分类结果005precisoion-Recall的平衡
  8. DDR3初始化时间测试
  9. linux shell中获取mongodb最大连接数、内存使用情况等
  10. 按编译原理的思路设计的一个计算器
  11. SQL那些事儿(十三)--Oracle中varchar与varchar2区别
  12. iOS 应用的启动流程和优化详解
  13. 搜狐畅游一面(c++)
  14. 思科三层+TPAC200+TP AP实现每个SSID独立网段
  15. C语言练手题(52个小练习)
  16. SAI颈部正面的画法
  17. AnaConda下载安装完了只有anaconda prompt问题
  18. background 组合写法_css中background复合属性详解
  19. NOI2019滚粗记
  20. Python3---站在大佬肩膀写爬虫-爬取网易云音乐热歌榜歌曲热评(精彩评论)

热门文章

  1. Node.js v11.0 正式发布!
  2. mysql新增范围之外数据_mysql第二天 数据的增删改查补充及外键
  3. c语言中listnode是什么意思,怎么理解typedef Node * List
  4. 关于mysql的项目_项目中常用的MySQL 优化
  5. 圆括号匹配c语言代码,求救!!一道关于表达式圆括号匹配的经典题目
  6. 送书 | 师妹越多,团队集体智慧越高,当占比80%时,达到巅峰
  7. BetterZip结合自动操作工具和预设设置创建电影解压程序
  8. 依赖倒置原则_设计模式之SOLID原则
  9. tomcat配置url跳转_web和tomcat的区别
  10. P5713 【深基3.例5】洛谷团队系统(python3实现)