1. Selector设计

笔者下载得是openjdk8的源码, 画出类图

比较清晰得看到,openjdk中Selector的实现是SelectorImpl,然后SelectorImpl又将职责委托给了具体的平台,比如图中框出的

  • linux2.6以后才有的EpollSelectorImpl
  • Windows平台是WindowsSelectorImpl
  • MacOSX平台是KQueueSelectorImpl

从名字也可以猜到,openjdk肯定在底层还是用epoll,kqueue,iocp这些技术来实现的I/O多路复用

  1. 获取Selector

众所周知,Selector.open()可以得到一个Selector实例,怎么实现的呢?

// Selector.java
public static Selector open() throws IOException {// 首先找到provider,然后再打开Selectorreturn SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProviderpublic static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 这里就是打开Selector的真正方法provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}

在openjdk中,每个操作系统都有一个sun.nio.ch.DefaultSelectorProvider实现,以solaris为例:

/*** Returns the default SelectorProvider.*/
public static SelectorProvider create() {// 获取OS名称String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));// 根据名称来创建不同的Selctorif (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider();
}

如果系统名称是Linux的话,真正创建的是sun.nio.ch.EPollSelectorProvider。如果不是SunOS也不是Linux,就使用sun.nio.ch.PollSelectorProvider, 关于PollSelector有兴趣的读者自行了解下, 本文仅以实际常用的EpollSelector为例探讨。

打开sun.nio.ch.EPollSelectorProvider查看openSelector方法

public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);
}

很直观,这样我们在Linux平台就得到了最终的Selector实现:sun.nio.ch.EPollSelectorImpl

  1. EPollSelector如何进行select
    epoll系统调用主要分为3个函数
  • epoll_create: 创建一个epollfd,并开辟epoll自己的内核高速cache区,建立红黑树,分配好想要的size的内存对象,建立一个list链表,用于存储准备就绪的事件。
  • epoll_wait: 等待内核返回IO事件
  • epoll_ctl: 对新旧事件进行新增修改或者删除
  • 3.1 Epoll fd的创建

EPollSelectorImpl的构造器代码如下:

EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中// 高32位代表读 低32位代表写// 使用pipe为了实现Selector的wakeup逻辑long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 新建一个EPollArrayWrapperpollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();
}

再看EPollArrayWrapper的初始化过程

EPollArrayWrapper() throws IOException {// creates the epoll file descriptor// 创建epoll fdepfd = epollCreate();// the epoll_event array passed to epoll_waitint allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>();
}
private native int epollCreate();

在初始化过程中调用了epollCreate方法,这是个native方法。
打开

jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/// 这里的size可以不指定,从Linux2.6.8之后,改用了红黑树结构,指定了大小也没啥用int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd;
}

可以看到最后还是使用了操作系统的api: epoll_create函数

  • 3.2 Epoll wait等待内核IO事件

调用Selector.select(),最后会委托给各个实现的doSelect方法,限于篇幅不贴出太详细的,这里看下EpollSelectorImpl的doSelect方法

protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();// 真正的实现是这行pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();// 以下基本都是异常处理if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;
}

然后我们去看pollWrapper.poll, 打开jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java:

int poll(long timeout) throws IOException {updateRegistrations();// 这个epollWait是不是有点熟悉呢?updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;
}private native int epollWait(long pollAddress, int numfds, long timeout,int epfd) throws IOException;

epollWait也是个native方法,打开c代码一看:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd)
{struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) {           /* Indefinite or no wait */// 发起epoll_wait系统调用等待内核事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else {                      /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res;
}

可以看到,最后还是发起的epoll_wait系统调用.

  • 3.3 epoll control以及openjdk对事件管理的封装

JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.

调用Selector.register()时均会将事件存储到EpollArrayWrapper的成员变量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;/*** Sets the pending update events for the given file descriptor. This* method has no effect if the update events is already set to KILLED,* unless {@code force} is {@code true}.*/
private void setUpdateEvents(int fd, byte events, boolean force) {// 判断fd和数组长度if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}}
}

上面看到EpollArrayWrapper.poll()的时候, 首先会调用updateRegistrations

/*** Returns the pending update events for the given file descriptor.*/
private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}
}/*** Update the pending registrations.*/
private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];// 从保存的eventsLow和eventsHigh里取出事件short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;if (events != KILLED) {// 判断操作类型以传给epoll_ctl// 没有指定EPOLLET事件类型if (isRegistered) {opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 熟悉的epoll_ctlepollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);

