在linux2.5.44首次引入epoll,它设计的目的旨在取代既有的select、poll系统函数,让需要大量操作文件描述符的程序得以发挥更优异的性能(wikipedia example: 旧有的系统函数所花费的时间复杂度为O(n), epoll的时间复杂度O(log n))。epoll实现的功能与poll类似,都是监听多个文件描述符上的事件。
epoll底层是由可配置的操作系统内核对象建构而成,并以文件描述符(file descriptor)的形式呈现于用户空间(from wikipedia: 在操作系统中,虚拟内存通常会被分成用户空间,与核心空间这两个区段。这是存储器保护机制中的一环。内核**、核心扩展(kernel extensions)、以及驱动程序,运行在核心空间**上。而其他的应用程序,则运行在用户空间上。所有运行在用户空间的应用程序,都被统称为用户级(userland))。
多说一点关于内核的

它是一个用来管理软件发出的数据I/O的一个程序,并将数据交由CPU和电脑其他电子组件处理,但是直接对硬件操作是非常复杂的,通常内核提供一种硬件抽象的方法来完成(由内核决定一个程序在什么时候对某部分硬件操作多长时间),通过这些方法来完成进程间通信和系统调用。

宏内核:

宏内核简单来说,首先定义了一个高阶的抽象接口,叫系统调用(System call))来实现操作系统的功能,例如进程管理,文件系统,和存储管理等等,这些功能由多个运行在内核态的程序来完成。

微内核:

微内核结构由硬件抽象层和系统调用组成;包括了创建一个系统必需的几个部分;如线程管理,地址空间和进程间通信等。微核的目标是将系统服务的实现和系统的基本操作规则分离开来。

linux就是使用的宏内核。因为它能够在运行时将模块调入执行,使扩充内核的功能变得更简单。

epoll做了什么事?

epoll 通过使用红黑树(RB-tree)搜索被监视的文件描述符(file descriptor)。

在 epoll 实例上注册事件时,epoll 会将该事件添加到 epoll 实例的红黑树上并注册一个回调函数,当事件发生时会将事件添加到就绪链表中。

epoll的结构?

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

①epoll_create

向内核申请空间,创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。在最初的实现中,调用者通过 size 参数告知内核需要监听的文件描述符数量。如果监听的文件描述符数量超过 size, 则内核会自动扩容。而现在 size 已经没有这种语义了,但是调用者调用时 size 依然必须大于 0,以保证后向兼容性。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的。

②epoll_ctl

向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听。op 可以为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分别对应的是添加新的事件,修改文件描述符上监听的事件类型,从实例上删除一个事件。如果 event 的 events 属性设置了 EPOLLET flag,那么监听该事件的方式是边缘触发。

events可以是以下几个宏的集合:

  • EPOLLIN:触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
  • EPOLLOUT:触发该事件,表示对应的文件描述符上可以写数据;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

例如:

struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

③epoll_wait

Linux-2.6.19又引入了可以屏蔽指定信号的epoll_wait: epoll_pwait

接收发生在被侦听的描述符上的,用户感兴趣的IO事件。简单点说:通过循环,不断地监听暴露的端口,看哪一个fd可读、可写~

当 timeout 为 0 时,epoll_wait 永远会立即返回。而 timeout 为 -1 时,epoll_wait 会一直阻塞直到任一已注册的事件变为就绪。当 timeout 为一正整数时,epoll 会阻塞直到计时结束或已注册的事件变为就绪。因为内核调度延迟,阻塞的时间可能会略微超过 timeout (毫秒级)。

epoll文件描述符用完后,直接用close关闭,并且会自动从被侦听的文件描述符集合中删除

epoll实战

说了这么多原理,脑壳怕嗡嗡的吧,来看看实战清醒下~

如上知道:每次添加/修改/删除被侦听文件描述符都需要调用epoll_ctl,所以要尽量少地调用epoll_ctl,防止其所引来的开销抵消其带来的好处。有的时候,应用中可能存在大量的短连接(比如说Web服务器),epoll_ctl将被频繁地调用,可能成为这个系统的瓶颈。

传统的select以及poll的效率会因为在线人数的线形递增而导致呈二次乃至三次方的下降,这些直接导致了网络服务器可以支持的人数有了个比较明显的限制。这是因为他们有限的文件描述符和遍历所有的fd所带来的低效。

