一、NETTY底层使用的是NIO的selector和epoll进行实现的,select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说把数据从内核拷贝到用户空间是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

二、测试实例

1.C++

/**\ 服务器端的源代码*/#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <iostream>
#include <signal.h>
#include <sys/epoll.h>#define MAXFDS 256
#define EVENTS 100
#define PORT 8888int epfd;
bool setNonBlock(int fd)
{int flags = fcntl(fd, F_GETFL, 0);flags |= O_NONBLOCK;if(-1 == fcntl(fd, F_SETFL, flags))return false;return true;
}int main(int argc, char *argv[], char *evp[])
{int fd, nfds, confd;int on = 1;char *buffer[512];struct sockaddr_in saddr, caddr;struct epoll_event ev, events[EVENTS];if(-1 == socket(AF_INET, SOCKSTREAM), 0){std::cout << "创建套接字出错啦" << std::endl;return -1;}struct sigaction sig;sigemptyset(&sig.sa_mask);sig_handler = SIG_IGN;sigaction(SIGPIPE, &N > sig, NULL);epfd = epoll_create(MAXFDS);setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));memset(&saddr, 0, sizeof(saddr));saddr.sin_family = AF_INET;saddr.sin_port = htons((short)(PORT));saddr.sin_addr.s_addr = INADDR_ANY;if(-1 == bind(fd, (struct sockaddr *)&saddr, sizeof(saddr))){std::cout << "套接字不能绑定到服务器上" << std::endl;return -1;}if(-1 == listen(fd, 32)){std::cout << "监听套接字的时候出错了" << std::endl;return -1;}ev.data.fd = fd;ev.events = EPOLLIN;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);while(true){nfds = epoll_wait(epfd, &events, MAXFDS, 0);for(int i = 0; i < nfds; ++ i){if(fd == events[i].data.fd){memset(&caddr, sizeof(caddr));cfd = accept(fd, (struct sockaddr *)&caddr, &sizeof(caddr));if(-1 == cfd){std::cout << "服务器接收套接字的时候出问题了" << std::endl;break;}setNonBlock(cfd);ev.data.fd = cfd;ev.events = EPOLLIN;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);}else if(events[i].data.fd & EPOLLIN){bzero(&buffer, sizeof(buffer));std::cout << "服务器端要读取客户端发过来的消息" << std::endl;ret = recv(events[i].data.fd, buffer, sizeof(buffer), 0);if(ret < 0){std::cout << "服务器收到的消息出错了" << endl;return -1;}std::cout << "接收到的消息为:" << (char *) buffer << std::endl;ev.data.fd = events[i].data.fd;ev.events = EPOLLOUT;epoll_ctl(epfd, EPOLL_CTL_MOD, events[i].data.fd, &ev);}else if(events[i].data.fd & EPOLLOUT){bzero(&buffer, sizeof(buffer));bcopy("The Author@: magicminglee@Hotmail.com", buffer, sizeof("The Author@: magicminglee@Hotmail.com"));ret = send(events[i].data.fd, buffer, strlen(buffer));if(ret < 0){std::cout << "服务器发送消息给客户端的时候出错啦" << std::endl;return -1;}ev.data.fd = events[i].data.fd;epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, &ev);}}}if(fd > 0){shutdown(fd, SHUT_RDWR);close(fd);}
}

2.JAVA

