2019独角兽企业重金招聘Python工程师标准>>>

前面 ,我们分析了NioEventLoop的创建过程,接下来我们开始分析NioEventLoop的启动和执行逻辑。

Netty版本:4.1.30

启动

在之前分析 Channel绑定 的文章中,提到过下面这段代码,先前只讲了 channel.bind() 绑定逻辑,跳过了execute() 接口,现在我们以这个为例,开始分析NioEventLoop的execute()接口,主要逻辑如下:

  • 添加任务队列
  • 绑定当前线程到EventLoop上
  • 调用EventLoop的run()方法

前面 ,我们分析了NioEventLoop的创建过程,接下来我们开始分析NioEventLoop的启动和执行逻辑。

Netty版本:4.1.30

启动

在之前分析 Channel绑定 的文章中,提到过下面这段代码,先前只讲了 channel.bind() 绑定逻辑,跳过了execute() 接口,现在我们以这个为例,开始分析NioEventLoop的execute()接口,主要逻辑如下:

  • 添加任务队列
  • 绑定当前线程到EventLoop上
  • 调用EventLoop的run()方法
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 通过eventLoop来执行channel绑定的Taskchannel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// channel绑定channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}

往下追踪到 SingleThreadEventExecutor 中 execute 接口,如下:

@Override
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判断当前运行时线程是否与EventLoop中绑定的线程一致// 这里还未绑定Thread,所以先返回falseboolean inEventLoop = inEventLoop();// 将任务添加任务队列,也就是我们前面讲EventLoop创建时候提到的 MpscQueue.addTask(task);if (!inEventLoop) {// 启动线程startThread();if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}

启动线程接口:

