本文分享内容如下

  1. select()和空轮询bug解决分析
  2. EventLoop 中对selectKeys的改造
  3. wakeup分析

select()和空轮询bug解决分析

当select空轮询( selector.select(timeoutMillis); 未等待 timeoutMillis) 执行 次数 达到SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时重新创建 selector, 并注册所有的channel和关注的事件。

private void select() throws IOException {

Selector selector = this.selector;

try {

int selectCnt = 0;

long currentTimeNanos = System.nanoTime();

//delayNanos()获取即将执行的定时任务距离要执行的时间纳秒差值, 没有获取到返回 默认值1000ms

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {

// 因为EventLoop 要同时 select IO事件和执行任务,不能一直阻塞 ,当超出 期限时间后,就跳出select(),执行任务。

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

// 假设timeoutMillis 1000ms ,经过一次或多次循环后执行时间超出1000ms,则退出select循环。(注释A)

if (timeoutMillis <= 0) {

if (selectCnt == 0) {

selector.selectNow();

selectCnt = 1;

}

break;//code B }

//如果查询到IO事件会正常跳出循环,或者按照timeoutMillis时长阻塞后 code B 跳出循环,否则就是发生了空轮询。

int selectedKeys = selector.select(timeoutMillis);

selectCnt ++;

//有IO事件,被唤醒,有需要执行的任务 都跳出循环

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {

// Selected something,

// waken up by user, or

// the task queue has a pending task.

break;

}

//解决NIO selector 空轮询的bug。注释A 中的处理,当selectCnt数量过大,一定是selector.select(timeoutMillis) 中 阻塞功能失效,发生了空轮询,当空轮询数过多时,为了防止空轮询 CPU达到100%, 重建selector

if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&

selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

// The selector returned prematurely many times in a row.

// Rebuild the selector to work around the problem.

logger.warn(

"Selector.select() returned prematurely {} times in a row; rebuilding selector.",

selectCnt);

//重新创建 selector, 并注册所有的channel和关注的事件

rebuildSelector();

selector = this.selector;

// Select again to populate selectedKeys.

selector.selectNow();

selectCnt = 1;

break;

}

currentTimeNanos = System.nanoTime();

}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {

if (logger.isDebugEnabled()) {

logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);

}

}

} catch (CancelledKeyException e) {

if (logger.isDebugEnabled()) {

logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);

}

// Harmless exception - log anyway

}

}

rebuildSelector分析

public void rebuildSelector() {

if (!inEventLoop()) {

execute(new Runnable() {

@Override

public void run() {

rebuildSelector();

}

});

return;

}

final Selector oldSelector = selector;

final Selector newSelector;

if (oldSelector == null) {

return;

}

try {

newSelector = openSelector();

} catch (Exception e) {

logger.warn("Failed to create a new Selector.", e);

return;

}

// Register all channels to the new Selector.

int nChannels = 0;

for (;;) {

try {

for (SelectionKey key: oldSelector.keys()) {

Object a = key.attachment();

try {

if (key.channel().keyFor(newSelector) != null) {

continue;

}

int interestOps = key.interestOps();

key.cancel();

key.channel().register(newSelector, interestOps, a);

nChannels ++;

} catch (Exception e) {

logger.warn("Failed to re-register a Channel to the new Selector.", e);

if (a instanceof AbstractNioChannel) {

AbstractNioChannel ch = (AbstractNioChannel) a;

ch.unsafe().close(ch.unsafe().voidPromise());

} else {

@SuppressWarnings("unchecked")

NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

invokeChannelUnregistered(task, key, e);

}

}

}

} catch (ConcurrentModificationException e) {

// Probably due to concurrent modification of the key set.

continue;

}

break;

}

selector = newSelector;

try {

// time to close the old selector as everything else is registered to the new one

oldSelector.close();

} catch (Throwable t) {

if (logger.isWarnEnabled()) {

logger.warn("Failed to close the old Selector.", t);

}

}

logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");

}

rebuildSelector整体的逻辑比较清晰,

先创建Selector ,将原来的 channel,interestOps,attachment 注册到新的Selector 上,然后关闭旧的Selector。

EventLoop 中对selectKeys的改造

selectedKeys是一个 SelectedSelectionKeySet 类对象,

  1. 每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将 SelectionKey 塞到 set中去,而jdk底层使用的hashSet需要O(lgn)的时间复杂度
  2. 优化过的 SelectedSelectionKeySet 的好处,遍历的时候遍历的是数组,相对jdk原生的HashSet效率有所提高

