Epoll 是Linux内核的高性能、可扩展的I/O事件通知机制。

在linux2.5.44首次引入epoll,它设计的目的旨在取代既有的select、poll系统函数,让需要大量操作文件描述符的程序得以发挥更优异的性能(wikipedia example: 旧有的系统函数所花费的时间复杂度为O(n), epoll的时间复杂度O(log n))。epoll实现的功能与poll类似,都是监听多个文件描述符上的事件。

epoll底层是由可配置的操作系统内核对象建构而成,并以文件描述符(file descriptor)的形式呈现于用户空间(from wikipedia: 在操作系统中,虚拟内存通常会被分成用户空间,与核心空间这两个区段。这是存储器保护机制中的一环。内核**、核心扩展(kernel extensions)、以及驱动程序,运行在核心空间**上。而其他的应用程序,则运行在用户空间上。所有运行在用户空间的应用程序,都被统称为用户级(userland))。

多说一点关于内核的

它是一个用来管理软件发出的数据I/O的一个程序,并将数据交由CPU和电脑其他电子组件处理,但是直接对硬件操作是非常复杂的,通常内核提供一种硬件抽象的方法来完成(由内核决定一个程序在什么时候对某部分硬件操作多长时间),通过这些方法来完成进程间通信和系统调用。

宏内核:

宏内核简单来说,首先定义了一个高阶的抽象接口,叫系统调用(System call))来实现操作系统的功能,例如进程管理,文件系统,和存储管理等等,这些功能由多个运行在内核态的程序来完成。

微内核:

微内核结构由硬件抽象层和系统调用组成;包括了创建一个系统必需的几个部分;如线程管理,地址空间和进程间通信等。微核的目标是将系统服务的实现系统的基本操作规则分离开来。

linux就是使用的宏内核。因为它能够在运行时将模块调入执行,使扩充内核的功能变得更简单。

epoll做了什么事?

epoll 通过使用红黑树(RB-tree)搜索被监视的文件描述符(file descriptor)。

在 epoll 实例上注册事件时,epoll 会将该事件添加到 epoll 实例的红黑树上并注册一个回调函数,当事件发生时会将事件添加到就绪链表中。

epoll的结构?

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

①epoll_create

向内核申请空间,创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。在最初的实现中,调用者通过 size 参数告知内核需要监听的文件描述符数量。如果监听的文件描述符数量超过 size, 则内核会自动扩容。而现在 size 已经没有这种语义了,但是调用者调用时 size 依然必须大于 0,以保证后向兼容性。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的。

epoll_ctl

向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听op 可以为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分别对应的是添加新的事件,修改文件描述符上监听的事件类型,从实例上删除一个事件。如果 event 的 events 属性设置了 EPOLLET flag,那么监听该事件的方式是边缘触发

events可以是以下几个宏的集合:

  • EPOLLIN:触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
  • EPOLLOUT:触发该事件,表示对应的文件描述符上可以写数据;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

例如:

struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

epoll_wait

Linux-2.6.19又引入了可以屏蔽指定信号的epoll_wait: epoll_pwait

接收发生在被侦听的描述符上的,用户感兴趣的IO事件。简单点说:通过循环,不断地监听暴露的端口,看哪一个fd可读、可写~

timeout 为 0 时,epoll_wait 永远会立即返回。而 timeout 为 -1 时,epoll_wait 会一直阻塞直到任一已注册的事件变为就绪。当 timeout 为一正整数时,epoll 会阻塞直到计时结束或已注册的事件变为就绪。因为内核调度延迟,阻塞的时间可能会略微超过 timeout (毫秒级)。

epoll文件描述符用完后,直接用close关闭,并且会自动从被侦听的文件描述符集合中删除

epoll实战

说了这么多原理,脑壳怕嗡嗡的吧,来看看实战清醒下~

如上知道:每次添加/修改/删除被侦听文件描述符都需要调用epoll_ctl,所以要尽量少地调用epoll_ctl,防止其所引来的开销抵消其带来的好处。有的时候,应用中可能存在大量的短连接(比如说Web服务器),epoll_ctl将被频繁地调用,可能成为这个系统的瓶颈。

