一,说明:

1,消费组的概念:多个消费者组成,相当于一个线程池,初始化线程池,创建消费线程,统一消费方法

2,多个消费者:每个消费者抽象为一个线程,实现Callable接口,可获取返回值,查看消费情况

3,消费者:需要初始化,绑定与消费组ID,top,broker关系

4,初始化线程池:核心线程池大小等于消费者数量

5,call()执行内容:消费者消费内容,从kafka拉取消息,统计调用信息作为返回值

6,定义返回消息类

二,代码

1,主线程

初始化消费组,绑定关系,调用封装执行方法consumerGroup.execute()

public class ConsumerThreadMain {private static Logger logger = LoggerFactory.getLogger(ConsumerThreadMain.class);private static String brokerList = "localhost:9094";private static String groupId = "group1";private static String topic = "test";/*** 线程数量*/private static int threadNum = 3;public static void main(String[] args) {// 创建线程池 消费线程ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList);// 多个消费线程执行消费任务List<Future<ConsumerFuture>> futureList = consumerGroup.execute();for (Future<ConsumerFuture> future : futureList) {try {ConsumerFuture consumerFuture = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
}

2,消费组

2.1,创建线程池

2.2,创建消费线程

2.3,循环执行,消费线程

public class ConsumerGroup {private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class);/*** 线程池*/private ExecutorService threadPool;private List<ConsumerCallable> consumers;public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("consumer-pool-%d").build();threadPool = new ThreadPoolExecutor(threadNum, threadNum,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());consumers = new ArrayList<ConsumerCallable>(threadNum);for (int i = 0; i < threadNum; i++) {ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic);consumers.add(consumerThread);}}/*** 执行任务*/public List<Future<ConsumerFuture>> execute() {long startTime = System.currentTimeMillis();List<Future<ConsumerFuture>> futures = new ArrayList<Future<ConsumerFuture>>();for (ConsumerCallable runnable : consumers) {Future<ConsumerFuture> future = threadPool.submit(runnable);futures.add(future);}if (threadPool.isShutdown()) {long endTime = System.currentTimeMillis();LOGGER.info("main thread use {} Millis", endTime - startTime);}threadPool.shutdown();return futures;}
}

3,消费线程

3.1,初始化消费线程,读取配置信息绑定top

3.2,执行消费内容

public class ConsumerCallable implements Callable<ConsumerFuture> {private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class);private AtomicInteger totalCount = new AtomicInteger();private AtomicLong totalTime = new AtomicLong();private AtomicInteger count = new AtomicInteger();/*** 每个线程维护KafkaConsumer实例*/private final KafkaConsumer<String, String> consumer;public ConsumerCallable(String brokerList, String groupId, String topic) {Properties props = new Properties();props.put("bootstrap.servers", brokerList);props.put("group.id", groupId);//自动提交位移props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));}/*** Computes a result, or throws an exception if unable to do so.** @return computed result* @throws Exception if unable to compute a result*/@Overridepublic ConsumerFuture call() throws Exception {boolean flag = true;int failPollTimes = 0;long startTime = System.currentTimeMillis();while (flag) {// 使用200ms作为获取超时时间ConsumerRecords<String, String> records = consumer.poll(200);if (records.count() <= 0) {failPollTimes++;if (failPollTimes >= 20) {LOGGER.debug("达到{}次数,退出 ", failPollTimes);flag = false;}continue;}//获取到之后则清零failPollTimes = 0;LOGGER.debug("本次获取:" + records.count());count.addAndGet(records.count());totalCount.addAndGet(count.get());long endTime = System.currentTimeMillis();if (count.get() >= 10000) {LOGGER.info("this consumer {} record,use {} milliseconds", count, endTime - startTime);totalTime.addAndGet(endTime - startTime);startTime = System.currentTimeMillis();count = new AtomicInteger();}LOGGER.debug("end totalCount={},min={}", totalCount, totalTime);/*for (ConsumerRecord<String, String> record : records) {// 简单地打印消息LOGGER.debug(record.value() + " consumed " + record.partition() +" message with offset: " + record.offset());}*/}ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(), totalTime.get());return consumerFuture;}
}

4,返回消息类

