@Overrideprotected void run() {for (;;) {try {           switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT://select轮询, 设置wakenUp为false并返回之前的wakenUp值select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}//去除了无关紧要的代码processSelectedKeys();runAllTasks();                } catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception....}}

前面讲到Reactor的核心是执行了NioEventLoop的run方法,主要做了上面三件事:

  • 轮询注册到reactor线程上的对应的selector的所有channel的IO事件
  • 根据不同的SelectKeys进行处理  processSelectedKeys();
  • 处理任务队列 runAllTasks();

接下来再详细看下processSelectedKeys()和runAllTasks();  方法做了什么

processSelectedKeys

  private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}

这里的processSelectedkeys()方法会根据selectedKeys是否为空,判断执行优化后的processSelectedKeysOptimized()还是普通的processSelectedKeysPlain()方法

这里的selectedKeys Netty在调用openSelector时对其进行了优化

 private SelectedSelectionKeySet selectedKeys;private Selector openSelector() {final Selector selector;selector = provider.openSelector();final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");selectedKeysField.setAccessible(true);publicSelectedKeysField.setAccessible(true);selectedKeysField.set(selector, selectedKeySet);publicSelectedKeysField.set(selector, selectedKeySet);return null;         }});selectedKeys = selectedKeySet;return selector;}

先创建一个空的SelectedSelectionKeySet对象,然后通过反射获取jdk 底层的Selector 的class 对象的 selectedKeys和publicSelectedKeys字段,并将Netty的SelectedSelectionKeySet通过反射赋值,这样在底层调用jdk的api存储注册事件时,最后都会把事件保存到Netty的SelectedSelectionKeySet 对象里

可以看下替换前后有什么区别,jdk底层的SelectImpl对象的selectedKeys和publicSelectedKeys字段都是Set<SelectionKey>类型,而Netty里的SelectedSelectionKeySet对象是这样的一个结构:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {private SelectionKey[] keysA;private int keysASize;private SelectionKey[] keysB;private int keysBSize;private boolean isA = true;@Overridepublic boolean add(SelectionKey o) {if (o == null) {return false;}//添加元素到数组的最后,如果数组满了,就进行扩容(*2)if (isA) {int size = keysASize;keysA[size ++] = o;keysASize = size;if (size == keysA.length) {doubleCapacityA();}} else {...}return true;}//移除对应的SelectionKey数组的最后一个元素SelectionKey[] flip() {if (isA) {isA = false;keysA[keysASize] = null;keysBSize = 0;return keysA;} else {...}}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<SelectionKey> iterator() {throw new UnsupportedOperationException();}
}

SelectedSelectionKeySet是AbstractSet的一个子类,底层通过SelectionKey[]数组方法实现,并且将一些不需要的方法remove,contains方法进行重写,Netty里轮询事件的时候对操作进行了简化,不需要通过集合的Iterator进行移除,而直接通过flip方法去掉集合的最后一个SelectionKey就可以了(这样的操作的时间复杂度更低,可以直接定位到具体的下标),而我们在使用NIO的API的时候都需要进行remove操作
4.1.6.Final中的源码,这里的SelectionKey是两个数组交替遍历的,在4.1.9.Final 版本中,netty已经将SelectedSelectionKeySet底层使用一个数组了:SelectedSelectionKeySet

接着来看下 processSelectedKeysOptimized(selectedKeys.flip());方法

 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//拿到SelectionKey的attachment,并根据其类型做不同处理final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {//如果需要重新select,就将selectedKeys的元素都置为null恢复初始的状态for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again. selectedKeys = this.selectedKeys.flip();i = -1;}}}

上述过程可以分为三步:

  • 取出SelectionKey(包含channel,attachment等信息)
  • 这里看到SelectionKey的attachment类型可能是AbstractNioChannel,猜测是不是在注册事件的时间添加的,根据ServerBootstrap的启动流程,最后会调用AbstractNioChannel的如下方法:

    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
    

    这里的最后一个参数也就是attachment,当前对象不就是AbstractNioChannel的子类

  • 处理SelectionKey
  • private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;eventLoop = ch.eventLoop();if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise());return;}int readyOps = k.readyOps();//连接建立事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;//1.将连接事件从interestOps中移除k.interestOps(ops);//2.调用pipeline().fireChannelActive()将连接建立完成通知给pipeline中的各个handler unsafe.finishConnect();}//可写事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}   //可读事件         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}}

可以看出这里就是一系列NIO的操作,分别对OP_READ, 可读事件, OP_WRITE, 可写事件, OP_CONNECT, 连接事件进行处理

以OP_READ事件为例

public final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;do {//1.分配ByteBufbyteBuf = allocHandle.allocate(allocator);//2.从Channel读取数据allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;break;}allocHandle.incMessagesRead(1);readPending = false;//3.通过pipeline.fireChannelRead事件通知给pipeline里的各个handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}}}
  • 判断是否需要重新Select并重置
void cancel(SelectionKey key) {key.cancel();cancelledKeys ++;if (cancelledKeys >= CLEANUP_INTERVAL) {cancelledKeys = 0;needsToSelectAgain = true;}}

这里的cancelledKeys会在调用cancel(SelectionKey)删除注册事件的时候计数,当他大于CLEANUP_INTERVAL(256)的时候,就会将needsToSelectAgain设置为true,进入对应的分支判断,先将原来的selectedKeys都置为Null,然后重新调用selectNow(),重新填充selectedKeys

总结:
netty的NioEventLoop线程第二步做的事情就是处理SelectionKey,netty使用数组替换掉jdk原生的HashSet来优化查询和更新SelectionKey的效率,每个SelectionKey上绑定了netty类AbstractNioChanne的具体实现子类对象作为attachment,在处理每个SelectionKey的时候,就可以找到对应的AbstractNioChannel,最后通过pipeline来处理通知给其他Handler

任务执行runAllTasks

任务添加

添加普通任务

前面的分析说过NioEventLoop 是Netty的核心线程,其添加任务是通过执行父类SingleThreadEventExecutor的execute方法,
通过addTask方法,将Runnable(即task)添加到对应的任务队列 Queue<Runnable> taskQueue;里

public void execute(Runnable task) {boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);           }       }

Netty的源码里的bind()流程中有通过如下方法添加对应的task到SingleThreadEventExecutor的任务队列里,如下:

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

用户也可以通过如下方式自己添加task到TaskQueue

EventLoop eventLoop = channel.eventLoop();
eventLoop.execute(new Runnable() {@Overridepublic void run() {//TODO }
});

添加定时任务

除了上述方式,我们还可以通过如下方法添加定时任务到对应的任务队列

EventLoop eventLoop = channel.eventLoop();
eventLoop.schedule(new Runnable() {@Overridepublic void run() {//TODO }
}, 30, TimeUnit.SECONDS);

具体的实现是在父类AbstractScheduledEventExecutor里,看下对应的源码

 public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {...if (delay < 0) {throw new IllegalArgumentException(String.format("delay: %d (expected: >= 0)", delay));}return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task);}});}return task;}

