java kafka 多线程消费
我们先来看下简单的kafka生产者和消费者模式代码:
生产者KafkaProducer
/** * @author xiaofeng * @version V1.0 * @title: KafkaProducer.java * @package: com.yingda.xsignal.app.test * @description: kafka生产者demo * @date 2018/4/4 0004 上午 11:20 */ public class KafkaProducer extends Thread {private String topic; public KafkaProducer(String topic) {super(); this.topic = topic; }@Override public void run() {Producer producer = createProducer(); int i = 0; while (true) {String msg = "message"; producer.send(new KeyedMessage<Integer, String>(topic, msg + (i++))); try {TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }}}private Producer createProducer() {Properties properties = new Properties(); //声明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "10.0.2.22:9092"); properties.put("batch.size", 4096); return new Producer<Integer, String>(new ProducerConfig(properties)); }public static void main(String[] args) {new KafkaProducer("TEST_TOPIC").start(); }}
消费者KafkaConsumer
/** * @author xiaofeng * @version V1.0 * @title: KafkaConsumer.java * @package: com.yingda.xsignal.app.test * @description: 单线程消费模式 * @date 2018/4/4 0004 上午 11:18 */ public class KafkaConsumer extends Thread {private String topic; public KafkaConsumer(String topic) {super(); this.topic = topic; }@Override public void run() {ConsumerConnector consumer = createConsumer(); Map<String, Integer> topicCountMap = Maps.newHashMap(); // 一次从主题中获取一个数据 topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 获取每次接收到的这个数据 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) {String message = new String(iterator.next().message()); System.out.println("接收到: " + message); }}private ConsumerConnector createConsumer() {Properties properties = new Properties(); // //声明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); // // 消费组 properties.put("group.id", "test-group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); }public static void main(String[] args) {new KafkaConsumer("TEST_TOPIC").start(); }
分别启动producer 和consumer
查看消费者控制台:
虽然已成功生产和消费,但是这种消费模式很明显是单个topic和单线程的形式,那么如果我一次性要订阅多个topic 而且需要多线程消费该怎样做呢?接下来让我们一探究竟吧!
构建多线程消费KafkaConsumer
/** * @author xiaofeng * @version V1.0 * @title: OrderBackConsumer.java * @package: com.yingda.xsignal.app.consumer * @description: 订单备份消费者 * @date 2018/3/16 0016 下午 4:46 */ public class OrderBackConsumer extends BaseSpringApp {protected static final Logger logger = LoggerFactory.getLogger(OrderBackConsumer.class); private final ConsumerConnector consumer; private final String signalTopic = "SIGNAL_ORDERINFO"; private final String followTopic = "FOLLOW_ORDERINFO"; private final String signalHisTopic = "HIS_ORDERINFO"; private final String followHisTopic = "FOLLOW_HIS_ORDERINFO"; private ConsumerConfig consumerConfig; private static int threadNum = 6; /** * Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 6; /** * Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 1024; /** * thread prefix name */ private String ThreadNamePrefix = "kafka-consumer-pool-%d"; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(ThreadNamePrefix).build(); /** * Common Thread Pool */ ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public OrderBackConsumer(String[] args) {super(args, "classpath:app-KafkaConsumer.xml"); Properties properties = new Properties(); //开发环境:10.0.2.22:2181 properties.put("zookeeper.connect", "10.0.2.22:2181"); // 组名称 properties.put("group.id", "back_consumer_group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); consumerConfig = new ConsumerConfig(properties); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); }@Override public void shutdown() {if (consumer != null) {consumer.shutdown(); }if (pool != null) {pool.shutdown(); }try {if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); }} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly"); }}public void run(int numThreads) {Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(signalTopic, new Integer(numThreads)); topicCountMap.put(followTopic, new Integer(numThreads)); topicCountMap.put(signalHisTopic, new Integer(numThreads)); topicCountMap.put(followHisTopic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); consumerMap.values().stream().forEach(value -> {List<KafkaStream<byte[], byte[]>> streams = value; int threadNumber = 0; /** * 可以为每隔topic创建一个线程池,因为每个topic我设置的partition=6 * (kafka consumer通过增加线程数来增加消费能力,但是需要足够的分区,如目前我设置的partition=6,那么并发可以启动6个线程同时消费) * ExecutorService pool = createThreadPool(); */ for (final KafkaStream stream : streams) {pool.submit(new KafkaOrderConsumer(stream, threadNumber)); threadNumber++; }}); }/** * 创建线程池 * * @return */ private ExecutorService createThreadPool() {ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); return pool; }public static void main(String[] args) {int threads = 1; if (args.length < 1) {threads = threadNum; } else {threads = Integer.parseInt(args[0]); }OrderBackConsumer example = new OrderBackConsumer(args); example.run(threads); try {Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException ie) {}example.shutdown(); }
/** * @author xiaofeng * @version V1.0 * @title: KafkaOrderConsumer.java * @package: com.yingda.xsignal.app.service.impl * @description: kafka消费服务 * @date 2018/3/20 0020 下午 8:03 */ public class KafkaOrderConsumer implements Runnable {protected static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumer.class); private KafkaStream stream; private int threadNumber; public KafkaOrderConsumer(KafkaStream stream, int threadNumber) {this.stream = stream; this.threadNumber = threadNumber; }@Override public void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) {//消费队列内容 final MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); try {final byte[] messageBytes = (byte[]) messageAndMetadata.message(); if (messageBytes != null && messageBytes.length > 0) {String content = new String(messageBytes); logger.info("message:'" + content + "'"); /** * @// TODO: 2018/3/20 0020 消费入库 */ }} catch (Exception e) {logger.error("kafka back order consumer error", e); }}logger.info("Shutting down Thread: " + threadNumber); } }
以上代码中,我们创建了一个线程池,线程数为6,因为我设置的partition=6,而且一次性订阅了4个topic(当然这些topic要真实存在哦),测试的时候随便往哪个topic中写数据都可以收到相应的消费数据哦。
java kafka 多线程消费相关推荐
- kafka多线程消费
1.zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper 2.kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN ...
- 【Kafka笔记】5.Kafka 多线程消费消息
Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...
- 正确处理kafka多线程消费的姿势
最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...
- kafka 多线程消费
一. 1.Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费).即消费并行度和分区数一致. ...
- 几种kafka多线程消费方式
kafka API https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/Kafka ...
- kafka Java客户端之 consumer API 多线程消费消息
kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...
- Kafka Consumer多线程消费
概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...
- kafka消费者(Consumer)端多线程消费的实现方案
kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...
- 【kafka】浅谈Kafka的多线程消费的设计
1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...
最新文章
- Kali Linux 安全渗透教程第五更1.4 安装Kali Linux
- 【floyd存字典序路径】【HDU1385】【Minimum Transport Cost】
- nginx for discuz 伪静态规则
- Java序列化接口Serializable接口的作用总结
- 2015级C++第2周实践项目
- Eclipse中的控制台
- # 异运算_人教版六年级数学下册第29课数的运算(P7680)图文视频辅导
- [数据恢复答疑]删除了WINDOWS桌面上的文件,该如何恢复数据
- 海康威视mp4html播放器,videoJS 网页视频播放器支持MP4
- python UI自动化无界面运行
- 最新Apple苹果开发者账号AppleID注册流程
- EXCEL长数字显示和转化为文本
- 程序员简历模版【A4纸正反两面】(20220511)
- 调试python程序---pdb
- poj 2152 树形dp(建立消防站)
- 为视图或函数指定的列名比其定义中的列多
- keystone的详细功能
- 初识二维码 第二十讲 二维码解码程序的组件之一 摄像头拍照功能
- INS/GPS 制导的 SDB 炸弹投放域计算与分析
- 常用坐标系及SuperMap投影转换
热门文章
- mysql怎么备份和恢复_如何优雅的备份和恢复Mysql数据库?
- 2 image pil 转_pdf转图片,php语言和Java语言的两种方法
- 使用MailMessage.AlternateViews时遇到的小问题
- 硬链接与软链接的区别
- LeetCode每日一题: 缺失数字(No.268)
- (效果一)js实现上拉加载
- 某些书籍翻译的太屎了,误导人!
- iOS 日期格式的转换
- int 小数_[LeetCode] 166. 分数到小数
- 用计算机画关于科技的画,用计算机鉴识画作