package com.tpw.summaryday.nio;import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** <h3>summaryday</h3>* <p></p>** @author : lipengyao* @date : 2021-09-16 16:50:54**/
@Slf4j
public class ServerReactor3 {public  static final  int workReactorNums = Runtime.getRuntime().availableProcessors() * 2;public static class WorkReactor {public static ExecutorService executorService = Executors.newFixedThreadPool(ServerReactor3.workReactorNums * 2);private Selector selector;private int reactorIndex;private int channelCnt;private Map<SocketChannel, ArrayDeque<String>> unWriteDataMap = new ConcurrentHashMap<>();private List<SocketChannel> waitRegisterChannels = new ArrayList<>();private Lock lock = new ReentrantLock();private int maxItemKeyCnt = 0;public WorkReactor(int reactorIndex) throws IOException {this.selector = Selector.open();this.reactorIndex = reactorIndex;this.channelCnt = 0;select();log.debug("register init channelCnt:{},reactorIndex:{},selectionKey:{}", this.channelCnt, this.reactorIndex,this.selector);}public void register(SocketChannel socketChannel) {lock.lock();try{waitRegisterChannels.add(socketChannel);log.debug("register add socket channel waitRegisterChannels:{}",waitRegisterChannels.size());}finally {lock.unlock();}}public void registerAllWaitChannels(){lock.lock();try{if (!this.waitRegisterChannels.isEmpty()){log.debug("registerAllWaitChannels will register begin waitRegisterChannels:{}",waitRegisterChannels.size());for (int i = 0; i < this.waitRegisterChannels.size(); i++) {this.interRegister(this.waitRegisterChannels.get(i));}this.waitRegisterChannels.clear();log.debug("registerAllWaitChannels  register end waitRegisterChannels:{}",waitRegisterChannels.size());}}finally {lock.unlock();}}public void interRegister(SocketChannel socketChannel){try {selector.wakeup();SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ /*| SelectionKey.OP_WRITE*/);int read = SelectionKey.OP_READ;this.channelCnt++;unWriteDataMap.put(socketChannel, new ArrayDeque<String>());log.debug("register channelCnt:{},reactorIndex:{},selectionKey:{}", this.channelCnt, this.reactorIndex, selectionKey);}catch (  Exception e){e.printStackTrace();}}public void select() throws ClosedChannelException {executorService.submit(() -> {log.debug("begin channelCnt:{},reactorIndex:{}", this.channelCnt, this.reactorIndex);while (true) {try {if (selector.select(10) <= 0) {
//                            TimeUnit.MILLISECONDS.sleep(10);
//                            log.debug(" has no event,continue,reactorIndex:{}", this.reactorIndex);this.registerAllWaitChannels();continue;}log.debug(" get some io event,will handle,reactorIndex:{}", this.reactorIndex);Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();Set<SelectionKey> allKeys = selector.keys();if (allKeys.size() > maxItemKeyCnt){maxItemKeyCnt = allKeys.size();}log.debug(" get select key--> selectionKeys:{},channelCnt:{},reactorIndex:{},allKeys:{},maxItemKeyCnt:{}",selectionKeys.size(), this.channelCnt, this.reactorIndex,allKeys.size(),maxItemKeyCnt);while (selectionKeyIterator.hasNext()) {SelectionKey t = selectionKeyIterator.next();selectionKeyIterator.remove();if (t.isReadable() && t.isValid()) {SocketChannel socketChannel = (SocketChannel) t.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int count = socketChannel.read(byteBuffer);log.debug(" read socketHandler:{},count:{}", socketChannel.getRemoteAddress(),count);if (count < 0) {log.debug(" read error ,will close channel remote Address:{}", socketChannel.getRemoteAddress());t.cancel();socketChannel.close();this.channelCnt--;continue;}byteBuffer.flip();byte[] inputData = new byte[byteBuffer.limit()];byteBuffer.get(inputData);String body = new String(inputData);log.debug(" read ok , ,reactorIndex:{},data:{}", reactorIndex, body);if (socketChannel.isConnected() && socketChannel.isOpen()) {byteBuffer.clear();String response = "server rsp:" + body + "\r\n";log.debug(" read ok , ,response:{}", response);
//                                unWriteDataMap.get(socketChannel).addLast(response);byte[] writeByets = response.getBytes("utf-8");byteBuffer.put(writeByets);byteBuffer.flip();socketChannel.write(byteBuffer);}} else if (t.isWritable()) {SocketChannel socketChannel = (SocketChannel) t.channel();while (!unWriteDataMap.get(socketChannel).isEmpty()) {String response = unWriteDataMap.get(socketChannel).pollFirst();log.debug(" write ok , ,response:{}", response);byte[] writeByets = response.getBytes("utf-8");ByteBuffer byteBuffer = ByteBuffer.allocate(1024);byteBuffer.put(writeByets);byteBuffer.flip();socketChannel.write(byteBuffer);}}selectionKeys.remove(t);}} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}});}}@Getterpublic static class SocketHandler {private String name;private Object channel;public SocketHandler(String name, Object channel) {this.name = name;this.channel = channel;}}public void start() throws IOException {ExecutorService executorService = Executors.newFixedThreadPool(100);Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);int port = 7020;serverSocketChannel.bind(new InetSocketAddress(port));serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new SocketHandler("acceptHandler", serverSocketChannel));log.debug(" start bind success port:{}", port);int clientIndex = 0;WorkReactor[] workReactors = new WorkReactor[workReactorNums];for (int i = 0; i < workReactorNums; i++) {workReactors[i] = new WorkReactor(i);}while (selector.select() > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();while (selectionKeyIterator.hasNext()) {SelectionKey t = selectionKeyIterator.next();selectionKeys.remove(t);if (t.isAcceptable()) {ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) t.channel();SocketHandler socketHandler = (SocketHandler) t.attachment();
//                    log.debug(" accept socketHandler:{}", socketHandler.getName());SocketChannel socketChannel = serverSocketChannel1.accept();socketChannel.configureBlocking(false);int reactorNumIndex = clientIndex++ % workReactorNums;log.debug(" accept new channel remote Address:{},reactorNumIndex:{}", socketChannel.getRemoteAddress(), reactorNumIndex);workReactors[reactorNumIndex].register(socketChannel);}}}}public static void main(String[] args) {ServerReactor3 serverReactor = new ServerReactor3();try {serverReactor.start();} catch (IOException e) {e.printStackTrace();}}
}

1.这里面的accept-reactor 一个selector,处理所有的接入连接。就是我们主类的start函数,会不断接收连接,然后将新获取到的socketChannel注册到work-reactor的处理类中。

2.work-reactor CPU核数selector 和CPU核心*2 的线程。所有的客户端连接采用roundRobin方式均匀分配到这些线程上。这里面的work-reactor为WorkReactor类,每个类会创建一个selector,并处理部分的socketChannel连接,分担任务。

3.注意:对一个selector的register和select等所有操作要放到同一个线程处理,否则会出现死锁。像accept-reactor在注册新连接到的socketChannel到work-reactor的selector时,不能直接注册,需放在和work-reactor的select方法同一个线程处理,否则会出现竞争死锁。

三、Selector.open初始化流程分析

1.首先会调用Selector.open()创建一个selector对象。注意,一般nio程序会部署在linux环境,所以我们查看linux下的JDK实现源码(一般下载open-jdk的linux源码进行分析)。

public static Selector open() throws IOException {return SelectorProvider.provider().openSelector();
}

2.provider由sun.nio.ch.DefaultSelectorProvider.create()创建。create方法内部通过系统的名称来创建创建SelectorProvider,在这里由于linux环境,创建了sun.nio.ch.EPollSelectorProvider

 public static SelectorProvider create() {String osName = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osName .equals("SunOS")) {return createProvider("sun.nio.ch.DevPollSelectorProvider");} else {return (SelectorProvider)(osName .equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());}
}

3.继续回到Selector的open()中,获取到SelecProvider实例之后,继续调用openSelector(),很自然进入EPollSelectorProvider的openSelector()方法

    public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}

4.EPollSelectorImpl为linux下的实现,继承图如下

5.我们来看EPollSelectorImpl初始化做的事情,EPollArrayWrapper为对linux的epoll操作的三个接口epoll_create,epoll_ctl,epoll_wait进行逻辑封装。

    EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;pollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}

创建fd0,fd1,并且调用pollWrapper.initInterrupt是将这两个本地描述符加到epoll的监控事件中,以便调用wakeUp函数时能跳出select函数,可以快速唤醒,结束等待或者阻塞,进而跳出循环或者注册新的事件。

    void initInterrupt(int fd0, int fd1) {outgoingInterruptFD = fd1;incomingInterruptFD = fd0;epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);}

6.父类SelectorImp的构造方法。JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept。内部初始化publicKeys和publicSelectedKeys,用到的容器是HashSet,前者用来保存所有的感兴趣的事件,后者准备好的事件。publicKeys 和publicSelectedKeys 引用这包装了一层权限控制,内部还是指向前面的HashSet。实际的堆内存中只有两个HashSet对象。下面是父类SelectorImpl的属性和构造方法:

    protected SelectorImpl(SelectorProvider sp) {super(sp);keys = ConcurrentHashMap.newKeySet();selectedKeys = new HashSet<>();publicKeys = Collections.unmodifiableSet(keys);publicSelectedKeys = Util.ungrowableSet(selectedKeys);}

7.EPollArrayWrapper完成了对epoll文件描述符的构建,以及对linux系统的epoll指令操纵的封装。维护每次selection操作的结果,即epoll_wait结果的epoll_event数组。这里面创建了epoll_create返回的句柄,pollArray就是int epoll_wait ( int epfd, struct epoll_event* events, int maxevents, int timeout );这个方法中的epoll_event数组对象,用来接收IO改变的事件,由内核 进行修改此本地对象。

    EPollArrayWrapper() throws IOException {// creates the epoll file descriptorepfd = epollCreate();// the epoll_event array passed to epoll_waitint allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>();}

8.eventsLow,eventsHigh都是存放socket的FD所注册关注的事件列表。 eventsLow以FD为数组下标,值为关注事件。eventsHigh则是以FD做为KEY,关注事件作为VALUEL,小于64K的FD放在低端事件数组,否则放在高端事件数组。

private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private Map<Integer,Byte> eventsHigh;
9.这里面还有几个参数,updateCount表示已注册事件的数目,updateDescriptors为更新事件的FD列表。registered表示这个FD是否已经注册到EPOLL句柄中,主要是用来在epll_wait中注册事件时作判断使用,首先注册则是要调用epoll_ctl(ctl_add),其它则是要调用epoll_ctl(ctl_modify),如果注销了epoll_ctl(ctl_del)
// number of file descriptors with registration changes pending
private int updateCount;// file descriptors with registration changes pending
private int[] updateDescriptors = new int[INITIAL_PENDING_UPDATE_SIZE];
// Used by release and updateRegistrations to track whether a file
// descriptor is registered with epoll.
private final BitSet registered = new BitSet();

四、channel.register注册流程分析

1.channel.register首先会调用基类的java.nio.channels.spi.AbstractSelectableChannel.register方法,

  1. 如果该channel和selector已经注册过,则直接添加事件和附件
  2. 否则通过selector实现注册过程。继续调用SelectorImp的register方法
  3. addKey(k)是将注册的key添加到socketchannel的成员变量keys[]中
public abstract class AbstractSelectableChannel extends SelectableChannel {private final SelectorProvider provider;private SelectionKey[] keys = null;public final SelectionKey register(Selector var1, int var2, Object var3) throws ClosedChannelException {synchronized(this.regLock) {if (!this.isOpen()) {throw new ClosedChannelException();}  else {SelectionKey var5 = this.findKey(var1);if (var5 != null) {var5.interestOps(var2);var5.attach(var3);}if (var5 == null) {synchronized(this.keyLock) {var5 = ((AbstractSelector)var1).register(this, var2, var3);this.addKey(var5);}}return var5;}}}

2.接着会调用selector.register方法,这个会调用selectorImpl这个基类的注册方法。这里会创建一个SelectionKeyImpl,里面包装了socketChannel,和当前的selector作为成员变量,并可以添加一个任意对象为附加数据。

   protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {if (!(var1 instanceof SelChImpl)) {throw new IllegalSelectorException();} else {SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);var4.attach(var3);synchronized(this.publicKeys) {this.implRegister(var4);}var4.interestOps(var2);return var4;}}

3.EPollSelectorImpl.implRegister,将channel对应的fd(文件描述符)和对应的SelectionKey放到fdToKey映射表中。fdToKey是一个map类型的结构,用来保存fd和key的映射关系。
将channel对应的fd(文件描述符)添加到EPollArrayWrapper中,并强制初始化fd的事件为0 ( 强制初始更新事件为0,因为该事件可能存在于之前被取消过的注册中。)
将selectionKey放入到keys集合中。

    protected void implRegister(SelectionKeyImpl ski) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;int fd = Integer.valueOf(ch.getFDVal());fdToKey.put(fd, ski);pollWrapper.add(fd);keys.add(ski);}
ch.getFDVal()就是socketChannel中的FD.keys就是我们前面讲的全量注册到selector的selectKey对象。

4.EpollWrapper的add方法内部调用了setUpdateEvents方法,并且把第二个参数事件类型(events)设置为0,即为初始值。在setUpdateEvents中,把fd作为数组的下表,值为事件类型。如果fd大于64*1024,则把fd和事件类型存入eventsHigh中,就是上面讲的EpollWrapper中的成员变量,两个事件数组和HASHMAP.