private void startThread() {// 状态比较,最开始时state = 1 ,为trueif (state == ST_NOT_STARTED) {// cs操作后,state状态设置为 2if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 启动接口doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}// 执行线程启动方法
private void doStartThread() {// 断言判断 SingleThreadEventExecutor 还未绑定 Threadassert thread == null;// executor 执行任务executor.execute(new Runnable() {@Overridepublic void run() {// 将 SingleThreadEventExecutor(在我们的案例中就是NioEventLoop) 与 当前线程进行绑定thread = Thread.currentThread();if (interrupted) {thread.interrupt();}// 设置状态为 falseboolean success = false;// 更新最近一次任务的执行时间updateLastExecutionTime();try {// 往下调用 NioEventLoop 的 run 方法,执行SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {...}}});
}

执行

往下调用到 NioEventLoop 中的 run 方法,通过无限for循环,主要做以下三件事情:

  • 轮循I/O事件:select(wakenUp.getAndSet(false))
  • 处理I/O事件:processSelectedKeys
  • 运行Task任务:runAllTasks
@Override
protected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:// 轮训检测I/O事件// wakenUp为了标记selector是否是唤醒状态,每次select操作,都设置为false,也就是未唤醒状态。select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' 总是在调用 'selector.wakeup()' 之前进行评估,以减少唤醒的开销// (Selector.wakeup() 是非常耗性能的操作.)// 但是,这种方法存在竞争条件。当「wakeup」太早设置为true时触发竞争条件// 在下面两种情况下,「wakenUp」会过早设置为true:// 1)Selector 在 'wakenUp.set(false)' 与 'selector.select(...)' 之间被唤醒。(BAD)// 2)Selector 在 'selector.select(...)' 与 'if (wakenUp.get()) { ... }' 之间被唤醒。(OK)// 在第一种情况下,'wakenUp'设置为true,后面的'selector.select(...)'将立即唤醒。 直到'wakenUp'在下一轮中再次设置为false,'wakenUp.compareAndSet(false,true)'将失败,因此任何唤醒选择器的尝试也将失败,从而导致以下'selector.select(。 ..)'呼吁阻止不必要的。// 要解决这个问题,如果在selector.select(...)操作之后wakenUp立即为true,我们会再次唤醒selector。 它是低效率的,因为它唤醒了第一种情况(BAD - 需要唤醒)和第二种情况(OK - 不需要唤醒)的选择器。if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;// ioRatio 表示处理I/O事件与执行具体任务事件之间所耗时间的比值。// ioRatio 默认为50final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {// 处理I/O事件processSelectedKeys();} finally {// 处理任务队列runAllTasks();}} else {// 处理IO事件的开始时间final long ioStartTime = System.nanoTime();try {// 处理I/O事件processSelectedKeys();} finally {// 记录io所耗时间final long ioTime = System.nanoTime() - ioStartTime;// 处理任务队列,设置最大的超时时间runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

轮循检测I/O事件

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {// select操作计数int selectCnt = 0;// 记录当前系统时间long currentTimeNanos = System.nanoTime();// delayNanos方法用于计算定时任务队列,最近一个任务的截止时间// selectDeadLineNanos 表示当前select操作所不能超过的最大截止时间long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {// 计算超时时间,判断是否超时long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 如果 timeoutMillis <= 0, 表示超时,进行一个非阻塞的 select 操作。设置 selectCnt 为 1. 并终止本次循环。if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 当wakenUp为ture时,恰好有task被提交,这个task将无法获得调用的机会// Selector#wakeup. 因此,在执行select操作之前,需要再次检查任务队列// 如果不这么做,这个Task将一直挂起,直到select操作超时// 如果 pipeline 中存在 IdleStateHandler ,那么Task将一直挂起直到 空闲超时。if (hasTasks() && wakenUp.compareAndSet(false, true)) {// 调用非阻塞方法selector.selectNow();selectCnt = 1;break;}// 如果当前任务队列为空,并且超时时间未到,则进行一个阻塞式的selector操作。timeoutMillis 为最大的select时间int selectedKeys = selector.select(timeoutMillis);// 操作计数 +1selectCnt ++;// 存在以下情况,本次selector则终止if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - 轮训到了事件(Selected something,)// - 被用户唤醒(waken up by user,)// - 已有任务队列(the task queue has a pending task.)// - 已有定时任务(a scheduled task is ready for processing)break;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}// 记录当前时间long time = System.nanoTime();// 如果time > currentTimeNanos + timeoutMillis(超时时间),则表明已经执行过一次select操作if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} // 如果 time <= currentTimeNanos + timeoutMillis,表示触发了空轮训// 如果空轮训的次数超过 SELECTOR_AUTO_REBUILD_THRESHOLD (512),则重建一个新的selctor,避免空轮训else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem.logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);// 重建创建一个新的selectorrebuildSelector();selector = this.selector;// Select again to populate selectedKeys.// 对重建后的selector进行一次非阻塞调用,用于获取最新的selectedKeysselector.selectNow();// 设置select计数selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}

重新创建一个新的Selector

该方法的主要逻辑就是:

  • 创建一个新的selector
  • 将老的selector上的 selectKey注册到新的 selector 上
public void rebuildSelector() {if (!inEventLoop()) {execute(new Runnable() {@Overridepublic void run() {rebuildSelector0();}});return;}rebuildSelector0();
}// 重新创建selector
private void rebuildSelector0() {// 暂存老的selectorfinal Selector oldSelector = selector;final SelectorTuple newSelectorTuple;if (oldSelector == null) {return;}try {// 创建一个新的 SelectorTuple// openSelector()在之前分析过了newSelectorTuple = openSelector();} catch (Exception e) {logger.warn("Failed to create a new Selector.", e);return;}// Register all channels to the new Selector.// 记录select上注册的channel数量int nChannels = 0;// 遍历老的 selector 上的 SelectionKey for (SelectionKey key: oldSelector.keys()) {// 获取 attachment,这里的attachment就是我们前面在讲 Netty Channel注册时,select会将channel赋值到 attachment 变量上。// 获取老的selector上注册的channel Object a = key.attachment();try {if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {continue;}// 获取兴趣集int interestOps = key.interestOps();// 取消 SelectionKeykey.cancel();// 将老的兴趣集重新注册到前面新创建的selector上SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);if (a instanceof AbstractNioChannel) {// Update SelectionKey((AbstractNioChannel) a).selectionKey = newKey;}// nChannels计数 + 1nChannels ++;} catch (Exception e) {logger.warn("Failed to re-register a Channel to the new Selector.", e);if (a instanceof AbstractNioChannel) {AbstractNioChannel ch = (AbstractNioChannel) a;ch.unsafe().close(ch.unsafe().voidPromise());} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;invokeChannelUnregistered(task, key, e);}}}// 设置新的 selectorselector = newSelectorTuple.selector;// 设置新的 unwrappedSelectorunwrappedSelector = newSelectorTuple.unwrappedSelector;try {// time to close the old selector as everything else is registered to the new one// 关闭老的selecloroldSelector.close();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to close the old Selector.", t);}}if (logger.isInfoEnabled()) {logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");}
}

处理I/O事件


private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363// 设置为null,有利于GC回收selectedKeys.keys[i] = null;// 获取 SelectionKey 中的 attachment, 我们这里就是 NioChannelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {// 处理 SelectedKeyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}// 处理 SelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// 获取Netty Channel中的 NioUnsafe 对象,用于后面的IO操作final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 判断 SelectedKey 的有效性,如果无效,则直接返回并关闭channelif (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymore// 关闭channelunsafe.close(unsafe.voidPromise());return;}try {// 获取 SelectionKey 中所有准备就绪的操作集int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.// 在调用处理READ与WRITE事件之间,先调用finishConnect()接口,避免异常 NotYetConnectedException 发生。if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.// 处理 WRITE 事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop// 处理 ACCEPT 与 READ 事件// 如果当前的EventLoop是WorkGroup,则表示有 READ 事件// 如果当前的EventLoop是BossGroup,则表示有 ACCEPT 事件,有新连接进来了if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 读取数据unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

关于 unsafe.read() 的分析,请看 后文

执行所有任务

接下来,我们了解一下执行具体Task任务的接口:runAllTasks。在EventLoop中,待执行的任务队列分为两种:一种是普通任务队列,一种是定时任务队列。

前面 我们讲 EventLoop 创建时提到过NioEventLoop中 taskQueue 的创建,是一个MpscQueue,关于高效率的MpscQueue 后面单独写文章进行介绍:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {...// 存放普通任务的队列private final Queue<Runnable> taskQueue;...protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);this.executor = ObjectUtil.checkNotNull(executor, "executor");// 创建TaskQueuetaskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}  ...}public final class NioEventLoop extends SingleThreadEventLoop {...// NioEventLoop 创建TaskQueue队列@Overrideprotected Queue<Runnable> newTaskQueue(int maxPendingTasks) {// This event loop never calls takeTask()return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);}...}

存放定时任务的队列在 AbstractScheduledEventExecutor 中,成员变量为 scheduledTaskQueue,代码如下:


public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {// 优先级队列的比较器private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =new Comparator<ScheduledFutureTask<?>>() {@Overridepublic int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {return o1.compareTo(o2);}};// 存放定时任务的优先级队列PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;// 创建定时任务队列    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {if (scheduledTaskQueue == null) {scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(SCHEDULED_FUTURE_TASK_COMPARATOR,// Use same initial capacity as java.util.PriorityQueue11);}return scheduledTaskQueue;}// 保存定时任务@Overridepublic ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(unit, "unit");if (delay < 0) {delay = 0;}validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}// 保存定时任务@Overridepublic <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(callable, "callable");ObjectUtil.checkNotNull(unit, "unit");if (delay < 0) {delay = 0;}validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<V>(this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}// 保存定时任务<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {// 判断是否为当前线程if (inEventLoop()) {// 添加定时任务队列scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {// 添加定时任务队列scheduledTaskQueue().add(task);}});}return task;}
}

Netty存放定时任务队列为 DefaultPriorityQueue ,定时任务的封装对象为 ScheduledFutureTask ,在队列中的优先按照它们的截止时间进行排序,其次在按照id进行排序。

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {...// 比较 ScheduledFutureTask 之间的排序@Overridepublic int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;long d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else if (id == that.id) {throw new Error();} else {return 1;}}    ...}

再来看看任务的执行逻辑,首先将定时任务取出,聚合到普通任务队列中,再去for循环运行每个Task。

protected boolean runAllTasks(long timeoutNanos) {// 将定时任务从定时队列中取出,放入普通队列中fetchFromScheduledTaskQueue();// 从队列中取出任务Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}// 计算任务执行的最大超时时间final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;// 任务计数long runTasks = 0;// 最近一次任务执行的时间long lastExecutionTime;for (;;) {// 执行任务safeExecute(task);// 任务计数 +1runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.// 由于nanoTime() 是非常好性能的操作,因此每64次就对比一下 定时任务的执行时间与 deadline,// 如果 lastExecutionTime >= deadline,则表示任务超时了,需要中断退出if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}// 获取任务task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();// 记录最后一次的执行时间this.lastExecutionTime = lastExecutionTime;return true;
}// 取出任务
protected Runnable pollTask() {assert inEventLoop();return pollTaskFrom(taskQueue);
}// 从队列中取出任务
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}
}// 将定时任务从定时队列中取出,聚合到普通队列中:
private boolean fetchFromScheduledTaskQueue() {// 得到nanoTime = 当前时间 - ScheduledFutureTask的START_TIME(开始时间)long nanoTime = AbstractScheduledEventExecutor.nanoTime();// 获得截止时间小于nanoTime的定时任务Runnable scheduledTask  = pollScheduledTask(nanoTime);while (scheduledTask != null) {// 将定时任务放入普通队列中,以备运行if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.// 如果 taskQueue 没有足够的空间,导致添加失败,则将其返回定时任务队列中scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);return false;}scheduledTask  = pollScheduledTask(nanoTime);}return true;
}// 获得截止时间小于nanoTime的定时任务
protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();// 获取定时任务队列Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;// 获取第一个定时任务ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();if (scheduledTask == null) {return null;}// 如果该定时任务的截止时间 <= nanoTime ,则返回if (scheduledTask.deadlineNanos() <= nanoTime) {scheduledTaskQueue.remove();return scheduledTask;}return null;
}

小结

好了,NioEventLoop的原理以及它的 创建 与 启动执行 流程到这里就分析完毕了。启动流程主要流程如下:

  • 将待执行的任务添加到任务队列中
  • 将当前线程绑定到EventLoop上
  • 轮循I/O事件,在轮循selector过程中,会对JDK的空轮循Bug做一个处理。
  • 处理I/O事件。
  • 运行Task任务。将定时任务聚合到普通任务队列中,然后在依次执行队列中的任务。

问题:

  • 默认情况下,netty服务端启动多少个线程?何时启动?
  • netty是如何解决空轮训Bug的?
  • netty是如何保证串行无锁化的?
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 通过eventLoop来执行channel绑定的Taskchannel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// channel绑定channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}

往下追踪到 SingleThreadEventExecutor 中 execute 接口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判断当前运行时线程是否与EventLoop中绑定的线程一致// 这里还未绑定Thread,所以先返回falseboolean inEventLoop = inEventLoop();// 将任务添加任务队列,也就是我们前面讲EventLoop创建时候提到的 MpscQueue.addTask(task);if (!inEventLoop) {// 启动线程startThread();if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}

启动线程接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void startThread() {// 状态比较,最开始时state = 1 ,为trueif (state == ST_NOT_STARTED) {// cs操作后,state状态设置为 2if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 启动接口doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}// 执行线程启动方法
private void doStartThread() {// 断言判断 SingleThreadEventExecutor 还未绑定 Threadassert thread == null;// executor 执行任务executor.execute(new Runnable() {@Overridepublic void run() {// 将 SingleThreadEventExecutor(在我们的案例中就是NioEventLoop) 与 当前线程进行绑定thread = Thread.currentThread();if (interrupted) {thread.interrupt();}// 设置状态为 falseboolean success = false;// 更新最近一次任务的执行时间updateLastExecutionTime();try {// 往下调用 NioEventLoop 的 run 方法,执行SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {...}}});
}

执行

往下调用到 NioEventLoop 中的 run 方法,通过无限for循环,主要做以下三件事情:

  • 轮循I/O事件:select(wakenUp.getAndSet(false))
  • 处理I/O事件:processSelectedKeys
  • 运行Task任务:runAllTasks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Override
protected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:// 轮训检测I/O事件// wakenUp为了标记selector是否是唤醒状态,每次select操作,都设置为false,也就是未唤醒状态。select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' 总是在调用 'selector.wakeup()' 之前进行评估,以减少唤醒的开销// (Selector.wakeup() 是非常耗性能的操作.)// 但是,这种方法存在竞争条件。当「wakeup」太早设置为true时触发竞争条件// 在下面两种情况下,「wakenUp」会过早设置为true:// 1)Selector 在 'wakenUp.set(false)' 与 'selector.select(...)' 之间被唤醒。(BAD)// 2)Selector 在 'selector.select(...)' 与 'if (wakenUp.get()) { ... }' 之间被唤醒。(OK)// 在第一种情况下,'wakenUp'设置为true,后面的'selector.select(...)'将立即唤醒。 直到'wakenUp'在下一轮中再次设置为false,'wakenUp.compareAndSet(false,true)'将失败,因此任何唤醒选择器的尝试也将失败,从而导致以下'selector.select(。 ..)'呼吁阻止不必要的。// 要解决这个问题,如果在selector.select(...)操作之后wakenUp立即为true,我们会再次唤醒selector。 它是低效率的,因为它唤醒了第一种情况(BAD - 需要唤醒)和第二种情况(OK - 不需要唤醒)的选择器。if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;// ioRatio 表示处理I/O事件与执行具体任务事件之间所耗时间的比值。// ioRatio 默认为50final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {// 处理I/O事件processSelectedKeys();} finally {// 处理任务队列runAllTasks();}} else {// 处理IO事件的开始时间final long ioStartTime = System.nanoTime();try {// 处理I/O事件processSelectedKeys();} finally {// 记录io所耗时间final long ioTime = System.nanoTime() - ioStartTime;// 处理任务队列,设置最大的超时时间runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

轮循检测I/O事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {// select操作计数int selectCnt = 0;// 记录当前系统时间long currentTimeNanos = System.nanoTime();// delayNanos方法用于计算定时任务队列,最近一个任务的截止时间// selectDeadLineNanos 表示当前select操作所不能超过的最大截止时间long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {// 计算超时时间,判断是否超时long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 如果 timeoutMillis <= 0, 表示超时,进行一个非阻塞的 select 操作。设置 selectCnt 为 1. 并终止本次循环。if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 当wakenUp为ture时,恰好有task被提交,这个task将无法获得调用的机会// Selector#wakeup. 因此,在执行select操作之前,需要再次检查任务队列// 如果不这么做,这个Task将一直挂起,直到select操作超时// 如果 pipeline 中存在 IdleStateHandler ,那么Task将一直挂起直到 空闲超时。if (hasTasks() && wakenUp.compareAndSet(false, true)) {// 调用非阻塞方法selector.selectNow();selectCnt = 1;break;}// 如果当前任务队列为空,并且超时时间未到,则进行一个阻塞式的selector操作。timeoutMillis 为最大的select时间int selectedKeys = selector.select(timeoutMillis);// 操作计数 +1selectCnt ++;// 存在以下情况,本次selector则终止if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - 轮训到了事件(Selected something,)// - 被用户唤醒(waken up by user,)// - 已有任务队列(the task queue has a pending task.)// - 已有定时任务(a scheduled task is ready for processing)break;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}// 记录当前时间long time = System.nanoTime();// 如果time > currentTimeNanos + timeoutMillis(超时时间),则表明已经执行过一次select操作if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} // 如果 time <= currentTimeNanos + timeoutMillis,表示触发了空轮训// 如果空轮训的次数超过 SELECTOR_AUTO_REBUILD_THRESHOLD (512),则重建一个新的selctor,避免空轮训else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem.logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);// 重建创建一个新的selectorrebuildSelector();selector = this.selector;// Select again to populate selectedKeys.// 对重建后的selector进行一次非阻塞调用,用于获取最新的selectedKeysselector.selectNow();// 设置select计数selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}

重新创建一个新的Selector

该方法的主要逻辑就是:

  • 创建一个新的selector
  • 将老的selector上的 selectKey注册到新的 selector 上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public void rebuildSelector() {if (!inEventLoop()) {execute(new Runnable() {@Overridepublic void run() {rebuildSelector0();}});return;}rebuildSelector0();
}// 重新创建selector
private void rebuildSelector0() {// 暂存老的selectorfinal Selector oldSelector = selector;final SelectorTuple newSelectorTuple;if (oldSelector == null) {return;}try {// 创建一个新的 SelectorTuple// openSelector()在之前分析过了newSelectorTuple = openSelector();} catch (Exception e) {logger.warn("Failed to create a new Selector.", e);return;}// Register all channels to the new Selector.// 记录select上注册的channel数量int nChannels = 0;// 遍历老的 selector 上的 SelectionKey for (SelectionKey key: oldSelector.keys()) {// 获取 attachment,这里的attachment就是我们前面在讲 Netty Channel注册时,select会将channel赋值到 attachment 变量上。// 获取老的selector上注册的channel Object a = key.attachment();try {if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {continue;}// 获取兴趣集int interestOps = key.interestOps();// 取消 SelectionKeykey.cancel();// 将老的兴趣集重新注册到前面新创建的selector上SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);if (a instanceof AbstractNioChannel) {// Update SelectionKey((AbstractNioChannel) a).selectionKey = newKey;}// nChannels计数 + 1nChannels ++;} catch (Exception e) {logger.warn("Failed to re-register a Channel to the new Selector.", e);if (a instanceof AbstractNioChannel) {AbstractNioChannel ch = (AbstractNioChannel) a;ch.unsafe().close(ch.unsafe().voidPromise());} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;invokeChannelUnregistered(task, key, e);}}}// 设置新的 selectorselector = newSelectorTuple.selector;// 设置新的 unwrappedSelectorunwrappedSelector = newSelectorTuple.unwrappedSelector;try {// time to close the old selector as everything else is registered to the new one// 关闭老的selecloroldSelector.close();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to close the old Selector.", t);}}if (logger.isInfoEnabled()) {logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");}
}

处理I/O事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363// 设置为null,有利于GC回收selectedKeys.keys[i] = null;// 获取 SelectionKey 中的 attachment, 我们这里就是 NioChannelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {// 处理 SelectedKeyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}// 处理 SelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// 获取Netty Channel中的 NioUnsafe 对象,用于后面的IO操作final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 判断 SelectedKey 的有效性,如果无效,则直接返回并关闭channelif (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymore// 关闭channelunsafe.close(unsafe.voidPromise());return;}try {// 获取 SelectionKey 中所有准备就绪的操作集int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.// 在调用处理READ与WRITE事件之间,先调用finishConnect()接口,避免异常 NotYetConnectedException 发生。if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.// 处理 WRITE 事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop// 处理 ACCEPT 与 READ 事件// 如果当前的EventLoop是WorkGroup,则表示有 READ 事件// 如果当前的EventLoop是BossGroup,则表示有 ACCEPT 事件,有新连接进来了if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 读取数据unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

关于 unsafe.read() 的分析,请看 后文

执行所有任务

接下来,我们了解一下执行具体Task任务的接口:runAllTasks。在EventLoop中,待执行的任务队列分为两种:一种是普通任务队列,一种是定时任务队列。

前面 我们讲 EventLoop 创建时提到过NioEventLoop中 taskQueue 的创建,是一个MpscQueue,关于高效率的MpscQueue 后面单独写文章进行介绍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {...// 存放普通任务的队列private final Queue<Runnable> taskQueue;...protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);this.executor = ObjectUtil.checkNotNull(executor, "executor");// 创建TaskQueuetaskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}  ...}public final class NioEventLoop extends SingleThreadEventLoop {...// NioEventLoop 创建TaskQueue队列@Overrideprotected Queue<Runnable> newTaskQueue(int maxPendingTasks) {// This event loop never calls takeTask()return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);}...}

存放定时任务的队列在 AbstractScheduledEventExecutor 中,成员变量为 scheduledTaskQueue,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {// 优先级队列的比较器private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =new Comparator<ScheduledFutureTask<?>>() {@Overridepublic int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {return o1.compareTo(o2);}};// 存放定时任务的优先级队列PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;// 创建定时任务队列    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {if (scheduledTaskQueue == null) {scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(SCHEDULED_FUTURE_TASK_COMPARATOR,// Use same initial capacity as java.util.PriorityQueue11);}return scheduledTaskQueue;}// 保存定时任务@Overridepublic ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(unit, "unit");if (delay < 0) {delay = 0;}validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}// 保存定时任务@Overridepublic <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(callable, "callable");ObjectUtil.checkNotNull(unit, "unit");if (delay < 0) {delay = 0;}validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<V>(this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}// 保存定时任务<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {// 判断是否为当前线程if (inEventLoop()) {// 添加定时任务队列scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {// 添加定时任务队列scheduledTaskQueue().add(task);}});}return task;}
}

Netty存放定时任务队列为 DefaultPriorityQueue ,定时任务的封装对象为 ScheduledFutureTask ,在队列中的优先按照它们的截止时间进行排序,其次在按照id进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {...// 比较 ScheduledFutureTask 之间的排序@Overridepublic int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;long d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else if (id == that.id) {throw new Error();} else {return 1;}}    ...}

再来看看任务的执行逻辑,首先将定时任务取出,聚合到普通任务队列中,再去for循环运行每个Task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
protected boolean runAllTasks(long timeoutNanos) {// 将定时任务从定时队列中取出,放入普通队列中fetchFromScheduledTaskQueue();// 从队列中取出任务Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}// 计算任务执行的最大超时时间final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;// 任务计数long runTasks = 0;// 最近一次任务执行的时间long lastExecutionTime;for (;;) {// 执行任务safeExecute(task);// 任务计数 +1runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.// 由于nanoTime() 是非常好性能的操作,因此每64次就对比一下 定时任务的执行时间与 deadline,// 如果 lastExecutionTime >= deadline,则表示任务超时了,需要中断退出if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}// 获取任务task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();// 记录最后一次的执行时间this.lastExecutionTime = lastExecutionTime;return true;
}// 取出任务
protected Runnable pollTask() {assert inEventLoop();return pollTaskFrom(taskQueue);
}// 从队列中取出任务
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}
}// 将定时任务从定时队列中取出,聚合到普通队列中:
private boolean fetchFromScheduledTaskQueue() {// 得到nanoTime = 当前时间 - ScheduledFutureTask的START_TIME(开始时间)long nanoTime = AbstractScheduledEventExecutor.nanoTime();// 获得截止时间小于nanoTime的定时任务Runnable scheduledTask  = pollScheduledTask(nanoTime);while (scheduledTask != null) {// 将定时任务放入普通队列中,以备运行if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.// 如果 taskQueue 没有足够的空间,导致添加失败,则将其返回定时任务队列中scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);return false;}scheduledTask  = pollScheduledTask(nanoTime);}return true;
}// 获得截止时间小于nanoTime的定时任务
protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();// 获取定时任务队列Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;// 获取第一个定时任务ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();if (scheduledTask == null) {return null;}// 如果该定时任务的截止时间 <= nanoTime ,则返回if (scheduledTask.deadlineNanos() <= nanoTime) {scheduledTaskQueue.remove();return scheduledTask;}return null;
}

小结

好了,NioEventLoop的原理以及它的 创建 与 启动执行 流程到这里就分析完毕了。启动流程主要流程如下:

  • 将待执行的任务添加到任务队列中
  • 将当前线程绑定到EventLoop上
  • 轮循I/O事件,在轮循selector过程中,会对JDK的空轮循Bug做一个处理。
  • 处理I/O事件。
  • 运行Task任务。将定时任务聚合到普通任务队列中,然后在依次执行队列中的任务。

问题:

  • 默认情况下,netty服务端启动多少个线程?何时启动?
  • netty是如何解决空轮训Bug的?
  • netty是如何保证串行无锁化的?

转载于:https://my.oschina.net/u/3837147/blog/2413894

Netty NioEventLoop 启动过程源码分析相关推荐

  1. Android系统默认Home应用程序(Launcher)的启动过程源码分析

    在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还须要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...

  2. Activity启动流程源码分析-浅析生命周期函数

    源码分析 接着上一篇 Activity启动流程源码分析-setContentView源码阅读 的讲解,本节介绍一下Activity的生命周期函数何时被调用 要看Activity的生命周期函数何时被调用 ...

  3. SpringBoot2 | SpringBoot启动流程源码分析(一)

    首页 博客 专栏·视频 下载 论坛 问答 代码 直播 能力认证 高校 会员中心 收藏 动态 消息 创作中心 SpringBoot2 | SpringBoot启动流程源码分析(一) 置顶 张书康 201 ...

  4. Activity启动流程源码分析(基于Android N)

    Activity启动流程源码分析 一个Activity启动分为两种启动方式,一种是从Launcher界面上的图标点击启动,另一种是从一个Activity中设置按钮点击启动另外一个Activity.这里 ...

  5. NioEventLoop启动流程源码解析

    NioEventLoop的启动时机是在服务端的NioServerSocketChannel中的ServerSocketChannel初始化完成,且注册在NioEventLoop后执行的, 下一步就是去 ...

  6. Activity启动过程源码分析

    老罗的Android系统源码分析讲的很不错,网上有很不同层面多源码分析.了解细节,还是自己看源码最直接.个人并没有透彻的研究过Android系统,这一系列的博客就当是读Android源码笔记了.有不对 ...

  7. DataNode启动流程源码分析

    我们都知道在Hadoop hdfs文件系统中,Datanode是负责hdfs文件对应的数据块存储管理的组件,其会在启动时向NameNode汇报其上拥有的数据块,以及周期性心跳并接收来自NameNode ...

  8. Tomcat启动过程源码分析六

    前言 上一篇文章中我们讨论了Catalina类中start方法中一部分,今天这篇文章我们把Catalina类的start方法剩余部分讲解完毕,在讲解代码之前我们先看之前的一篇关于ShutdownHoo ...

  9. Spring Boot启动过程源码分析--转

    https://blog.csdn.net/dm_vincent/article/details/76735888 关于Spring Boot,已经有很多介绍其如何使用的文章了,本文从源代码(基于Sp ...

  10. 【Spring】SpringIOC容器启动过程源码分析 以及 循环依赖问题

    1.Spring是什么 Spring是一款轻量级的开发框架 . 简而言之 Spring提高了开发效率 两个核心 IOC 和 AOP 1.1 IOC ( Inversion of Control ) 是 ...

最新文章

  1. CNN回应中方谴责 否认冒犯中国人
  2. 温故知新 javascript 正则表达式
  3. 81.游戏项目-物体任意角度飞行和停止
  4. 给你的主机防火墙添加l7-filter
  5. 最简单的Asp.Net 2.0 TreeView的Checkbox级联操作
  6. Android仿人人客户端(v5.7.1)——Auth授权认证(整理流程,重构代码)
  7. 牛客 黑龙江大学程序设计竞赛重现 19-4-25 D
  8. eplan连接定义点不显示_CAD和EPLAN!电气制图你会选择哪个?
  9. Python 高斯列主元消去法求增广矩阵/方程组的解 Numpy模块
  10. 图片转icon图标并在项目中引用
  11. hmmlearn源代码
  12. 洪水攻击程序c语言,洪水攻击原理及代码实现全攻略(附源代码)
  13. [javascript]替换所有带/的字符串
  14. 【手机投影】安卓手机投影到WIN10
  15. Java HashMap的put方法
  16. 针对大众点评网上商铺评论字体替换反爬的反反爬
  17. 奥斯卡 | hulu拿到小金人!迪士尼共斩获六项
  18. Python的函数返回值和参数
  19. [源码分享] HIVE表数据量统计邮件
  20. 英语四级+六级词汇大全(全部带“音标”)

热门文章

  1. 1-5Badboy添加检查点和参数化
  2. caffe数据集——LMDB
  3. 九度OJ 1068:球的半径和体积 (基础题)
  4. CakePHP 2.x CookBook 中文版 第七章 模型 之 数据校验
  5. china-pub春季教材展,给力优惠,买二赠一
  6. R中的 url编码 和 解码
  7. hdu-5754 Life Winner Bo(博弈)
  8. 簡單的爬蟲 二 ಥ_ಥ 爬一爬 一個博客的每篇文件的標題
  9. Web分页打印 细线表格+分页打印之终极攻略(转载)
  10. leetcode(90)子集 2