首先确保Topic的元数据可用,否则消息根本没法往外发。如果以前从没加载过Topic元数据,就会在doSend发送消息时调用waitOnMetadata方法在此同步阻塞住,等待连接Broker成功后拉取元数据。

max.block.ms决定了调用send()方法时,最多会被阻塞多长时间。send在一些异常情况下:

  • 拉取Topic元数据,连接不到Broker
  • 数据放到内存缓冲区,但内存缓冲区已经满了
    经历这段时间后,就必须得返回了。

首先会判断目标Topic的元数据是否已经被加载过了。如果没有,就会把该Topic添加到Set中去。

        //目标Topic是否已经被缓存了元数据,如果没有就会被放到Set<String>中if (!this.metadata.containsTopic(topic))this.metadata.add(topic);

因为KafkaProducer初始化并没有真正拉取,现在又是send的开始,所以这个Set必然是空的。add是说明这个Topic我拉取、缓存过

判断目标Topic下属的分区Map是否为null,来判断是否已经缓存过元数据了:

        //得到的目标Topic对应的所有分区 --> Map<String, List<PartitionInfo>>//说明目标Topic下有分区,即该Topic的元数据信息已经缓存过了if (metadata.fetch().partitionsForTopic(topic) != null)//return 0表示无需等待return 0;

很明显由于未拉取过元数据,所以Topic下属的分区Map是空的。这就进入到while了。首先会获取当前集群元数据的版本号:

//当前集群元数据的版本号
int version = metadata.requestUpdate();

然后唤醒Sender线程,解除NetworkClient的阻塞状态:

//唤醒Sender线程,即解除NetworkClient的阻塞状态,待会要异步通信拉取了
sender.wakeup();

接下来我们必须等待awaitUpdate()方法执行完毕,即集群元数据的版本号已经“+1”了,才能认为元数据已经加载成功了。

    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");}long begin = System.currentTimeMillis();long remainingWaitMs = maxWaitMs;//当前版本号 < 上一次的version,就进入循环//如果版本号已经累加了,就会跳出while循环while (this.version <= lastVersion) {if (remainingWaitMs != 0)//客户端释放锁,尝试等待60s,同步阻塞wait(remainingWaitMs);//wait被唤醒后,能计算出阻塞了多长时间    long elapsed = System.currentTimeMillis() - begin;//如果实际阻塞时间超过了60s,就会抛出超时的异常if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");//如果阻塞没超时,就算出剩余时间remainingWaitMs = maxWaitMs - elapsed;}}

真正的拉取元数据,是在这里完成的。唤醒Sender线程后,进入while循环,直接去wait()释放锁,并让客户端最多等待60s。Sender线程的拉取,是异步方式。但是对拉取结果,却是同步阻塞。如果拉取成功了,version版本号必定会+1.所以此时只需要判断version是否累加,就能判定元数据是否被异步拉取成功。客户端主线程最多wait阻塞等待60s。

拉取结果分两种情况:

  • Sender线程在60s内就把Topic元数据加载到了,也已经缓存到Metadata中了、更新了version了。此时一定会尝试将当初wait阻塞等待的线程唤醒,让主线程直接返回阻塞等待的时长
  • wait(60s)都超时了,Sender线程还没把元数据拉取回来。人家会在60s后自动醒来,且抛出一个超时异常
            long elapsed = time.milliseconds() - begin;//判断阻塞等待时间是否超过了规定设置的60sif (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");if (metadata.fetch().unauthorizedTopics().contains(topic))throw new TopicAuthorizationException(topic);//60s - 拉取元数据的耗时remainingWaitMs = maxWaitMs - elapsed;

拉取成功后,如果没超时,就要返回阻塞等待的时间。

//返回阻塞的时间
return time.milliseconds() - begin;

然后waitOnMetadata方法执行完毕,得到的就是为了获取元数据而花费的阻塞等待时间:

//等待获取元数据
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);

如果拉取元数据超时了,awaitUpdate方法就抛出超时异常,自然就终结掉本方法的while循环。上层的waitOnMetadata方法接收到超时异常后,就会中断掉它的while。异常通过InterruptedException抛出,上层的doSend方法的try catch就会捕获到,我们就能得知。

