kafka java 多线程_20. 多线程开发者实例
# 多线程 Consumer Instance
## Kafka Java Consumer 设计原理
* Kafka Java Consumer 是单线程设计
* 从 Kafka V0.10.1.0,KafkaConsumer 是双线程:用户主线程 & 心跳线程
* 用户主线程
* 启动 Consumer 应用 main 方法的线程
* 心跳线程
* 只负责定期给对应的 Broker 发送心跳,标示 Consumer 的存活性(liveness)
* 新版本设计:单线程 + 轮询机制:
* 实现非阻塞式的消息获取
## 多线程方案
* KafkaConsumer 类不是 thread-safe
* 所有的网络 IO 处理都是发生在用户主线程中
* 不能在多个线程中共享同一个 KafkaConsumer 实例
* 可以使用 `KafkaConsumer.wakeup()` 在其他线程中唤醒 Consumer
基于非 thread-safe,两套多线程方案
* 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer Instance,负责完整的消息获取、消息处理流程
* 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑
* 处理消息交由特定的线程池来做
* 将消息获取与处理解耦
![](https://img.kancloud.cn/40/70/4070c15055bf275c44cb7b470fb1f850_696x326.jpeg)
## Code
### 方案 1
```
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
```
### 方案 2
```
private final KafkaConsumer consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
..
```
kafka java 多线程_20. 多线程开发者实例相关推荐
- kafka Java客户端之 consumer API 多线程消费消息
kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...
- java中的多线程有什么意义_Java多线程与并发面试题(小结)
1,什么是线程? 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位.程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速.比如,如果一个线程完成一 ...
- JAVA高并发多线程必须懂的50个问题
http://www.importnew.com/12773.html ImportNew 首页所有文章资讯Web架构基础技术书籍教程Java小组工具资源 Java线程面试题 Top 50 2014/ ...
- 100道Java并发和多线程面试题
1.多线程有什么用? 一个可能在很多人看来很扯淡的一个问题:我会用多线程就好了,还管它有什么用?在我看来,这个回答更扯淡.所谓"知其然知其所以然","会用"只是 ...
- java基础学习-多线程笔记
说说Java中实现多线程有几种方法 创建线程的常用三种方式: 1. 继承Thread类 2. 实现Runnable接口 3. 实现Callable接口( JDK1.5>= ) 4. 线程池方式创 ...
- Java中控制多线程顺序执行
Java中控制多线程顺序执行 一.概述 二.普通示例 三.控制示例 3.1.设置线程优先级 3.2.使用线程类的join() 3.2.1.在主线程join() 3.2.2.在子线程join() 3.3 ...
- Java基础、多线程、JVM、集合八股文自述(持续更新)
Java基础.多线程.JVM.集合八股文自述 一.Java基础 1.1 object类有哪些方法? getClass().hashCode().equals().clone().toString(). ...
- Java回顾之多线程
在这篇文章里,我们关注多线程.多线程是一个复杂的话题,包含了很多内容,这篇文章主要关注线程的基本属性.如何创建线程.线程的状态切换以及线程通信,我们把线程同步的话题留到下一篇文章中. 线程是操作系统运 ...
- python3多线程异步爬虫_python3爬虫中多线程进行解锁操作实例
生活中我们为了保障房间里物品的安全,所以给门进行上锁,在我们需要进入房间的时候又会重新打开.同样的之间我们讲过多线程中的lock,作用是为了不让多个线程运行是出错所以进行锁住的指令.但是鉴于我们实际运 ...
最新文章
- 一种新方法或让AI模型拥有“联想”力,或能识别从未见过的事物
- 【采用】风控模型评估方法以及大数据风控模型概念
- 一名 40 岁“老”程序员的反思~
- TOPAS 命令详解
- android调用python框架_在Java中从Android应用程序执行Python脚本?
- 在线直接绘制倾斜45度的“火山图”
- Python机器学习:评价分类结果005precisoion-Recall的平衡
- DDR3初始化时间测试
- linux shell中获取mongodb最大连接数、内存使用情况等
- 按编译原理的思路设计的一个计算器
- SQL那些事儿(十三)--Oracle中varchar与varchar2区别
- iOS 应用的启动流程和优化详解
- 搜狐畅游一面(c++)
- 思科三层+TPAC200+TP AP实现每个SSID独立网段
- C语言练手题(52个小练习)
- SAI颈部正面的画法
- AnaConda下载安装完了只有anaconda prompt问题
- background 组合写法_css中background复合属性详解
- NOI2019滚粗记
- Python3---站在大佬肩膀写爬虫-爬取网易云音乐热歌榜歌曲热评(精彩评论)
热门文章
- Node.js v11.0 正式发布!
- mysql新增范围之外数据_mysql第二天 数据的增删改查补充及外键
- c语言中listnode是什么意思,怎么理解typedef Node * List
- 关于mysql的项目_项目中常用的MySQL 优化
- 圆括号匹配c语言代码,求救!!一道关于表达式圆括号匹配的经典题目
- 送书 | 师妹越多,团队集体智慧越高,当占比80%时,达到巅峰
- BetterZip结合自动操作工具和预设设置创建电影解压程序
- 依赖倒置原则_设计模式之SOLID原则
- tomcat配置url跳转_web和tomcat的区别
- P5713 【深基3.例5】洛谷团队系统(python3实现)