    void add(int fd) {// force the initial update events to 0 as it may be KILLED by a// previous registration.synchronized (updateLock) {assert !registered.get(fd);setUpdateEvents(fd, (byte)0, true);}}private void setUpdateEvents(int fd, byte events, boolean force) {if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}}}

注意,这里add时首先会判断registered中是否已经有此FD,有则报错,不能重复加入。

5.我们再回到第二步selectorImpl.register方法中,会调用SelectionKeyImpl.interestOps(var2);进行事件添加,接着调用到SelectionKeyImpl.nioInterestOps。

    public SelectionKey nioInterestOps(int var1) {if ((var1 & ~this.channel().validOps()) != 0) {throw new IllegalArgumentException();} else {this.channel.translateAndSetInterestOps(var1, this);this.interestOps = var1;return this;}}

6.上面的channel就是socketChannel,var1为关注的事件。sun.nio.ch.SocketChannelImpl.translateAndSetInterestOps 进行关注事件转换和设置。

    public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) {int var3 = 0;if ((var1 & 1) != 0) {var3 |= Net.POLLIN;}if ((var1 & 4) != 0) {var3 |= Net.POLLOUT;}if ((var1 & 8) != 0) {var3 |= Net.POLLCONN;}var2.selector.putEventOps(var2, var3);}

这里面的1,4,8分别为SelectionKey.OP_READ,OP_WRITE,OP_CONNECT,把这三个转换为Net的定义。

7.接着又调用了EpollSelectorImpl.putEventOps配置事件。