重点哦~

当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作—这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有“活跃”的socket才会主动的去调用 callback函数,其他idle(空闲)状态socket则不会,在这点上,epoll实现了一个“伪”AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的—比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。

int epfd = epoll_create(POLL_SIZE);struct epoll_event ev;struct epoll_event *events = NULL;nfds = epoll_wait(epfd, events, 20, 500);{for (n = 0; n < nfds; ++n) {if (events[n].data.fd == listener) {//如果是主socket的事件的话,则表示//有新连接进入了,进行新连接的处理。client = accept(listener, (structsockaddr *)&local, &addrlen);if (client < 0) {perror("accept");continue;}setnonblocking(client);        //将新连接置于非阻塞模式ev.events = EPOLLIN | EPOLLET; //并且将新连接也加入EPOLL的监听队列。//注意,这里的参数EPOLLIN|EPOLLET并没有设置对写socket的监听,//如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作//也监听的话,应该是EPOLLIN|EPOLLOUT|EPOLLETev.data.fd = client;if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {//设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,//这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个//epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。fprintf(stderr, "epollsetinsertionerror:fd=%d", client);return -1;}}else if(event[n].events & EPOLLIN){//如果是已经连接的用户,并且收到数据,//那么进行读入int sockfd_r;if ((sockfd_r = event[n].data.fd) < 0)continue;read(sockfd_r, buffer, MAXSIZE);//修改sockfd_r上要处理的事件为EPOLLOUTev.data.fd = sockfd_r;ev.events = EPOLLOUT | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)}else if(event[n].events & EPOLLOUT){//如果有数据发送int sockfd_w = events[n].data.fd;write(sockfd_w, buffer, sizeof(buffer));//修改sockfd_w上要处理的事件为EPOLLINev.data.fd = sockfd_w;ev.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)}do_use_fd(events[n].data.fd);}}

简单说下流程:

  • 监听到有新连接进入了,进行新连接的处理;
  • 如果是已经连接的用户,并且收到数据,读完之后修改sockfd_r上要处理的事件为EPOLLOUT(可写);
  • 如果有数据发送,写完之后,修改sockfd_w上要处理的事件为EPOLLIN(可读)

epoll在Java中怎么去调用的?

基础知识:

文件描述符:

  • (参考《Unix网络编程》译者的注释)
  • 文件描述符是Unix系统标识文件的int,Unix的哲学一切皆文件,所以各自资源(包括常规意义的文件、目录、管道、POSIX IPC、socket)都可以看成文件。

Java NIO的世界中,Selector是中央控制器,Buffer是承载数据的容器,而Channel可以说是最基础的门面,它是本地I/O设备、网络I/O的通信桥梁。

  • 网络I/O设备:
  • DatagramChannel:读写UDP通信的数据,对应DatagramSocket类
  • SocketChannel:读写TCP通信的数据,对应Socket类
  • ServerSocketChannel:监听新的TCP连接,并且会创建一个可读写的SocketChannel,对应ServerSocket类
  • 本地I/O设备:
  • FileChannel:读写本地文件的数据,不支持Selector控制,对应File类

①先从最简单的ServerSocketChannel看起

ServerSocketChannel与ServerSocket一样是socket监听器,其主要区别前者可以运行在非阻塞模式下运行;

// 创建一个ServerSocketChannel,将会关联一个未绑定的ServerSocketpublic static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}

ServerSocketChannel的创建也是依赖底层操作系统实现,其实现类主要是ServerSocketChannelImpl,我们来看看其构造方法

    ServerSocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);// 创建一个文件操作符this.fd = Net.serverSocket(true);// 得到文件操作符是索引this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}

新建一个ServerSocketChannelImpl其本质是在底层操作系统创建了一个fd(即文件描述符),相当于建立了一个用于网络通信的通道,调用socket的bind()方法绑定,通过accept()调用操作系统获取TCP连接

public SocketChannel accept() throws IOException {// 忽略一些校验及无关代码....SocketChannelImpl var2 = null;// var3的作用主要是说明当前的IO状态,主要有/*** EOF = -1;* UNAVAILABLE = -2;* INTERRUPTED = -3;* UNSUPPORTED = -4;* THROWN = -5;* UNSUPPORTED_CASE = -6;*/int var3 = 0;// 这里本质也是用fd来获取连接FileDescriptor var4 = new FileDescriptor();// 用来存储TCP连接的地址信息InetSocketAddress[] var5 = new InetSocketAddress[1];try {// 这里设置了一个中断器,中断时会将连接关闭this.begin();// 这里当IO被中断时,会重新获取连接do {var3 = this.accept(this.fd, var4, var5);} while(var3 == -3 && this.isOpen());}finally {// 当连接被关闭且accept失败时或抛出AsynchronousCloseExceptionthis.end(var3 > 0);// 验证连接是可用的assert IOStatus.check(var3);}if (var3 < 1) {return null;} {// 默认连接是阻塞的IOUtil.configureBlocking(var4, true);// 创建一个SocketChannel的引用var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);// 下面是是否连接成功校验,这里忽略...return var2;}
}// 依赖底层操作系统实现的accept0方法
private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {return this.accept0(var1, var2, var3);
}

②SocketChannel

用于读写TCP通信的数据,相当于客户端

  1. 通过open方法创建SocketChannel,
  2. 然后利用connect方法来和服务端发起建立连接,还支持了一些判断连接建立情况的方法;
  3. read和write支持最基本的读写操作

open

 public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel();}public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);}// State, increases monotonicallyprivate static final int ST_UNINITIALIZED = -1;private static final int ST_UNCONNECTED = 0;private static final int ST_PENDING = 1;private static final int ST_CONNECTED = 2;private static final int ST_KILLPENDING = 3;private static final int ST_KILLED = 4;private int state = ST_UNINITIALIZED;    SocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);// 创建一个scoket通道,即fd(fd的作用可参考上面的描述)this.fd = Net.socket(true);// 得到该fd的索引this.fdVal = IOUtil.fdVal(fd);// 设置为未连接this.state = ST_UNCONNECTED;}

