一、

1、Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费)。即消费并行度和分区数一致。

2、(1)如果指定了某个分区,会只讲消息发到这个分区上

(2)如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用

(3)如果没有指定分区和key,那么将会随机发送到topic的分区中

(4)如果指定了key,那么将会以hash<key>的方式发送到分区中

二、多线程消费实例

paritition 为3,broker为3,节点为3

1、生产者随机分区提交数据

这也是一个比较关键步骤,只有随机提交到不同的分区才能实现多分区消费; 
自定义随机分区:

public class MyPartition implements Partitioner{private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); @Overridepublic void configure(Map<String, ?> arg0) {// TODO Auto-generated method stub}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();int partitionNum = 0;try {partitionNum = Integer.parseInt((String) key);} catch (Exception e) {partitionNum = key.hashCode() ;}
//        System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value);return Math.abs(partitionNum  % numPartitions);}
}  

然后在初始化kafka生产者配置的时候修改如下配置

props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));

这样就实现了kafka生产者随机分区提交数据。

2、消费者

最后一步就是消费者,修改单线程模式为多线程,这里的多线程实现方式有很多,本例知识用了最简单的固定线程模式:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);for (int i = 0; i < 3; i++) {fixedThreadPool.execute(new Runnable() {@Overridepublic void run() {kafkaConsumerService.getInstance();}});}

在消费方面得注意,这里得遍历所有分区,否则还是只消费了一个区:

ConsumerRecords<String, String> records = consumer.poll(1000);for (TopicPartition partition : records.partitions()) {  List<ConsumerRecord<String, String>> partitionRecords = records  .records(partition); for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println("message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset()+ " 分区:" + record.partition());if (record.value() == null || record.key() == null) {consumer.commitSync();} else {// dealMessageKafkaServer.dealMessage(record.key(),record.value(),consumer);
//              consumer.commitSync();}}}

  注意上面的线程为啥只有3个,这里得跟上面kafka的分区个数相对应起来,否则如果线程超过分区数量,那么只会浪费线程,因为即使使用3个以上的线程也只会消费三个分区,而少了则无法消费完全。所以这里必须更上面的对应起来。

转载于:https://www.cnblogs.com/liuwei6/p/6905016.html

kafka 多线程消费相关推荐

  1. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  2. kafka多线程消费

    1.zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper 2.kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN ...

  3. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  4. java kafka 多线程消费

    我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...

  5. 几种kafka多线程消费方式

    kafka API   https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/Kafka ...

  6. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  7. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  8. 一看就会的kafka多线程顺序消费【内附Demo哦】

    Hello,这里是爱 Coding,爱 Hiphop,爱喝点小酒的 AKA 柏炎. Kafka是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于 ...

  9. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

最新文章

  1. 用php循环星期一到星期日,php函数获取日期范围内的所有星期一
  2. 领域驱动设计实践(一)(转)
  3. 做创业者的老婆,一定要注意股权分配的3个坑
  4. 如何安装指定版本的 SAP Spartacus
  5. css修改select选择框option被选中的背景颜色_5个容易忽视的 CSS 属性
  6. 再添一所!华中科技大学成立人工智能与自动化学院
  7. SCPPO(二十):系统统一身份认证的改造之路
  8. 异常mongodb:Invalid BSON field name XXXXXX:YYYYY.zz
  9. Helm 3 完整教程(十六):Helm 函数讲解(10)版本语义化函数、URL函数、UUID函数
  10. vue实现php传数据,vue+props传递数据怎样实现
  11. 抽象代数基本概念(一):代数系
  12. 软件测试笔记本硬件,专业工作站软件测试_惠普笔记本电脑_笔记本评测-中关村在线...
  13. 3.Python标准库—math库的使用
  14. 2019年世界500强完整榜单,出炉!
  15. 办公小技巧,批量修改文件名,手把手教你
  16. 超市微信小程序怎么做_微信小程序便利店怎么开?便利店和百货超市怎么开发小程序?...
  17. 模型评估与改进(三)// 评估指标
  18. Hibernate查询Query By Criterial
  19. arcgis,裁剪投影不一致的矢量和栅格
  20. 固本培元之三:Convert、运算符、流程控制语句、ref/out/in三种参数类型

热门文章

  1. 总结django form
  2. 2018北大计算机复试线,2018年北京大学考研复试分数线已公布
  3. linux分区par,linux基础篇(磁盘分区)
  4. linux网络子系统研究:数据收发简略流程图
  5. 最优化读书笔记R(二)
  6. java构造方法 隐含三步_Java入门总结--------类和对象关系以及构造方法
  7. foundation-datepicker只能选年份_你喝的年份酒和原浆酒都怎么来的?
  8. 【kafka】浅谈Kafka的多线程消费的设计
  9. 【clickhouse】clickhouse UTC 时间带有时区 如何写入
  10. 【elasticsearch】 elasticsearch 写一致性