一直不明白pipe是如何唤醒selector的,所以又去看了jdk的源码(openjdk下载),整理了如下:

以Java nio自带demo : OperationServer.java   OperationClient.java(见附件)

其中server端的核心代码:

public voidinitSelector() {try{

selector=SelectorProvider.provider().openSelector();this.serverChannel1 =ServerSocketChannel.open();

serverChannel1.configureBlocking(false);

InetSocketAddress isa= new InetSocketAddress("localhost", this.port1);

serverChannel1.socket().bind(isa);

serverChannel1.register(selector, SelectionKey.OP_ACCEPT);

}catch(IOException e) {//TODO Auto-generated catch block

e.printStackTrace();

}

}

从头开始,

先看看SelectorProvider.provider()做了什么:

public staticSelectorProvider provider() {synchronized(lock) {if (provider != null)returnprovider;returnAccessController.doPrivileged(new PrivilegedAction() {publicSelectorProvider run() {if(loadProviderFromProperty())returnprovider;if(loadProviderAsService())returnprovider;

provider=sun.nio.ch.DefaultSelectorProvider.create();returnprovider;

}

});

}

}

其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;

而if (provider != null) returnprovider;

保证了整个server程序中只有一个WindowsSelectorProvider对象;

再看看WindowsSelectorProvider. openSelector():

public AbstractSelector openSelector() throwsIOException {return new WindowsSelectorImpl(this);

}newWindowsSelectorImpl(SelectorProvider)代码:

WindowsSelectorImpl(SelectorProvider sp)throwsIOException {super(sp);

pollWrapper= newPollArrayWrapper(INIT_CAP);

wakeupPipe=Pipe.open();

wakeupSourceFd=((SelChImpl)wakeupPipe.source()).getFDVal();//Disable the Nagle algorithm so that the wakeup is more immediate

SinkChannelImpl sink =(SinkChannelImpl)wakeupPipe.sink();

(sink.sc).socket().setTcpNoDelay(true);

wakeupSinkFd=((SelChImpl)sink).getFDVal();

pollWrapper.addWakeupSocket(wakeupSourceFd,0);

}

其中Pipe.open()是关键,这个方法的调用过程是:

Java代码

public static Pipe open() throwsIOException {returnSelectorProvider.provider().openPipe();

}

SelectorProvider 中:public Pipe openPipe() throwsIOException {return new PipeImpl(this);

}

再看看怎么new PipeImpl()的:

Java代码

PipeImpl(SelectorProvider sp) {long pipeFds = IOUtil.makePipe(true);int readFd = (int) (pipeFds >>> 32);int writeFd = (int) pipeFds;

FileDescriptor sourcefd= newFileDescriptor();

IOUtil.setfdVal(sourcefd, readFd);

source= newSourceChannelImpl(sp, sourcefd);

FileDescriptor sinkfd= newFileDescriptor();

IOUtil.setfdVal(sinkfd, writeFd);

sink= newSinkChannelImpl(sp, sinkfd);

}

其中IOUtil.makePipe(true)是个native方法:

/**

* Returns two file descriptors for a pipe encoded in a long.

* The read end of the pipe is returned in the high 32 bits,

* while the write end is returned in the low 32 bits.

*/

staticnativelong makePipe(boolean blocking);

具体实现:

JNIEXPORT jlong JNICALL

Java_sun_nio_ch_IOUtil_makePipe(JNIEnv*env, jobject this, jboolean blocking)

{int fd[2];if (pipe(fd) < 0) {

JNU_ThrowIOExceptionWithLastError(env,"Pipe failed");return 0;

}if (blocking ==JNI_FALSE) {if ((configureBlocking(fd[0], JNI_FALSE) < 0)|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {

JNU_ThrowIOExceptionWithLastError(env,"Configure blocking failed");

close(fd[0]);

close(fd[1]);return 0;

}

}return ((jlong) fd[0] << 32) | (jlong) fd[1];

}static intconfigureBlocking(intfd, jboolean blocking)

{int flags =fcntl(fd, F_GETFL);int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags |O_NONBLOCK);return (flags == newflags) ? 0: fcntl(fd, F_SETFL, newflags);

}

正如这段注释:

/**

* Returns two file descriptors for a pipe encoded in a long.

* The read end of the pipe is returned in the high 32 bits,

* while the write end is returned in the low 32 bits.

*/

High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位处理。

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

这行代码把返回的pipe的write端的FD放在了pollWrapper中(后面会发现,这么做是为了实现selector的wakeup())

ServerSocketChannel.open()的实现:

public static ServerSocketChannel open() throwsIOException {returnSelectorProvider.provider().openServerSocketChannel();

}

SelectorProvider:public ServerSocketChannel openServerSocketChannel() throwsIOException {return new ServerSocketChannelImpl(this);

}

可见创建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。