会将对应的Runnable和延迟时间封装成一个新的ScheduledFutureTask,然后调用重载的schedule方法,将对应的task添加到PriorityQueue<ScheduledFutureTask<?>>的优先队列里

这里对添加定时任务的Thread进行了判断,如果调用的发起方是reactor线程,那么就直接将Task添加到优先队列中;如果是外部线程调用的schedule,会将"添加定时任务到优先队列"封装成一个Runnable也就是新的task,然后调用上面的execute方法去添加任务,这样会访问PriorityQueue的就只有reactor线程了,变成了单线程

接下来我们来详细看下这个特殊的优先队列PriorityQueue<ScheduledFutureTask<?>>,所谓的优先队列与普通队列的区别在于每个元素都被赋予了优先级。当访问元素时,会将具有最高优先级的元素最先弹出。即优先队列具有最高级先出的特征

看下这个优先队列里的元素ScheduledFutureTask,它实现了Comparable接口,定义了自己的compareTo方法,先比较deadlineNanos(也就是截止时间)的大小,如果一样则比较id,如果也相同就抛出异常

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {private static final AtomicLong nextTaskId = new AtomicLong();private final long id = nextTaskId.getAndIncrement();@Overridepublic int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;long d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else if (id == that.id) {throw new Error();} else {return 1;}}
}

