当调用了SelectorImpl的select()方法的时候,同时会将所带的参数,也就是给select()所设置的timeout,之后会调用lockAndDoSelect(),在这个方法中,主要还是调用了doSelect()方法,参数与传进来的一致。以WindowsSelectorImpl为例子,实现的deSelect()方法。

protected int doSelect(long var1) throws IOException {if(this.channelArray == null) {throw new ClosedSelectorException();} else {this.timeout = var1;this.processDeregisterQueue();if(this.interruptTriggered) {this.resetWakeupSocket();return 0;} else {this.adjustThreadsCount();this.finishLock.reset();this.startLock.startThreads();try {this.begin();try {this.subSelector.poll();} catch (IOException var7) {this.finishLock.setException(var7);}if(this.threads.size() > 0) {this.finishLock.waitForHelperThreads();}} finally {this.end();}this.finishLock.checkForException();this.processDeregisterQueue();int var3 = this.updateSelectedKeys();this.resetWakeupSocket();return var3;}}
}

首先,会调用processDeregisterQueue()方法,来将已经准备解除注册的channel进行解除注册。

void processDeregisterQueue() throws IOException {Set var1 = this.cancelledKeys();synchronized(var1) {if(!var1.isEmpty()) {Iterator var3 = var1.iterator();while(var3.hasNext()) {SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();try {this.implDereg(var4);} catch (SocketException var12) {IOException var6 = new IOException("Error deregistering key");var6.initCause(var12);throw var6;} finally {var3.remove();}}}}
}

在这里,会取得所有需要取消注册的SelectionKey,并且依次调用implDereg()进行解除绑定。

protected void implDereg(SelectionKeyImpl var1) throws IOException {int var2 = var1.getIndex();assert var2 >= 0;Object var3 = this.closeLock;synchronized(this.closeLock) {if(var2 != this.totalChannels - 1) {SelectionKeyImpl var4 = this.channelArray[this.totalChannels - 1];this.channelArray[var2] = var4;var4.setIndex(var2);this.pollWrapper.replaceEntry(this.pollWrapper, this.totalChannels - 1, this.pollWrapper, var2);}var1.setIndex(-1);}this.channelArray[this.totalChannels - 1] = null;--this.totalChannels;if(this.totalChannels != 1 && this.totalChannels % 1024 == 1) {--this.totalChannels;--this.threadsCount;}this.fdMap.remove(var1);this.keys.remove(var1);this.selectedKeys.remove(var1);this.deregister(var1);SelectableChannel var7 = var1.channel();if(!var7.isOpen() && !var7.isRegistered()) {((SelChImpl)var7).kill();}}

如果需要解除注册的channel已经是selector当中最后一个了,那么直接从数组中移走就行,但如果没有,则需要与数组最后一个索引位置上的交换位置,保证数组中间位置的连续,再将其移除,后面的操作与注册的操作相似,但都是反向操作。

在完成取消注册的步骤后,将会调用adjustThreadCount()方法来调整线程的数量,具体看下面的方法。

private void adjustThreadsCount() {int var1;if(this.threadsCount > this.threads.size()) {for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) {WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1);this.threads.add(var2);var2.setDaemon(true);var2.start();}} else if(this.threadsCount < this.threads.size()) {for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) {((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie();}}}

在Selector中,每有1024条channel,就需要重新开一个线程加入完成监听的操作,这里是从新根据当前应该有的线程数量与此时现存的线程数量进行比较,动态调整。

在这之后,调用了begin()方法,准备开始正式进行select操作。

protected final void begin() {if (interruptor == null) {interruptor = new Interruptible() {public void interrupt(Thread ignore) {AbstractSelector.this.wakeup();}};}AbstractInterruptibleChannel.blockedOn(interruptor);Thread me = Thread.currentThread();if (me.isInterrupted())interruptor.interrupt(me);
}

在这里的begin()方法判断了这里的interruptor是否为空,如果为空,则会在这里重新生成一个,这里的Interruptor保证了当线程阻塞在了Io操作上,并且被interruptor时,保证selecor能够被唤醒。

在begin()方法执行完毕之后,将会调用其subSelector的poll()方法,正式开始select操作。

