接上文:
1.【Kafka分区分配策略(1)——RangeAssignor】
2.【Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor】


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-partitions-allocation-strategy-3-self-definition/


自定义分区分配策略

读者不仅可以任意选用Kafka所提供的3种分配策略,还可以自定义分配策略来实现更多可选的功能。自定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。PartitionAssignor接口的定义如下:

Subscription subscription(Set<String> topics);
String name();
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);
void onAssignment(Assignment assignment);
class Subscription {private final List<String> topics;private final ByteBuffer userData;
(省略若干方法……)
}
class Assignment {private final List<TopicPartition> partitions;private final ByteBuffer userData;
(省略若干方法……)
}

PartitionAssignor接口中定义了两个内部类:Subscription和Assignment。

Subscription类用来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者所订阅topic列表和用户自定义信息。PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意到此方法中只有一个参数topics,与Subscription类中的topics的相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、ip地址、host或者机架(rack)等等。

举例,在subscription()这个方法中提供机架信息,标识此消费者所部署的机架位置,在分区分配时可以根据分区的leader副本所在的机架位置来实施具体的分配,这样可以让消费者与所需拉取消息的broker节点处于同一机架。参考下图,消费者consumer1和broker1都部署在机架rack1上,消费者consumer2和broker2都部署在机架rack2上。如果分区的分配不是机架感知的,那么有可能与图(上部分)中的分配结果一样,consumer1消费broker2中的分区,而consumer2消费broker1中的分区;如果分区的分配是机架感知的,那么就会出现图(下部分)的分配结果,consumer1消费broker1中的分区,而consumer2消费broker2中的分区,这样相比于前一种情形而言,既可以减少消费延迟又可以减少跨机架带宽的占用。

再来说一下Assignment类,它是用来表示分配结果信息的,类中也有两个属性:partitions和userData,分别表示所分配到的分区集合和用户自定义的数据。可以通过PartitionAssignor接口中的onAssignment()方法是在每个消费者收到消费组leader分配结果时的回调函数,例如在StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备在下次消费组再平衡(rebalance)时可以提供分配参考依据。

接口中的name()方法用来提供分配策略的名称,对于Kafka提供的3种分配策略而言,RangeAssignor对应的protocol_name为“range”,RoundRobinAssignor对应的protocol_name为“roundrobin”,StickyAssignor对应的protocol_name为“sticky”,所以自定义的分配策略中要注意命名的时候不要与已存在的分配策略发生冲突。这个命名用来标识分配策略的名称,在后面所描述的加入消费组以及选举消费组leader的时候会有涉及。

真正的分区分配方案的实现是在assign()方法中,方法中的参数metadata表示集群的元数据信息,而subscriptions表示消费组内各个消费者成员的订阅信息,最终方法返回各个消费者的分配信息。

Kafka中还提供了一个抽象类org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化PartitionAssignor接口的实现,对assign()方法进行了实现,其中会将Subscription中的userData信息去掉后,在进行分配。Kafka提供的3种分配策略都是继承自这个抽象类。如果开发人员在自定义分区分配策略时需要使用userData信息来控制分区分配的结果,那么就不能直接继承AbstractPartitionAssignor这个抽象类,而需要直接实现PartitionAssignor接口。

下面笔者参考Kafka中的RangeAssignor策略来自定义一个随机的分配策略,这里笔者称之为RandomAssignor,具体代码实现如下:

package org.apache.kafka.clients.consumer;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;/*** Created by 朱小厮 on 2018/7/12. * 欢迎关注笔者的微信公众号:朱小厮的博客.*/
public class RandomAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "random";}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet()) {assignment.put(memberId, new ArrayList<>());}// 针对每一个topic进行分区分配for (Map.Entry<String, List<String>> topicEntry :
consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();int consumerSize = consumersForTopic.size();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null) {continue;}// 当前topic下的所有分区List<TopicPartition> partitions =
AbstractPartitionAssignor.partitions(topic,
numPartitionsForTopic);// 将每个分区随机分配给一个消费者for (TopicPartition partition : partitions) {int rand = new Random().nextInt(consumerSize);String randomConsumer = consumersForTopic.get(rand);assignment.get(randomConsumer).add(partition);}}return assignment;}// 获取每个topic所对应的消费者列表,即:[topic, List[consumer]]private Map<String, List<String>> consumersPerTopic(
Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry :
consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}
}

