Kafka如何对Topic元数据进行细粒度的懒加载、同步等待?
首先确保Topic的元数据可用,否则消息根本没法往外发。如果以前从没加载过Topic元数据,就会在doSend发送消息时调用waitOnMetadata方法在此同步阻塞住,等待连接Broker成功后拉取元数据。
max.block.ms决定了调用send()方法时,最多会被阻塞多长时间。send在一些异常情况下:
- 拉取Topic元数据,连接不到Broker
- 数据放到内存缓冲区,但内存缓冲区已经满了
经历这段时间后,就必须得返回了。
首先会判断目标Topic的元数据是否已经被加载过了。如果没有,就会把该Topic添加到Set中去。
//目标Topic是否已经被缓存了元数据,如果没有就会被放到Set<String>中if (!this.metadata.containsTopic(topic))this.metadata.add(topic);
因为KafkaProducer初始化并没有真正拉取,现在又是send的开始,所以这个Set必然是空的。add是说明这个Topic我拉取、缓存过
判断目标Topic下属的分区Map是否为null,来判断是否已经缓存过元数据了:
//得到的目标Topic对应的所有分区 --> Map<String, List<PartitionInfo>>//说明目标Topic下有分区,即该Topic的元数据信息已经缓存过了if (metadata.fetch().partitionsForTopic(topic) != null)//return 0表示无需等待return 0;
很明显由于未拉取过元数据,所以Topic下属的分区Map是空的。这就进入到while了。首先会获取当前集群元数据的版本号:
//当前集群元数据的版本号
int version = metadata.requestUpdate();
然后唤醒Sender线程,解除NetworkClient的阻塞状态:
//唤醒Sender线程,即解除NetworkClient的阻塞状态,待会要异步通信拉取了
sender.wakeup();
接下来我们必须等待awaitUpdate()方法执行完毕,即集群元数据的版本号已经“+1”了,才能认为元数据已经加载成功了。
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");}long begin = System.currentTimeMillis();long remainingWaitMs = maxWaitMs;//当前版本号 < 上一次的version,就进入循环//如果版本号已经累加了,就会跳出while循环while (this.version <= lastVersion) {if (remainingWaitMs != 0)//客户端释放锁,尝试等待60s,同步阻塞wait(remainingWaitMs);//wait被唤醒后,能计算出阻塞了多长时间 long elapsed = System.currentTimeMillis() - begin;//如果实际阻塞时间超过了60s,就会抛出超时的异常if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");//如果阻塞没超时,就算出剩余时间remainingWaitMs = maxWaitMs - elapsed;}}
真正的拉取元数据,是在这里完成的。唤醒Sender线程后,进入while循环,直接去wait()释放锁,并让客户端最多等待60s。Sender线程的拉取,是异步方式。但是对拉取结果,却是同步阻塞。如果拉取成功了,version版本号必定会+1.所以此时只需要判断version是否累加,就能判定元数据是否被异步拉取成功。客户端主线程最多wait阻塞等待60s。
拉取结果分两种情况:
- Sender线程在60s内就把Topic元数据加载到了,也已经缓存到Metadata中了、更新了version了。此时一定会尝试将当初wait阻塞等待的线程唤醒,让主线程直接返回阻塞等待的时长
- wait(60s)都超时了,Sender线程还没把元数据拉取回来。人家会在60s后自动醒来,且抛出一个超时异常
long elapsed = time.milliseconds() - begin;//判断阻塞等待时间是否超过了规定设置的60sif (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");if (metadata.fetch().unauthorizedTopics().contains(topic))throw new TopicAuthorizationException(topic);//60s - 拉取元数据的耗时remainingWaitMs = maxWaitMs - elapsed;
拉取成功后,如果没超时,就要返回阻塞等待的时间。
//返回阻塞的时间
return time.milliseconds() - begin;
然后waitOnMetadata方法执行完毕,得到的就是为了获取元数据而花费的阻塞等待时间:
//等待获取元数据
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
如果拉取元数据超时了,awaitUpdate方法就抛出超时异常,自然就终结掉本方法的while循环。上层的waitOnMetadata方法接收到超时异常后,就会中断掉它的while。异常通过InterruptedException抛出,上层的doSend方法的try catch就会捕获到,我们就能得知。
总结:
- 想让Sender线程加载哪个Topic的元数据,就将它add到Metadata的Set中
- 唤醒Sender线程前,会获取元数据的version、并更新标志位(this.needUpdate = true)。就是为了让Sender直接按照标志位,拉取对应Topic的元数据
- 元数据的拉取是异步的,但是客户端对拉取结果的等待却是同步阻塞的。
- 拉取元数据的两种情况,不管超不超时,当初wait的主线程都会被唤醒。未超时就返回阻塞时长,超时就抛出异常
Kafka如何对Topic元数据进行细粒度的懒加载、同步等待?相关推荐
- hbase遇到元数据缺失,hbase:namespace加载错误,hbase启动不了
####背景#### hbase-2.0.0-alpha4 jdk1.8.0_151 zookeeper-3.4.9 ####问题描述#### 因为某种原因,已经部署好的环境,突然报错 2017-12 ...
- kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了
kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...
- kafka监听topic消费_大白话 + 13 张图解 Kafka
前言 一.Kafka基础 消息系统的作用 1.Topic 主题 2.Partition 分区 3.Producer - 生产者 4.Consumer - 消费者 5.Message - 消息 二.ka ...
- 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...
原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...
- Kafka解析之topic创建(3)——合法性验证
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- Kafka解析之topic创建(2)
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- kafka彻底删除topic(cleanup policy)、清除特定topic数据
一.彻底不需要该topic了 假设topic为test-0921 步骤如下: ①$KAFKA/bin/kafka-topics.sh --delete --zookeeper Desktop:2181 ...
- kafka练习:创建topic以及生产与消费
1.java和scala版本的kafka adminclient去创建topic主题? scala版本 import java.util import java.util.Properties imp ...
- 8.解析Kafka中的 Topic 和 Partition
目录 1.什么是Topic 2.什么是Partition 3.Consumer Group 消费者组 4.Topic 和 Partition 的存储 5.producer消息分发策略 6.消费者如何消 ...
最新文章
- 基于OpenCV的实战:轮廓检测(附代码解析)
- 访谈Brad Fitzpatrick——《编程人生》精彩样章,抢先看
- python输出n阶矩阵_python-递归计算矩阵(nxn)的行列式
- Android开发者必备的42个链接
- 课程设计完成之后要考虑的问题
- ubuntu 虚拟机 串口 socket_上篇 | 虚拟机Ubuntu向开发板AMR传送文件
- linux下查看硬盘信息、硬盘分区、格式化、挂载、及swap分区
- Leecode刷题热题HOT100(13)——罗马数字转整数
- PL/SQL中的RSA加密
- 每日记载内容总结14
- 性能测试中的jvm监控
- 指定的可执行文件不是此操作系统平台的有效应用程序_.NET Core 应用程序发布概述
- Eclipse中修改项目的文本字符集编码
- django mysql 登陆界面_django 简单实现登录验证给你 Django用户登录验证跳转问题
- 打开visio后屏幕会不停的抖动是怎么回事
- 大学计算机应用基础教学设计,大学计算机应用基础电子教案设计.doc
- DM6437 GPIO模拟I2C
- 计算机管理格式化硬盘,细说电脑怎么格式化硬盘
- Java 获取主机ip地址(ipv4)
- 异次元发卡系统源码荔枝发卡V3.0