netty源码分析7-NioEventLoop-run方法疑难点
本文分享内容如下
- select()和空轮询bug解决分析
- EventLoop 中对selectKeys的改造
- wakeup分析
select()和空轮询bug解决分析
当select空轮询( selector.select(timeoutMillis); 未等待 timeoutMillis) 执行 次数 达到SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时重新创建 selector, 并注册所有的channel和关注的事件。
private void select() throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//delayNanos()获取即将执行的定时任务距离要执行的时间纳秒差值, 没有获取到返回 默认值1000ms
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 因为EventLoop 要同时 select IO事件和执行任务,不能一直阻塞 ,当超出 期限时间后,就跳出select(),执行任务。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 假设timeoutMillis 1000ms ,经过一次或多次循环后执行时间超出1000ms,则退出select循环。(注释A)
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;//code B }
//如果查询到IO事件会正常跳出循环,或者按照timeoutMillis时长阻塞后 code B 跳出循环,否则就是发生了空轮询。
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//有IO事件,被唤醒,有需要执行的任务 都跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
break;
}
//解决NIO selector 空轮询的bug。注释A 中的处理,当selectCnt数量过大,一定是selector.select(timeoutMillis) 中 阻塞功能失效,发生了空轮询,当空轮询数过多时,为了防止空轮询 CPU达到100%, 重建selector
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
//重新创建 selector, 并注册所有的channel和关注的事件
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = System.nanoTime();
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}
rebuildSelector分析
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, a);
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
rebuildSelector整体的逻辑比较清晰,
先创建Selector ,将原来的 channel,interestOps,attachment 注册到新的Selector 上,然后关闭旧的Selector。
EventLoop 中对selectKeys的改造
selectedKeys是一个 SelectedSelectionKeySet 类对象,
- 每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将 SelectionKey 塞到 set中去,而jdk底层使用的hashSet需要O(lgn)的时间复杂度
- 优化过的 SelectedSelectionKeySet 的好处,遍历的时候遍历的是数组,相对jdk原生的HashSet效率有所提高
SelectedSelectionKeySet
当IO事件发生了 一定是调用了add方法, 这里只需要O(1)的时间复杂度。
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
add 根据isA 判断使用哪个数组,实际上 keysA,keysB 这个两个数组是轮流使用的。
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;//因为数组存在复用,按照add的逻辑 keysASize位置应该是无效的
keysBSize = 0;//翻转前 将另一个数组 的添加位置赋值为0
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
isA ,filp() 都是为使用2个数组而设计的。
filp这样设计原本是处于 高并发,一致性的考虑,在高并发的情况下 如果只有一个数组存储SelectKey, 这个数组会一直增长,假设数组没有并发问题,线程会一直处理IO事件,IO任务就一直得不到处理,而数组的修改 是有并发问题的,添加进来的SelectKey有可能不会被及时的处理而跳过,而使用两个数组,一个用于添加SelectKey,一个用于SelectKey的分发执行。这样做是巧妙的办法,而新的版本中已经改为一个数组了,作者描述:一个数组虽然有一致性的问题,但是分发执行的时候小心使用可以解决这个问,如传递一个定长的size。
该问题官方描述:https://github.com/netty/netty/issues/6058#
wakeup分析
NioEventLoop run方法负责轮询IO事件和执行IO任务,这里简称为IO轮询方法.
IO轮询方法中有wakeup 的处理,还有wakeup好多的注释,花了我3个多小时,终于研究明白
通过分析原文注释和实验分析得知 使用selelct.wakeup()效果如下:
先执行selelct的还没返回的操作立即返回。
如果没有执行selelct,则下一次阻塞的 select() select(long timeout) 会立即返回
selectNow(), select() select(long timeout)都会清除 wakeup状态,不会影响下次 select() select(long timeout)的阻塞。
NioEventLoop向外暴露的wakeup方法
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
这里根据inEventLoop进行判断,也就是说只有初次启动,或非EventLoop线程的才有可能修改wakenUp,并执行selector.wakeup();
调用场景
SingleThreadEventExecutor-execute()
public void execute(Runnable task) {
//...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {//addTaskWakesUp 默认是false
wakeup(inEventLoop);
}
}
熟悉吧?就是在启动EventLoop或提交IO任务时候会调用wakeup()。为啥要就这样搞这里先留个疑问 设为 问题1
结合IO轮询方法分析,如下
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
//源代码有很多注释,难以读懂 设为 问题2
if (wakenUp.get()) {
selector.wakeup();
}
}
//....
}
注意:selector的唤醒都是调用 NioEventLoop.wakeup()
问题1 IO轮询方法中 IO事件IO任务循环顺序执行,如果用户线程提交IO任务,而IO轮询方法所在线程由于没有IO事件,一直阻塞在select(long timeout)中,就影响了用户线程IO任务的执行, 所以需要执行selector.wakeup来停止阻塞,执行用户线程。
问题2 既然要执行selector.wakeup,那么 IO轮询过程中处于阻塞状态中执行是最有用的。分析IO轮询方法会出现2种不理想情况
- selector.wakeup在 wakenUp.getAndSet(false) 和 select(long timeout)之间执行
- selector.wakeup在 select(long timeout)和 if (wakenUp.get()){。。。}之间执行
情况2 下次执行select(long timeout)不会阻塞,算是尽量满足减少阻塞时间的需求。
情况1 由于执行了select(long timeout)后立即返回,导致selector 的wakeup状态复原,在这个期间,wakenUp.get()=false,后续执行 NioEventLoop.wakeup()不会调用成功,希望减少阻塞的目标没有达成,因此需要尽可能的完成目标。
if (wakenUp.get()) {
selector.wakeup();
}
这个尽可能的减少阻塞事件的处理有问题,如果情况2没有发生,会多执行了一次selector.wakeup();,猜测netty作者是经过权衡,才这么做的。
IO轮询方法中 IO事件IO任务按照配置好的时间比例执行,默认 50比50。selector.wakeup的运用是对此的优化。深究无用,理解到此就可以了。
netty源码分析7-NioEventLoop-run方法疑难点相关推荐
- Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用
Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程 第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...
- 【Netty源码分析摘录】(八)新连接的接入
文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...
- Netty源码分析系列之服务端Channel的端口绑定
扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...
- Netty源码分析系列之常用解码器(下)——LengthFieldBasedFrameDecoder
扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 前言 在上一篇文章中分析了三个比较简单的解码器,今天接着分析最后一个常用的解码器:Leng ...
- Netty源码分析(六)—Future和Promis分析
Netty源码分析(六)-Future和Promis分析 Future用来在异步执行中获取提前执行的结果 个人主页:tuzhenyu's page 原文地址:Netty源码分析(六)-Future和P ...
- Netty源码分析第6章(解码器)----第4节: 分隔符解码器
Netty源码分析第6章(解码器)---->第4节: 分隔符解码器 Netty源码分析第六章: 解码器 第四节: 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecode ...
- Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder
Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...
- Netty源码分析第5章(ByteBuf)----第5节: directArena分配缓冲区概述
Netty源码分析第5章(ByteBuf)---->第5节: directArena分配缓冲区概述 Netty源码分析第五章: ByteBuf 第五节: directArena分配缓冲区概述 上 ...
- netty源码分析系列——EventLoop
2019独角兽企业重金招聘Python工程师标准>>> 前言 EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面 ...
最新文章
- 计算机行业越来越卷,AI都会刷LeetCode了,网友:比我强
- 绍兴袍江计算机培训,绍兴春华电脑基础培训班
- html5 静态网页 线程,HTML5 Web Workers之网站也能多线程的实现
- B--Bookshelf 2
- php实现隐藏字符串的功能
- 如何在Jsp上传图片
- python中的PEP是什么?怎么理解?(转)
- docker 时区_centos7.X上部署docker并运行常用的应用
- 衔着树枝飞跃太平洋的傻鸟!(童话版)
- 系统架构师 项目经理 哪个更有前景_OLED和QLED电视有什么区别?哪个更好更有前景?...
- js src 变量_人人都能看懂的鸿蒙 “JS 小程序” 数据绑定原理
- C#: 数字经纬度和度分秒经纬度间的转换
- oracle 存储过程body,【随手记】Oracle存储过程报错 Compilation errors for PACKAGE BODY
- Access-Control-Allow- 跨域CORS 的使用
- 初探MYD-AM335x开发板
- 计算机组装后要干什么,电脑组装完后还有哪些事需要干?
- oracle 毫秒时间换mysql_Mysql与Oracle常用时间格式的转换
- JS中关于a+aa+aaa+aaaa的简便计算方法
- 四国外交官变身天猫双11“首席惊喜官”,给剁手党们送快递
- 岛屿类问题通用解法与DFS框架
热门文章
- 电机学他励直流发电机matlab,基于Matlab并励直流发电机的自励过程分析
- simulink实现他励直流电动机串电阻调速仿真
- 立冬、小雪、凛冬将至:如何理解美国科技企业裁员潮?
- 百度传课-php2小时超音速入门
- ae制h5文字动画_html5酷炫的文字打字动画特效
- Linux中FTP设置登录欢迎词,怎么为FTP登陆用户设置欢迎语(servu)
- HDR视频色调映射算法(之三:Block matching TMO)
- 计算机广告设计发展前景,2018广告设计与制作专业就业前景和就业方向分析
- STP生成树协议切割网络环路
- 中国交通标志牌数据集TT1OOK中的类别ID及其图标罗列以及含义详细介绍