Kafka源码研究--Comsumer获取partition下标
背景
由于项目上Flink在设置parallel多于1的情况下,job没法正确地获取watermark,所以周末来研究一下一部分,大概已经锁定了原因:
虽然我们的topic只设置了1的partition,但是Kafka的Comsumer还是起了好几个subtask去读索引是2、3的partition,然后这几个subtask的watermark一直不更新,导致我们job整体的watermark一直是Long.MIN_VALUE。现在需要去了解一下subtask获取partition的流程,等上班的时候debug一遍应该就可以知道原因。
翻源码的过程
通过log找到分配partition的大概位置
从图中可以看到,在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
这个类中可以找到一些关键信息。
跟踪源码
往上翻翻,看有没有有用信息
关键源码,附上注释
public void open(Configuration configuration) throws Exception {// determine the offset commit modethis.offsetCommitMode = OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());// create the partition discovererthis.partitionDiscoverer = createPartitionDiscoverer(topicsDescriptor,getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks());this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets = new HashMap<>();// 重点函数,这个函数或获取到subtask的所有partition。final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();if (restoredState != null) {...} else {// use the partition discoverer to fetch the initial seed partitions,// and set their initial offsets depending on the startup mode.// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined// when the partition is actually read.switch (startupMode) {...default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}}if (!subscribedPartitionsToStartOffsets.isEmpty()) {switch (startupMode) {...case GROUP_OFFSETS:LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets.size(),subscribedPartitionsToStartOffsets.keySet());}} else {LOG.info("Consumer subtask {} initially has no partitions to read from.",getRuntimeContext().getIndexOfThisSubtask());}}public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {if (!closed && !wakeup) {try {List<KafkaTopicPartition> newDiscoveredPartitions;// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic patternif (topicsDescriptor.isFixedTopics()) {// 对于没有使用通配符的topic,直接获取topic的所有partitionnewDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());} else {// 对于使用了通配符的topic, 先找到所有topic,再一一matchList<String> matchedTopics = getAllTopics();// retain topics that match the patternIterator<String> iter = matchedTopics.iterator();while (iter.hasNext()) {if (!topicsDescriptor.isMatchingTopic(iter.next())) {iter.remove();}}if (matchedTopics.size() != 0) {// get partitions only for matched topicsnewDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);} else {newDiscoveredPartitions = null;}}// (2) eliminate partition that are old partitions or should not be subscribed by this subtaskif (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);} else {Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();KafkaTopicPartition nextPartition;while (iter.hasNext()) {nextPartition = iter.next();// 只保留符合要求的partition,这就是我们要找的函数if (!setAndCheckDiscoveredPartition(nextPartition)) {iter.remove();}}}return newDiscoveredPartitions;}...}...}public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {if (isUndiscoveredPartition(partition)) {discoveredPartitions.add(partition);// 在这return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;}return false;}public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {// 先算出此topic的hash(partition.getTopic().hashCode() * 31),这里不知道为什么不直接用hash,还要再*31,然后取正数(& 0x7FFFFFFF),最后获取到此topic的起始位置。int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;// here, the assumption is that the id of Kafka partitions are always ascending// starting from 0, and therefore can be used directly as the offset clockwise from the start index// 计算当前的partition应该属于哪个subtask。例如:一共有20个subtask,算出来的起始位置是5,partition是5,那么最后就是// (5 + 5) % 20 = 10, 这个partition应该分给10号subtask。return (startIndex + partition.getPartition()) % numParallelSubtasks;}
思考
某topic的每个partition会分给哪个subtask其实是确定的
topic名字是确定的 -> topic的hashCode是确定的 && subtask的数量是确定的 -> startIndex是确定的 -> 某partition会分给哪个subtask其实是确定的
为什么要算startIndex
大概是为了平均分配不同的topic,如果topic很多,每个topic都只从0开始,那么subtask 0,1,2之类的靠前subtask就需要读大量的partition。
Kafka源码研究--Comsumer获取partition下标相关推荐
- org.reflections 接口通过反射获取实现类源码研究
org.reflections 接口通过反射获取实现类源码研究 版本 org.reflections reflections 0.9.12 Reflections通过扫描classpath,索引元数据 ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇
大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...
- 跟我学Kafka源码Producer分析
2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...
- kafka源码分析之一server启动分析
0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...
- 【kafka】Kafka 源码解析:Group 协调管理机制
1.概述 转载:Kafka 源码解析:Group 协调管理机制 在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 ...
- Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)
文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...
- 聊聊 Kafka:编译 Kafka 源码并搭建源码环境
一.前言 老周这里编译 Kafka 的版本是 2.7,为啥采用这个版本来搭建源码的阅读环境呢?因为该版本相对来说比较新.而我为啥不用 2.7 后的版本呢?比如 2.8,这是因为去掉了 ZooKeepe ...
- kafka源码分析-consumer的分区策略
kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...
最新文章
- 2020-10-29Ubuntu20.04将软件添加至桌面
- Js控制样式的诸多方法
- MediaSession框架介绍
- 《研磨设计模式》chap20 享元模式 Flyweight (3)重写应用场景
- Spring boot错误处理原理
- 人人可以理解的区块链100问
- 怎么判断前轮左右的位置_新手开车技巧,确定前轮位置,准确判断与障碍物距离...
- bci测试如何整改_基于fNIRS技术的脑机接口(BCI)
- PostgreSQL高可用性、负载均衡、复制与集群方案介绍
- C++--第26课 - 异常处理 - 下
- mac废纸篓清空的心得、mac设置不睡眠不待机不锁屏、如何快速锁屏待机睡眠、mac重启、mac学习的必备软件-城...
- VS2019 配色_OPPO Enco M31颜值太顶了,斩获 A'设计大奖赛金奖,引领时尚潮流|oppo|大奖赛|无线耳机|配色|时尚|卡特...
- python编程招生海报_怎么用ps做招生海报
- 奥克兰大学计算机专业世界排名,奥克兰大学,15学科排名世界前50!
- python12306抢票_GitHub - versionzhang/python_12306: python 12306 抢票工具
- html新浪短域名api,新浪短网址API接口
- 27岁,30岁,37岁...... 你是否已经把世界拱手让人?
- java工程license机制_使用truelicense实现用于JAVA工程license机制(包括license生成和验证)...
- SPSS Modeler KNN分类器(第十七章)
- 基于VMware 的 hive安装与启动
热门文章
- kaggle 注册无法激活的问题解决
- The word is not correctly spelled
- 利用python+zabbix查询服务器利用率
- leetcode——第322题——零钱兑换
- 《UE4蓝图完全学习》笔记
- android 自动更换壁纸,安卓壁纸如何设置自动更换壁纸-手机天堂
- 魔兽最多人的服务器,魔兽世界9.0人口最多的服务器_魔兽世界
- ! Some Android licenses not accepted. To resolve this, run: flutter doctor --android-licenses
- 实验三+161+张丽霞
- 今天凌晨十二点和明天凌晨十二点的时间戳