在获取到事件之后将操作委托给了epollCtl,这又是个native方法,打开相应的c代码一看:

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events)
{struct epoll_event event;int res;event.events = events;event.data.fd = fd;// 发起epoll_ctl调用来进行IO事件的管理RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");}
}

原来还是我们的老朋友epoll_ctl.
有个小细节是jdk没有指定ET(边缘触发)还是LT(水平触发),所以默认会用LT:)

AbstractSelectorImpl中有3个set保存事件

// Public views of the key sets
// 注册的所有事件
private Set<SelectionKey> publicKeys;             // Immutable
// 内核返回的IO事件封装,表示哪些fd有数据可读可写
private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition// 取消的事件
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

EpollArrayWrapper.poll调用完成之后, 会调用updateSelectedKeys来更新上面的仨set

private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;

}
代码很直白,拿出事件对set比对操作。

  1. Selector类的相关方法

重点注意四个方法

  • select(): 这是一个阻塞方法,调用该方法,会阻塞,直到返回一个有事件发生的selectionKey集合
  • selectNow() :非阻塞方法,获取不到有事件发生的selectionKey集合,也会立即返回
  • select(long):阻塞方法,如果没有获取到有事件发生的selectionKey集合,阻塞指定的long时间
  • selectedKeys(): 返回全部selectionKey集合,不管是否有事件发生

可以理解:selector一直在监听select()

  1. Selector、SelectionKey、ServerScoketChannel、ScoketChannel的关系
  • Server代码:
public class NIOServer {public static void main(String[] args) throws Exception{//创建ServerSocketChannel -> ServerSocketServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//得到一个Selecor对象Selector selector = Selector.open();//绑定一个端口6666, 在服务器端监听serverSocketChannel.socket().bind(new InetSocketAddress(6666));//设置为非阻塞serverSocketChannel.configureBlocking(false);//把 serverSocketChannel 注册到  selector 关心 事件为 OP_ACCEPTserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1//循环等待客户端连接while (true) {//这里我们等待1秒,如果没有事件发生, 返回if(selector.select(1000) == 0) { //没有事件发生System.out.println("服务器等待了1秒,无连接");continue;}//如果返回的>0, 就获取到相关的 selectionKey集合//1.如果返回的>0, 表示已经获取到关注的事件//2. selector.selectedKeys() 返回关注事件的集合//   通过 selectionKeys 反向获取通道Set<SelectionKey> selectionKeys = selector.selectedKeys();System.out.println("selectionKeys 数量 = " + selectionKeys.size());//遍历 Set<SelectionKey>, 使用迭代器遍历Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()) {//获取到SelectionKeySelectionKey key = keyIterator.next();//根据key 对应的通道发生的事件做相应处理if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接//该该客户端生成一个 SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());//将  SocketChannel 设置为非阻塞socketChannel.configureBlocking(false);//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel//关联一个BuffersocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..}if(key.isReadable()) {  //发生 OP_READ//通过key 反向获取到对应channelSocketChannel channel = (SocketChannel)key.channel();//获取到该channel关联的bufferByteBuffer buffer = (ByteBuffer)key.attachment();channel.read(buffer);System.out.println("form 客户端 " + new String(buffer.array()));}//手动从集合中移动当前的selectionKey, 防止重复操作keyIterator.remove();}}}
}
  • Client代码
public class NIOClient {public static void main(String[] args) throws Exception{//得到一个网络通道SocketChannel socketChannel = SocketChannel.open();//设置非阻塞socketChannel.configureBlocking(false);//提供服务器端的ip 和 端口InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);//连接服务器if (!socketChannel.connect(inetSocketAddress)) {while (!socketChannel.finishConnect()) {System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");}}//...如果连接成功,就发送数据String str = "hello, 尚硅谷~";//Wraps a byte array into a bufferByteBuffer buffer = ByteBuffer.wrap(str.getBytes());//发送数据,将 buffer 数据写入 channelsocketChannel.write(buffer);System.in.read();}
}
  1. 总结


jdk中Selector是对操作系统的IO多路复用调用的一个封装,在Linux中就是对epoll的封装。epoll实质上是将event loop交给了内核,因为网络数据都是首先到内核的,直接内核处理可以避免无谓的系统调用和数据拷贝, 性能是最好的。jdk中对IO事件的封装是SelectionKey, 保存Channel关心的事件。

本文转自

高并发核心Selector详解相关推荐

  1. 千万级PV规模高性能高并发网站架构详解