ServerSocketChannelImpl(SelectorProvider sp) throwsIOException {super(sp);this.fd = Net.serverSocket(true); //打开一个socket,返回FD

this.fdVal =IOUtil.fdVal(fd);this.state =ST_INUSE;

}

然后通过serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel绑定在一起,也就是把new ServerSocketChannel时创建的FD与selector绑定在了一起。

到此,server端已启动完成了,主要创建了以下对象:

WindowsSelectorProvider:单例

WindowsSelectorImpl中包含:

pollWrapper:保存selector上注册的FD,包括pipe的write端FD和ServerSocketChannel所用的FD

wakeupPipe:通道(其实就是两个FD,一个read,一个write)

再到Server 中的run():

selector.select();主要调用了WindowsSelectorImpl中的这个方法:

protected int doSelect(long timeout) throwsIOException {if (channelArray == null)throw newClosedSelectorException();this.timeout = timeout; //set selector timeout

processDeregisterQueue();if(interruptTriggered) {

resetWakeupSocket();return 0;

}//Calculate number of helper threads needed for poll. If necessary//threads are created here and start waiting on startLock

adjustThreadsCount();

finishLock.reset();//reset finishLock//Wakeup helper threads, waiting on startLock, so they start polling.//Redundant threads will exit here after wakeup.

startLock.startThreads();//do polling in the main thread. Main thread is responsible for//first MAX_SELECTABLE_FDS entries in pollArray.

try{

begin();try{

subSelector.poll();

}catch(IOException e) {

finishLock.setException(e);//Save this exception

}//Main thread is out of poll(). Wakeup others and wait for them

if (threads.size() > 0)

finishLock.waitForHelperThreads();

}finally{

end();

}//Done with poll(). Set wakeupSocket to nonsignaled for the next run.

finishLock.checkForException();

processDeregisterQueue();int updated =updateSelectedKeys();//Done with poll(). Set wakeupSocket to nonsignaled for the next run.

resetWakeupSocket();returnupdated;

}

其中subSelector.poll()是核心,也就是轮训pollWrapper中保存的FD;具体实现是调用native方法poll0:

private int poll() throws IOException{ //poll for the main thread

returnpoll0(pollWrapper.pollArrayAddress,

Math.min(totalChannels, MAX_SELECTABLE_FDS),

readFds, writeFds, exceptFds, timeout);

}private native int poll0(long pollAddress, intnumfds,int[] readFds, int[] writeFds, int[] exceptFds, longtimeout);//These arrays will hold result of native select().//The first element of each array is the number of selected sockets.//Other elements are file descriptors of selected sockets.

private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD

private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD

private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD

这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。比如,由于pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD发一份数据,也会造成poll0()返回;如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,所有在OperationServer的run()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();

这时再来看看WindowsSelectorImpl. Wakeup():

publicSelector wakeup() {synchronized(interruptLock) {if (!interruptTriggered) {

setWakeupSocket();

interruptTriggered= true;

}

}return this;

}//Sets Windows wakeup socket to a signaled state.

private voidsetWakeupSocket() {

setWakeupSocket0(wakeupSinkFd);

}private native void setWakeupSocket0(intwakeupSinkFd);

JNIEXPORTvoidJNICALL

Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv*env, jclass this,

jint scoutFd)

{/*Write one byte into the pipe*/

const char byte = 1;

send(scoutFd,&byte, 1, 0);

}

可见wakeup()是通过pipe的write 端send(scoutFd, &byte, 1, 0),发生一个字节1,来唤醒poll()。所以在需要的时候就可以调用selector.wakeup()来唤醒selector。

原文:http://goon.iteye.com/blog/1775421

补充linux操作系统下的DefaultSelectorProvider的实现,可以看到,如果内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider

//sun.nio.ch.DefaultSelectorProvider

public staticSelectorProvider create() {

PrivilegedAction pa= new GetPropertyAction("os.name");

String osname=(String) AccessController.doPrivileged(pa);if ("SunOS".equals(osname)) {return newsun.nio.ch.DevPollSelectorProvider();

}//use EPollSelectorProvider for Linux kernels >= 2.6

if ("Linux".equals(osname)) {

pa= new GetPropertyAction("os.version");

String osversion=(String) AccessController.doPrivileged(pa);

String[] vers= osversion.split("\\.", 0);if (vers.length >= 2) {try{int major = Integer.parseInt(vers[0]);int minor = Integer.parseInt(vers[1]);if (major > 2 || (major == 2 && minor >= 6)) {return newsun.nio.ch.EPollSelectorProvider();

}

}catch(NumberFormatException x) {//format not recognized

}

}

}return newsun.nio.ch.PollSelectorProvider();

}