既然ScheduledFutureTask本质也是一个Runnable,那么就看下它的run方法吧
这里对于不同的类型任务进行了不同的处理,periodNanos=0表示是只执行一次的任务,>0 表示是按照指定频率定期执行的任务,<0表示是每次执行完成后,延迟一段时间再次执行的任务(二者的区别在于一个是根据上次任务开始执行的时间计算间隔,一个是按照上次任务执行结束的时间计算间隔)

 /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */private final long periodNanos;    @Overridepublic void run() {if (periodNanos == 0) {if (setUncancellableInternal()) {V result = task.call();setSuccessInternal(result);}} else {if (!isCancelled()) {task.call();if (!executor().isShutdown()) {long p = periodNanos;if (p > 0) {//设置该任务的下一次截止时间为本次的截止时间加上间隔时间periodNanosdeadlineNanos += p;} else {//设置下一次截止时间为当前时间加上延迟(因为p<0,所以要减去) 此时的当前时间就是本次任务之间结束的时间 task.call()是一个阻塞的方法deadlineNanos = nanoTime() - p;}if (!isCancelled()) {//将新的ScheduledFutureTask添加到任务队列等待下次执行Queue<ScheduledFutureTask<?>> scheduledTaskQueue =((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;assert scheduledTaskQueue != null;scheduledTaskQueue.add(this);}}}}}

Task任务的执行

有两个重载的runAllTasks方法,一个无参,一个带有long timeoutNanos参数,先来看下无参的方法

 protected boolean runAllTasks() {assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne = false;do {fetchedAll = fetchFromScheduledTaskQueue();if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();}afterRunningAllTasks();return ranAtLeastOne;}

主要做下面三件事情:

1.将优先队列里的ScheduledFutureTask取出放到taskQueue里
2.从taskQueue里取出task并执行
3.task任务执行完毕后执行后置处理逻辑

将任务从优先队列移动到taskQueue

  private boolean fetchFromScheduledTaskQueue() {long nanoTime = AbstractScheduledEventExecutor.nanoTime();Runnable scheduledTask  = pollScheduledTask(nanoTime);while (scheduledTask != null) {if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);return false;}scheduledTask  = pollScheduledTask(nanoTime);}return true;}protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();if (scheduledTask == null) {return null;}if (scheduledTask.deadlineNanos() <= nanoTime) {scheduledTaskQueue.remove();return scheduledTask;}return null;}

先从scheduledTaskQueue优先队列里拿到对应优先级最高的task(截止时间最近的Task),判断当前是否已到达其截止时间,是的话就将其从优先队列中取出并删除元素,然后将其加入到taskQueue中,如果加入失败就重新加入到scheduledTaskQueue中,一直到所有的优先队列里的task都迁移成功

简单来说就是把已经到期的定时任务从PriorityQueue转移到taskQueue

执行task

   protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {safeExecute(task);task = pollTaskFrom(taskQueue);if (task == null) {return true;}}}protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}}protected static void safeExecute(Runnable task) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);}}

从taskQueue中取出非WAKEUP_TASK的任务,然后调用safeExecute() --内部之间调用task.run()来安全执行所有的task,一直到所有的task都执行完毕

后置处理

 @Overrideprotected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);}

当所有的task执行完毕之后,我们还可以执行一些自己的task,通过afterRunningAllTasks方法来执行在tailTasks队列里的所有任务,我们可以通过SingleThreadEventLoop的executeAfterEventLoopIteration向tailTasks里添加自己想要执行的业务逻辑

task的执行还有一个带有超时时间的重载方法,如下:

 protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();//从taskQueue poll获取任务Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}//计算当前方法超时的截止时间final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task);runTasks ++;//位运算,说明runTasks是64的倍数 0x3F=0011 1111 (63)if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}

基本思路和不带参数的runAllTasks一样,区别在于会考虑所有任务执行的超时时间,为了提高执行效率,每执行64个任务都会比较下当前时间是否大于runAllTasks的截止时间,是的话就退出

从上面可以看出,我们的EventLoopGroup 既需要执行 IO 操作, 又需要执行 很多的task, 因此在调用对应execute 方法添加任务的时候, 不要提交耗时任务, 更不能提交一些会造成阻塞的任务, 不然会导致我们的 IO 线程得不到调度, 影响整个程序的并发量

总结一下:

  • netty内的任务可分为普通任务和定时任务,分别保存在LinkedBlockingQueue和PriorityQueue
  • netty执行任务之前,会将已经到期的定时任务从PriorityQueue转移到LinkedBlockingQueue
  • 如果执行任务有超时时间,那么会每执行64个任务校验下是否达到截止时间

参考:
netty源码分析之揭开reactor线程的面纱(二)
netty源码分析之揭开reactor线程的面纱(三)
Netty 源码分析-EventLoop​​​​​​​

