Netty服务器启动源码剖析
Netty服务器启动源码剖析
文章目录
- Netty服务器启动源码剖析
- 1、Netty服务器启动源码剖析
- 1.1、执行new NioEventLoopGroup()时发生了什么
- 1.1.1、NioEventLoopGroup 成分
- 1.1.2、追踪 new NioEventLoopGroup()
- 1.1.3、总结
- 1.2、引导类ServerBootstrap的创建与配置
- 1.2.1、ServerBootstrap 的成分
- 1.2.2、ServerBootstrap 的配置
- 1.2.3、总结
- 1.3、执行ServerBootstrap.bind(PORT)时发生了什么
- 1.3.1、doBind 源码
- 1.3.2、run 死循环
- 1.3.3、总结
1、Netty服务器启动源码剖析
1.1、执行new NioEventLoopGroup()时发生了什么
本次分析创建workerGroup的过程(创建 bossGroup 的过程同理):
/** * EventLoopGroup 是一个线程组,其中的每一个线程都在循环执行着三件事情: * select:轮询注册在其中的 Selector 上的 Channel 的 IO 事件 * processSelectedKeys:在对应的 Channel 上处理 IO 事件 * runAllTasks:再去以此循环处理任务队列中的其他任务 */ EventLoopGroup workGroup = new NioEventLoopGroup();
1.1.1、NioEventLoopGroup 成分
Debug查看 workerGroup 的“成分”,可以看到它包含 8(cpu核数*2) 个 NioEventLoop,每个 NioEventLoop 里面有选择器、任务队列、执行器等等:
NioEventLoop继承关系图:
1.1.2、追踪 new NioEventLoopGroup()
追踪 new NioEventLoopGroup() 的底层调用:
(1)new NioEventLoopGroup()
红色框圈住的构造方法的源码为:
/*** Create a new instance.** @param nThreads the number of threads that will be used by this instance.* @param executor the Executor to use, or {@code null} if the default should be used.* @param chooserFactory the {@link EventExecutorChooserFactory} to use.* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}// 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());// ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command//public final class ThreadPerTaskExecutor implements Executor {// private final ThreadFactory threadFactory;//// public ThreadPerTaskExecutor(ThreadFactory threadFactory) {// if (threadFactory == null) {// throw new NullPointerException("threadFactory");// }// this.threadFactory = threadFactory;// }//// @Override// public void execute(Runnable command) {// threadFactory.newThread(command).start();// }//}}// 这里定义了一个容量为 nThreads 的 EventExecutor 的数组children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {// 往 EventExecutor 数组中添加元素children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {// 添加元素失败,则 shutdown 每一个 EventExecutorfor (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}// chooser 的作用是为了实现 next()方法,即从 group 中挑选一个 NioEventLoop 来处理连接上 IO 事件的方法chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {// EventExecutor 的终止事件回调方法@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {// 通过本类中定义的 Promise 属性的.setSuccess()方法设置结果,所有的监听者可以拿到该结果terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {// 为每一个 EventExecutor 添加终止事件监听器e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
(2):newChild(executor, args) 方法,主要关注 NioEventLoop 中的选择器、任务队列、执行器等成分是从哪来的。
这里的 newChild() 方法包含了构建每一个 NioEventLoop 的细节,可以看到,newChild()调用了 NioEventLoop 的构造函数来构建每一个 NioEventLoop 实例。
执行器(executor)
调用 NioEventLoop 的构造函数的时候,传入的参数 parent 为上一层调用者,executor 为 ThreadPerTaskExecutor 的实例。上文的代码注释已经讲明了其来源和功能,如下:
// 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数 if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());// ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command//public final class ThreadPerTaskExecutor implements Executor {// private final ThreadFactory threadFactory;//// public ThreadPerTaskExecutor(ThreadFactory threadFactory) {// if (threadFactory == null) {// throw new NullPointerException("threadFactory");// }// this.threadFactory = threadFactory;// }//// @Override// public void execute(Runnable command) {// threadFactory.newThread(command).start();// }//} }
选择器(selector)
NioEventLoop 的构造方法中有一个 openSelector(),它完成了选择器(多路复用器)的初始化。其中 provider(源码追踪图1的step-3)、selectStrategy(源码追踪图1的step-4) 由上一层传入。
openSelector() 源码分析
private SelectorTuple openSelector() {final Selector unwrappedSelector;try {// 通过往下追踪发现 provider.openSelector()最终调用了 WindowsSelectorImpl 类的构造方法构造出一个 Selector,因此 unwrappedSelector 是 WindowsSelectorImpl 的实例unwrappedSelector = provider.openSelector();//public class WindowsSelectorProvider extends SelectorProviderImpl {// public WindowsSelectorProvider() {// }//// public AbstractSelector openSelector() throws IOException {// return new WindowsSelectorImpl(this);// }//}} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}// Netty 对 NIO 的 Selector 的 selectedKeys 进行了优化(默认设置),用户可以通过 io.netty.noKeySetOptimization 开关决定是否启用该优化项。// 常量 DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);if (DISABLE_KEY_SET_OPTIMIZATION) {// 若没有开启 selectedKeys 优化,直接返回return new SelectorTuple(unwrappedSelector);// SelectorTuple(Selector unwrappedSelector) {// this.unwrappedSelector = unwrappedSelector;// this.selector = unwrappedSelector;// }}// 若开启 selectedKeys 优化,需要通过反射的方式从 Selector 实例中获取 selectedKeys 和 publicSelectedKeys,将上述两个成员变量置为可写,然后通过反射的方式使用 Netty 构造的 selectedKeys 包装类selectedKeySet 将原 JDK 的 selectedKeys 替换掉。Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {return Class.forName("sun.nio.ch.SelectorImpl",false,PlatformDependent.getSystemClassLoader());} catch (Throwable cause) {return cause;}}});if (!(maybeSelectorImplClass instanceof Class) ||// ensure the current selector implementation is what we can instrument.!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {Throwable t = (Throwable) maybeSelectorImplClass;logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);}return new SelectorTuple(unwrappedSelector);}final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.// This allows us to also do this in Java9+ without any extra flags.long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset =PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}// We could not retrieve the offset, lets try reflection as last-resort.}Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);if (cause != null) {return cause;}cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);if (cause != null) {return cause;}selectedKeysField.set(unwrappedSelector, selectedKeySet);publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);return null;} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}}});if (maybeException instanceof Exception) {selectedKeys = null;Exception e = (Exception) maybeException;logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);return new SelectorTuple(unwrappedSelector);}selectedKeys = selectedKeySet;logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);return new SelectorTuple(unwrappedSelector,new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
任务队列(taskQueue)
在 NioEventLoop 的构造器中,通过 rejectedExecutionHandler、queueFactory 构造任务队列,newTaskQueue()根据参数 queueFactory 产生 Queue的实例。其中 rejectedExecutionHandler (源码追踪图1的step-5)由上一层传入,queueFactory可以自定义传入,否则为空。
追踪 super 方法
- 看到 newTaskQueue()根据参数 queueFactory 产生的 Queue实例最终被赋值给了 SingleThreadEventExecutor 的 taskQueue 属性,taskQueue 是 SingleThreadEventExecutor 中的任务队列,而 NioEventLoop 又继承于 SingleThreadEventExecutor,因此 NioEventLoop 也就具有这个任务队列了。
- 同理,NioEventLoop 中的定时任务队列 scheduledTaskQueue 也是这么得到的:AbstractScheduledEventExecutor 包含 scheduledTaskQueue 属性,NioEventLoop 又继承于 AbstractScheduledEventExecutor,构造 NioEventLoop 的时候初始化这个 scheduledTaskQueue,因此 NioEventLoop 就有了定时任务队列。
1.1.3、总结
(1)NioEventLoopGroup 的无参数构造函数会调用 NioEventLoopGroup 的有参数构造函数,最终把下面的参数传递给父类 MultithreadEventLoopGroup 的有参数构造函数。
nThreads=cpu核数*2 executor=null chooserFactory=DefaultEventExecutorChooserFactory.INSTANCE selectorProvider=SelectorProvider.provider() selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE rejectedExecutionHandler=RejectedExecutionHandlers.reject()
(2)父类 MultithreadEventLoopGroup 的有参数构造函数创建一个 NioEventLoop 的容器 children = new EventExecutor[nThreads],并构建出 若干个 NioEventLoop 的实例放入其中。
(3)构建每一个 NioEventLoop 调用的是 children[i] = newChild(executor, args)。
(4)newChild()方法最终调用了 NioEventLoop 的构造函数,初始化其中的选择器、任务队列、执行器等成分。
(5)本节只详述了 NioEventLoop 中选择器、任务队列、执行器三个成分的用途和由来,对于其他成分,可按照本节的代码追踪思路继续探究。
1.2、引导类ServerBootstrap的创建与配置
本节一起看下服务端启动类 ServerBootstrap 的创建与配置代码背后的逻辑。
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 设置线程组 serverBootstrap.group(bossGroup, workerGroup) // 说明服务器端通道的实现类(便于 Netty 做反射处理) .channel(NioServerSocketChannel.class) // 临时存放已完成三次握手的请求的队列的最大长度。 .option(ChannelOption.SO_BACKLOG, 100) // 对服务端的 NioServerSocketChannel 添加 Handler // LoggingHandler 是 netty 内置的一种 ChannelDuplexHandler,既可以处理出站事件,又可以处理入站事件,即 LoggingHandler 既记录出站日志又记录入站日志。 .handler(new LoggingHandler(LogLevel.INFO)) // 对服务端接收到的、与客户端之间建立的 SocketChannel 添加 Handler .childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {// sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理p.addLast(sslCtx.newHandler(ch.alloc()));}// 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志// p.addLast(new LoggingHandler(LogLevel.INFO));// 业务 serverHandlerp.addLast(serverHandler);} });
ServerBootstrap 提供了一系列的链式配置方法,具体而言就是 ServerBootstrap 对象的一个配置方法(比如.group())处理完配置参数之后,会将当前 ServerBootstrap 对象返回,这样就能紧随其后继续调用该对象的其他配置方法(比如.channel())。这是面向对象语言中常见的一种编程模式。
1.2.1、ServerBootstrap 的成分
此时 ServerBootstrap 刚刚被创建,且未进行设置。它包含一个 ServerBootstrapConfig 对象,而这个对象又引用了 ServerBootstrap 对象,因此两个是互相引用、互相包含的关系。此外还包含了 group、handler、childGroup、childHandler 等成分,目前这些成分都为 null,后面进行的各种设置就是为这些成分赋值。
ServerBootstrap 和 Bootstrap 一样,都继承于抽象类 AbstractBootstrap。因此两者具备很多相同的属性和 API,例如 group、channelFactory、localAddress、options、attrs、handler、channel()、channelFactory()、register()、bind()等等。
1.2.2、ServerBootstrap 的配置
.group(bossGroup, workerGroup):作用是把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。
.channel(NioServerSocketChannel.class):作用是通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。
- 服务端的 NioServerSocketChannel 实例就是通过这个 channelFactory 创建的,不过现在还没有开始创建,要等到后面调用.bind()的时候才会创建。
.option(ChannelOption.XXX, YYY):作用是将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。
.childOption(ChannelOption.XXX, YYY):作用是将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。
.handler(ChannelHandler handler):作用是将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。
- 这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
.childHandler(ChannelHandler handler):作用是为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。
ChannelInitializer 本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。
.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {// sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理p.addLast(sslCtx.newHandler(ch.alloc()));}// 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志// p.addLast(new LoggingHandler(LogLevel.INFO));// 业务 serverHandlerp.addLast(serverHandler);}})
p.addLast(serverHandler):调用了 Pipeline 的 addLast 方法向 Pipeline 中的双向链表添加 ChannelHandlerContext 元素:
1.2.3、总结
- (1).group(bossGroup, workerGroup)把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。
- (2).channel(NioServerSocketChannel.class)通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。在调用.bind()的时候 channelFactory 会创建 NioServerSocketChannel 的实例。
- (3).option(ChannelOption.XXX, YYY) 将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。
- (4).childOption(ChannelOption.XXX, YYY) 将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。
- (5).handler(ChannelHandler handler) 将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
- (6).childHandler(ChannelHandler handler) 参数通常使用ChannelInitializer,其本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。
1.3、执行ServerBootstrap.bind(PORT)时发生了什么
本节介绍的 bind(PORT) ,实质是调用 AbstractBootstrap 的 doBind(final SocketAddress localAddress) 方法。
1.3.1、doBind 源码
private ChannelFuture doBind(final SocketAddress localAddress) 源码:
PS: 主要关注 initAndRegister() 和 doBind0(regFuture, channel, localAddress, promise)。
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();// (1) 初始化 NioServerSocketChannel 的实例,并且将其注册到 bossGroup 中的 EvenLoop 中的 Selector 中final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// 若异步过程 initAndRegister()已经执行完毕,则进入该分支// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);//(2) 调用底层 JDK 接口完成端口绑定和监听return promise;} else {// 若异步过程 initAndRegister()还未执行完毕,则进入该分支// Registration future is almost always fulfilled already, but just in case it's not.final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);// 监听 regFuture 的完成事件,完成之后再调用 doBind0(regFuture, channel, localAddress, promise);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;} }
(1)initAndRegister() 源码:
PS: 主要关注 newChannel() 、init(channel)、register(channel) 方法。
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();//(1-1) 创建 NioServerSocketChannel 实例init(channel);//(1-2) 对该 NioServerSocketChannel 进行初始化} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = config().group().register(channel);//(1-3) 最终把 NioServerSocketChannel 实例注册到 bossGroup 中 EventLoop 中的 Selector 上。if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture; }
(1-1) newChannel() 源码追踪:
- 前面介绍 ServerBootstrap 的配置.channel(NioServerSocketChannel.class) 已经说明了 channelFactory 的作用。
- newChannel() 实质是 ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例。
- 在 NioServerSocketChannel 的空构造方法往下追踪源码,会发现传递了 SelectionKey.OP_ACCEPT 参数,并且赋予给 readInterestOp 属性,作用是标识该 Channel 感兴趣的事件。
- 继续追踪下去会在 AbstractChannel(Channel parent) 中的 newChannelPipeline() -> DefaultChannelPipeline(Channel channel) 创建了 ChannelPipeline。里面的 head、tail 实质是 ChannelHandlerContext 类型的双向链表。
(1-2)init(channel) 源码:
PS: init(channel) 方法在 AbstractBootstrap 中是抽象方法,在 ServerBootstrap 中进行了实现。注意这里添加了 ServerBootstrapAcceptor ,而且这是一个 ChannelInboundHandler。
/** * ServerBootstrap.init()方法,它在 channel = channelFactory.newChannel() 之后被执行,用于初始化这个 channel */ @Override void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);// 通过.option()设置的 TCP 参数就在这里应用//static void setChannelOptions(// Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {// for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {// setChannelOption(channel, e.getKey(), e.getValue(), logger);// }//}}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 通过.attr()设置的附加属性就在这里应用for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {// 获取 NioServerSocketChannel 实例的 pipelinefinal ChannelPipeline pipeline = ch.pipeline();// 这里的 config.handler()就是前面通过 .handler(ChannelHandler handler) 设置的 handlerChannelHandler handler = config.handler();if (handler != null) {// 将这个 handler 添加到 NioServerSocketChannel 实例的 pipeline 中pipeline.addLast(handler);}// 异步执行向 pipeline 添加 ServerBootstrapAcceptor 的步骤ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// ServerBootstrapAcceptor 是一个 ChannelInboundHandlerpipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}}); }
(1-3)register(channel) 源码追踪:
在上图的最后一个红色框圈住的代码处,NioServerSocketChannel 的实例被注册到 bossGroup 中 EventLoop 中的 Selector 上(ops: 0 在这里猜测是ready的意思,因为后面会在 AbstractNioChannel.doBeginRead() 方法中真正设置key感兴趣的ops)
doBeginRead() 方法会在channel首次注册激活或者每次readComplete之后发生(如果开启了isAutoRead,默认是开启的)。需要注意的是,即使读事件发生的时候,readyOps是0,同样可以进行read。
(2)doBind0(regFuture, channel, localAddress, promise) 源码追踪:
- 在 NioServerSocketChannel 中的 javaChannel().bind(localAddress, config.getBacklog()) 调用底层 JDK 接口完成端口绑定和监听。
- 追踪下去会发现最终调用了一个 Native 方法把.bind(PORT)最终托管给了 JVM,然后 JVM 进行系统调用。
1.3.2、run 死循环
在调用register0、doBind0等方法的时候,会委托给 EventLoop 去执行,如果是当前 EventLoop,直接执行 register0 方法,否则会交给 EventLoop.execute(Runnable task)(一般情况下都是会这样异步执行)。
// SingleThreadEventExecutor.execute(Runnable task): @Override public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();// 添加任务到队列addTask(task);if (!inEventLoop) {// 如果当前线程不属于该 EventLoop,EventLoop 需要启动新线程。最终会执行 SingleThreadEventExecutor.this.run() 方法,进入到了 NioEventLoop 中 run 方法的死循环里。startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);} }
异步添加任务到队列之后,会在 run 循环里面,通过 runAllTasks() 方法执行队列里面的任务。即register0、doBind0等方法会在这时候被处理。
NioEventLoop 中的死循环,不断执行以下三个过程:
- select:轮训注册在其中的 Selector 上的 Channel 的 IO 事件。
- processSelectedKeys:在对应的 Channel 上处理 IO 事件。
- runAllTasks:再去以此循环处理任务队列中的其他任务。
// NioEventLoop.run(): @Override protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}} }
1.3.3、总结
- (1)首先调用 AbstractBootstrap 中的 initAndRegister() 方法完成 NioServerSocketChannel 实例的初始化和注册。
- (2)然后调用 NioServerSocketChannel 实例的 doBind() 方法,最终调用 sun.nio.ch.Net 中的 bind()和 listen() 完成端口绑定和客户端连接监听。
- (3)在真正 register0(注册)和 bind0 (绑定)之前,会委托当前 eventLoop 的 executor 去执行,实质上是在死循环run方法中通过 runAllTasks() 方法执行 eventLoop 的队列里面的任务。
Netty服务器启动源码剖析相关推荐
- 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!
前言 前面小飞已经讲解了NIO和Netty服务端启动,这一讲是Client的启动过程. 源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Fi ...
- 源码剖析 Netty 服务启动 NIO
如果这个文章看不懂的话 , 建议反复阅读 Netty 与 Reactor 开篇立意.引用网友好的建议.看源码要针对性的看,最佳实践就是,带着明确的目的去看源码.抓主要问题,放弃小问题.主要的逻辑理解了 ...
- 老李推荐: 第8章4节《MonkeyRunner源码剖析》MonkeyRunner启动运行过程-启动AndroidDebugBridge 1...
老李推荐: 第8章4节<MonkeyRunner源码剖析>MonkeyRunner启动运行过程-启动AndroidDebugBridge 上一节我们看到在启动AndroidDebugBri ...
- 老李推荐:第8章2节《MonkeyRunner源码剖析》MonkeyRunner启动运行过程-解析处理命令行参数...
老李推荐:第8章2节<MonkeyRunner源码剖析>MonkeyRunner启动运行过程-解析处理命令行参数 MonkeyRunnerStarter是MonkeyRunner启动时的入 ...
- React Native 启动流程 源码剖析
开始之前 开始分析之前,新建一个名为 RnDemo 的空项目,RN 版本选择 0.58.1,查看项目自动为我们生成 MainActivity.java 和 MainApplication.java ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- 老李推荐:第5章5节《MonkeyRunner源码剖析》Monkey原理分析-启动运行: 获取系统服务引用 1...
老李推荐:第5章5节<MonkeyRunner源码剖析>Monkey原理分析-启动运行: 获取系统服务引用 上一节我们描述了monkey的命令处理入口函数run是如何调用optionPro ...
- Mongoose源码剖析:外篇之web服务器
引言 在深入Mongoose源码剖析之前,我们应该清楚web服务器是什么?它提供什么服务?怎样提供服务?使用什么协议?客户端如何唯一标识web服务器的资源?下面我们抛开Mongoose,来介绍一个we ...
- Spring源码剖析——Bean的配置与启动
IOC介绍 相信大多数人在学习Spring时 IOC 和 Bean 算得上是最常听到的两个名词,IOC在学习Spring当中出现频率如此之高必然有其原因.如果我们做一个比喻的话,把Bean说成Sp ...
最新文章
- 如何利用 C# 爬取带 Token 验证的网站数据?
- 非常量引用的初始值必须是左值_C++核心编程--引用
- 中国互联网的抑郁:抄与被抄都很痛
- MySQL数据库同步小工具(Java实现)
- yii 加载php文件,Yii2框架加载css和js文件的方法分析
- 引导类加载器 Bootstrap ClassLoader
- 揭开不一样的世界,这5部纪录片绝对不能错过!
- 工作228:小程序学习2开始布局页面2
- 用SSE加速CPU蒙皮计算
- 九章基础算法02:栈和队列
- JavaScript基础学习(一)—JavaScript简介
- 三星智能电视将用户语音隐私泄露给第三方?
- 无聊时分析了下目前国内和国外汽车消费市场的区域性分布
- [Poi2000]公共串 hustoj2797
- 阿里巴巴矢量图标 iconfont 下载图标分辨率小一点、并占得内存小一点呢
- 深度神经网络主要模型,深度神经网络预测模型
- 逆向学习第二天如何手动脱UPX、Aspack壳
- 解决制作FAT32格式的重装U盘中文件过大问题
- Linux 日志切割神器 Logrotate 原理和配置详解(附多生产实例)
- 上海高二物理公式整理
热门文章
- 2022/3/8——手把手配置一台崭新的华为交换机配置堆叠(附使用console线配置交换机方法)
- [USACO4.4]追查坏牛奶Pollutant Control
- Twitter蓝伟纶:推特现在进中国也打不过微博微信了丨CES 中国之夜
- wap2app(五)-- 微信授权登录以及踩过的坑
- 全套恒压供水(3托3)图纸程序 功能: 三拖三(3台变频3台水泵),3台水泵循环软启
- Cesium结合Echarts的使用
- xilinx SDK MSS文件
- 微信小程序毕业设计开题报告家教信息管理系统|招聘求职兼职+后台管理系统|前后分离VUE.js
- 解读DO-254(机载电子硬件设计保证指南)
- EOF的意义及用法(while(scanf(%d,n) != EOF))