public class ConsumerFuture {private Integer totalCount;private Long TotalTime;public ConsumerFuture(Integer totalCount, Long totalTime) {this.totalCount = totalCount;TotalTime = totalTime;}public Integer getTotalCount() {return totalCount;}public void setTotalCount(Integer totalCount) {this.totalCount = totalCount;}public Long getTotalTime() {return TotalTime;}public void setTotalTime(Long totalTime) {TotalTime = totalTime;}
}

kafka+多线程实现案例+Callable相关推荐

  1. kafka多线程消费

    1.zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper 2.kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN ...

  2. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  3. 一个多线程死锁案例,如何避免及解决死锁问题

    转载自 一个多线程死锁案例,如何避免及解决死锁问题 多线程死锁在java程序员笔试的时候时有遇见,死锁概念在之前的文章有介绍,大家应该也都明白它的概念,不清楚的去翻看历史文章吧. 下面是一个多线程死锁 ...

  4. python queue查询空_【Python】多线程爬虫案例

    爬取博客园文章列表 爬取博客园文章列表,假设页面的URL是https://www.cnblogs.com/loaderman 要求:使用requests获取页面信息,用XPath / re 做数据提取 ...

  5. java基础-多线程应用案例展示

    java基础-多线程应用案例展示 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.两只熊,100只蜜蜂,蜜蜂每次生产的蜂蜜量是1,罐子的容量是30,熊在罐子的蜂蜜量达到20的时候 ...

  6. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  7. Kafka吞吐量测试案例

    Kafka吞吐量测试案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 领英公司参考连接:https://www.slideshare.net/JiangjieQin/produc ...

  8. 从入门到实战,Netty多线程篇案例集锦

    从入门到实战,Netty多线程篇案例集锦 原创 2015-09-10 李林峰 InfoQ Netty案例集锦系列文章介绍 1|Netty的特点 Netty入门比较简单,主要原因有如下几点: Netty ...

  9. 线程控制总结及多线程经典案例

    线程终止 Linux下有两种方式可以使线程终止: 通过return 从线程函数返回 通过调用函数pthread_exit()使线程退出 #include<pthread.h> void p ...

  10. Java多线程分析案例

    1. 多线程的创建方式 (1).继承 Thread类:但Thread本质上也是实现了Runnable 接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过 Thread 类的 sta ...

最新文章

  1. pandas将dataframe中的特定数据列的内容转化为列表list数据(convert dataframe column values into a list)
  2. 《云计算:概念、技术与架构》一2.3 案例研究3:Innovartus
  3. 模型压缩千万不要选择剪枝,那就是一个坑,改造一个学生网络才是真的压缩
  4. 白色裤子为什么会沾上蓝色_什么是蓝色的,为什么它可以在Mac上运行?
  5. java 对象工厂_Java设计模式之--工厂方式
  6. JVM系列(三)— Java内存模型
  7. 最新出炉|也许你该看看这份的模型数据
  8. 在iview + vue项目中使用自定义icon图标
  9. [转载] python选择排序二元选择_选择排序:简单选择排序(Simple Selection Sort)
  10. vmstat命令 查看内存、CPU占用
  11. python设置路径变量_python – Bokeh中设置的静态路径变量在哪里(对于create_html_snippet)...
  12. c语言邻接表存储拓扑排序,拓扑排序(完整案列及C语言完整代码实现)
  13. 人工神经网络图像识别,人脸识别神经网络模型
  14. 通达信版弘历软件指标_弘历主图指标详解 通达信指标
  15. python: 集合操作符和关系符号
  16. 细胞分裂模拟(C++)
  17. The types of the interface org.apache.flink.util.OutputTag could not be inferred.
  18. NetApp Storage MetroCluster 双活解析
  19. 如何用python绘制等边三角形_python叠加等边三角形的代码编写方法
  20. 园区网络—中小型企业网络工程项目实践(思科模拟器)

热门文章

  1. Ubuntu下使用Atom将Markdown文件转换为PDF的一个异常
  2. Linux-软件包管理-rpm命令管理-安装-卸载
  3. 网络安全系列之四十 在Linux中设置SET位权限
  4. SQL Server 何时将“脏页”回写到硬盘
  5. Shell脚本应用之服务启动脚本
  6. C#中方法,方法声明,方法调用和方法重载!
  7. 婚姻是一堂需要认真学习的课程
  8. python软件是什么作用,python-dotenv的用途是什么?
  9. 北航计算机组成重修,北航计算机组成原理123希望大伙把作业上传.pdf
  10. linux 卸载pppoe,列“State”不属于表 。