传统的select以及poll的效率会因为在线人数的线形递增而导致呈二次乃至三次方的下降,这些直接导致了网络服务器可以支持的人数有了个比较明显的限制。这是因为他们有限的文件描述符和遍历所有的fd所带来的低效。

重点哦~

当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降epoll不存在这个问题,它只会对“活跃”的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有“活跃”的socket才会主动的去调用 callback函数,其他idle(空闲)状态socket则不会,在这点上,epoll实现了一个“伪”AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。

int epfd = epoll_create(POLL_SIZE);struct epoll_event ev;struct epoll_event *events = NULL;nfds = epoll_wait(epfd, events, 20, 500);{for (n = 0; n < nfds; ++n) {if (events[n].data.fd == listener) {//如果是主socket的事件的话,则表示//有新连接进入了,进行新连接的处理。client = accept(listener, (structsockaddr *)&local, &addrlen);if (client < 0) {perror("accept");continue;}setnonblocking(client);        //将新连接置于非阻塞模式ev.events = EPOLLIN | EPOLLET; //并且将新连接也加入EPOLL的监听队列。//注意,这里的参数EPOLLIN|EPOLLET并没有设置对写socket的监听,//如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作//也监听的话,应该是EPOLLIN|EPOLLOUT|EPOLLETev.data.fd = client;if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {//设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,//这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个//epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。fprintf(stderr, "epollsetinsertionerror:fd=%d", client);return -1;}}else if(event[n].events & EPOLLIN){//如果是已经连接的用户,并且收到数据,//那么进行读入int sockfd_r;if ((sockfd_r = event[n].data.fd) < 0)continue;read(sockfd_r, buffer, MAXSIZE);//修改sockfd_r上要处理的事件为EPOLLOUTev.data.fd = sockfd_r;ev.events = EPOLLOUT | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)}else if(event[n].events & EPOLLOUT){//如果有数据发送int sockfd_w = events[n].data.fd;write(sockfd_w, buffer, sizeof(buffer));//修改sockfd_w上要处理的事件为EPOLLINev.data.fd = sockfd_w;ev.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)}do_use_fd(events[n].data.fd);}}

简单说下流程:

  • 监听到有新连接进入了,进行新连接的处理;
  • 如果是已经连接的用户,并且收到数据,读完之后修改sockfd_r上要处理的事件为EPOLLOUT(可写);
  • 如果有数据发送,写完之后,修改sockfd_w上要处理的事件为EPOLLIN(可读)

epoll在Java中怎么去调用的?

基础知识:

文件描述符:

  • (参考《Unix网络编程》译者的注释)
  • 文件描述符是Unix系统标识文件的int,Unix的哲学一切皆文件,所以各自资源(包括常规意义的文件、目录、管道、POSIX IPC、socket)都可以看成文件。

Java NIO的世界中,Selector是中央控制器Buffer是承载数据的容器,而Channel可以说是最基础的门面,它是本地I/O设备、网络I/O的通信桥梁

  • 网络I/O设备:

    • DatagramChannel:读写UDP通信的数据,对应DatagramSocket类
    • SocketChannel:读写TCP通信的数据,对应Socket类
    • ServerSocketChannel:监听新的TCP连接,并且会创建一个可读写的SocketChannel,对应ServerSocket类
  • 本地I/O设备:
    • FileChannel:读写本地文件的数据,不支持Selector控制,对应File类

先从最简单的ServerSocketChannel看起

ServerSocketChannelServerSocket一样是socket监听器,其主要区别前者可以运行在非阻塞模式下运行;

// 创建一个ServerSocketChannel,将会关联一个未绑定的ServerSocketpublic static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}

ServerSocketChannel的创建也是依赖底层操作系统实现,其实现类主要是ServerSocketChannelImpl我们来看看其构造方法

     ServerSocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);// 创建一个文件操作符this.fd = Net.serverSocket(true);// 得到文件操作符是索引this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}

新建一个ServerSocketChannelImpl其本质是在底层操作系统创建了一个fd(即文件描述符),相当于建立了一个用于网络通信的通道,调用socket的bind()方法绑定,通过accept()调用操作系统获取TCP连接

public SocketChannel accept() throws IOException {// 忽略一些校验及无关代码....
​SocketChannelImpl var2 = null;// var3的作用主要是说明当前的IO状态,主要有/*** EOF = -1;* UNAVAILABLE = -2;* INTERRUPTED = -3;* UNSUPPORTED = -4;* THROWN = -5;* UNSUPPORTED_CASE = -6;*/int var3 = 0;// 这里本质也是用fd来获取连接FileDescriptor var4 = new FileDescriptor();// 用来存储TCP连接的地址信息InetSocketAddress[] var5 = new InetSocketAddress[1];
​try {// 这里设置了一个中断器,中断时会将连接关闭this.begin();// 这里当IO被中断时,会重新获取连接do {var3 = this.accept(this.fd, var4, var5);} while(var3 == -3 && this.isOpen());}finally {// 当连接被关闭且accept失败时或抛出AsynchronousCloseExceptionthis.end(var3 > 0);// 验证连接是可用的assert IOStatus.check(var3);}
​if (var3 < 1) {return null;} {// 默认连接是阻塞的IOUtil.configureBlocking(var4, true);// 创建一个SocketChannel的引用var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);// 下面是是否连接成功校验,这里忽略...
​return var2;}
}
​
// 依赖底层操作系统实现的accept0方法
private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {return this.accept0(var1, var2, var3);
}

SocketChannel

用于读写TCP通信的数据,相当于客户端

  1. 通过open方法创建SocketChannel,
  2. 然后利用connect方法来和服务端发起建立连接,还支持了一些判断连接建立情况的方法;
  3. readwrite支持最基本的读写操作

open

  public static SocketChannel open() throws IOException {    return SelectorProvider.provider().openSocketChannel();  }public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);}// State, increases monotonicallyprivate static final int ST_UNINITIALIZED = -1;private static final int ST_UNCONNECTED = 0;private static final int ST_PENDING = 1;private static final int ST_CONNECTED = 2;private static final int ST_KILLPENDING = 3;private static final int ST_KILLED = 4;private int state = ST_UNINITIALIZED;    SocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);// 创建一个scoket通道,即fd(fd的作用可参考上面的描述)this.fd = Net.socket(true);// 得到该fd的索引this.fdVal = IOUtil.fdVal(fd);// 设置为未连接this.state = ST_UNCONNECTED;}

connect建立连接

    // 代码均来自JDK1.8 部分代码public boolean connect(SocketAddress var1) throws IOException {boolean var2 = false;// 读写都锁住synchronized(this.readLock) {synchronized(this.writeLock) {/****状态检查,channel和address****/// 判断channel是否openthis.ensureOpenAndUnconnected();InetSocketAddress var5 = Net.checkAddress(var1);SecurityManager var6 = System.getSecurityManager();if (var6 != null) {var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());}
​boolean var10000;/****连接建立****/// 阻塞状态变更的锁也锁住synchronized(this.blockingLock()) {int var8 = 0;
​try {try {this.begin(); // 如果当前socket未绑定本地端口,则尝试着判断和服务端是否能建立连接synchronized(this.stateLock) {if (!this.isOpen()) {boolean var10 = false;return var10;}
​if (this.localAddress == null) {// 和远程建立连接后关闭连接NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());}
​this.readerThread = NativeThread.current();}
​do {InetAddress var9 = var5.getAddress();if (var9.isAnyLocalAddress()) {var9 = InetAddress.getLocalHost();}// 建立连接var8 = Net.connect(this.fd, var9, var5.getPort());} while(var8 == -3 && this.isOpen());synchronized(this.stateLock) {this.remoteAddress = var5;if (var8 <= 0) {if (!this.isBlocking()) {this.state = 1;} else {assert false;}} else {this.state = 2;// 连接成功if (this.isOpen()) {this.localAddress = Net.localAddress(this.fd);}
​var10000 = true;return var10000;}}}
​var10000 = false;return var10000;}}}

