Selector是kafka自己实现的一个NIO 异步非阻塞的网络IO操作。使用一条单独的线程管理多条网络连接上的连接、读、写操作

一 核心字段

java.nio.channels.Selector nioSelector: 用来监听网络I/O事件

Map<String, KafkaChannel> channels: 维护了NodeId和KafkaChannel之间的映射关系,KafkaChannel是针对SocketChannel的进一步封装

List<Send> completedSends: 保存哪些请求已经完全发送出去

List<String> failedSends:  保存哪些请求发送失败

List<NetworkReceive> completedReceives: 保存已经完全接收到的请求

Map<KafkaChannel,Deque<NetworkReceive>> stagedReceives: 暂存一次OP_READ事件处理过程中读取到的全部请求,当一次OP_READ事件处理完成之后,会将stagedReceives集合中的请求保存到completedReceives集合中

List<String> disconnected: 记录poll过程中断开的连接

List<String> connected:记录poll过程中新建的连接

ChannelBuilder channelBuilder: 用于创建KafkaChannel的工具,根据不同配置创建不同的TransportLayer的子类,然后创建KafkaChannel

int maxReceiveSize: 能够接受请求的最大字节是多大

二 重要方法

2.1 connect

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    if (this.channels.containsKey(id))
        throw new IllegalStateException("There isalready a connection for id " + id);
    // 创建SocketChannel
    SocketChannel
socketChannel = SocketChannel.open();
    // 设置成非阻塞模式
   
socketChannel.configureBlocking(false);
    // 获取Socket对象
   
Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);// 设置为长连接
    // 设置发送buffer的大小
   
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setSendBufferSize(sendBufferSize);
    // 设置接收buffe的大小
   
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setReceiveBufferSize(receiveBufferSize);
    socket.setTcpNoDelay(true);
    boolean connected;
    try {
        // 因为是非阻塞式的,所以SocketChannel.connect方法是发起一个连接,connect方法在连接正式建立
        // 之前就可能返回,在后面会通过Selector.finishConnect方法确认连接是否真正的建立
       
connected = socketChannel.connect(address);
    } catch (UnresolvedAddressException e) {
        socketChannel.close();
        throw new IOException("Can't resolve address:" + address, e);
    } catch (IOException e) {
        socketChannel.close();
        throw e;
    }
    // 将这个SocketChannel注册到nioSelector上,并关注OP_CONNECT事件
    SelectionKey
key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    // 创建KafkaChannel
   
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    // 将kafkachannel注册到key
   
key.attach(channel);
    // 将NodeId和KafkaChannel绑定,放到channels中管理
   
this.channels.put(id, channel);

if (connected) {
        // OP_CONNECTwon't trigger for immediately connected channels
        log
.debug("Immediately connected tonode {}", channel.id());
        immediatelyConnectedKeys.add(key);
        key.interestOps(0);
    }
}

2.2 poll 真正执行网络IO,它会调用nioSelector.select方法等待I/O事件发生

public void poll(long timeout) throws IOException {if (timeout < 0)throw new IllegalArgumentException("timeout should be >= 0");clear(); // 将上一次poll的结果清除掉if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())timeout = 0;/* check ready keys */long startSelect = time.nanoseconds();// 调用nioSelect.select方法等待I/O事件发生int readyKeys = select(timeout);long endSelect = time.nanoseconds();this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());// 处理I/O事件if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);}// 将stagedReceives复制到completedReceivesaddToCompletedReceives();long endIo = time.nanoseconds();this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());// 关闭长期空闲连接maybeCloseOldestConnection(endSelect);
}

