目录

1 默认分区

1.1 键key的作用

1.2 键的分区

2 生产者自定义分区

2.1 使用场景分析

2.2 自定义分区器要实现Partitioner接口

2.3 生产者使用分区器

3 消费者自定义分区

3.1 默认的分区策略

3.2 自定义分区策略

3.3 消费者使用自定义策略


1 默认分区

1.1 键key的作用

  1. 决定消息在主题的哪个分区
  2. 作为消息的附加信息

1.2 键的分区

如果key=null,并且采用默认分区器,就会轮询均匀分布在各个分区

如果key不为null,使用默认分区,会计算散列值,所以同一个key每次都会落到同一个分区上;如果增加了分区,就无法保证落到同一个分区上了

2 生产者自定义分区

2.1 使用场景分析

比如电商服务,大城市的业务量明显比中小城市高,可以单独为大城市自定义分区处理

2.2 自定义分区器要实现Partitioner接口

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;/*** 自定义分区器** @author honry.guan* @date 2021-05-07 9:21*/
public class MyPartitioner implements Partitioner {/*** 自定义分区方法*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);//分区数量int num = partitionInfos.size();//根据value与分区数求余的方式得到分区IDreturn value.hashCode() % num;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

2.3 生产者使用分区器

package cn.enjoyedu.selfpartition;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** MyPartitionerProducer** @author honry.guan* @date 2021-05-07 9:51*/
public class MyPartitionerProducer {public static void main(String[] args) {Properties properties = new Properties();//配置连接ip和地址properties.put("bootstrap.servers", "127.0.0.1:9092");//kafka自带序列化器,可以不用谢全类路径StringSerializer.class也可以,这里作为演示properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);properties.put("partitioner.class", MyPartitioner.class);KafkaProducer<String, String> producer = new KafkaProducer<>(properties);try {//使用自定义分区器ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-hello", "name", "tom");Future<RecordMetadata> send = producer.send(producerRecord);//这里会阻塞,直到发送成功RecordMetadata recordMetadata = send.get();if (recordMetadata != null) {System.out.println("偏移量:" + recordMetadata.offset() + "-" + "分区:" + recordMetadata.partition());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} finally {//关闭连接producer.close();}}
}

3 消费者自定义分区

3.1 默认的分区策略

partition.assignment.strategy 分区分配给消费者的策略。系统提供两种策略。默认为 Range。允许自定义策略。
分区有1,2,3,4,消费者有a,b
  1. Range 把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区) :a管分区1,2,b管分区3,4
  2. RoundRobin 把主题的分区循环分配给消费者:a管分区1,3,b管分区2,4

3.2 自定义分区策略

以下是复制的RoundRobinAssignor对象中的实现方法

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Utils;import java.util.*;/*** @author: honry.guan* @create: 2021-05-07 21:52**/
public class MyCustomerPartitioner extends AbstractPartitionAssignor {/**** @param partitionsPerTopic 所订阅的每个 topic 与其 partition 数的对应关系* @param subscriptions 每个 consumerId 与其所订阅的 topic 列表的关系。* @return*/@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<>());CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();while (!subscriptions.get(assigner.peek()).topics().contains(topic))assigner.next();assignment.get(assigner.next()).add(partition);}return assignment;}public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {SortedSet<String> topics = new TreeSet<>();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());List<TopicPartition> allPartitions = new ArrayList<>();for (String topic : topics) {Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic != null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}@Overridepublic String name() {return null;}
}

3.3 消费者使用自定义策略

public class HelloKafkaConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers","127.0.0.1:9092");properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);//使用自定义分区策略properties.put("partition.assignment.strategy", MyCustomerPartitioner.class);//群组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {//消费者订阅主题(可以多个)consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));while(true){//TODO 拉取(新版本)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));//do my work//打包任务投入线程池}}} finally {consumer.close();}}}

kafka自定义生产者分区器、自定义消费者分区器相关推荐

  1. Kafka生产者和消费者分区策略部分源码解析

    之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消 ...

  2. 手撸kafka producer生产者的分区器(partition)API

    简介:本篇博客是对kafka produce 生产者分区器的API(Java) 包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果. 目录标题 ...

  3. kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...

    作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...

  4. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  5. kafka内置分区及自定义分区

    内置分区策略 1.如果指定的partition,那么直接进入该partition 2.如果没有指定partition,但是指定了key,使用key的hash选择partition 3.如果既没有指定p ...

  6. flink keyby、shuffle、 rebalance、rescale、 broadcast、global、自定义分区算子以及各分区器源码

    文章目录 前言 1. 随机分区 2. 轮询分区 3. 重缩放分区 4. 广播 5. 全局分区 6. 自定义分区 前言   flink中keyBy是一种按照键的哈希值来进行重新分区的操作,至于分区是否均 ...

  7. kafka消费者分区消费策略

    前言 在上一篇,我们谈到了从生产者一端,kafka是基于何种策略,将消息推送到集群下topic的不同分区中,可以使用kafka自带的分区策略,也可以根据自身的业务定制消息推送的分区策略 而从消费者一端 ...

  8. 【转载】CodeWarrior IDE使用tips之prm链接文件详解(自定义存储器分区以及自定义RAM数据初始化与在RAM中运行函数)...

    CodeWarrior IDE使用tips之prm链接文件详解(自定义存储器分区以及自定义RAM数据初始化与在RAM中运行函数) 2017-08-19 胡恩伟 汽车电子expert成长之路 内容提要 ...

  9. 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护

    本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...

最新文章

  1. php面积计算html代码,计算PHP页面中的所有HTML标记
  2. Nature Methods:宏基因组物种组成分析工具MetaPhlAn2
  3. python open文件安全隐患_python的其他安全隐患
  4. gym 101858
  5. 前端小知识点(1):undefined和null区别
  6. node、npm、vue安装 -- VUE 项目 demo 实例
  7. Moment.js常见用法总结 1
  8. Linux学习笔记十三——文件压缩、解压缩和归档
  9. 翻译|How to Export a Connected Component
  10. stl之map 排序
  11. 【中秋福利】Linux系统从入门到精通推荐的书籍——中秋限时送书活动
  12. 视频怎么加水印上去,视频加水印怎么加?
  13. CPU GPU 扫盲帖
  14. yolov3原理+训练损失
  15. 影驰gtx960显卡怎么样_影驰gtx960 2g_影驰gtx960 2g跑分
  16. ExtJS中Grid分页
  17. 最新法定假日修改及2008年法定假日安排
  18. AJAX技术(第一篇博客)
  19. 基于Babel对JS代码进行混淆与还原操作
  20. Servlet生命周期与Web容器架构及处理请求详解

热门文章

  1. jQuery判断email地址 邮箱地址 email regex
  2. 51自学网java壁虎_我要自学网JAVA基础4-26日历补充壁虎老师的完整代码
  3. 【游戏建模全流程】ZBrush生物模型雕刻教程:豹纹壁虎
  4. 求解三维空间中两向量之间的夹角
  5. Java多线程(超详解)
  6. 《Python 源码剖析》一些理解以及勘误笔记(3)
  7. 用户注册后是如何进行激活的,为什么需要激活
  8. 一个典型的微型计算机绘图系统,机械制图考试理论知识练习题
  9. Html5浪漫结婚请柬婚礼网站模板❤_爱她就给她最美的H5婚礼请柬_(婚庆电子邀请函)含背景音乐...
  10. go语言-空结构体/ chan struct{}