在建立在绑定地址之前,我们需要调用NetHooks.beforeTcpBind,这个方法是将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。SDP需要网卡支持InfiniBand高速网络通信技术,windows不支持该协议。

我们来看看在openjdk: srcsolarisclassessunnet下的NetHooks.java

   private static final Provider provider = new sun.net.sdp.SdpProvider();public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpBind(fdObj, address, port);}public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpConnect(fdObj, address, port);}

可以看到实际是调用的SdpProvider里的implBeforeTcpBind

 @Overridepublic void implBeforeTcpBind(FileDescriptor fdObj,InetAddress address,int port)throws IOException{if (enabled)convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);}// converts unbound TCP socket to a SDP socket if it matches the rulesprivate void convertTcpToSdpIfMatch(FileDescriptor fdObj,Action action,InetAddress address,int port)throws IOException{boolean matched = false;// 主要是先通过规则校验器判断入参是否符合,一般有PortRangeRule校验器// 然后再执行将fd转换为socketfor (Rule rule: rules) {if (rule.match(action, address, port)) {SdpSupport.convertSocket(fdObj);matched = true;break;}}}public static void convertSocket(FileDescriptor fd) throws IOException {...//获取fd索引int fdVal = fdAccess.get(fd);convert0(fdVal);}// convert0JNIEXPORT void JNICALLJava_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd){// create方法实际是通过socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一个socketint s = create(env);if (s >= 0) {socklen_t len;int arg, res;struct linger linger;/* copy socket options that are relevant to SDP */len = sizeof(arg);// 重用TIME_WAIT的端口if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);len = sizeof(arg);// 紧急数据放入普通数据流if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);len = sizeof(linger);// 延迟关闭连接if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);// 将fd也引用到s所持有的通道RESTARTABLE(dup2(s, fd), res);if (res < 0)JNU_ThrowIOExceptionWithLastError(env, "dup2");// 执行close方法,关闭s这个引用RESTARTABLE(close(s), res);}}