connect建立连接

// 代码均来自JDK1.8 部分代码public boolean connect(SocketAddress var1) throws IOException {boolean var2 = false;// 读写都锁住synchronized(this.readLock) {synchronized(this.writeLock) {/****状态检查,channel和address****/// 判断channel是否openthis.ensureOpenAndUnconnected();InetSocketAddress var5 = Net.checkAddress(var1);SecurityManager var6 = System.getSecurityManager();if (var6 != null) {var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());}boolean var10000;/****连接建立****/// 阻塞状态变更的锁也锁住synchronized(this.blockingLock()) {int var8 = 0;try {try {this.begin(); // 如果当前socket未绑定本地端口,则尝试着判断和服务端是否能建立连接synchronized(this.stateLock) {if (!this.isOpen()) {boolean var10 = false;return var10;}if (this.localAddress == null) {// 和远程建立连接后关闭连接NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());}this.readerThread = NativeThread.current();}do {InetAddress var9 = var5.getAddress();if (var9.isAnyLocalAddress()) {var9 = InetAddress.getLocalHost();}// 建立连接var8 = Net.connect(this.fd, var9, var5.getPort());} while(var8 == -3 && this.isOpen());synchronized(this.stateLock) {this.remoteAddress = var5;if (var8 <= 0) {if (!this.isBlocking()) {this.state = 1;} else {assert false;}} else {this.state = 2;// 连接成功if (this.isOpen()) {this.localAddress = Net.localAddress(this.fd);}var10000 = true;return var10000;}}}var10000 = false;return var10000;}}}

在建立在绑定地址之前,我们需要调用NetHooks.beforeTcpBind,这个方法是将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。SDP需要网卡支持InfiniBand高速网络通信技术,windows不支持该协议。

我们来看看在openjdk: src\solaris\classes\sun\net下的NetHooks.java

private static final Provider provider = new sun.net.sdp.SdpProvider();public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpBind(fdObj, address, port);}public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpConnect(fdObj, address, port);}

可以看到实际是调用的SdpProvider里的implBeforeTcpBind