Netty学习笔记(四)EventLoopGroup续篇相关推荐

  1. Netty学习笔记(三)EventLoopGroup开篇

    使用Netty都需要定义EventLoopGroup,也就是线程池 前面讲过在客户端只需要一个EventLoopGroup就够了,而在服务端就需要两个Group--bossGroup和workerGr ...

  2. Netty学习笔记(六)Pipeline的传播机制

    前面简单提到了下Pipeline的传播机制,这里再详细分析下 Pipeline的传播机制中有两个非常重要的属性inbound和outbound(AbstractChannelHandlerContex ...

  3. Netty学习笔记(二)Netty服务端流程启动分析

    先贴下在NIO和Netty里启动服务端的代码 public class NioServer { /*** 指定端口号启动服务* */public boolean startServer(int por ...

  4. Netty学习笔记(二) 实现服务端和客户端

    在Netty学习笔记(一) 实现DISCARD服务中,我们使用Netty和Python实现了简单的丢弃DISCARD服务,这篇,我们使用Netty实现服务端和客户端交互的需求. 前置工作 开发环境 J ...

  5. Netty学习笔记二网络编程

    Netty学习笔记二 二. 网络编程 1. 阻塞模式 阻塞主要表现为: 连接时阻塞 读取数据时阻塞 缺点: 阻塞单线程在没有连接时会阻塞等待连接的到达,连接到了以后,要进行读取数据,如果没有数据,还要 ...

  6. Netty学习笔记一NIO基础

    Netty学习笔记一 一. NIO 基础 non-blocking io 非阻塞IO (也可称为new IO, 因为是JDK1.4加入的) 1. 三大组件 1.1 Channel 通道:数据的传输通道 ...

  7. C#可扩展编程之MEF学习笔记(四):见证奇迹的时刻

    前面三篇讲了MEF的基础和基本到导入导出方法,下面就是见证MEF真正魅力所在的时刻.如果没有看过前面的文章,请到我的博客首页查看. 前面我们都是在一个项目中写了一个类来测试的,但实际开发中,我们往往要 ...

  8. IOS学习笔记(四)之UITextField和UITextView控件学习

    IOS学习笔记(四)之UITextField和UITextView控件学习(博客地址:http://blog.csdn.net/developer_jiangqq) Author:hmjiangqq ...

  9. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

最新文章

  1. To handle Unhandled Exception
  2. torch max 判断与筛选
  3. 奥西300工程机服务器装系统,奥西工程机ftp服务器登录
  4. 二、1、怎么做都好做,没flag就抓包
  5. 国网浙江电力组建网络安全分析室
  6. 软考-信息系统项目管理师-项目成本管理
  7. easy ui 使用总结
  8. 我的世界服务器传送系统,我的世界多人服务器任何人都没权限了 不能回主城 不能传送 等等...
  9. 在css中使用hover来控制其他元素的样式,该两个元素必须是父子元素
  10. [我的成长:1004期]春节快乐
  11. 汽车行业准则:自信地采用 AIAG-VDA FMEA 方法
  12. 微信小程序生成二维码接口调用
  13. [渝粤教育] 西北农林科技大学 土壤学 参考 资料
  14. ppi 各代iphone_各代iPhone逻辑分辨率与物理分辨率
  15. 宝塔面板FTP存储空间无法连接的问题
  16. 卡内基梅隆大学计算机专业介绍,卡内基梅隆大学计算机专业介绍 全美大学计算机专业榜首...
  17. Linux CGroup之freezer分析与应用
  18. 计算机网络丢包排查,ping命令图文教程,电脑测试网络丢包延迟,检测网络故障通不通...
  19. 2. 企业发放的奖金根据利润提成. 利润(I)低于或等于10万元时, 奖金可提10%;利润高于10万元, 低于20万元时, 低于10万元的部分按10%提成,...
  20. SDWebImage详细解析

热门文章

  1. spring项目搭建-导包对象准备
  2. 原型模式codeing
  3. python加密反编译_对Python源码进行加密及反编译前后对比
  4. python的md5
  5. JavaScript实现省市二级联动
  6. 数据挖掘算法学习(四)PCA算法
  7. 持续集成接口自动化-jmeter+ant+jenkins(一)
  8. Openfire使用上的一些技巧
  9. cobbler自动化安装详解
  10. 超经典解释什么叫网关