read 读

public int read(ByteBuffer var1) throws IOException {// 省略一些判断synchronized(this.readLock) {this.begin();synchronized(this.stateLock) {do {// 通过IOUtil的读取fd的数据至buf// 这里的nd是SocketDispatcher,用于调用底层的read和write操作var3 = IOUtil.read(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen());// 这个方法主要是将UNAVAILABLE(原为-2)这个状态返回0,否则返回nvar4 = IOStatus.normalize(var3);var20 = false;break label367;}
​this.readerCleanup();assert IOStatus.check(var3);}    }}}
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1.isReadOnly()) {throw new IllegalArgumentException("Read-only buffer");} else if (var1 instanceof DirectBuffer) {return readIntoNativeBuffer(var0, var1, var2, var4);} else {// 临时缓冲区,大小为buf的remain(limit - position),堆外内存,使用ByteBuffer.allocateDirect(size)分配// Notes:这里分配后后面有个try-finally块会释放该部分内存ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
​int var7;try {// 将网络中的buf读进direct bufferint var6 = readIntoNativeBuffer(var0, var5, var2, var4);var5.flip();// 待读取if (var6 > 0) {var1.put(var5);// 成功时写入}
​var7 = var6;} finally {Util.offerFirstTemporaryDirectBuffer(var5);}
​return var7;}}
private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {// 忽略变量initif (var2 != -1L) {// pread方法只有在同步状态下才能使用var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);} else {// 其调用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);}
​if (var9 > 0) {var1.position(var5 + var9);}
​return var9;}}
// 同样找到openjdk:srcsolarisnativesunnioch
//FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,jobject fdo, jlong address, jint len)
{jint fd = fdval(env, fdo);// 获取fd索引void *buf = (void *)jlong_to_ptr(address);// 调用底层read方法return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}

总结一下读取的过程

  1. 初始化一个direct buffer,如果本身的buffer就是direct的则不用初始化
  2. 调用底层read方法写入至direct buffer
  3. 最终将direct buffer写到传入的buffer对象

write 写

看完了前面的read,write整个执行流程基本一样,具体的细节参考如下

public int write(ByteBuffer var1) throws IOException {if (var1 == null) {throw new NullPointerException();} else {synchronized(this.writeLock) {this.ensureWriteOpen();this.begin();synchronized(this.stateLock) {if (!this.isOpen()) {var5 = 0;var20 = false;break label310;}this.writerThread = NativeThread.current();}do {// 通过IOUtil的读取fd的数据至buf// 这里的nd是SocketDispatcher,用于调用底层的read和write操作var3 = IOUtil.write(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen());
​var4 = IOStatus.normalize(var3);var20 = false;this.writerCleanup();assert IOStatus.check(var3);return var4;}}}}
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1 instanceof DirectBuffer) {return writeFromNativeBuffer(var0, var1, var2, var4);} else {​ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
​int var10;try {// 这里的pos为buf初始的position,意思是将buf重置为最初的状态;因为目前还没有真实的写入到channel中var8.put(var1);var8.flip();var1.position(var5);// 调用int var9 = writeFromNativeBuffer(var0, var8, var2, var4);if (var9 > 0) {var1.position(var5 + var9);}
​var10 = var9;} finally {Util.offerFirstTemporaryDirectBuffer(var8);}
​return var10;}}
IOUtil.writeFromNativeBuffer(fd , buf , position , nd)
{// ... 忽略一些获取buf变量的代码    int written = 0;if (position != -1) {// pread方法只有在同步状态下才能使用written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);} else {// 其调用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);}//....
}
FileDispatcherImpl.write0
{// 调用底层的write方法写入return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}
}