    public void putEventOps(SelectionKeyImpl ski, int ops) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;pollWrapper.setInterest(ch.getFDVal(), ops);}

8.接着调用了EpollWrapper的setInterest对FD设置关注事件。首先会判断存放fd的数组updateDescriptors是否已满,如果满了,则进行扩容,在原来的基础上加64,然后将fd保存到数组中。然后调用setUpdateEvents,设置fd的事件,在前面已经分析过一次。

    void setInterest(int fd, int mask) {synchronized (updateLock) {// record the file descriptor and eventsint oldCapacity = updateDescriptors.length;if (updateCount == oldCapacity) {int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;int[] newDescriptors = new int[newCapacity];System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);updateDescriptors = newDescriptors;}updateDescriptors[updateCount++] = fd;// events are stored as bytes for efficiency reasonsbyte b = (byte)mask;assert (b == mask) && (b != KILLED);setUpdateEvents(fd, b, false);}}

setUpdateEvent方法会将注册的感兴趣的事件和其对应的文件描述存储到EPollArrayWrapper对象的eventsLow或eventsHigh中,这是给底层实现epoll_wait时使用的。同时selectorImpl.register还会将设置SelectionKey的interestOps字段,这是给我们程序员获取使用的

这里会把新关注的事件的FD保存到updateDescriptors中。

五、selector.select注册流程分析

1.首先触发selectorImpl.select方法,为抽象基类的方法,这里面都使用了模板的设计模式,将具体的业务实现放在继承类中。抽象基类实现框架的流程。

    public int select(long var1) throws IOException {if (var1 < 0L) {throw new IllegalArgumentException("Negative timeout");} else {return this.lockAndDoSelect(var1 == 0L ? -1L : var1);}}private int lockAndDoSelect(long var1) throws IOException {synchronized(this) {synchronized(this.publicKeys) {synchronized(this.publicSelectedKeys) {var10000 = this.doSelect(var1);}}return var10000;}}protected abstract int doSelect(long var1) throws IOException;

2.该方法会一直阻塞直到至少一个channel被选择(即,该channel注册的事件发生了为止,除非当前线程发生中断或者selector的wakeup方法被调用。Selector.select方法最终调用的是EPollSelectorImpl的doSelect方法

 protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}

调用processDeregisterQueue方法,将cancel的selectionKey从selector中删除,底层会调用epoll_ctl方法移除被epoll所监听的文件描述符;
begin和end方法主要是为了处理线程中断,将线程的中断转化为Selector的wakeup方法,避免线程堵塞在IO操作上;
通过fdToKey查找文件描述符对应的SelectionKey,并更新之。

3.继续看poll的实现

 int poll(long timeout) throws IOException {updateRegistrations();updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;}

4.updateRegistrations方法 updateRegistrations()方法会将已经注册到该selector的fd和事件(eventsLow或eventsHigh)通过调用epollCtl(epfd, opcode, fd, events),注册到linux系统中。具体的实现如下:

 private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;if (events != KILLED) {if (isRegistered) {opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {epollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}}private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}}

这里首先从updateDescriptors中获取已注册的FD,然后通过fd从event_low,event_high中获取关注事件。再根据是否已注册来判断是调用ctl_add,还是ctl_modify,并配置到epoll事件模型中。

5.然后调用updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);进行IO事件等待,updated返回已更新的EPOLLEVENTS个数,并且内核会将新的IO事件放以pollArrayAddress(pollArray)。

6.继承回到第二步doSelect,有IO事件返回后,会调用updateSelectedKeys将接收到的IO事件更新到EpollSelectorImpl的keys,selectkeys集合中。