2.3 pollSelectionKeys 处理OP_CONNECT、OP_READ、OP_WRITE事件,并且会检测连接状态

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,boolean isImmediatelyConnected, long currentTimeNanos) {Iterator<SelectionKey> iterator = selectionKeys.iterator();// 遍历SelectionKeywhile (iterator.hasNext()) {// 获取每一个SelectionKeySelectionKey key = iterator.next();// 从Iterator中移除iterator.remove();// 之前创建连接时,将kafkachannel注册到key上,就是为了在这里获取KafkaChannel channel = channel(key);sensors.maybeRegisterConnectionMetrics(channel.id());if (idleExpiryManager != null)idleExpiryManager.update(channel.id(), currentTimeNanos);try {// 对connect方法返回true或者OP_CONNECTION事件进行处理if (isImmediatelyConnected || key.isConnectable()) {// 会检测SocketChannel是否建立完成,建立后会取消对OP_CONNECT事件的关注,开始// 关注OP_READ事件if (channel.finishConnect()) {// 添加到已连接的集合中this.connected.add(channel.id());this.sensors.connectionCreated.record();SocketChannel socketChannel = (SocketChannel) key.channel();log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",socketChannel.socket().getReceiveBufferSize(),socketChannel.socket().getSendBufferSize(),socketChannel.socket().getSoTimeout(),channel.id());} elsecontinue;}// 调用KafkaChannel的prepare方法进行身份验证if (channel.isConnected() && !channel.ready())channel.prepare();// 处理OP_READ事件if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {NetworkReceive networkReceive;while ((networkReceive = channel.read()) != null)// read方法读到一个完整的NetworkReceive,则将其添加到stagedReceives中保存// 如读取不到一个完整的NetworkReceive,则返回null,下次处理OP_READ事件时,继续读取// 直到读取到一个完整的NetworkReceiveaddToStagedReceives(channel, networkReceive);}// 处理OP_WRITE事件if (channel.ready() && key.isWritable()) {Send send = channel.write();// 上面的write方法将KafkaChannel的send字段发送出去,如果发送未完成,则返回null// 如果发送完成则返回Send,并添加到completedSends集合中带后续处理if (send != null) {this.completedSends.add(send);this.sensors.recordBytesSent(channel.id(), send.size());}}// 如果key无效。则关闭KafkaChannel.并且添加这个channel到断开的连接的集合中if (!key.isValid()) {close(channel);this.disconnected.add(channel.id());}} catch (Exception e) {String desc = channel.socketDescription();if (e instanceof IOException)log.debug("Connection with {} disconnected", desc, e);elselog.warn("Unexpected error from {}; closing connection", desc, e);close(channel);this.disconnected.add(channel.id());}}
}

2.4 具体的读写操作交给了KafkaChannel

private boolean send(Send send) throws IOException {// 如果send在一次write调用时没有发送完,SelectionKey的OP_WRITE事件还没有取消,还会继续监听此channel的op_write事件// 直到整个send请求发送完毕才取消send.writeTo(transportLayer);// 判断发送是否完成是通过ByteBuffer中是否还有剩余的字节来判断的if (send.completed())transportLayer.removeInterestOps(SelectionKey.OP_WRITE);return send.completed();
}
public NetworkReceive read() throws IOException {NetworkReceive result = null;// 初始化NetworkReceiveif (receive == null) {receive = new NetworkReceive(maxReceiveSize, id);}// 从TransportLayer中读取数据到NetworkReceive对象中,如果没有读完一个完整的NetworkReceive// 则下次触发OP_READ事件时将继续填充此NetworkReceive对象;如果读完了则将此receive置为空,下次// 触发读操作的时候,创建新的NetworkReceive对象receive(receive);if (receive.complete()) {receive.payload().rewind();result = receive;receive = null;}return result;
}