总结一下write的过程:

  1. 如果buf是direct buffer则直接开始写入,否则需要初始化一个direct buffer,大小是buf的remain
  2. 将buf的内容写入到direct buffer中,并恢复buf的position
  3. 调用底层的write方法写入至channel
  4. 更新buf的position,即被direct buffer读取内容后的position

耐心一点,马上就到Epoll了

理解了前面的一些基础知识,接下来的部分就会涉及到Java是怎么样来使用epoll的。

Selector简述

Selector的作用是Java NIO中管理一组多路复用的SelectableChannel对象,并能够识别通道是否为诸如读写事件做好准备的组件 --Java doc

Selector的创建过程如下:

// 1.创建Selector
Selector selector = Selector.open();// 2.将Channel注册到选择器中
// ....... new channel的过程 ....//Notes:channel要注册到Selector上就必须是非阻塞的,所以FileChannel是不可以
//使用Selector的,因为FileChannel是阻塞的
channel.configureBlocking(false);// 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);// 也可以使用或运算|来组合多个事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);// 不过值得注意的是,一个 Channel 仅仅可以被注册到一个 Selector 一次,
// 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey
//的 interest set.

一个Channel在Selector注册其代表的是一个SelectionKey事件,SelectionKey的类型包括:

  • OP_READ:可读事件;值为:1<<0
  • OP_WRITE:可写事件;值为:1<<2
  • OP_CONNECT:客户端连接服务端的事件(tcp连接),一般为创建SocketChannel客户端channel;值为:1<<3
  • OP_ACCEPT:服务端接收客户端连接的事件,一般为创建ServerSocketChannel服务端channel;值为:1<<4

一个Selector内部维护了三组keys:

  1. key set:当前channel注册在Selector上所有的key;可调用keys()获取
  2. selected-key set:当前channel就绪的事件;可调用selectedKeys()获取
  3. cancelled-key:主动触发SelectionKey#cancel()方法会放在该集合,前提条件是该channel没有被取消注册;不可通过外部方法调用

Selector类中总共包含以下10个方法:

  • open():创建一个Selector对象
  • isOpen():是否是open状态,如果调用了close()方法则会返回false
  • provider():获取当前Selector的Provider
  • keys():如上文所述,获取当前channel注册在Selector上所有的key
  • selectedKeys():获取当前channel就绪的事件列表
  • selectNow():获取当前是否有事件就绪,该方法立即返回结果,不会阻塞;如果返回值>0,则代表存在一个或多个
  • select(long timeout):selectNow的阻塞超时方法,超时时间内,有事件就绪时才会返回;否则超过时间也会返回
  • select():selectNow的阻塞方法,直到有事件就绪时才会返回
  • wakeup():调用该方法会时,阻塞在select()处的线程会立马返回;(ps:下面一句划重点)即使当前不存在线程阻塞在select()处,那么下一个执行select()方法的线程也会立即返回结果,相当于执行了一次selectNow()方法
  • close(): 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。channel本身并不会关闭。

