1、zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper

2、kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN博客

3、kafka生成消息:kafka-producer生产者案例_燕少༒江湖的博客-CSDN博客_kafkaproducer单例

4、kafka多线程消费:offset从zookeeper中得到,一个线程对应一个partition,这样消费速度很快,而且消息的顺序可控,线程数量和partition一样,多了浪费资源,少了效率很低,也可以不通过zookeeper来消费,kafka0.9以后的版本就可以将offset记录到对应消费group到对应的broker上。

5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cn.dl</groupId><artifactId>kafka-consumer1</artifactId><version>1.0</version><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin></plugins></build><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.8.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.43</version></dependency></dependencies></project>

6、KafkaConsumterMain

package com.dl.cn;import java.io.IOException;
import java.util.Properties;/*** Created by tiger on 2018/8/20.*/
public class KafkaConsumterMain {public static void main(String[] args) throws IOException {String topic = "user-info";int threadNum = 2;Properties properties = ReadPropertiesUtils.readConfig("config.properties");KafkaConsumterServer kafkaConsumterDemo = new KafkaConsumterServer(topic,threadNum,properties);kafkaConsumterDemo.consumer();}
}

7、KafkaConsumterServer

package com.dl.cn;import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** Created by tiger on 2018/8/20.*/
public class KafkaConsumterServer {private String topic;private Properties properties;private int threadNum;public KafkaConsumterServer(String topic,int threadNum,Properties properties) {this.topic = topic;this.threadNum = threadNum;this.properties = properties;}/*** 创建固定线程池消费消息* 线程和partition一对一* */public void consumer() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(threadNum));ConsumerConfig consumerConfig = new ConsumerConfig(properties);ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);//创建固定数量的线程池ExecutorService executor = Executors.newFixedThreadPool(threadNum);for (KafkaStream stream : streams) {executor.submit(new KafkaConsumerThread(stream));}}
}

8、KafkaConsumerThread

package com.dl.cn;import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;/*** Created by tiger on 2018/8/20.*/
public class KafkaConsumerThread implements Runnable{private KafkaStream<byte[], byte[]> stream;public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {this.stream = stream;}@Overridepublic void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> mam = it.next();System.out.println(Thread.currentThread().getName() + ">>>partition[" + mam.partition() + "],"+ "offset[" + mam.offset() + "], " + new String(mam.message()));}}
}

9、ReadPropertiesUtils

package com.dl.cn;import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;/*** Created by tiger on 2018/8/18.*/
public class ReadPropertiesUtils {/*** 读取properties配置文件* @param configFileName* @exception* @return* */public static Properties readConfig(String configFileName) throws IOException {Map<String,String> config = new HashMap<String, String>();InputStream in = ReadPropertiesUtils.class.getClassLoader().getResourceAsStream(configFileName);Properties properties = new Properties();properties.load(in);return properties;}
}

10、config.properties

11、测试结果:

线程1对应partition0,线程2对应partition1,两者互不干扰

12、

auto.offset.reset=smallest,意思是从topic最早数据开始消费
auto.offset.reset=largest,是从topic最新数据开始消费

在zk中可以看到消费组

比如在代码中用到tiger7777这个消费者组

在代码中看到线程2最后消费的消息offset=1755

线程1最后消费的消息offset=2243

zookeeper中记录的offset值

生产者不断生产数据,消费者不断消费数据

将tiger7777,中partition对应的offset的值更新为200,然后重新启动

消费者,发现消息从offset=200开始重新消费,而且发现只有一个线程在继续消费

kafka多线程消费相关推荐

  1. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  2. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  3. java kafka 多线程消费

    我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...

  4. kafka 多线程消费

    一. 1.Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费).即消费并行度和分区数一致. ...

  5. 几种kafka多线程消费方式

    kafka API   https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/Kafka ...

  6. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  7. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  8. 一看就会的kafka多线程顺序消费【内附Demo哦】

    Hello,这里是爱 Coding,爱 Hiphop,爱喝点小酒的 AKA 柏炎. Kafka是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于 ...

  9. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

最新文章

  1. QEMU — VirtIO 虚拟化
  2. [TJOI2018]xor BZOJ5338 可持久trie
  3. innodb_locks_unsafe_for_binlog分析
  4. 认识JSON补丁:JSON-P 1.1概述系列
  5. c语言EOF0x99,C语言选择题99道.doc
  6. 台式计算机和笔记本计算机区别,【单选题】计算机的分类方法有多种,按照计算机的性能和用途分,台式计算机和笔记本计算机属于_________。...
  7. 下载Office安装包,提示找不到OfficeLR.cab文件。
  8. 复杂背景下计算机视觉模型害虫识别的比较研究(像素语义分割网络SegNet)
  9. cisco 的网络地址转换技术(NAT)
  10. Haroopad--最好用的markdown编辑器
  11. 万能手机解锁工具v1.0绿色加强版
  12. 华为跨域bgp_跨域组播---BGP+MSDP
  13. 引导工业物联网变革 中国占据有利位置
  14. Python多线程实现 as_completed先返回的任务先处理 在 阿里云 函数式计算 优化的应用
  15. java 代码加壳,专家和您一同谈谈java加壳的问题[Java编程]
  16. pytorch-->optimizer.zero_grad()、loss.backward()、optimizer.step()和scheduler.step()
  17. SLUB和SLAB的区别
  18. 什么是优秀的用户体验:解读40个优秀界面设计
  19. html5 canvas模拟的爆炸效果
  20. Jetson 加装 USB 声卡后设置为默认声卡

热门文章

  1. iTween.MoveTo用法
  2. Zookeeper客户端Curator详解
  3. 【python入门小知识】实现人名全大写,全小写,首字母大写
  4. Opencv批量处理图片的两种方法
  5. SQL注入之布尔型盲注
  6. 可调背光板(二)__DW01FA锂电池保护IC电路
  7. LIEF:修改安卓.so后报 dlopen failed:has invalid shdr offset/size
  8. MAC使用SSH连接IPhone
  9. 1000句最常用英语口语 (1~500)
  10. python3.5 爬取bing搜索结果页面标题、链接