java selector 源码_Java NIO——Selector机制源码分析---转相关推荐

  1. java 32位授权码_Java实现OAuth2.0授权码方式

    Java实现OAuth2.0授权码方式 前面介绍了OAuth2.0和授权方式,可以参考以下文章: 今天就用Java来验证OAuth2.0授权方式的授权码式,我们Spring Cloud的OAuth来实 ...

  2. 分析java 线程占用内存_Java线程:保留的内存分析

    分析java 线程占用内存 本文将为您提供一个教程,使您可以确定活动应用程序Java线程中保留了多少Java堆空间 . 将提供来自Oracle Weblogic 10.0生产环境的真实案例研究,以使您 ...

  3. java selector 源码_Java NIO核心组件-Selector和Channel

    昨天我们介绍了一下SelectorProvider和IO multiplexing.特别是IO multiplexing中的epoll系统调用,是Linux版本的Java的NIO的核心实现. 那今天我 ...

  4. java channel源码_java nio ServerSocketChannel源码分析

    /** * 创建新实例 * * @param provider * The provider that created this channel */ protected ServerSocketCh ...

  5. android java代码打印系统日志_Java快速开发平台源码

    Java快速开发平台源码 用户权限管理系统源码 一个轻量级的Java快速开发平台,能快速开发项目并交付[接私活利器] 友好的代码结构及注释,便于阅读及二次开发 实现前后端分离,通过token进行数据交 ...

  6. java futuretask 源码_java并发编程——FutureTask源码分析

    FutureTask的简单示例: FutureTask的应用场景,如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前 ...

  7. java list addall源码_Java集合:ArrayList源码分析

    其实我看到已有很多大佬写过此类文章,并且写的也比较清晰明了,那我为何要再写一遍呢?其实也是为了加深本身的印象,巩固本身的基础html (主要是不少文章没有写出来我想知道的东西!!!​!!!!)java ...

  8. java程序阅读技巧_Java程序员阅读源码的小技巧,原来大牛都是这样读的,赶紧看看!...

    1.Quick Type Hierarchy 快速查看类继承体系. 快捷键:Ctrl + T 查看类很多人可能都知道,可源码阅读的时候更多用来查看方法体系更重要,可以方便快速的定位到方法的实现类.如: ...

  9. java linkedlist源码_Java集合之LinkedList源码分析

    一.LinkedList简介 LinkedList是一种可以在任何位置进行高效地插入和移除操作的有序序列,它是基于双向链表实现的. ps:这里有一个问题,就是关于实现LinkedList的数据结构是否 ...

最新文章

  1. java.lang.RuntimeException: Handler (com.***.behavior.BEvent$1) {421bca80} sending message to a Hand
  2. java 获取上下文_如何获得spring上下文的方法总结
  3. IOS 浏览器端overflow:scroll overflow:auto元素无法滑动bug解决方法整理
  4. 慕课-北京理工大学 机器学习 大学生上网时间 聚类,小白学习
  5. MVC Web.Config 配置错误
  6. 【jzoj】2018.2.7NOIP普及组——某【BC】组模拟赛
  7. 2020年度中国生命科学十大进展揭晓
  8. 网易《社会心理学》笔记(不定时更新)
  9. 视觉平台搭建——光源选型介绍
  10. 微机计算机原理及应用ppt,微型计算机原理及应用PPT课件
  11. 问道虚拟机服务器地址,问道1.60.0905虚拟机手工启动服务端+配套客户端+启动教程+充值注册软件+配套工具...
  12. 近远场转换算法matlab,计算电磁学之FDTD算法的MATLAB语言实现解析.doc
  13. linux qq传文件怎么安装,在Ubuntu Linux下怎样安装QQ
  14. 金盾播放器android安卓,(金盾高级视频加密系统跨平台播放器Android安卓安装步骤.doc...
  15. 【FinalIK】Full Body Biped IK
  16. 美国阿肯色州闪电彩虹共享一片天空(组图)
  17. python源文件改写_Python源文件改写.编写一个程序,读取一个Python源程序,将文件中所有除保留字外的小写字母换成大写字母...
  18. 新媒体广告投放的知识要点解析
  19. 【无人驾驶 | 国内篇】主要玩家介绍
  20. 响应式织梦模板旅游公司类网站

热门文章

  1. 从Eclipse中导入项目到AndroidStudio中
  2. 每日学口语(持续更新)
  3. 苹果三代耳机_P130 【AirPods 3代】绝对性的福利!“地表最强真无线耳机” 苹果第三代AirPods Pro蓝牙耳机、妙不可言!...
  4. msdn和系统下载位置
  5. 实用收藏,外贸常用工具大全分享-跨境知道
  6. Selenium+PhantomJS QQ空间模拟登陆
  7. 淘宝新自动化测试框架AutoRobot简要介绍
  8. linux自动刷新桌面,Ubuntu下实现用Python开机自动更新壁纸为bing壁纸
  9. 用原生JS写一个网页版的2048小游戏(兼容移动端)
  10. HTML5 权威指南第 10 章 文档分节 学习笔记