private int poll() throws IOException {return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}private int poll(int var1) throws IOException {return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress + (long)(this.pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, WindowsSelectorImpl.this.totalChannels - (var1 + 1) * 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);

这里的poll0()还是原生方法的实现。主要是为了监听pollWrapper中所保存的fd是否有数据进出,如果没有进出,则会在在此处在timeout的时间里一直保持阻塞状态。

当完成数据监听,取得相应的数据的时候,在这之后,将会重新检验并取消一边已经被取消的channel之后调用updateSelectedKeys()方法。

private int processSelectedKeys(long var1) {byte var3 = 0;int var4 = var3 + this.processFDSet(var1, this.readFds, 1, false);var4 += this.processFDSet(var1, this.writeFds, 6, false);var4 += this.processFDSet(var1, this.exceptFds, 7, true);return var4;
}private int processFDSet(long var1, int[] var3, int var4, boolean var5) {int var6 = 0;for(int var7 = 1; var7 <= var3[0]; ++var7) {int var8 = var3[var7];if(var8 == WindowsSelectorImpl.this.wakeupSourceFd) {synchronized(WindowsSelectorImpl.this.interruptLock) {WindowsSelectorImpl.this.interruptTriggered = true;}} else {WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);if(var9 != null) {SelectionKeyImpl var10 = var9.ski;if(!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {if(WindowsSelectorImpl.this.selectedKeys.contains(var10)) {if(var9.clearedCount != var1) {if(var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {var9.updateCount = var1;++var6;}} else if(var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {var9.updateCount = var1;++var6;}var9.clearedCount = var1;} else {if(var9.clearedCount != var1) {var10.channel.translateAndSetReadyOps(var4, var10);if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}} else {var10.channel.translateAndUpdateReadyOps(var4, var10);if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}}var9.clearedCount = var1;}}}}}return var6;
}
private int updateSelectedKeys() {++this.updateCount;byte var1 = 0;int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);WindowsSelectorImpl.SelectThread var3;for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {var3 = (WindowsSelectorImpl.SelectThread)var2.next();}return var4;
}

这里,将会在所有线程中调用processSelectedKeys()来对所有线程在poll过程中取得的结果进行处理,并返回所有线程中处理的channel的数量。

