为什么80%的码农都做不了架构师?>>>   

##NioSelector和KafkaSelector有什么区别?

先说结论,KafkaSelector(org.apache.kafka.common.network.selector)是对NioSelector(java.nio.channels.Selector)的进一步封装。回想一下NioSelector,它参与了IO中的哪些过程?

1、创建一个通道,并将通道注册到NioSelector上,我们可以得到一个SelectionKey 2、轮询NioSelector中的ready集合,拿到对应的SelectionKey,并根据这个SelectionKey所关注的事件去执行对应的操作

实际上,KafkaSelector也是在调用NioSelector去执行这些操作,待补充……

##一、创建连接

KafkaSelector创建连接,和普通的NioSelector并没有什么不同,首先创建一个通道,并将其设置为非阻塞式的长连接,设置完毕后,执行连接操作。

        SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);// 非阻塞模式Socket socket = socketChannel.socket();socket.setKeepAlive(true);// 设置为长连接if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {socket.setSendBufferSize(sendBufferSize);// 设置SO_SNDBUF 大小}if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {socket.setReceiveBufferSize(receiveBufferSize);// 设置 SO_RCVBUF 大小}socket.setTcpNoDelay(true);boolean connected;try {connected = socketChannel.connect(address);// 因为是非阻塞模式,所以方法可能会在连接正式连接之前返回} catch (UnresolvedAddressException e) {socketChannel.close();throw new IOException("Can't resolve address: " + address, e);} catch (IOException e) {socketChannel.close();throw e;}

创建完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来创建KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在创建完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID做进一步绑定。

        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 创建KafkaChannelkey.attach(channel);// 将channel绑定到key上this.channels.put(id, channel);// 将 nodeId 和 Channel绑定

这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而我们也可以根据节点ID来拿到KafkaChannel,同样可以根据SelectionKey来拿到KafkaChannel,这就意味着,我们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,都可以通过这些引用关系拿到彼此,从而进行相关操作。

##二、预发送 实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操作,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。我们这里根据节点ID,从我们刚才的channels中,取出KafkaChannel。

 public void send(Send send) {// 看看send要发的这个nodeId在不在KafkaChannel channel = channelOrFail(send.destination());try {// 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件channel.setSend(send);} catch (CancelledKeyException e) {// 失败了加一条node_id的失败记录this.failedSends.add(send.destination());close(channel);}}

##三、进行IO操作 来到了我们比较熟悉的轮询环节,从NioSelector中取出所有SelectionKey进行轮询。

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 处理I/O的核心方法pollSelectionKeys(immediatelyConnectedKeys, true);
}private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// 创建连接时(connect)将kafkaChannel注册到key上,就是为了在这里获取KafkaChannel channel = channel(key);……………………