在使用时,消费者客户端需要添加相应的Properties参数,示例如下:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RandomAssignor.class.getName());

这里只是演示如何自定义实现一个分区分配策略,RandomAssignor的实现并不是特别的理想,并不见得会比Kafka自身所提供的RangeAssignor策略之类的要好。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-partitions-allocation-strategy-3-self-definition/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


Kafka分区分配策略(3)——自定义分区分配策略相关推荐

  1. kafka模拟生产-消费者以及自定义分区

    2019独角兽企业重金招聘Python工程师标准>>> 基本概念 kafka中的重要角色   broker:一台kafka服务器就是一个broker,一个集群可有多个broker,一 ...

  2. 21,spark sql 测试 : 1.4G 文件实战,测试耗时多少,先分区,再在分区内计算,用列内容分区( 这是一个很魔幻的问题 ),自定义分区

    一 ,常规问题 : 1 ,表关联,数据过滤 : sql select stock.area,goods.smallLei,goods.typeColorId, weekofyear(to_date(s ...

  3. Kafka分区分配策略(4)——分配的实施

    接上文: 1.[Kafka分区分配策略(1)--RangeAssignor] 2.[Kafka分区分配策略(2)--RoundRobinAssignor和StickyAssignor] 3.[Kafk ...

  4. kafka 重新分配节点_Kafka控制器-分区重分配

    分区重分配指的是将分区的副本重新分配到不同的代理节点上.如果ZK节点中分区的副本的新副本集合和当前分区副本集合相同,这个分区就不需要重新分配了. 分区重分配是通过监听ZK的 /admin/reassi ...

  5. kafka分区及副本在broker的分配

    kafka分区及副本在broker的分配 @(KAFKA)[kafka, 大数据] 部分内容参考自:http://blog.csdn.net/lizhitao/article/details/4177 ...

  6. kafka查看broker上主副本_kafka分区及副本在broker的分配

    kafka分区及副本在broker的分配 以下以一个Kafka集群中4个Broker举例,创建1个topic包括4个Partition,2 Replication:数据Producer流动如图所看到的 ...

  7. kafka源码分析-consumer的分区策略

    kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...

  8. 分区起始位置参数溢出_Kafka分区副本分配解析

    重新梳理了 assignReplicasToBrokersRackUnaware() 和 assignReplicasToBrokersRackAware() 两个方法的思路.如有错误或不当欢迎指出. ...

  9. kafka自定义分区实战

    本文来说下kafka自定义分区相关的知识与内容,同时说下springboot整合kafka如何来实现自定义分区 文章目录 Kafka如何实现分区 Kafka集群是如何知道投递到哪个broker中 默认 ...

最新文章

  1. labview实现简单的图片显示
  2. addslashes 及 其他 清除空格的方法是不安全的
  3. 网络:HTTP状态码
  4. 三、前端开发-CSS
  5. 面试中遇到过的闭包~
  6. Extjs checkbox 多删除
  7. art-template入门(一)之介绍
  8. mysql数据表数据丢失6_MYSQL数据表损坏的原因分析和修复方法小结
  9. magic_quotes_gpc合magic_quotes_runtime的区别!
  10. requestAnimationFrame 优化Web动画
  11. Mysql慢查询定位和优化实践分享
  12. clion连接mysql,使用 CLion 调试 mysql 源码
  13. drupal简体中文语言包安装方法
  14. 京瓷打印机m5521cdn_京瓷m5521cdn驱动
  15. 数据结构之图(九)——拓补排序
  16. adjoint-io bulletpoofs 性能测试结果
  17. 机器学习-推荐系统中基于深度学习的混合协同过滤模型
  18. udp接受_电脑网络基础知识:用户数据报协议(UDP)的学习
  19. Ubuntu20.04英文系统无法安装中文智能拼音输入法
  20. 认同和确定性矩阵(Ralph Stacey's Agreement and Certainty Matrix)-译

热门文章

  1. Spring WebApplicationContext
  2. 【报错笔记】使用MultipartFile 出现异常:java.lang.ClassNotFoundException: org.apache.commons.fileupload...
  3. 【报错笔记】做struts项目建立jsp文件老是报错
  4. React 和 Vue的特点
  5. 使用python实现人脸检测转载
  6. 从零开始撸一个Fresco之内存缓存
  7. iOS App 启动性能优化
  8. 关于web爬虫的tips
  9. 使用Struts 2框架实现文件下载
  10. structs2多文件上传