 private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;}

7.pollWrapper.getDescriptor从pollArray成员变量中根据索引和偏移位置获取EPOLLEVENT对象数组中的索引对象的描述符数据,pollWrapper.getEventOps则是获取新的事件。由于pollArray为本地对象,所以都是通过偏移位置去获取值的。

    int getEventOps(int i) {int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET;return pollArray.getInt(offset);}int getDescriptor(int i) {int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;return pollArray.getInt(offset);}

8.通过fdToKey的MAP对象获取FD所对应的selectKey,后面根据EpollSelectorImpl的成员变量selectKeys集合对象是否包含此selectKey,包含则直接将关注事件转换为应用层事件并设置到selectKeyImpl的interKeys关注事件成员变量中就可以,没有包含,则先要加入到集合中。

9.接着调用selectkey.channel就是socketChannel的translateAndSetReadyOps

sun.nio.ch.SocketChannelImplpublic boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {return translateReadyOps(ops, 0, ski);}public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {int intOps = ski.nioInterestOps();int oldOps = ski.nioReadyOps();int newOps = initialOps;if ((ops & Net.POLLNVAL) != 0) {// This should only happen if this channel is pre-closed while a// selection operation is in progress// ## Throw an error if this channel has not been pre-closedreturn false;}if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {newOps = intOps;ski.nioReadyOps(newOps);return (newOps & ~oldOps) != 0;}boolean connected = isConnected();if (((ops & Net.POLLIN) != 0) &&((intOps & SelectionKey.OP_READ) != 0) && connected)newOps |= SelectionKey.OP_READ;if (((ops & Net.POLLCONN) != 0) &&((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())newOps |= SelectionKey.OP_CONNECT;if (((ops & Net.POLLOUT) != 0) &&((intOps & SelectionKey.OP_WRITE) != 0) && connected)newOps |= SelectionKey.OP_WRITE;ski.nioReadyOps(newOps);return (newOps & ~oldOps) != 0;}

这里面把Net(就是EPOLL底层)的IO事件转换为SelectionKey的事件。然后又调用SelectKeyImpl的nioReadyOps再设置到关注事件成员变量readOps中。

sun.nio.ch.SelectionKeyImpl
public void nioReadyOps(int ops) {readyOps = ops;}

六、selector.selectedKeys()流程分析

1.这个方法比较简单,直接返回SelectorImpl的publicSelectedKeys,这个就是上面EpollSelectorImpl中的selectedKeys的引用,就不用重复讲了,在上面方法已经更新进去了。

    public Set<SelectionKey> selectedKeys() {if (!this.isOpen() && !Util.atBugLevel("1.4")) {throw new ClosedSelectorException();} else {return this.publicSelectedKeys;}}

部分摘自下面文章
原文链接:https://blog.csdn.net/TheLudlows/article/details/82931478

nio的epoll和selector实现流程分析相关推荐

  1. NIO - Selector源码分析

    1. 背景 SelectableChannel对象的多路复用器. 可以通过调用Selector.open()方法创建Selector对象.Selector.open()方法会利用系统默认的Select ...

  2. HDFS2.x之RPC流程分析

    HDFS2.x之RPC流程分析 1 概述 Hadoop提供了一个统一的RPC机制来处理client-namenode, namenode-dataname,client-dataname之间的通信.R ...

  3. java基础流程分析,及原理解析,因为bu满,而qian行

    基本功 =>同样的流程 别人的解释*=>基础二 面向对象的特征 四个基本特征:抽象,继承, 封装, 多态 抽象: 就好比用程序描述一个人,肯定得抽象的通过(身高,体重,年龄 , 胖瘦)这些 ...

  4. 面试必会系列 - 5.1 网络BIO、NIO、epoll,同步/异步模型、阻塞/非阻塞模型,你能分清吗?

    本文已收录至 Github(MD-Notes),若博客中图片模糊或打不开,可以来我的 Github 仓库,包含了完整图文:https://github.com/HanquanHq/MD-Notes,涵 ...

  5. netty 关闭chnnal_Netty 源码学习——服务端流程分析

    在上一篇我们已经介绍了客户端的流程分析,我们已经对启动已经大体上有了一定的认识,现在我们继续看对服务端的流程来看一看到底有什么区别. 服务端代码 public class NioServer { pr ...

  6. 【操作系统三】图解网络IO(bio\nio\slect\epoll)

    [操作系统三]图解网络IO+实战 一.计算机组成 二.系统中断 三.晶振(时间中断.分时复用) 四.事件中断 1.DMA 2.事件中断 3.网卡也会产生中断? 五.linux系统知识 1.linux下 ...

  7. muduo网络库学习总结:基本架构及流程分析

    muduo网络库学习:基本架构及流程分析 基本架构 Basic Reactor Mutiple Reactor + ThreadPool muduo库的基本使用 基本结构介绍 EventLoop类 P ...

  8. VLC架构及流程分析

    0x00 前置信息 VLC是一个非常庞大的工程,我从它的架构及流程入手进行分析,涉及到一些很细的概念先搁置一边,日后详细分析. 0x01 源码结构(Android Java相关的暂未分析) # bui ...

  9. 动态执行流程分析和性能瓶颈分析的利器——gperftools的Cpu Profiler

    在<动态执行流程分析和性能瓶颈分析的利器--valgrind的callgrind>中,我们领略了valgrind对流程和性能瓶颈分析的强大能力.本文将介绍拥有相似能力的gperftools ...

最新文章

  1. c++Primer再学习-练习Todo
  2. 简述mysql半同步复制—semisync
  3. LaTex文章中插入Visio及Matlab矢量图
  4. 用曼哈顿距离来巧解---输出菱形的问题
  5. 计算机动漫游戏制作 巴中,四川省哪些中专学校有计算机动漫与游戏制作专业...
  6. 年假.........
  7. Qt工作笔记-html做界面时,QFileInfo小技巧,获取前端页面
  8. Mac homebrew报错Error: homebrew-core is a shallow clone.
  9. 括号配对问题----栈模拟
  10. UVa 808 (建坐标系、找规律) Bee Breeding
  11. 正确学习JavaScript知识和教程
  12. 【自动驾驶】模型预测控制(MPC)实现轨迹跟踪
  13. 火狐浏览器不能看网页视频了的解决方法
  14. Oracle 函数编写
  15. 结合可变形注意力的视觉Transformer
  16. 通信领域的专有名词释义
  17. IMX6ULL与IMX6UL异同(主要是优化了其成本:安全功能减低,优化功耗EMC SIM模块删除等与但和I.MX6UltraLite芯片是PIN-2-PIN兼容的)
  18. 国际日期书写标准格式
  19. 数学美 之 判断线段相交的最简方法
  20. 艺赛旗RPA--经验分享:Python 之深浅拷贝

热门文章

  1. SAP直接踢人下线 SM04
  2. SAP概念之Client(集团)
  3. python中从键盘输入的代码_Python读取键盘输入的2种方法
  4. svpwm仿真_案例12:三相三线PWM整流仿真建模
  5. curl封装php,PHP封装curl的简单方法
  6. vue 计算属性和data_vue之watch和计算属性computed
  7. php 不存在给默认值,当属性不存在时,创建一个属性并给它一个默认值
  8. Python面向对象中:__init__() 构造方法
  9. Python的__getattr__方法学习
  10. python中如何将字符串连接在一起,多倍的字符串如何输出