关于SelectionKey

谈到Selector就不得不提SelectionKey,两者是紧密关联,配合使用的;如上文所示,往Channel注册Selector会返回一个SelectionKey对象, 这个对象包含了如下内容:

  • interest set,当前Channel感兴趣的事件集,即在调用register方法设置的interes set
  • ready set
  • channel
  • selector
  • attached object,可选的附加对象

interest set 可以通过SelectionKey类中的方法来获取和设置interes set

// 返回当前感兴趣的事件列表
int interestSet = key.interestOps();// 也可通过interestSet判断其中包含的事件
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    // 可以通过interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);

ready set 当前Channel就绪的事件列表

int readySet = key.readyOps();// 也可通过四个方法来分别判断不同事件是否就绪
key.isReadable();    //读事件是否就绪
key.isWritable();    //写事件是否就绪
key.isConnectable(); //客户端连接事件是否就绪
key.isAcceptable();  //服务端连接事件是否就绪

channel和selector 我们可以通过SelectionKey来获取当前的channel和selector

// 返回当前事件关联的通道,可转换的选项包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();
​
//返回当前事件所关联的Selector对象
Selector selector = key.selector();

attached object 我们可以在selectionKey中附加一个对象,或者在注册时直接附加:

key.attach(theObject);
Object attachedObj = key.attachment();
// 在注册时直接附加
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

万丈高楼平地起,基础知识差不多了,了解了这些,可以找一些nio demo或者netty demo练练手。接下来讲解本节比较重要的~epoll

前面多次提到了openjdk,seletor的具体实现肯定是跟操作系统有关的,我们一起来看看。

可以看到Selector的实现是SelectorImpl, 然后SelectorImpl又将职责委托给了具体的平台,比如图中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl

根据前面我们知道,Selector.open()可以得到一个Selector实例,怎么实现的呢?

// Selector.java
public static Selector open() throws IOException {// 首先找到provider,然后再打开Selectorreturn SelectorProvider.provider().openSelector();
}// java.nio.channels.spi.SelectorProviderpublic static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 这里就是打开Selector的真正方法provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}

在openjdk中,每个操作系统都有一个sun.nio.ch.DefaultSelectorProvider实现,以src**solaris**classessunnioch下的DefaultSelectorProvider为例:

/*** Returns the default SelectorProvider.*/
public static SelectorProvider create() {// 获取OS名称String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));// 根据名称来创建不同的Selctorif (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider();
}

打开src**solaris**classessunnioch下的EPollSelectorProvider.java

public class EPollSelectorProviderextends SelectorProviderImpl
{public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}public Channel inheritedChannel() throws IOException {return InheritedChannel.getChannel();}
}

Linux平台就得到了最终的Selector实现:src**solaris**classessunnioch下的EPollSelectorImpl.java

来看看它实现的构造器:

    EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中// 高32位代表读 低32位代表写// 使用pipe为了实现Selector的wakeup逻辑long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 新建一个EPollArrayWrapperpollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}

srcsolarisnativesunnioch下的EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd;
}

epoll_create在前面已经讲过了,这里就不再赘述了。

②epoll wait 等待内核IO事件

调用Selector.select(返回键的数量,可能是零)最后会委托给各个实现的doSelect方法,限于篇幅不贴出太详细的,这里看下EpollSelectorImpl的doSelect方法

