背景

由于项目上Flink在设置parallel多于1的情况下,job没法正确地获取watermark,所以周末来研究一下一部分,大概已经锁定了原因:
虽然我们的topic只设置了1的partition,但是Kafka的Comsumer还是起了好几个subtask去读索引是2、3的partition,然后这几个subtask的watermark一直不更新,导致我们job整体的watermark一直是Long.MIN_VALUE。现在需要去了解一下subtask获取partition的流程,等上班的时候debug一遍应该就可以知道原因。

翻源码的过程

通过log找到分配partition的大概位置

从图中可以看到,在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase这个类中可以找到一些关键信息。

跟踪源码

往上翻翻,看有没有有用信息

关键源码,附上注释

 public void open(Configuration configuration) throws Exception {// determine the offset commit modethis.offsetCommitMode = OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());// create the partition discovererthis.partitionDiscoverer = createPartitionDiscoverer(topicsDescriptor,getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks());this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets = new HashMap<>();// 重点函数,这个函数或获取到subtask的所有partition。final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();if (restoredState != null) {...} else {// use the partition discoverer to fetch the initial seed partitions,// and set their initial offsets depending on the startup mode.// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined// when the partition is actually read.switch (startupMode) {...default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}}if (!subscribedPartitionsToStartOffsets.isEmpty()) {switch (startupMode) {...case GROUP_OFFSETS:LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets.size(),subscribedPartitionsToStartOffsets.keySet());}} else {LOG.info("Consumer subtask {} initially has no partitions to read from.",getRuntimeContext().getIndexOfThisSubtask());}}public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {if (!closed && !wakeup) {try {List<KafkaTopicPartition> newDiscoveredPartitions;// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic patternif (topicsDescriptor.isFixedTopics()) {// 对于没有使用通配符的topic,直接获取topic的所有partitionnewDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());} else {// 对于使用了通配符的topic, 先找到所有topic,再一一matchList<String> matchedTopics = getAllTopics();// retain topics that match the patternIterator<String> iter = matchedTopics.iterator();while (iter.hasNext()) {if (!topicsDescriptor.isMatchingTopic(iter.next())) {iter.remove();}}if (matchedTopics.size() != 0) {// get partitions only for matched topicsnewDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);} else {newDiscoveredPartitions = null;}}// (2) eliminate partition that are old partitions or should not be subscribed by this subtaskif (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);} else {Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();KafkaTopicPartition nextPartition;while (iter.hasNext()) {nextPartition = iter.next();// 只保留符合要求的partition,这就是我们要找的函数if (!setAndCheckDiscoveredPartition(nextPartition)) {iter.remove();}}}return newDiscoveredPartitions;}...}...}public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {if (isUndiscoveredPartition(partition)) {discoveredPartitions.add(partition);// 在这return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;}return false;}public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {// 先算出此topic的hash(partition.getTopic().hashCode() * 31),这里不知道为什么不直接用hash,还要再*31,然后取正数(& 0x7FFFFFFF),最后获取到此topic的起始位置。int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;// here, the assumption is that the id of Kafka partitions are always ascending// starting from 0, and therefore can be used directly as the offset clockwise from the start index// 计算当前的partition应该属于哪个subtask。例如:一共有20个subtask,算出来的起始位置是5,partition是5,那么最后就是// (5 + 5) % 20 = 10, 这个partition应该分给10号subtask。return (startIndex + partition.getPartition()) % numParallelSubtasks;}

思考

某topic的每个partition会分给哪个subtask其实是确定的

topic名字是确定的 -> topic的hashCode是确定的 && subtask的数量是确定的 -> startIndex是确定的 -> 某partition会分给哪个subtask其实是确定的

为什么要算startIndex

大概是为了平均分配不同的topic,如果topic很多,每个topic都只从0开始,那么subtask 0,1,2之类的靠前subtask就需要读大量的partition。

Kafka源码研究--Comsumer获取partition下标相关推荐

  1. org.reflections 接口通过反射获取实现类源码研究

    org.reflections 接口通过反射获取实现类源码研究 版本 org.reflections reflections 0.9.12 Reflections通过扫描classpath,索引元数据 ...

  2. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  3. 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇

    大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...

  4. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  5. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  6. 【kafka】Kafka 源码解析:Group 协调管理机制

    1.概述 转载:Kafka 源码解析:Group 协调管理机制 在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 ...

  7. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  8. 聊聊 Kafka:编译 Kafka 源码并搭建源码环境

    一.前言 老周这里编译 Kafka 的版本是 2.7,为啥采用这个版本来搭建源码的阅读环境呢?因为该版本相对来说比较新.而我为啥不用 2.7 后的版本呢?比如 2.8,这是因为去掉了 ZooKeepe ...

  9. kafka源码分析-consumer的分区策略

    kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...

最新文章

  1. 2020-10-29Ubuntu20.04将软件添加至桌面
  2. Js控制样式的诸多方法
  3. MediaSession框架介绍
  4. 《研磨设计模式》chap20 享元模式 Flyweight (3)重写应用场景
  5. Spring boot错误处理原理
  6. 人人可以理解的区块链100问
  7. 怎么判断前轮左右的位置_新手开车技巧,确定前轮位置,准确判断与障碍物距离...
  8. bci测试如何整改_基于fNIRS技术的脑机接口(BCI)
  9. PostgreSQL高可用性、负载均衡、复制与集群方案介绍
  10. C++--第26课 - 异常处理 - 下
  11. mac废纸篓清空的心得、mac设置不睡眠不待机不锁屏、如何快速锁屏待机睡眠、mac重启、mac学习的必备软件-城...
  12. VS2019 配色_OPPO Enco M31颜值太顶了,斩获 A'设计大奖赛金奖,引领时尚潮流|oppo|大奖赛|无线耳机|配色|时尚|卡特...
  13. python编程招生海报_怎么用ps做招生海报
  14. 奥克兰大学计算机专业世界排名,奥克兰大学,15学科排名世界前50!
  15. python12306抢票_GitHub - versionzhang/python_12306: python 12306 抢票工具
  16. html新浪短域名api,新浪短网址API接口
  17. 27岁,30岁,37岁...... 你是否已经把世界拱手让人?
  18. java工程license机制_使用truelicense实现用于JAVA工程license机制(包括license生成和验证)...
  19. SPSS Modeler KNN分类器(第十七章)
  20. 基于VMware 的 hive安装与启动

热门文章

  1. kaggle 注册无法激活的问题解决
  2. The word is not correctly spelled
  3. 利用python+zabbix查询服务器利用率
  4. leetcode——第322题——零钱兑换
  5. 《UE4蓝图完全学习》笔记
  6. android 自动更换壁纸,安卓壁纸如何设置自动更换壁纸-手机天堂
  7. 魔兽最多人的服务器,魔兽世界9.0人口最多的服务器_魔兽世界
  8. ! Some Android licenses not accepted. To resolve this, run: flutter doctor --android-licenses
  9. 实验三+161+张丽霞
  10. 今天凌晨十二点和明天凌晨十二点的时间戳