先抛几个简单问题,1问, 4个topic,每个topic 5个分区,问并行度10 ,这个并行度是怎么划分这些topic 分区的。2问,topic 分区 动态更新怎么做的。3问,就1问中的tm 是怎么产生的?

省流版,先总结。Flink 中kafka 作为Source源头,首先会开始一个SourceCoordinator来与Kafka联系获取所有topic分区,同时兼顾新增tp(topic parition)的检测,在根据并行度,根据一个规则(等下用源码展示这个规则)来切分tp,然后,Flink 在并行度work中会开启SourceOperator,并向SourceCoordinator发送注册请求,要求获取split tp,用来后续的消费kafka数据。其中会创建KafkaSourceReader,该对象主要是用来创建KafkaPartitionSplitReader,以及SplitState的管理。而KafkaPartitionSplitReader就是最实际用来与Kafka建立consumer来消费数据的。

先贴上规则,见以下代码,可以明显看出,同一个topic,同属于一个startIndex,但是会根据不同的partion,又被切分到不同的地方。
那就可以回答问题1,同一个topic的5个分区要平分到并行度为10的work内,如果,该topic的startIndex=0;那么这5个分区依次分到0、1、2、3、4 的work上,进行数据获取。所以,这个并不会存在一个KafkaConsumer 消费 同一个topic的5个分区,但是有可能存在 一个KafkaConsumer 消费不同topic的不同分区,因为startIndex是不定的,partion会重叠划分到相同work上。如果,并行度<5,才会出现同一个KafkaConsumer 消费同一个topic的不同分区。这里有个前提是不同的work 代表不同的KafkaConsumer,这是肯定的,因为不同的work,就意味的并行度。