SelectedSelectionKeySet

当IO事件发生了 一定是调用了add方法, 这里只需要O(1)的时间复杂度。

public boolean add(SelectionKey o) {

if (o == null) {

return false;

}

if (isA) {

int size = keysASize;

keysA[size ++] = o;

keysASize = size;

if (size == keysA.length) {

doubleCapacityA();

}

} else {

int size = keysBSize;

keysB[size ++] = o;

keysBSize = size;

if (size == keysB.length) {

doubleCapacityB();

}

}

return true;

}

add 根据isA 判断使用哪个数组,实际上 keysA,keysB 这个两个数组是轮流使用的。

SelectionKey[] flip() {

if (isA) {

isA = false;

keysA[keysASize] = null;//因为数组存在复用,按照add的逻辑 keysASize位置应该是无效的

keysBSize = 0;//翻转前 将另一个数组 的添加位置赋值为0

return keysA;

} else {

isA = true;

keysB[keysBSize] = null;

keysASize = 0;

return keysB;

}

}

isA ,filp() 都是为使用2个数组而设计的。

filp这样设计原本是处于 高并发,一致性的考虑,在高并发的情况下 如果只有一个数组存储SelectKey, 这个数组会一直增长,假设数组没有并发问题,线程会一直处理IO事件,IO任务就一直得不到处理,而数组的修改 是有并发问题的,添加进来的SelectKey有可能不会被及时的处理而跳过,而使用两个数组,一个用于添加SelectKey,一个用于SelectKey的分发执行。这样做是巧妙的办法,而新的版本中已经改为一个数组了,作者描述:一个数组虽然有一致性的问题,但是分发执行的时候小心使用可以解决这个问,如传递一个定长的size。

该问题官方描述:https://github.com/netty/netty/issues/6058#

wakeup分析

NioEventLoop run方法负责轮询IO事件和执行IO任务,这里简称为IO轮询方法.

IO轮询方法中有wakeup 的处理,还有wakeup好多的注释,花了我3个多小时,终于研究明白

通过分析原文注释和实验分析得知 使用selelct.wakeup()效果如下:

先执行selelct的还没返回的操作立即返回。

如果没有执行selelct,则下一次阻塞的 select() select(long timeout) 会立即返回

selectNow(), select() select(long timeout)都会清除 wakeup状态,不会影响下次 select() select(long timeout)的阻塞。

NioEventLoop向外暴露的wakeup方法

protected void wakeup(boolean inEventLoop) {

if (!inEventLoop && wakenUp.compareAndSet(false, true)) {

selector.wakeup();

}

}

这里根据inEventLoop进行判断,也就是说只有初次启动,或非EventLoop线程的才有可能修改wakenUp,并执行selector.wakeup();

调用场景

SingleThreadEventExecutor-execute()

public void execute(Runnable task) {

//...

boolean inEventLoop = inEventLoop();

if (inEventLoop) {

addTask(task);

} else {

startThread();

addTask(task);

if (isShutdown() && removeTask(task)) {

reject();

}

}

if (!addTaskWakesUp) {//addTaskWakesUp 默认是false

wakeup(inEventLoop);

}

}

熟悉吧?就是在启动EventLoop或提交IO任务时候会调用wakeup()。为啥要就这样搞这里先留个疑问 设为 问题1

结合IO轮询方法分析,如下

