kafka自定义生产者分区器、自定义消费者分区器
目录
1 默认分区
1.1 键key的作用
1.2 键的分区
2 生产者自定义分区
2.1 使用场景分析
2.2 自定义分区器要实现Partitioner接口
2.3 生产者使用分区器
3 消费者自定义分区
3.1 默认的分区策略
3.2 自定义分区策略
3.3 消费者使用自定义策略
1 默认分区
1.1 键key的作用
- 决定消息在主题的哪个分区
- 作为消息的附加信息
1.2 键的分区
如果key=null,并且采用默认分区器,就会轮询均匀分布在各个分区
如果key不为null,使用默认分区,会计算散列值,所以同一个key每次都会落到同一个分区上;如果增加了分区,就无法保证落到同一个分区上了
2 生产者自定义分区
2.1 使用场景分析
比如电商服务,大城市的业务量明显比中小城市高,可以单独为大城市自定义分区处理
2.2 自定义分区器要实现Partitioner接口
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;/*** 自定义分区器** @author honry.guan* @date 2021-05-07 9:21*/
public class MyPartitioner implements Partitioner {/*** 自定义分区方法*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);//分区数量int num = partitionInfos.size();//根据value与分区数求余的方式得到分区IDreturn value.hashCode() % num;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
2.3 生产者使用分区器
package cn.enjoyedu.selfpartition;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** MyPartitionerProducer** @author honry.guan* @date 2021-05-07 9:51*/
public class MyPartitionerProducer {public static void main(String[] args) {Properties properties = new Properties();//配置连接ip和地址properties.put("bootstrap.servers", "127.0.0.1:9092");//kafka自带序列化器,可以不用谢全类路径StringSerializer.class也可以,这里作为演示properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);properties.put("partitioner.class", MyPartitioner.class);KafkaProducer<String, String> producer = new KafkaProducer<>(properties);try {//使用自定义分区器ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-hello", "name", "tom");Future<RecordMetadata> send = producer.send(producerRecord);//这里会阻塞,直到发送成功RecordMetadata recordMetadata = send.get();if (recordMetadata != null) {System.out.println("偏移量:" + recordMetadata.offset() + "-" + "分区:" + recordMetadata.partition());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} finally {//关闭连接producer.close();}}
}
3 消费者自定义分区
3.1 默认的分区策略
- Range 把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区) :a管分区1,2,b管分区3,4
- RoundRobin 把主题的分区循环分配给消费者:a管分区1,3,b管分区2,4
3.2 自定义分区策略
以下是复制的RoundRobinAssignor对象中的实现方法
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Utils;import java.util.*;/*** @author: honry.guan* @create: 2021-05-07 21:52**/
public class MyCustomerPartitioner extends AbstractPartitionAssignor {/**** @param partitionsPerTopic 所订阅的每个 topic 与其 partition 数的对应关系* @param subscriptions 每个 consumerId 与其所订阅的 topic 列表的关系。* @return*/@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<>());CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();while (!subscriptions.get(assigner.peek()).topics().contains(topic))assigner.next();assignment.get(assigner.next()).add(partition);}return assignment;}public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {SortedSet<String> topics = new TreeSet<>();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());List<TopicPartition> allPartitions = new ArrayList<>();for (String topic : topics) {Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic != null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}@Overridepublic String name() {return null;}
}
3.3 消费者使用自定义策略
public class HelloKafkaConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers","127.0.0.1:9092");properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);//使用自定义分区策略properties.put("partition.assignment.strategy", MyCustomerPartitioner.class);//群组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {//消费者订阅主题(可以多个)consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));while(true){//TODO 拉取(新版本)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));//do my work//打包任务投入线程池}}} finally {consumer.close();}}}
kafka自定义生产者分区器、自定义消费者分区器相关推荐
- Kafka生产者和消费者分区策略部分源码解析
之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消 ...
- 手撸kafka producer生产者的分区器(partition)API
简介:本篇博客是对kafka produce 生产者分区器的API(Java) 包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果. 目录标题 ...
- kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...
作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...
- kafka中生产者和消费者的分区问题
本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...
- kafka内置分区及自定义分区
内置分区策略 1.如果指定的partition,那么直接进入该partition 2.如果没有指定partition,但是指定了key,使用key的hash选择partition 3.如果既没有指定p ...
- flink keyby、shuffle、 rebalance、rescale、 broadcast、global、自定义分区算子以及各分区器源码
文章目录 前言 1. 随机分区 2. 轮询分区 3. 重缩放分区 4. 广播 5. 全局分区 6. 自定义分区 前言 flink中keyBy是一种按照键的哈希值来进行重新分区的操作,至于分区是否均 ...
- kafka消费者分区消费策略
前言 在上一篇,我们谈到了从生产者一端,kafka是基于何种策略,将消息推送到集群下topic的不同分区中,可以使用kafka自带的分区策略,也可以根据自身的业务定制消息推送的分区策略 而从消费者一端 ...
- 【转载】CodeWarrior IDE使用tips之prm链接文件详解(自定义存储器分区以及自定义RAM数据初始化与在RAM中运行函数)...
CodeWarrior IDE使用tips之prm链接文件详解(自定义存储器分区以及自定义RAM数据初始化与在RAM中运行函数) 2017-08-19 胡恩伟 汽车电子expert成长之路 内容提要 ...
- 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护
本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...
最新文章
- php面积计算html代码,计算PHP页面中的所有HTML标记
- Nature Methods:宏基因组物种组成分析工具MetaPhlAn2
- python open文件安全隐患_python的其他安全隐患
- gym 101858
- 前端小知识点(1):undefined和null区别
- node、npm、vue安装 -- VUE 项目 demo 实例
- Moment.js常见用法总结 1
- Linux学习笔记十三——文件压缩、解压缩和归档
- 翻译|How to Export a Connected Component
- stl之map 排序
- 【中秋福利】Linux系统从入门到精通推荐的书籍——中秋限时送书活动
- 视频怎么加水印上去,视频加水印怎么加?
- CPU GPU 扫盲帖
- yolov3原理+训练损失
- 影驰gtx960显卡怎么样_影驰gtx960 2g_影驰gtx960 2g跑分
- ExtJS中Grid分页
- 最新法定假日修改及2008年法定假日安排
- AJAX技术(第一篇博客)
- 基于Babel对JS代码进行混淆与还原操作
- Servlet生命周期与Web容器架构及处理请求详解
热门文章
- jQuery判断email地址 邮箱地址 email regex
- 51自学网java壁虎_我要自学网JAVA基础4-26日历补充壁虎老师的完整代码
- 【游戏建模全流程】ZBrush生物模型雕刻教程:豹纹壁虎
- 求解三维空间中两向量之间的夹角
- Java多线程(超详解)
- 《Python 源码剖析》一些理解以及勘误笔记(3)
- 用户注册后是如何进行激活的,为什么需要激活
- 一个典型的微型计算机绘图系统,机械制图考试理论知识练习题
- Html5浪漫结婚请柬婚礼网站模板❤_爱她就给她最美的H5婚礼请柬_(婚庆电子邀请函)含背景音乐...
- go语言-空结构体/ chan struct{}