Kafka consumer多线程下not safe for multi-threaded access问题

默认配置下kafka consumer的offset的commit是自动的,如需改成手动提交可以修改参数:enable.auto.commit = false
在手动提交offset的模式下,只需要手动执行kafkaConsumer.commitSync()即可提交本次拉取消息的所有分区的offset信息,伪代码片段如下:

Properties kafkaProperties = new Properties();
......
kafkaProperties.put("enable.auto.commit", false);
kafkaProperties.put("auto.offset.reset", "latest");
kafkaProperties.put("isolation.level", "read_committed");
......
while(running) {Consumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);ConsumerRecords<String, String> records = kafkaConsumer.poll(500L);......kafkaConsumer.commitSync();
}

有时候我们可能会使用到这样的方式:在kafkaConsumer poll到消息以后不会马上做处理,而是把消息先缓存到一个阻塞队列或者RingBuffer,再由其它线程去消费这个队列然后处理对应的消息。这样做的好处是可以将message的拉取逻辑和处理逻辑解耦;还有就是这种异步的方式可以提高效率,即在处理message的同时还是能正常拉取message,不会因为message的处理耗时影响message的拉取,只有当队列满了message的拉取才会被阻塞。伪代码片段如下:

private ArrayBlockingQueue<ConsumerRecords<String, String>> queue = new ArrayBlockingQueue<>(100);/*** 初始化方法*/
public void init() {Properties properties = new Properties();properties.put("topic", "mytest");......properties.put("enable.auto.commit", false);properties.put("auto.offset.reset", "latest");properties.put("isolation.level", "read_committed");this.kafkaConsumer = new KafkaConsumer<>(properties);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.running = true;new Thread(() -> {try {testPrintAndCommit();} catch (InterruptedException e) {e.printStackTrace();}}).start();
}/*** 拉取消息并缓存在阻塞队列*/
public void consumeMessage() {while (running) {           ConsumerRecords<String, String> records = this.kafkaConsumer.getMessage(200L, TimeUnit.MILLISECONDS);if (records != null && records.count() > 0) {queue.put(records);}            }
}/*** 打印消息并提交offset*/
private void testPrintAndCommit() {while (running) {ConsumerRecords<String, String> records= queue.poll(200, TimeUnit.MILLISECONDS);for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}this.kafkaConsumer.commitSync();}
}

但是如果直接运行该代码会抛异常:KafkaConsumer is not safe for multi-threaded access ,大概意思就是KafkaConsumer类不是线程安全的无法在多线程中同时对其访问。由于上面示例代码中kafkaConsumer.poll()和kafkaConsumer.commitSync()方法在两个不同的线程中同时进行操作,所以导致抛出这个异常。那么如何解决这个问题呢?常见的一些解决方法都是建议将kafkaConsumer的操作放在同一线程中或者使用自动提交的方式,但是这些方法都无法满足上述的需求。

其实根据上述的异常信息:KafkaConsumer is not safe for multi-threaded access,说明该对象在多线程同时访问时不是线程安全,那么是不是意味着如果能保证这个对象在多线程访问下的线程安全是否就不会抛这个异常?于是在kafkaConsumer的poll()方法和commitSync()方法上面加上读写锁进行测试:

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public void consumeMessage() {while (running) {ConsumerRecords<String, String> records;final Lock lock = readWriteLock.readLock();lock.lock();try {records = records = this.kafkaConsumer.getMessage(200L, TimeUnit.MILLISECONDS);} finally {lock.unlock();}           if (records != null && records.count() > 0) {queue.put(records);}            }
}private void testPrintAndCommit() {while (running) {ConsumerRecords<String, String> records= queue.poll(200, TimeUnit.MILLISECONDS);for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}final Lock lock = readWriteLock.writeLock();lock.lock();try {this.kafkaConsumer.commitSync();} finally {lock.unlock();}       }
}

在poll的时候加上读锁,在commit的时候加上写锁,以确保poll和commit的同步串行执行。再次测试执行代码就不会再抛 KafkaConsumer is not safe for multi-threaded access 异常。

下面看一下Kafka是如何实现这个功能的:
找到KafkaConsumer的 poll方法和commitSync方法(这里使用的是kafka-client 1.1.1 版本):

public ConsumerRecords<K, V> poll(long timeout) {acquireAndEnsureOpen();try {......} finally {release();}
}public void commitSync() {acquireAndEnsureOpen();try {coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE);} finally {release();}
}