sender分析之Selector相关推荐

  1. sender分析之创建请求

    一 Sender run方法调用流程 # 从Metadata获取集群元数据 # 调用RecordAccumulator.ready方法,根据RecordAccumulator的缓存情况,选出可以向哪些 ...

  2. Kafka Producer 实现源码分析

    前言 拥抱变化接手了 Kafka 平台,遂学习 0.10.0 线上版本的设计与实现.限于篇幅,本文不会逐行解析源码,而是从逻辑流程.设计模式.并发安全等方面学习各组件,笔记仅供个人 Review 一: ...

  3. 【Python数据分析】波士顿房价分析小例子

    %matplotlib inline             #将生成的图片嵌入网页中     import matplotlib.pyplot as plt     from sklearn imp ...

  4. 使用NSCondition实现多线程同步

    iOS中实现多线程技术有非常多方法. 这里说说使用NSCondition实现多线程同步的问题,也就是解决生产者消费者问题(如收发同步等等). 问题流程例如以下: 消费者取得锁,取产品,假设没有,则wa ...

  5. 【安全开发】IOS安全编码规范

    申明:本文非笔者原创,原文转载自:https://github.com/SecurityPaper/SecurityPaper-web/blob/master/_posts/2.SDL%E8%A7%8 ...

  6. iphone开发笔记和技巧总结

    在iphone程序中实现截屏的一种方法: //导入头文件   #importQuartzCore/QuartzCore.h //将整个self.view大小的图层形式创建一张图片imageUIGrap ...

  7. C#体育彩票选号器案例

    案例分析: 对象:选号器 属性:1.随机数   2.选号器集合(String数组的泛型集合) 要在LIstView中展示 方法:1.单组选号:随机生成7个随机数  2.随机单组:根据输入的组数,生成相 ...

  8. UI学习第二篇 (控件)

    UIbutton 也是一个控件,它属于UIControl 用的最多的就是事件响应 1. //创建按钮对象 UIButton * _botton = [UIButton buttonWithType:U ...

  9. UIMenuController的简单使用

    2019独角兽企业重金招聘Python工程师标准>>> // // ZsyTextField.m // UIMenuController使用简介 // // Created by Z ...

最新文章

  1. 2022-2028年中国乙酸钴行业发展现状调研及市场前景规划报告
  2. 通过聚合数据API获取微信精选文章
  3. 栈的输出_栈和队列--十进制转化为二进制
  4. python代码块-python一些常用代码块
  5. gcc交叉编译的实现
  6. iOS 文字样式处理总结(字体、前背景色、斜体、加粗、对齐、行间距、段间距、动态获取字符串label宽高等)...
  7. malloc和free
  8. 李宏毅自然语言处理——多语言BERT
  9. Android10(Q,API-29)以上版本无法在存储卡目录创建文件夹的问题
  10. QT绘制同心扇形(Paintevent实现)
  11. IntelliJ IDEA 更换背景图和背景颜色
  12. google code 代码托管 用git创建仓库
  13. Latex beamer 常用操作记录
  14. C++游戏game | 井字棋游戏坤坤版(配资源+视频)【赋源码,双人对战】
  15. android 游戏语言设置在哪里设置中文版,使命召唤手游语言变更方法 怎么设置中文...
  16. 激荡20年,芯片产能从零起步到反超美国,中国制造的又一大成就
  17. html好友页面,好友列表.html
  18. Jetpack MVVM 七宗罪之四: 使用 LiveData/StateFlow 发送 Events
  19. 漫话:什么是平衡(AVL)树?这应该是把AVL树讲的最好的文章了
  20. java判断两个数互质_AcWing 458. 比例简化-java(无需判断互质)

热门文章

  1. android日历编程,设置日历并添加 1天_android-calendar_开发99编程知识库
  2. JavaScript常用工具Date对象和Math介绍介绍
  3. Python进制转换(利用栈)
  4. 云服务器cpu系列,云服务器cpu系列
  5. java提升权限运行_提升代码的运行权限,实现模拟管理员身份的功能
  6. java 序列化 写入mysql_java 序列化到mysql数据库中
  7. linux安装mysql默认的配置文件_[转]关于Linux安装mysql默认配置文件位置
  8. 小学三年级计算机导学案,小学三年级学科导学案.doc
  9. python matplotlib使用ax绘图
  10. jmeter java 关联_使用Jmeter进行数据关联和并发用户