#####1、判断一下key 连接好了没有,因为我们用的是非阻塞连接,所以到了轮询阶段,还没有完成连接是正常的。

              if (isImmediatelyConnected || key.isConnectable()) {// finishConnect方法会先检测socketChannel是否建立完成,建立后,会取消对OP_CONNECT事件关注,//TODO 并开始关注OP_READ事件if (channel.finishConnect()) {this.connected.add(channel.id());// 将当前channel id 添加到已连接的集合中this.sensors.connectionCreated.record();} else {continue;// 代表连接未完成,则跳过对此Channel的后续处理}}

#####2、身份验证(略过) #####3、判断KafkaChannel有没有准备好,有没有关注OP_READ,能不能读之类的,并进行读操作。 这里有一个判断,就是判断当前的KafkaChannel是不是在StagedReceives里。我们往后看看,在从网络上读取数据时,我们会将KafkaChannel扔进StagedReceives里,也就是说,如果这个KafkaChannel已经在StagedReceives里了,那么代表它已经在读数据了。

              if (channel.ready() // 连接的三次握手完成,并且 todo 权限验证通过&& key.isReadable() // key是否关注了read事件&& !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,因为在读的时候,会把这个channel扔进stagedReceives里面NetworkReceive networkReceive;/*** 实际上这里就是分多次去一个channel取数据,直到取完,并将其保存在key:channel  value:new ArrayDeque<NetworkReceive> 中*/while ((networkReceive = channel.read()) != null) {addToStagedReceives(channel, networkReceive);}}

#####4、判断KafkaChannel有没有准备好,有没有关注OP_WRITE,并进行写操作

             if (channel.ready() && key.isWritable()) {Send send = channel.write();// 这里会将KafkaChannel的send字段发送出去,// 如果未完成发送,或者没发完,则返回null// 发送成功则返回send对象if (send != null) {this.completedSends.add(send);// 添加到completedSends集合this.sensors.recordBytesSent(channel.id(), send.size());}}

##四、关闭空闲连接 在每一次IO操作完毕后,KafkaSelector都会调用一个方法,去关闭掉那些没怎么用的连接,实际上它就是一个基于时间戳的断连机制。 KafkaSelector中维护了一个哈希表,

LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);

在每次进行IO操作时,将Key:节点ID,Value:当前时间戳扔进哈希表里面,在IO操作进行完毕时,检查一下,最大的那个节点,它的最后一次IO时间+connectionsMaxIdleNanos(创建KafkaSelector时指定),是否超过了当前的时间。 如果是,这个连接就会被关掉。

比如说connectionsMaxIdleNanos被指定成了1分钟,那么如果这个有序哈希表的最后一个节点的时间是一分钟之前,那么这个节点ID的通道将会被关掉。

 private void maybeCloseOldestConnection() {if (currentTimeNanos > nextIdleCloseCheckTime) {if (lruConnections.isEmpty()) {nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;} else {Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();Long connectionLastActiveTime = oldestConnectionEntry.getValue();nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;if (currentTimeNanos > nextIdleCloseCheckTime) {String connectionId = oldestConnectionEntry.getKey();if (log.isTraceEnabled()) {log.trace("About to close the idle connection from " + connectionId+ " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");}disconnected.add(connectionId);close(connectionId);}}}}

参考: 《Apache Kafka 源码剖析》 - 徐郡明著 Apache Kafka 源码 0.10.0.1

转载于:https://my.oschina.net/anur/blog/2051220

Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector相关推荐

  1. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  2. Vue 3源码剖析,看这篇就够了

    大家好,我是若川.源码的重要性相信不用再多说什么了吧,特别是用Vue 框架的,一般在面试的时候面试官多多少少都会考察源码层面的内容,比如: 如何理解虚拟Dom? Vue 3为什么这么快? Vue 3的 ...

  3. Apache Kafka源码剖析:第1篇 网络引擎漫谈(类比法)

    2019独角兽企业重金招聘Python工程师标准>>> 从这一篇开始,我们来研究kafka的网络引擎的源码. 可能很多读者有疑问,说好的Kafka讲解,怎么变成Thrift了? 答案 ...

  4. muduo源码剖析——以三个切片浅析muduo库代码设计的严谨性、高效性与灵活性

    0 前言 陈硕大佬的muduo网络库的源码我已经看了好久了,奈何本人实力有限,每每看到其代码设计的精巧之处只能内心称赞,无法用言语表达出来.实在令人汗颜.最近在看到网络设计部分时有了一些体会,结合自己 ...

  5. Apache Kafka源码剖析:第5篇 业务API处理

    2019独角兽企业重金招聘Python工程师标准>>> 之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的. --------------------- ...

  6. OSChinaiOS客户端源码剖析001(架构篇)

    1.架构概要 程序启动就先来到这里,创建左侧控制器SideMenuViewController和OSCTabBarController 2.OSCTabBarController 综合标签和动弹标签的 ...

  7. 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇

    大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...

  8. Zookeeper--Watcher机制源码剖析二

    Watcher触发 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是"Watcher监听的对应数据节点的数据内容发生变更& ...

  9. 菜鸟nginx源码剖析

    菜鸟nginx源码剖析 配置与部署篇(一) 手把手配置nginx "I love you"  TCMalloc 对MYSQL 性能 优化的分析 菜鸟nginx源码剖析系列文章解读 ...

最新文章

  1. centos7下kubernetes(6。运行应用)
  2. H5之 Canvas图形实现
  3. 计算机工程与应用单像素成像,2011计算机工程与应用基于压缩感知理论的单像素成像系统研究_白凌云.pdf...
  4. HDU 4422 The Little Girl who Picks Mushrooms(简单题)
  5. c++十进制转二进制_二进制与十进制相互转换的原理
  6. linux 解压rar密码,linux下rar包的压缩与解压方案
  7. redis缓存数据的流程
  8. 仿站小技巧20190409
  9. python之类与对象(2)
  10. mysql group日期_MySQL GROUP BY使用datetime时的日期?
  11. Java 匹配域名正则表达式
  12. c语言程序图片截取,C++实现屏幕截图功能
  13. Linux学习笔记之 Btrfs文件系统简介及使用
  14. scara机器人动荷载_揭密SCARA机器人
  15. H5 查看大图。缩放图片
  16. C++ STL(第十三篇:RB-tree)
  17. 递归中的引用传递和常引用传递
  18. ZXing改横屏识别为竖屏识别
  19. invalid python sd,Fatal Python error: init_fs_encoding: failed to get the Python cod如何解决
  20. STM8读取AD值偶尔跳变出错的问题

热门文章

  1. SAP IDoc 报错- Function module not allowed SPEIDOC_INPUT_DESADV1 –
  2. 强化学习、联邦学习、图神经网络,飞桨全新工具组件详解
  3. 心系AI的百度,这次能翻身吗?
  4. 快速了解Alias method/别名采样方法
  5. 深入理解Pytorch之register_buffer
  6. 多项式概率分布(Multinomial probability distribution)和分类分布(categorical distribution)
  7. __MACOSX文件是什么
  8. 受小动物大脑结构启发,研究人员开发出新的深度学习模型:更少神经元,更多智能...
  9. 【周末阅读】5G时代新型基础设施建设白皮书
  10. 谷歌无人车离奇车祸曝光:人类安全员睡着后,误触关闭了自动驾驶