总结:

  • 想让Sender线程加载哪个Topic的元数据,就将它add到Metadata的Set中
  • 唤醒Sender线程前,会获取元数据的version、并更新标志位(this.needUpdate = true)。就是为了让Sender直接按照标志位,拉取对应Topic的元数据
  • 元数据的拉取是异步的,但是客户端对拉取结果的等待却是同步阻塞的。
  • 拉取元数据的两种情况,不管超不超时,当初wait的主线程都会被唤醒。未超时就返回阻塞时长,超时就抛出异常

Kafka如何对Topic元数据进行细粒度的懒加载、同步等待?相关推荐

  1. hbase遇到元数据缺失,hbase:namespace加载错误,hbase启动不了

    ####背景#### hbase-2.0.0-alpha4 jdk1.8.0_151 zookeeper-3.4.9 ####问题描述#### 因为某种原因,已经部署好的环境,突然报错 2017-12 ...

  2. kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了

    kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...

  3. kafka监听topic消费_大白话 + 13 张图解 Kafka

    前言 一.Kafka基础 消息系统的作用 1.Topic 主题 2.Partition 分区 3.Producer - 生产者 4.Consumer - 消费者 5.Message - 消息 二.ka ...

  4. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  5. Kafka解析之topic创建(3)——合法性验证

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  6. Kafka解析之topic创建(2)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  7. kafka彻底删除topic(cleanup policy)、清除特定topic数据

    一.彻底不需要该topic了 假设topic为test-0921 步骤如下: ①$KAFKA/bin/kafka-topics.sh --delete --zookeeper Desktop:2181 ...

  8. kafka练习:创建topic以及生产与消费

    1.java和scala版本的kafka adminclient去创建topic主题? scala版本 import java.util import java.util.Properties imp ...

  9. 8.解析Kafka中的 Topic 和 Partition

    目录 1.什么是Topic 2.什么是Partition 3.Consumer Group 消费者组 4.Topic 和 Partition 的存储 5.producer消息分发策略 6.消费者如何消 ...

最新文章

  1. 基于OpenCV的实战:轮廓检测(附代码解析)
  2. 访谈Brad Fitzpatrick——《编程人生》精彩样章,抢先看
  3. python输出n阶矩阵_python-递归计算矩阵(nxn)的行列式
  4. Android开发者必备的42个链接
  5. 课程设计完成之后要考虑的问题
  6. ubuntu 虚拟机 串口 socket_上篇 | 虚拟机Ubuntu向开发板AMR传送文件
  7. linux下查看硬盘信息、硬盘分区、格式化、挂载、及swap分区
  8. Leecode刷题热题HOT100(13)——罗马数字转整数
  9. PL/SQL中的RSA加密
  10. 每日记载内容总结14
  11. 性能测试中的jvm监控
  12. 指定的可执行文件不是此操作系统平台的有效应用程序_.NET Core 应用程序发布概述
  13. Eclipse中修改项目的文本字符集编码
  14. django mysql 登陆界面_django 简单实现登录验证给你 Django用户登录验证跳转问题
  15. 打开visio后屏幕会不停的抖动是怎么回事
  16. 大学计算机应用基础教学设计,大学计算机应用基础电子教案设计.doc
  17. DM6437 GPIO模拟I2C
  18. 计算机管理格式化硬盘,细说电脑怎么格式化硬盘
  19. Java 获取主机ip地址(ipv4)
  20. 异次元发卡系统源码荔枝发卡V3.0

热门文章

  1. python中.whl文件下载,pandas
  2. linux 命令之 kill
  3. 防火墙 之 iptables 匹配条件讲解
  4. python之初接触
  5. Differentiation 导数和变化率
  6. Android CursorAdapter
  7. Java获取当前路径和读取文件
  8. AD恢复(3)使用AD回收站
  9. GroovyQ | 关注Groovy社区动态,分享Groovy开发经验。
  10. MOSS2007 webcast系列(二)