kafka多线程消费
1、zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper
2、kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN博客
3、kafka生成消息:kafka-producer生产者案例_燕少༒江湖的博客-CSDN博客_kafkaproducer单例
4、kafka多线程消费:offset从zookeeper中得到,一个线程对应一个partition,这样消费速度很快,而且消息的顺序可控,线程数量和partition一样,多了浪费资源,少了效率很低,也可以不通过zookeeper来消费,kafka0.9以后的版本就可以将offset记录到对应消费group到对应的broker上。
5、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cn.dl</groupId><artifactId>kafka-consumer1</artifactId><version>1.0</version><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin></plugins></build><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.8.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.43</version></dependency></dependencies></project>
6、KafkaConsumterMain
package com.dl.cn;import java.io.IOException;
import java.util.Properties;/*** Created by tiger on 2018/8/20.*/
public class KafkaConsumterMain {public static void main(String[] args) throws IOException {String topic = "user-info";int threadNum = 2;Properties properties = ReadPropertiesUtils.readConfig("config.properties");KafkaConsumterServer kafkaConsumterDemo = new KafkaConsumterServer(topic,threadNum,properties);kafkaConsumterDemo.consumer();}
}
7、KafkaConsumterServer
package com.dl.cn;import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** Created by tiger on 2018/8/20.*/
public class KafkaConsumterServer {private String topic;private Properties properties;private int threadNum;public KafkaConsumterServer(String topic,int threadNum,Properties properties) {this.topic = topic;this.threadNum = threadNum;this.properties = properties;}/*** 创建固定线程池消费消息* 线程和partition一对一* */public void consumer() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(threadNum));ConsumerConfig consumerConfig = new ConsumerConfig(properties);ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);//创建固定数量的线程池ExecutorService executor = Executors.newFixedThreadPool(threadNum);for (KafkaStream stream : streams) {executor.submit(new KafkaConsumerThread(stream));}}
}
8、KafkaConsumerThread
package com.dl.cn;import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;/*** Created by tiger on 2018/8/20.*/
public class KafkaConsumerThread implements Runnable{private KafkaStream<byte[], byte[]> stream;public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {this.stream = stream;}@Overridepublic void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> mam = it.next();System.out.println(Thread.currentThread().getName() + ">>>partition[" + mam.partition() + "],"+ "offset[" + mam.offset() + "], " + new String(mam.message()));}}
}
9、ReadPropertiesUtils
package com.dl.cn;import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;/*** Created by tiger on 2018/8/18.*/
public class ReadPropertiesUtils {/*** 读取properties配置文件* @param configFileName* @exception* @return* */public static Properties readConfig(String configFileName) throws IOException {Map<String,String> config = new HashMap<String, String>();InputStream in = ReadPropertiesUtils.class.getClassLoader().getResourceAsStream(configFileName);Properties properties = new Properties();properties.load(in);return properties;}
}
10、config.properties
11、测试结果:
线程1对应partition0,线程2对应partition1,两者互不干扰
12、
auto.offset.reset=smallest,意思是从topic最早数据开始消费
auto.offset.reset=largest,是从topic最新数据开始消费
在zk中可以看到消费组
比如在代码中用到tiger7777这个消费者组
在代码中看到线程2最后消费的消息offset=1755
线程1最后消费的消息offset=2243
zookeeper中记录的offset值
生产者不断生产数据,消费者不断消费数据
将tiger7777,中partition对应的offset的值更新为200,然后重新启动
消费者,发现消息从offset=200开始重新消费,而且发现只有一个线程在继续消费
kafka多线程消费相关推荐
- 正确处理kafka多线程消费的姿势
最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...
- 【Kafka笔记】5.Kafka 多线程消费消息
Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...
- java kafka 多线程消费
我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...
- kafka 多线程消费
一. 1.Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费).即消费并行度和分区数一致. ...
- 几种kafka多线程消费方式
kafka API https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/Kafka ...
- Kafka Consumer多线程消费
概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...
- 【kafka】浅谈Kafka的多线程消费的设计
1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...
- 一看就会的kafka多线程顺序消费【内附Demo哦】
Hello,这里是爱 Coding,爱 Hiphop,爱喝点小酒的 AKA 柏炎. Kafka是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于 ...
- kafka Java客户端之 consumer API 多线程消费消息
kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...
最新文章
- QEMU — VirtIO 虚拟化
- [TJOI2018]xor BZOJ5338 可持久trie
- innodb_locks_unsafe_for_binlog分析
- 认识JSON补丁:JSON-P 1.1概述系列
- c语言EOF0x99,C语言选择题99道.doc
- 台式计算机和笔记本计算机区别,【单选题】计算机的分类方法有多种,按照计算机的性能和用途分,台式计算机和笔记本计算机属于_________。...
- 下载Office安装包,提示找不到OfficeLR.cab文件。
- 复杂背景下计算机视觉模型害虫识别的比较研究(像素语义分割网络SegNet)
- cisco 的网络地址转换技术(NAT)
- Haroopad--最好用的markdown编辑器
- 万能手机解锁工具v1.0绿色加强版
- 华为跨域bgp_跨域组播---BGP+MSDP
- 引导工业物联网变革 中国占据有利位置
- Python多线程实现 as_completed先返回的任务先处理 在 阿里云 函数式计算 优化的应用
- java 代码加壳,专家和您一同谈谈java加壳的问题[Java编程]
- pytorch-->optimizer.zero_grad()、loss.backward()、optimizer.step()和scheduler.step()
- SLUB和SLAB的区别
- 什么是优秀的用户体验:解读40个优秀界面设计
- html5 canvas模拟的爆炸效果
- Jetson 加装 USB 声卡后设置为默认声卡