可以看到在两个方法的开始和结束都调用了acquireAndEnsureOpen()和release()方法,查看这两个方法的内容:

private void acquireAndEnsureOpen() {acquire();if (this.closed) {release();throw new IllegalStateException("This consumer has already been closed.");}
}
private void acquire() {long threadId = Thread.currentThread().getId();if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");refcount.incrementAndGet();
}private void release() {if (refcount.decrementAndGet() == 0)currentThread.set(NO_CURRENT_THREAD);
}

可以看到在方法开始的时候会进入acquire()方法,里面会对当前线程id进行判断:threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId),其中currentThread是KafkaConsumer的AtomicLong类型的成员变量:private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); NO_CURRENT_THREAD常量值为-1。

那么if判断的前半部分:threadId != currentThread.get() 在第一个线程进入的时候肯定返回true;后半部分:!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId) 会先判断currentThread当前值是否为NO_CURRENT_THREAD,是则设置为当前线程id,此处返回false那么不会进入if条件。

在方法最后执行release()方法,会判断refcount变量,该变量会记录当前线程的重入次数,当重入次数清零以后会把currentThread变量再次设置为NO_CURRENT_THREAD。那么就不难理解,第一个线程在执行了acquire()方法之后未执行release()之前,其它线程如果进来的执行了acquire()方法的话:判断threadId != currentThread.get() 返回ture,!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)返回true满足if条件抛出KafkaConsumer is not safe for multi-threaded access异常

由此可知在加了同步锁以后每次poll和commitSync都是串行的,能够保证acquire()和release()的原子性从而实现了该功能。

Kafka consumer多线程下not safe for multi-threaded access问题相关推荐

  1. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  2. Kafka Consumer多线程实例

    Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala ...

  3. 读Kafka Consumer源码

    最近一直在关注阿里的一个开源项目:OpenMessaging OpenMessaging, which includes the establishment of industry guideline ...

  4. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  5. Kafka Consumer的两种接口(高阶低阶)

    Kafka Consumer接口 http://www.cnblogs.com/fxjwind/p/3794255.html 对于kafka的consumer接口,提供两种版本, high-level ...

  6. Kafka: Consumer

    2019独角兽企业重金招聘Python工程师标准>>> Kafka Consumer 通过之前的架构介绍,对Consumer有了一个初步的了解.这里再深入一点来了解一下Consume ...

  7. kafka consumer消费者 offset groupID详解

    kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...

  8. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

  9. Kafka consumer

    Kafka consumer consumer概览 消费者组 消费者组定义:消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者 ...

最新文章

  1. SQL Server 查询性能优化——堆表、碎片与索引(一)
  2. mysql储存过程编程,MySQL 5.0存储过程编程入门
  3. centos7公司内网环境搭建集群性能测试环境(ip+域名部署)
  4. 在windows下查看SQLite数据库
  5. 60秒完成病毒基因对比 阿里云向社会免费开放基因计算服务
  6. 信息倒流php,PHP向客户端广播信息
  7. Linux+apache+svn
  8. 第五章:配置使用FastJson返回Json视图
  9. kettle组件-应用
  10. 吴恩达《机器学习》第八章:逻辑回归
  11. 第二阶段第八天站立会议
  12. 云计算介绍-1.1,IaaS\PaaS\SaaS辨析
  13. 嵌入式Linux驱动笔记--转自风筝丶
  14. C语言 - 数组作为参数传递给函数(按值传递和按引用传递)
  15. mysql 分页_mysql大表分页查询翻页优化方案
  16. 如何从网页上下载页面嵌入的PDF文件
  17. matlab3维傅里叶变换,MATLAB 分数傅里叶变换三维图像
  18. SOC安全运营中心产品
  19. 基于EasyX使用Wu反走样算法画线
  20. java后端判断图片尺寸(GB,MB,KB形式),图片色彩(黑白照或彩色照片),图片构图(横图竖图方图)

热门文章

  1. cmake CMAKE_CXX_COMPILER_VERSION 检查失败
  2. R语言 回归结果中有NA
  3. Python数据分析入门笔记4——数据预处理之重复值
  4. arctanx麦克劳林公式推导过程_徒手搭建三角函数公式推导体系
  5. ROS 2 Eloquent Elusor安装和使用汇总
  6. 拉着你的手 - 谢东 (zt)
  7. 职能与职位的区别_使安全职能与战略,目标和使命保持一致
  8. 数字图像处理 拜耳过滤器简介
  9. 新版标准日本语高级_第18课
  10. 小程序textarea显示混乱