@Overridepublic void implBeforeTcpBind(FileDescriptor fdObj,InetAddress address,int port)throws IOException{if (enabled)convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);}// converts unbound TCP socket to a SDP socket if it matches the rulesprivate void convertTcpToSdpIfMatch(FileDescriptor fdObj,Action action,InetAddress address,int port)throws IOException{boolean matched = false;// 主要是先通过规则校验器判断入参是否符合,一般有PortRangeRule校验器// 然后再执行将fd转换为socketfor (Rule rule: rules) {if (rule.match(action, address, port)) {SdpSupport.convertSocket(fdObj);matched = true;break;}}}public static void convertSocket(FileDescriptor fd) throws IOException {...//获取fd索引int fdVal = fdAccess.get(fd);convert0(fdVal);}// convert0JNIEXPORT void JNICALLJava_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd){// create方法实际是通过socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一个socketint s = create(env);if (s >= 0) {socklen_t len;int arg, res;struct linger linger;/* copy socket options that are relevant to SDP */len = sizeof(arg);// 重用TIME_WAIT的端口if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);len = sizeof(arg);// 紧急数据放入普通数据流if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);len = sizeof(linger);// 延迟关闭连接if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);// 将fd也引用到s所持有的通道RESTARTABLE(dup2(s, fd), res);if (res < 0)JNU_ThrowIOExceptionWithLastError(env, "dup2");// 执行close方法,关闭s这个引用RESTARTABLE(close(s), res);}}

read 读