static int getSplitOwner(TopicPartition tp, int numReaders) {int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % numReaders;// here, the assumption is that the id of Kafka partitions are always ascending// starting from 0, and therefore can be used directly as the offset clockwise from the// start indexreturn (startIndex + tp.partition()) % numReaders;}

接着回答问题二,新的分区等 动态更新是从哪里来,从SourceCoordinator来,这才是大脑。当然也是需要work 做协同配合的工作。
从代码中就可以看出partitionDiscoveryIntervalMs,这个参数,就是如果其>0,那就定时和kafka 联系看看是否有新增partition 等。

 public void start() {consumer = getKafkaConsumer();adminClient = getKafkaAdminClient();if (partitionDiscoveryIntervalMs > 0) {LOG.info("Starting the KafkaSourceEnumerator for consumer group {} "+ "with partition discovery interval of {} ms.",consumerGroupId,partitionDiscoveryIntervalMs);//关键在这里,如果partitionDiscoveryIntervalMs>0,这里就是一个定时服务了。context.callAsync(// 这里是发现多少个分区,并且进行划分this::discoverAndInitializePartitionSplit,// 这里就是要操作了this::handlePartitionSplitChanges,0,partitionDiscoveryIntervalMs);} else {LOG.info("Starting the KafkaSourceEnumerator for consumer group {} "+ "without periodic partition discovery.",consumerGroupId);context.callAsync(() -> {try {return discoverAndInitializePartitionSplit();} finally {// Close the admin client early because we won't use it anymore.adminClient.close();}},this::handlePartitionSplitChanges);}}

问题三,watermark的产生,如果一个work上,有不同分区存在,那么该watermark 怎么产生,从源码揭示,根据分区来,再从分区中选出最小的watermark,作为这个work上的watermark,然后broadcast 到下游。

接下来就是源码部分了,这部分针对喜欢看源码的读者。前提,这里也只会拎主线,不会全贴代码。

第一个SourceCoordinator
SourceCoordinator 运行在JobMaster 上,可以和其它work进行通信,同时开启KafkaSourceEnumerator

再来细看KafkaSourceEnumerator重点功能方法,首先要获取所有的新增topic partitions ( 简写:tp ),然后在将tp根据 上述代码规划,进行切分,最后就是根据注册过来的work(work个数对应并行度个数),将切分信息rpc到对应的work上。

// 发现并初始化分区分割private PartitionSplitChange discoverAndInitializePartitionSplit() {// Make a copy of the partitions to owners// 通过订阅获取所有tpKafkaSubscriber.PartitionChange partitionChange =subscriber.getPartitionChanges(adminClient, Collections.unmodifiableSet(discoveredPartitions));//分别获取分区、offsetSet<TopicPartition> newPartitions =   Collections.unmodifiableSet(partitionChange.getNewPartitions());OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = getOffsetsRetriever();Map<TopicPartition, Long> startingOffsets =startingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);Map<TopicPartition, Long> stoppingOffsets =stoppingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);Set<KafkaPartitionSplit> partitionSplits = new HashSet<>(newPartitions.size());for (TopicPartition tp : newPartitions) {Long startingOffset = startingOffsets.get(tp);long stoppingOffset =stoppingOffsets.getOrDefault(tp, KafkaPartitionSplit.NO_STOPPING_OFFSET);partitionSplits.add(new KafkaPartitionSplit(tp, startingOffset, stoppingOffset));}discoveredPartitions.addAll(newPartitions);return new PartitionSplitChange(partitionSplits, partitionChange.getRemovedPartitions());}// This method should only be invoked in the coordinator executor thread.private void handlePartitionSplitChanges(PartitionSplitChange partitionSplitChange, Throwable t) {if (t != null) {throw new FlinkRuntimeException("Failed to handle partition splits change due to ", t);}if (partitionDiscoveryIntervalMs < 0) {LOG.debug("Partition discovery is disabled.");noMoreNewPartitionSplits = true;}// TODO: Handle removed partitions.addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);assignPendingPartitionSplits(context.registeredReaders().keySet());}// This method should only be invoked in the coordinator executor thread.private void addPartitionSplitChangeToPendingAssignments(Collection<KafkaPartitionSplit> newPartitionSplits) {int numReaders = context.currentParallelism();for (KafkaPartitionSplit split : newPartitionSplits) {int ownerReader = getSplitOwner(split.getTopicPartition(), numReaders);//这部分就是根据注册的work,来切分tp,pendingPartitionSplitAssignment.computeIfAbsent(ownerReader, r -> new HashSet<>()).add(split);}LOG.debug("Assigned {} to {} readers of consumer group {}.",newPartitionSplits,numReaders,consumerGroupId);}// This method should only be invoked in the coordinator executor thread.private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new HashMap<>();// Check if there's any pending splits for given readersfor (int pendingReader : pendingReaders) {checkReaderRegistered(pendingReader);// Remove pending assignment for the readerfinal Set<KafkaPartitionSplit> pendingAssignmentForReader =pendingPartitionSplitAssignment.remove(pendingReader);if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {// Put pending assignment into incremental assignmentincrementalAssignment.computeIfAbsent(pendingReader, (ignored) -> new ArrayList<>()).addAll(pendingAssignmentForReader);// Make pending partitions as already assignedpendingAssignmentForReader.forEach(split -> assignedPartitions.add(split.getTopicPartition()));}}// Assign pending splits to readersif (!incrementalAssignment.isEmpty()) {LOG.info("Assigning splits to readers {}", incrementalAssignment);//这里是关键,这里代表着将切分好的分区rpc到对应的work上context.assignSplits(new SplitsAssignment<>(incrementalAssignment));}// If periodically partition discovery is disabled and the initializing discovery has done,// signal NoMoreSplitsEvent to pending readersif (noMoreNewPartitionSplits) {LOG.debug("No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"+ " in consumer group {}.",pendingReaders,consumerGroupId);pendingReaders.forEach(context::signalNoMoreSplits);}}
//context.assignSplits(...)调用关键代码。assignment.assignment().forEach((id, splits) -> {final OperatorCoordinator.SubtaskGateway gateway =getGatewayAndCheckReady(id);final AddSplitEvent<SplitT> addSplitEvent;try {addSplitEvent =new AddSplitEvent<>(splits, splitSerializer);} catch (IOException e) {throw new FlinkRuntimeException("Failed to serialize splits.", e);}gateway.sendEvent(addSplitEvent);});return null;},String.format("Failed to assign splits %s due to ", assignment));

好了,今天先到这,下次再些work 部分。

Flink kafka connectors 源码详解---<1>相关推荐

  1. 【Live555】live555源码详解(九):ServerMediaSession、ServerMediaSubsession、live555MediaServer

    [Live555]live555源码详解系列笔记 继承协作关系图 下面红色表示本博客将要介绍的三个类所在的位置: ServerMediaSession.ServerMediaSubsession.Dy ...

  2. 【Live555】live555源码详解系列笔记

    [Live555]liveMedia下载.配置.编译.安装.基本概念 [Live555]live555源码详解(一):BasicUsageEnvironment.UsageEnvironment [L ...

  3. 【Live555】live555源码详解(八):testRTSPClient

    [Live555]live555源码详解系列笔记 继承协作关系图 下面红色表示本博客将要介绍的testRTSPClient实现的三个类所在的位置: ourRTSPClient.StreamClient ...

  4. 【Live555】live555源码详解(七):GenericMediaServer、RTSPServer、RTSPClient

    [Live555]live555源码详解系列笔记 继承协作关系图 下面红色表示本博客将要介绍的三个类所在的位置: GenericMediaServer.RTSPServer.RTSPClient 14 ...

  5. 【Live555】live555源码详解(六):FramedSource、RTPSource、RTPSink

    [Live555]live555源码详解系列笔记 继承协作关系图 下面红色表示本博客将要介绍的三个类所在的位置: FramedSource.RTPSource.RTPSink 11.FramedSou ...

  6. 【Live555】live555源码详解(五):MediaSource、MediaSink、MediaSession、MediaSubsession

    [Live555]live555源码详解系列笔记 继承协作关系图 下面红色表示本博客将要介绍的四个类所在的位置: MediaSource.MediaSink.MediaSession.MediaSub ...

  7. 【Live555】live555源码详解(四):Medium媒体基础类

    [Live555]live555源码详解系列笔记 7.Media Medai所依赖关系图 依赖Medai关系图 Media和UsageEnvironment关联图

  8. 【Live555】live555源码详解(二):BasicHashTable、DelayQueue、HandlerSet

    [Live555]live555源码详解系列笔记 3.BasicHashTable 哈希表 协作图: 3.1 BasicHashTable BasicHashTable 继承自 HashTable 重 ...

  9. 【Live555】live555源码详解(一):BasicUsageEnvironment、UsageEnvironment

    [Live555]live555源码详解系列笔记 类关系图 1.UsageEnvironment 详解 1.1 BasicUsageEnvironment BasicUsageEnvironment ...

最新文章

  1. 【动态规划】最长公共子序列与最长公共子串
  2. XSS学习-初出茅庐
  3. arm云教室服务器_成都凌点科技告诉你ARM集群服务器适合的应用场景有哪些
  4. CentOS 5升级Python版本(2.42.7)
  5. div自动滚动_实现图片自动和手动切换的编程技巧
  6. 导出测试点的信号名_小程序导出数据到excel表,借助云开发云函数实现excel数据的保存...
  7. 第一百一十二期:96秒100亿!如何抗住双11高并发流量?
  8. php 提取二维数组的key,PHP 获取二维数组中某个key的集合
  9. c++中的构造函数和析构函数
  10. 【转】设计模式六大原则(1):单一职责原则
  11. Acme CAD ConverterDWG文件查看器 2021
  12. ARINC 429总线协议解析
  13. Linux 计划任务crontab详解,含笔试题讲解
  14. MacOS解压rar文件
  15. 【Vue 4 笔记 】(一)
  16. Java技术进阶推荐书单
  17. Android Studio开发工具的设置
  18. 水文气象学数据可视化——Panoply软件的下载
  19. R语言计算logistic回归C指数,最详细的基于R语言的Logistic Regression(Logistic回归)源码,包括拟合优度,Recall,Precision的计算...
  20. outlook邮件搜索方法与技巧

热门文章

  1. 读《被讨厌的勇气》有感
  2. 学习笔记STM32F429使用编码器测速HAL库版本
  3. 两种网页转Markdown的简便方法
  4. 全国省市数据 sql语句+json格式数据
  5. HTML绘制交互图,基于 HTML5 Canvas 的交互式地铁线路图
  6. 大数据开发工程师学习路线分享
  7. 计算机文化节闭幕式祝福语,快讯 | 第十三届计算机文化节闭幕式暨专家讲座圆满落幕...
  8. 数字电路硬件设计系列(五)之AT89C51/C52最小系统设计
  9. Android Matrix基本原理方法
  10. 一套效果图适配(Android和IOS)全尺寸和标注规范-(三)(360x640)