    1.缓存(expires) 2.deflate压缩 3.Apache/nginx静态服务器提供html页面内容 4.CDN/cache缓存静态内容如:html.jpg.gif.js等 5.MYSQL数 ...

  2. Nginx10m+高并发内核优化详解

    何为高并发 默认的Linux内核参数考虑的是最通用场景,不符合用于支持高并发访问的Web服务器,所以需要修改Linux内核参数,这样可以让Nginx拥有更高的性能: 在优化内核时,可以做的事情很多,不 ...

  3. 高并发服务器开源项目,高并发服务器框架详解 - osc_qgfjs4a5的个人空间 - OSCHINA - 中文开源技术交流社区...

    1)如何设计如何扩展 2)什么是高并发 (1)任务:完成某个功能的一个一个目标任务,服务器程序也是不例外的. (2)CPU核心:完成具体任务的,是CPU核心 + 周围的外设(读写磁盘IO.网络IO) ...

  4. 哪个服务器支持高并发,IIS Web服务器支持高并发设置方法详解

    这篇文章主要介绍了IIS Web服务器如何支持高并发,详细设置方法在下面,大家参考使用吧 适用的IIS版本:IIS 7.0, IIS 7.5, IIS 8.0 适用的Windows版本:Windows ...

  5. PHP中利用redis实现消息队列处理高并发请求思路详解

    在电商活动中,常常会出现高并发的情况,例如很多人同时点击购买按钮,以至于购买人数超出了库存量,这是一种非常不理想的状况,因此,我们在PHP开发中就会引入消息队列来解决这种高并发的问题. 当用户点击按钮 ...

  6. 高并发高流量网站架构详解

    (推荐)高并发高流量网站架构详解 Web2.0的兴起,掀起了互联网新一轮的网络创业大潮.以用户为导 向的新网站建设概念,细分了网站功能和用户群,不仅成功的造就了一大批新生的网站,也极大的方便了上网的人 ...

  7. 《Java高并发核心编程.卷2,多线程、锁、JMM、JUC、高并发设计模式》

    <Java高并发核心编程.卷2,多线程.锁.JMM.JUC.高并发设计模式> 目录 第1章 多线程原理与实战 1.2 无处不在的进程和线程 1.2.1 进程的基本原理 1.2.2 线程的基 ...

  8. 大数据是什么和大数据技术十大核心原理详解

     一.数据核心原理   从"流程"核心转变为"数据"核心   大数据时代,计算模式也发生了转变,从"流程"核心转变为"数据&quo ...

  9. ext核心API详解

    http://hi.baidu.com/j2me/profile 1 EXT核心API详解(一)-Ext 1 EXT核心API详解(二)-Array/Date/Function/Number/Stri ...

最新文章

  1. Android学习----自适应国际化语言
  2. java设计模式:简单工厂模式
  3. 嵌入式开发基础环境搭建
  4. HTML5 API详解(5):Page Visibility API帮您省流量,提高体验
  5. LDA的Gibbs抽样详细推理与理解
  6. asp.net使用 csla 序列化错误 有关于wcf的错误.
  7. 全自动光电整纬机安装和功能分析
  8. linux c 创建新线程,Linux C Phread 入门1---线程创建
  9. web SQL注入漏洞
  10. 【控制】《最优控制理论与系统》-胡寿松老师-第4章-动态规划
  11. Java程序员怎么优雅迈过30K+这道坎?附超全教程文档
  12. 福建省小学四年级上册计算机知识点总结,小学四年级上册数学知识点大全【1-6单元】...
  13. [转]应该被记住的天才,写在图灵诞辰100周年
  14. python3抓取aqi
  15. 再肝一个R包!一行代码绘制精美火山图!
  16. 正则表达式在线测试一
  17. 钉钉企业内部机器人python开发(公网部署版本)
  18. 金大侠眼光果然犀利:《笑傲江湖》只值一块钱
  19. python如何定义带有可选参数的函数_python中如何正确调用带可选参数的函数
  20. Directx11教程七之2D渲染

热门文章

  1. java中的Calendar
  2. ubuntu最基本的软件
  3. WinForm 之 程序启动不显示主窗体
  4. ie bug(如果不足,留言大家一起分享)
  5. [动规] hihocoder 1149 回文字符序列
  6. android — JNI注册方法说明
  7. jQuery(UI)常用插件
  8. 优化应用不可不知道的知识
  9. Python学习笔记:匿名函数
  10. [云炬python3玩转机器学习笔记] 3-3Numpy数据基础