private int processSelectedKeys(long var1) {byte var3 = 0;int var4 = var3 + this.processFDSet(var1, this.readFds, 1, false);var4 += this.processFDSet(var1, this.writeFds, 6, false);var4 += this.processFDSet(var1, this.exceptFds, 7, true);return var4;
}private int processFDSet(long var1, int[] var3, int var4, boolean var5) {int var6 = 0;for(int var7 = 1; var7 <= var3[0]; ++var7) {int var8 = var3[var7];if(var8 == WindowsSelectorImpl.this.wakeupSourceFd) {synchronized(WindowsSelectorImpl.this.interruptLock) {WindowsSelectorImpl.this.interruptTriggered = true;}} else {WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);if(var9 != null) {SelectionKeyImpl var10 = var9.ski;if(!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {if(WindowsSelectorImpl.this.selectedKeys.contains(var10)) {if(var9.clearedCount != var1) {if(var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {var9.updateCount = var1;++var6;}} else if(var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {var9.updateCount = var1;++var6;}var9.clearedCount = var1;} else {if(var9.clearedCount != var1) {var10.channel.translateAndSetReadyOps(var4, var10);if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}} else {var10.channel.translateAndUpdateReadyOps(var4, var10);if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {WindowsSelectorImpl.this.selectedKeys.add(var10);var9.updateCount = var1;++var6;}}var9.clearedCount = var1;}}}}}return var6;
}

这里实则是对在之前的监听到发生了io时间需要处理的fd与对应的channel进行操作,根据读到的fd取得selector下注册了的相应的channel,根据监听到其所发生的时间类型(读,写,异常)更新channel应有的状态,这是其主要功能,在完成这些操作之后,相应的slector的select也相应完成。

jdk的Selector(3)select的过程相关推荐

  1. 【Java】NIO中Selector的select方法源码分析

    该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看[Java]NIO中Channel的注册源码分析, [Java]NIO中Selector的创建源码分析 Select ...

  2. html中select标签乱码,select 的过程中中文乱码有关问题求教

    select 的过程中中文乱码问题求教 select dates,concat(concat(replace(replace(TRUNCATE(part * 0.5 – 0.5 * 1.0,1),'. ...

  3. SELECT执行过程,MySQL聚合函数,多行分组函数,GROUP BY HAVING,详细完整可收藏

    文章目录 1.聚合函数介绍 2.五个常用聚合函数 3.GROUP BY 4.HAVING 5.SELECT的执行过程 1.聚合函数介绍 聚合函数作用于一组数据,并对一组数据返回一个值.聚合函数不能嵌套 ...

  4. Spring : Spring Aop JDK和CGLIB动态代理调用过程

    1.美图 2.概述 3.Spring Aop JDK动态代理调用过程 参考:Spring Aop JDK动态代理调用过程 4. Spring Aop CGLIB动态代理调用过程 参考:

  5. eclipse安装和jdk环境配置(新手详细过程)

    eclipse安装和jdk环境配置,整个过程分为以下几步: 1.jdk的下载安装以及环境变量设置:(jdk安装). 注:目前jdk里面已包含jre,所以不需要进行jre的安装. 2.Eclipse的下 ...

  6. jdk的selector(2)channel的注册

    Selector中的channel注册需要由SelectabelChannel调用其register()方法开始注册流程.具体的register()方法实现在了AbstractSelectableCh ...

  7. jdk的selector(1)

    在java中如过需要用到Selector来处理Nio的情况下,需要先使用SelectorProvider的provider()方法来取得相应的SelectorProvider. public stat ...

  8. 图说Netty服务端启动过程

    来源:逐码 我们知道Netty是一个基于JDK的nio实现的网络编程框架,那Netty的服务端是怎么启动的呢,包括他是何时register 的,何时 bind 端口的,以及何时开始读取网络中的数据的? ...

  9. 深入剖析通信层和RPC调用的异步化(上)

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析通信层和 RPC 调用的异步化.李林锋此后还将在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的 ...

最新文章

  1. Swift 3.0 预告:将 Objc 库转换成更符合 Swift 语法风格的形式
  2. win7怎么跳过硬盘自检_win10系统改装win7步骤教程
  3. php面向对象编程快速入门,PHP面向对象编程的快速入门
  4. Python打包程序
  5. maven初学者(一)
  6. 从业务在线到互联互通,钉钉宜搭进入低代码3.0阶段新模式
  7. cout输出数组_让程序从1开始一直执行++操作,10秒钟能输出最大的数是多少
  8. LeetCode 1814. 统计一个数组中好对子的数目(哈希)
  9. 专题导读:科学数据治理
  10. window.onload 函数不执行处理
  11. LeetCode(561)——数组拆分 I(JavaScript)
  12. Stack Overflow 遭黑客入侵;中国首条 5G 覆盖地铁诞生;VS Code 1.34 发布!| 极客头条...
  13. HBase MemStore和Compaction剖析
  14. Android必知必会-使用Intent打开第三方应用及验证可用性
  15. Android ndk开发C调用C++
  16. html鼠标悬停多个效果,33个jQuery与CSS3实现的绚丽鼠标悬停效果
  17. 蓝桥杯练习 杨辉三角形
  18. 随笔 - 记录下当前的生活
  19. 文末有福利 | 停不下来!程序员在GitHub上开源了一个自制表情包项目
  20. html字体及颜色设置

热门文章

  1. 今年四月份,发现我的文章被人全部复制了,抄到博客园了,连原文出处都没有,就算你写个参考文章也行呀
  2. 解决IDEA不能编译XML文件
  3. Java8新特性总结 - 2.Optional类
  4. Spring注解之@Import用法解析
  5. java setr()_Java RPr.setRFonts方法代码示例
  6. idea解决activiti(*.bpmn)文件乱码问题。
  7. java判断经纬度是否在扇形内_地理坐标是用经度
  8. Cannot forward after response has been committed
  9. 一次httpserver优化的经验和教训(silverlight游戏 - 金庸群侠传X0.5上线记)
  10. HDU 2045 不容易系列之(3)―― LELE的RPG难题(递推)