public int read(ByteBuffer var1) throws IOException {// 省略一些判断synchronized(this.readLock) {this.begin();synchronized(this.stateLock) {do {// 通过IOUtil的读取fd的数据至buf// 这里的nd是SocketDispatcher,用于调用底层的read和write操作var3 = IOUtil.read(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen());// 这个方法主要是将UNAVAILABLE(原为-2)这个状态返回0,否则返回nvar4 = IOStatus.normalize(var3);var20 = false;break label367;}this.readerCleanup();assert IOStatus.check(var3);}    }}}
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1.isReadOnly()) {throw new IllegalArgumentException("Read-only buffer");} else if (var1 instanceof DirectBuffer) {return readIntoNativeBuffer(var0, var1, var2, var4);} else {// 临时缓冲区,大小为buf的remain(limit - position),堆外内存,使用ByteBuffer.allocateDirect(size)分配// Notes:这里分配后后面有个try-finally块会释放该部分内存ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());int var7;try {// 将网络中的buf读进direct bufferint var6 = readIntoNativeBuffer(var0, var5, var2, var4);var5.flip();// 待读取if (var6 > 0) {var1.put(var5);// 成功时写入}var7 = var6;} finally {Util.offerFirstTemporaryDirectBuffer(var5);}return var7;}}
private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {// 忽略变量initif (var2 != -1L) {// pread方法只有在同步状态下才能使用var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);} else {// 其调用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);}if (var9 > 0) {var1.position(var5 + var9);}return var9;}}
// 同样找到openjdk:src\solaris\native\sun\nio\ch
//FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,jobject fdo, jlong address, jint len)
{jint fd = fdval(env, fdo);// 获取fd索引void *buf = (void *)jlong_to_ptr(address);// 调用底层read方法return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}

总结一下读取的过程

  1. 初始化一个direct buffer,如果本身的buffer就是direct的则不用初始化
  2. 调用底层read方法写入至direct buffer
  3. 最终将direct buffer写到传入的buffer对象

write 写

看完了前面的read,write整个执行流程基本一样,具体的细节参考如下

public int write(ByteBuffer var1) throws IOException {if (var1 == null) {throw new NullPointerException();} else {synchronized(this.writeLock) {this.ensureWriteOpen();this.begin();synchronized(this.stateLock) {if (!this.isOpen()) {var5 = 0;var20 = false;break label310;}this.writerThread = NativeThread.current();}do {// 通过IOUtil的读取fd的数据至buf// 这里的nd是SocketDispatcher,用于调用底层的read和write操作var3 = IOUtil.write(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen());var4 = IOStatus.normalize(var3);var20 = false;this.writerCleanup();assert IOStatus.check(var3);return var4;}}}}
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1 instanceof DirectBuffer) {return writeFromNativeBuffer(var0, var1, var2, var4);} else {ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);int var10;try {// 这里的pos为buf初始的position,意思是将buf重置为最初的状态;因为目前还没有真实的写入到channel中var8.put(var1);var8.flip();var1.position(var5);// 调用int var9 = writeFromNativeBuffer(var0, var8, var2, var4);if (var9 > 0) {var1.position(var5 + var9);}var10 = var9;} finally {Util.offerFirstTemporaryDirectBuffer(var8);}return var10;}}
IOUtil.writeFromNativeBuffer(fd , buf , position , nd)
{// ... 忽略一些获取buf变量的代码    int written = 0;if (position != -1) {// pread方法只有在同步状态下才能使用written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);} else {// 其调用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);}//....
}
FileDispatcherImpl.write0
{// 调用底层的write方法写入return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}
}

总结一下write的过程:

  1. 如果buf是direct buffer则直接开始写入,否则需要初始化一个direct buffer,大小是buf的remain
  2. 将buf的内容写入到direct buffer中,并恢复buf的position
  3. 调用底层的write方法写入至channel
  4. 更新buf的position,即被direct buffer读取内容后的position

耐心一点,马上就到Epoll了

理解了前面的一些基础知识,接下来的部分就会涉及到Java是怎么样来使用epoll的。

Selector简述

Selector的作用是Java NIO中管理一组多路复用的SelectableChannel对象,并能够识别通道是否为诸如读写事件做好准备的组件 --Java doc

Selector的创建过程如下:

// 1.创建Selector
Selector selector = Selector.open();// 2.将Channel注册到选择器中
// ....... new channel的过程 ....//Notes:channel要注册到Selector上就必须是非阻塞的,所以FileChannel是不可以
//使用Selector的,因为FileChannel是阻塞的
channel.configureBlocking(false);// 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);// 也可以使用或运算|来组合多个事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);// 不过值得注意的是,一个 Channel 仅仅可以被注册到一个 Selector 一次,
// 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey
//的 interest set.

①一个Channel在Selector注册其代表的是一个SelectionKey事件,SelectionKey的类型包括:

  • OP_READ:可读事件;值为:1<<0
  • OP_WRITE:可写事件;值为:1<<2
  • OP_CONNECT:客户端连接服务端的事件(tcp连接),一般为创建SocketChannel客户端channel;值为:1<<3
  • OP_ACCEPT:服务端接收客户端连接的事件,一般为创建ServerSocketChannel服务端channel;值为:1<<4

②一个Selector内部维护了三组keys:

  1. key set:当前channel注册在Selector上所有的key;可调用keys()获取
  2. selected-key set:当前channel就绪的事件;可调用selectedKeys()获取
  3. cancelled-key:主动触发SelectionKey#cancel()方法会放在该集合,前提条件是该channel没有被取消注册;不可通过外部方法调用

③Selector类中总共包含以下10个方法:

  • open():创建一个Selector对象
  • isOpen():是否是open状态,如果调用了close()方法则会返回false
  • provider():获取当前Selector的Provider
  • keys():如上文所述,获取当前channel注册在Selector上所有的key
  • selectedKeys():获取当前channel就绪的事件列表
  • selectNow():获取当前是否有事件就绪,该方法立即返回结果,不会阻塞;如果返回值>0,则代表存在一个或多个
  • select(long timeout):selectNow的阻塞超时方法,超时时间内,有事件就绪时才会返回;否则超过时间也会返回
  • select():selectNow的阻塞方法,直到有事件就绪时才会返回
  • wakeup():调用该方法会时,阻塞在select()处的线程会立马返回;(ps:下面一句划重点)即使当前不存在线程阻塞在select()处,那么下一个执行select()方法的线程也会立即返回结果,相当于执行了一次selectNow()方法
  • close(): 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。channel本身并不会关闭。

关于SelectionKey

谈到Selector就不得不提SelectionKey,两者是紧密关联,配合使用的;如上文所示,往Channel注册Selector会返回一个SelectionKey对象, 这个对象包含了如下内容:

  • interest set,当前Channel感兴趣的事件集,即在调用register方法设置的interes set
  • ready set
  • channel
  • selector
  • attached object,可选的附加对象

①interest set 可以通过SelectionKey类中的方法来获取和设置interes set

// 返回当前感兴趣的事件列表
int interestSet = key.interestOps();// 也可通过interestSet判断其中包含的事件
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    // 可以通过interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);

②ready set 当前Channel就绪的事件列表

int readySet = key.readyOps();// 也可通过四个方法来分别判断不同事件是否就绪
key.isReadable();    //读事件是否就绪
key.isWritable();    //写事件是否就绪
key.isConnectable(); //客户端连接事件是否就绪
key.isAcceptable();  //服务端连接事件是否就绪

③channel和selector 我们可以通过SelectionKey来获取当前的channel和selector

// 返回当前事件关联的通道,可转换的选项包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();//返回当前事件所关联的Selector对象
Selector selector = key.selector();

attached object 我们可以在selectionKey中附加一个对象,或者在注册时直接附加:

key.attach(theObject);
Object attachedObj = key.attachment();
// 在注册时直接附加
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

万丈高楼平地起,基础知识差不多了,了解了这些,可以找一些nio demo或者netty demo练练手。接下来讲解本节比较重要的~epoll

前面多次提到了openjdk,seletor的具体实现肯定是跟操作系统有关的,我们一起来看看。

可以看到Selector的实现是SelectorImpl, 然后SelectorImpl又将职责委托给了具体的平台,比如图中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl

根据前面我们知道,Selector.open()可以得到一个Selector实例,怎么实现的呢?

// Selector.java
public static Selector open() throws IOException {// 首先找到provider,然后再打开Selectorreturn SelectorProvider.provider().openSelector();
}// java.nio.channels.spi.SelectorProviderpublic static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 这里就是打开Selector的真正方法provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}

在openjdk中,每个操作系统都有一个sun.nio.ch.DefaultSelectorProvider实现,以srcsolaris\classes\sun\nio\ch下的DefaultSelectorProvider为例:

/*** Returns the default SelectorProvider.*/
public static SelectorProvider create() {// 获取OS名称String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));// 根据名称来创建不同的Selctorif (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider();
}

打开srcsolaris\classes\sun\nio\ch下的EPollSelectorProvider.java

public class EPollSelectorProviderextends SelectorProviderImpl
{public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}public Channel inheritedChannel() throws IOException {return InheritedChannel.getChannel();}
}

Linux平台就得到了最终的Selector实现:srcsolaris\classes\sun\nio\ch下的EPollSelectorImpl.java

来看看它实现的构造器:

 EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中// 高32位代表读 低32位代表写// 使用pipe为了实现Selector的wakeup逻辑long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 新建一个EPollArrayWrapperpollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}

\src\solaris\native\sun\nio\ch下的EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd;
}

①epoll_create在前面已经讲过了,这里就不再赘述了。

②epoll wait 等待内核IO事件

调用Selector.select(返回键的数量,可能是零)最后会委托给各个实现的doSelect方法,限于篇幅不贴出太详细的,这里看下EpollSelectorImpl的doSelect方法

protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();//EPollArrayWrapper pollWrapperpollWrapper.poll(timeout);//重点在这里} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();// 后面会讲到if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}
int poll(long timeout) throws IOException {updateRegistrations();// 这个代码在下面讲,涉及到epoo_ctl// 这个epollWait是不是有点熟悉呢?updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;

看下EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd)
{struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) {           /* Indefinite or no wait *///系统调用等待内核事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else {                      /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res;
}

可以看到在linux中Selector.select()其实是调用了epoll_wait

③epoll control以及openjdk对事件管理的封装

JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.

调用Selector.register()时均会将事件存储到EpollArrayWrapper.java的成员变量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;/*** Sets the pending update events for the given file descriptor. This* method has no effect if the update events is already set to KILLED,* unless {@code force} is {@code true}.*/
private void setUpdateEvents(int fd, byte events, boolean force) {// 判断fd和数组长度if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}}
}/*** Returns the pending update events for the given file descriptor.*/private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}

在上面poll代码中涉及到

 int poll(long timeout) throws IOException {updateRegistrations();//*** Update the pending registrations.*/private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];// 从保存的eventsLow和eventsHigh里取出事件short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;if (events != KILLED) {if (isRegistered) {// 判断操作类型以传给epoll_ctl// 没有指定EPOLLET事件类型opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 熟悉的epoll_ctlepollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}private native void epollCtl(int epfd, int opcode, int fd, int events);

可以看到epollCtl调用的native方法,我们进入EpollArrayWrapper.c

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events)
{struct epoll_event event;int res;event.events = events;event.data.fd = fd;// epoll_ctl这里就不用多说了吧RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");}
}

在doSelect方法poll执行后,会更新EpollSelectorImpl.java里的 updateSelectedKeys,就是Selector里的三个set集合,具体可看前面。

/***更新已被epoll选择fd的键。*将就绪兴趣集添加到就绪队列。*/
private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;}

POLL原理分析与java实战相关推荐

  1. 【SemiDrive源码分析】【MailBox核间通信】51 - DCF_IPCC_Property实现原理分析 及 代码实战

    [SemiDrive源码分析][MailBox核间通信]51 - DCF_IPCC_Property实现原理分析 及 代码实战 一.RTOS 侧 Property Service 初始化流程 1.1 ...

  2. java hashset 实现_HashSet实现原理分析(Java源码剖析)

    本文将深入讨论HashSet实现原理的源码细节.在分析源码之前,首先我们需要对HashSet有一个基本的理解. HashSet只存储不同的值,set中是不会出现重复值的. HashSet和HashMa ...

  3. 单点登录原理分析及CAS实战

    前端需要了解的 SSO 与 CAS 知识

  4. java holder详解,Java基础系列18:Holder技术的实现原理分析

    一 简介 (1)Java中的Holder是什么? 我这里说的Holder即这个类:javax.xml.ws.Holder 这个类属于JAX-WS 2.0规范中的一个类.它的作用是为不可变的对象引用提供 ...

  5. log4j漏洞原理分析复现检测复盘

    凡事要自发,自然而为,即要顺从一切处于自然状态的事物,允许它们自发地转变.这样,道即达到了一种"无为而无不为"的状态.在日常生活中,道表现为"不自傲"或&quo ...

  6. java进阶Kafka集群实战之原理分析及优化教程全在这里

    我不去想是否能够成功 既然选择了Java 便只顾风雨兼程 我不去想能否征服Kafka集群 既然钟情于Java 就勇敢地追随千锋 我不去想Kafka集群有多么晦涩难懂 既然目标是远方 留给世界的只能是努 ...

  7. Java 重入锁 ReentrantLock 原理分析

    1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似.所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻塞住,这样可避免死锁的产生 ...

  8. Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

    转载自  Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析 Java中的阻塞队列接口BlockingQueue继承自Queue接口. Block ...

  9. java并发包线程池原理分析锁的深度化

    java并发包&线程池原理分析&锁的深度化 并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素 ...

最新文章

  1. 台式电脑一般价钱多少_让你少走弯路,看我怎么花低价钱配出高配置的台式电脑的...
  2. Web前端技术分享:全栈工程师常用的开发工具
  3. 踩自行车来进行人力发电,真的能驱动旋转木马吗?
  4. ADF任务流:页面片段的托管bean范围
  5. 6.824 Raft lesson4 2020(一)
  6. mysql中的boolean tinyint
  7. java httpclient发送json 请求 ,go服务端接收
  8. 这就是为什么您的开源项目失败
  9. 多语言网站开发 不完全技术分析收录
  10. .net常用面试核心技术点(持续更新中)
  11. MYSQL5.6服务启动不起来
  12. 多项目同时进行 如何高效协作?
  13. matlab对5个矩阵循环求均值,MATLAB循环求数组的平均值 每隔几个数据求一下平均值...
  14. lumion无法隐藏活动层_lumion是什么软件?想快速掌握 Lumion?快来拿走这份正确的入门教程!...
  15. VS code SSH 反复提示输入密码
  16. pandoc提取word中的图片
  17. 零售企业如何快速开上千家门店,揭秘名创优品的低价超级产品战略
  18. 根据指令判断寄存器状态
  19. pci总线原理(转)
  20. Java反射invoke报错wrong number of arguments

热门文章

  1. Web Application Security 网络应用程序安全 - (二)2010年网络安全威胁排行榜TOP 10...
  2. 【转载】(Git)用动图展示10大Git命令
  3. Ranger知识地图
  4. SpringMVC防止XSS攻击
  5. mos管电路_三极管和MOS管原来这样用,混用代价高,电路设计中需谨慎
  6. 写文件 追加到开始_文件和流
  7. 澳网:公茂鑫/张择创历史 中国男网夺大满贯首胜
  8. 我的小白同事接触白鹭引擎4天,成功做了一款足球小游戏
  9. 使用git将code同时提交github,gitee,coding
  10. 15大统计数据描绘网络安全行业市场蓝图