protected void run() {

for (;;) {

oldWakenUp = wakenUp.getAndSet(false);

try {

if (hasTasks()) {

selectNow();

} else {

select();

//源代码有很多注释,难以读懂 设为 问题2

if (wakenUp.get()) {

selector.wakeup();

}

}

//....

}

注意:selector的唤醒都是调用 NioEventLoop.wakeup()

问题1 IO轮询方法中 IO事件IO任务循环顺序执行,如果用户线程提交IO任务,而IO轮询方法所在线程由于没有IO事件,一直阻塞在select(long timeout)中,就影响了用户线程IO任务的执行, 所以需要执行selector.wakeup来停止阻塞,执行用户线程。

问题2 既然要执行selector.wakeup,那么 IO轮询过程中处于阻塞状态中执行是最有用的。分析IO轮询方法会出现2种不理想情况

  1. selector.wakeup在 wakenUp.getAndSet(false) 和 select(long timeout)之间执行
  2. selector.wakeup在 select(long timeout)和 if (wakenUp.get()){。。。}之间执行

情况2 下次执行select(long timeout)不会阻塞,算是尽量满足减少阻塞时间的需求。

情况1 由于执行了select(long timeout)后立即返回,导致selector 的wakeup状态复原,在这个期间,wakenUp.get()=false,后续执行 NioEventLoop.wakeup()不会调用成功,希望减少阻塞的目标没有达成,因此需要尽可能的完成目标。

if (wakenUp.get()) {

selector.wakeup();

}

这个尽可能的减少阻塞事件的处理有问题,如果情况2没有发生,会多执行了一次selector.wakeup();,猜测netty作者是经过权衡,才这么做的。

IO轮询方法中 IO事件IO任务按照配置好的时间比例执行,默认 50比50。selector.wakeup的运用是对此的优化。深究无用,理解到此就可以了。

netty源码分析7-NioEventLoop-run方法疑难点相关推荐

  1. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  2. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  3. Netty源码分析系列之服务端Channel的端口绑定

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...

  4. Netty源码分析系列之常用解码器(下)——LengthFieldBasedFrameDecoder

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 前言 在上一篇文章中分析了三个比较简单的解码器,今天接着分析最后一个常用的解码器:Leng ...

  5. Netty源码分析(六)—Future和Promis分析

    Netty源码分析(六)-Future和Promis分析 Future用来在异步执行中获取提前执行的结果 个人主页:tuzhenyu's page 原文地址:Netty源码分析(六)-Future和P ...

  6. Netty源码分析第6章(解码器)----第4节: 分隔符解码器

    Netty源码分析第6章(解码器)---->第4节: 分隔符解码器 Netty源码分析第六章: 解码器 第四节: 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecode ...

  7. Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder

    Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...

  8. Netty源码分析第5章(ByteBuf)----第5节: directArena分配缓冲区概述

    Netty源码分析第5章(ByteBuf)---->第5节: directArena分配缓冲区概述 Netty源码分析第五章: ByteBuf 第五节: directArena分配缓冲区概述 上 ...

  9. netty源码分析系列——EventLoop

    2019独角兽企业重金招聘Python工程师标准>>> 前言 EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面 ...

最新文章

  1. 计算机行业越来越卷,AI都会刷LeetCode了,网友:比我强
  2. 绍兴袍江计算机培训,绍兴春华电脑基础培训班
  3. html5 静态网页 线程,HTML5 Web Workers之网站也能多线程的实现
  4. B--Bookshelf 2
  5. php实现隐藏字符串的功能
  6. 如何在Jsp上传图片
  7. python中的PEP是什么?怎么理解?(转)
  8. docker 时区_centos7.X上部署docker并运行常用的应用
  9. 衔着树枝飞跃太平洋的傻鸟!(童话版)
  10. 系统架构师 项目经理 哪个更有前景_OLED和QLED电视有什么区别?哪个更好更有前景?...
  11. js src 变量_人人都能看懂的鸿蒙 “JS 小程序” 数据绑定原理
  12. C#: 数字经纬度和度分秒经纬度间的转换
  13. oracle 存储过程body,【随手记】Oracle存储过程报错 Compilation errors for PACKAGE BODY
  14. Access-Control-Allow- 跨域CORS 的使用
  15. 初探MYD-AM335x开发板
  16. 计算机组装后要干什么,电脑组装完后还有哪些事需要干?
  17. oracle 毫秒时间换mysql_Mysql与Oracle常用时间格式的转换
  18. JS中关于a+aa+aaa+aaaa的简便计算方法
  19. 四国外交官变身天猫双11“首席惊喜官”,给剁手党们送快递
  20. 岛屿类问题通用解法与DFS框架

热门文章

  1. 电机学他励直流发电机matlab,基于Matlab并励直流发电机的自励过程分析
  2. simulink实现他励直流电动机串电阻调速仿真
  3. 立冬、小雪、凛冬将至:如何理解美国科技企业裁员潮?
  4. 百度传课-php2小时超音速入门
  5. ae制h5文字动画_html5酷炫的文字打字动画特效
  6. Linux中FTP设置登录欢迎词,怎么为FTP登陆用户设置欢迎语(servu)
  7. HDR视频色调映射算法(之三:Block matching TMO)
  8. 计算机广告设计发展前景,2018广告设计与制作专业就业前景和就业方向分析
  9. STP生成树协议切割网络环路
  10. 中国交通标志牌数据集TT1OOK中的类别ID及其图标罗列以及含义详细介绍