1.视界

关于Java consumer如何动态修改topic订阅的问题。仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然后调用subscribe进行修改,consumer端必然会抛出异常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access

和KafkaProducer不同的是,KafkaConsumer不是线程安全的,所以我们不能直接在没有同步保护的机制下直接启用另一个线程调用consumer的任何方法(除了wakeup)。因此,实现这个需求有两种途径:

使用重量级的synchorinzed机制来实现线程安全
借助Java类库已有的线程安全数据结构来实现
  如果是第一种方式,则无论哪个线程访问consumer都必须要配备必要的同步保护机制,代价相当大且极易出错。本文选取第二种方式,我们可以借助Java提供的ConcurrentLinkedQueue来帮助我们实现。具体的步骤为:

构建ConcurrentLinkedQueue对象分别给两个线程使用(这里并不限定于两个线程,但这个需求最可能的实际场景是consumer主线程和一个后台管理类的用户线程,而后者负责触发“动态修改订阅”逻辑)
调用KafkaCons

60-60-020-API-Kafka Java consumer动态修改topic订阅相关推荐

  1. Kafka Java consumer动态修改topic订阅

    前段时间在Kafka QQ群中有人问及此事--关于Java consumer如何动态修改topic订阅的问题.仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然 ...

  2. java 反射动态修改数据类型_[Java-基础]反射_Class对象_动态操作

    动态性 动态语言 在程序运行时,可以改变程序结构或变量类型,典型的语言: Python,ruby,javascript 如: function test(){ var s = "var a= ...

  3. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  4. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  5. kafka java 多线程_20. 多线程开发者实例

    # 多线程 Consumer Instance ## Kafka Java Consumer 设计原理 * Kafka Java Consumer 是单线程设计 * 从 Kafka V0.10.1.0 ...

  6. kafka java api 生产者 producer 与消费者consumer

    c踩坑 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld 将localhost必须和 ...

  7. kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用

    常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...

  8. kafka文档(3)----0.8.2-kafka API(java版本)

    原文地址: http://kafka.apache.org/documentation.html#api Apache Kafka包含新的java客户端,这些新的的客户端将取代现存的Scala客户端, ...

  9. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接: Flink入门程序异常,记录一下跟大家分享. SLF4J: Failed to l ...

最新文章

  1. 网关 0.0.0.0_久违的升级——全新米家智能多模网关
  2. Spring bean 实现生命周期的三种解决方案
  3. python在线课程-《Python程序设计与应用》在线课程使用说明
  4. 从零开始学 Java - Spring 使用 Quartz 任务调度定时器
  5. Mysql清理binlog日志
  6. 深度学习时代的目标检测算法综述
  7. PrintArea打印,@media screen解决移动web开发的多分辨率问题,@media print设置打印的样式...
  8. centos7.5 源码安装mysql5.7.25
  9. NG RouteReuseStrategy(路由复用策略)
  10. Linux磁盘分区/格式化/挂载目录
  11. 微信小程序之 Classify(商品属性分类)
  12. RouterOS全局限制速度和限制线程
  13. 《Node应用程序构建——使用MongoDB和Backbone》一2.3 事件
  14. 学软件测试必看的10本书推荐给你
  15. TCP/IP四层模型
  16. 接触了阿里云国际版香港服务器,让我学会了飞一般的乐趣
  17. 为什么excel中取消隐藏行后仍然有隐藏的行
  18. unity 3d 仿真_在Unity3D中构建3D仿真入门,第1部分
  19. WgpSec(狼组安全) CTF PHPCode题目记录
  20. net core WebApi——文件分片上传与跨域请求处理

热门文章

  1. 抖音电商发布创作者管理总则 近八万名带货达人因违规被罚
  2. 黄光裕回应与京东、拼多多竞争:谁也灭不了谁 不排除合作的可能性
  3. 阿里影业、IMAX股价飙涨!2021年中国电影总票房已突破100亿元人民币
  4. 《2020饿了么蓝骑士报告》:贫困县骑手月入5800元 成脱贫新兴力量
  5. 北京出台快递业价格行为规则 不得收取未予标明的费用
  6. iQOO Neo 855竞速版来了:今年最后一款骁龙855 Plus手机
  7. 王思聪连收3条限制消费令后,债主回应:对于他是小钱,对于我们可是巨款
  8. 京东数科公布11.11一小时战报:白条交易额10秒破亿
  9. 微信版花呗“分付”要来了!花呗,白条你们怎么看?
  10. 中国宽带最新速率状况报告 你家达标了吗?