protected int doSelect(long timeout) throws IOException {    if (closed)      throw new ClosedSelectorException();    processDeregisterQueue();    try {      begin();      //EPollArrayWrapper pollWrapper      pollWrapper.poll(timeout);//重点在这里    } finally {      end();    }    processDeregisterQueue();    int numKeysUpdated = updateSelectedKeys();// 后面会讲到    if (pollWrapper.interrupted()) {      // Clear the wakeup pipe      pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);      synchronized (interruptLock) {        pollWrapper.clearInterrupted();        IOUtil.drain(fd0);        interruptTriggered = false;      }    }    return numKeysUpdated;  }
int poll(long timeout) throws IOException {updateRegistrations();// 这个代码在下面讲,涉及到epoo_ctl// 这个epollWait是不是有点熟悉呢?updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;

看下EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd)
{struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) {           /* Indefinite or no wait *///系统调用等待内核事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else {                      /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res;
}

可以看到在linux中Selector.select()其实是调用了epoll_wait

③epoll control以及openjdk对事件管理的封装

JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.

调用Selector.register()时均会将事件存储到EpollArrayWrapper.java的成员变量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;
​
​
/*** Sets the pending update events for the given file descriptor. This* method has no effect if the update events is already set to KILLED,* unless {@code force} is {@code true}.*/
private void setUpdateEvents(int fd, byte events, boolean force) {// 判断fd和数组长度if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}}
}/*** Returns the pending update events for the given file descriptor.*/private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}

在上面poll代码中涉及到

    int poll(long timeout) throws IOException {updateRegistrations();/
​/*** Update the pending registrations.*/private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];// 从保存的eventsLow和eventsHigh里取出事件short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;
​if (events != KILLED) {if (isRegistered) {// 判断操作类型以传给epoll_ctl// 没有指定EPOLLET事件类型opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 熟悉的epoll_ctlepollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}private native void epollCtl(int epfd, int opcode, int fd, int events);

可以看到epollCtl调用的native方法,我们进入EpollArrayWrapper.c

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events)
{struct epoll_event event;int res;event.events = events;event.data.fd = fd;// epoll_ctl这里就不用多说了吧RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");}
}

在doSelect方法poll执行后,会更新EpollSelectorImpl.java里的 updateSelectedKeys,就是Selector里的三个set集合,具体可看前面。

/**
​
*更新已被epoll选择fd的键。
​
*将就绪兴趣集添加到就绪队列。
​
*/
private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;}

总结

通过本文,你应该知道Channel、Selector基本原理和在Java中怎么使用Epoll的。 (包括更细节的fd与channel和socket之间的转换关系)掌握这些基础知识,再去看NIO、netty网络框架的源码可能就没有那么吃力了。在接下来的文章里我会跟进关于Netty的文章,毕竟这已成为分布式网络通信框架的主流了!

感谢

https://zh.wikipedia.org/wiki/Epoll 维基百科

https://baike.baidu.com/item/epoll/10738144?fr=aladdin

https://juejin.im/entry/5b51546df265da0f70070b93

https://www.jianshu.com/p/f26f1eaa7c8e

来自:微信公众号(作者:汀雨笔记),著作权属于:本文和汀雨

epoll监听文件_【原创】万字长文浅析:Epoll与Java Nio的那些事儿相关推荐

  1. epoll监听文件_介绍一下 Android Handler 中的 epoll 机制?

    介绍一下 Android Handler 中的 epoll 机制? 目录: IO 多路复用 select.poll.epoll 对比 epoll API epoll 使用示例 Handler 中的 e ...

  2. epoll监听文件_怎么理解把标准输入以ET模式加入epoll,监听EPOLLOUT事件时,epoll_wait多次返回?...

    确实是后面的printf导致的,去掉后面的printf,增加计数器的判断, 可确认epoll_wait只返回了一次.但是为什么printf会影响到标准输入stdin?还是不理解. 修改后符合预期代码如 ...

  3. epoll监听文件_epoll详解——从功能到内核

    首先我们了解一下什么是I/O复用.I/O就是指网络中的I/O(即输入输出),多路是指多个TCP连接,复用是指一个或少量线程被重复使用.连起来理解就是,用少量的线程来处理网络上大量的TCP连接中的I/O ...

  4. epoll监听文件_epoll使用详解

    epoll介绍 epoll的行为与poll(2)相似,监视多个有IO事件的文件描述符.epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触 ...

  5. 11无监听程序_腾讯开心鼠英语 小程序实践与总结

    腾讯开心鼠英语 团队中有很多小程序的项目,且后续还会很多小程序的开发和迭代规划,因此我们团队是小程序的重度使用者.在小程序的开发中,团队积累了一些技术和经验,也遇到了一些困难和挑战,还踩了很多坑,因此 ...

  6. plsql developer无监听程序_腾讯开心鼠英语 小程序实践与总结

    腾讯开心鼠英语 团队中有很多小程序的项目,且后续还会很多小程序的开发和迭代规划,因此我们团队是小程序的重度使用者.在小程序的开发中,团队积累了一些技术和经验,也遇到了一些困难和挑战,还踩了很多坑,因此 ...

  7. Android 中关于 FileObserver类监听文件状态的实践

    文章目录 需求背景 走进源码 实现示例 参考 需求背景 当某一个目录的文件发生变化(创建.修改.删除.移动)时,需要给一个回调事件给其他端调用. 其他场景:阅后即焚等等. 比如在 Android 的 ...

  8. Handler ,MessageQueue 的Looper中epoll监听的fd

    hi,同学们大家好! 这些天有学员再群里问起了Handler中有个数据监听相关问题,学员有的认为Handler数据传递是靠流传递,误认为是epoll中监听的fd进行传递的,这个其实有必要更正这个学员的 ...

  9. python监听文件最后修改人_Python持续监听文件变化代码实例

    在日常的工作中,有时候会有这样的需求,需要一个常驻任务,持续的监听一个目录下文件的变化,对此作出回应. pyinotify就是这样的一个python包,使用方式如下: 一旦src.txt有新的内容,程 ...

最新文章

  1. 结合案例深入解析:抽象工厂模式
  2. SQL Server 2000安装指南及数据创建
  3. Adobe Flash Builder 4.6 开发环境详解
  4. geth 以太坊钱包_以太坊Geth节点RPC API中文文档
  5. MySQL 性能方案
  6. python统计一个字符串中连在一起数字的个数.(如12asd25asfd,输出结果为2)
  7. 第8章 中医证型关联规则挖掘
  8. Kubernetes部署(一):K8s 二进制方式安装
  9. USB输入单节锂电池0.5A充电管理IC,防高压40V保护电路-7号电路板
  10. SRGAN超分辨率网络
  11. 企业非法集资风险预测
  12. WOFOST模型Matlab,一种WOFOST-PAR耦合模型建立方法与流程
  13. 【数据库MySQL】2021最新官网下载及查看MySQL版本步骤教程
  14. 谷歌浏览器提示密码泄露弹窗关闭
  15. vue新框架nuxt通过文件目录自动生成路由
  16. 2023最新SSM计算机毕业设计选题大全(附源码+LW)之java创梦宝大学生创业众筹平台cds88
  17. OsChina开发地图工具一览
  18. 敏捷软件开发模型--SCRUM
  19. BZOJ 1499 [NOI2005]瑰丽华尔兹 动态规划(+单调队列)
  20. 01筑基期——Java入门(Hello Word)+ 废话

热门文章

  1. 骑手困在系统里,网友困在回应里,而王兴正在刷饭否
  2. joomla 3.4.5 php版本,Joomla3.4.6漏洞最强总结
  3. java自定义标签 map_基于Spring MVC的自定义标签Tag
  4. echart php mysql简书_echarts-自定义构建
  5. php mysql全能权威指南 pdf_《PHP+MySQL全能权威指南(配光盘)》怎么样_目录_pdf在线阅读 - 课课家教育...
  6. linux跑r语言代码,R语言快捷键(示例代码)
  7. termux怎么安装python库_Python termux-apt-repo包_程序模块 - PyPI - Python中文网
  8. 关于解决安装pwndbg问题sys.stderr.write(f“ERROR: {exc}“) /usr/bin/python3.5: No module named ensurepip
  9. 教你控制Python多线程中线程数量
  10. Django之session验证的三种姿势