Netty服务器启动源码剖析

文章目录

  • Netty服务器启动源码剖析
    • 1、Netty服务器启动源码剖析
      • 1.1、执行new NioEventLoopGroup()时发生了什么
        • 1.1.1、NioEventLoopGroup 成分
        • 1.1.2、追踪 new NioEventLoopGroup()
        • 1.1.3、总结
      • 1.2、引导类ServerBootstrap的创建与配置
        • 1.2.1、ServerBootstrap 的成分
        • 1.2.2、ServerBootstrap 的配置
        • 1.2.3、总结
      • 1.3、执行ServerBootstrap.bind(PORT)时发生了什么
        • 1.3.1、doBind 源码
        • 1.3.2、run 死循环
        • 1.3.3、总结

1、Netty服务器启动源码剖析

1.1、执行new NioEventLoopGroup()时发生了什么

本次分析创建workerGroup的过程(创建 bossGroup 的过程同理):

/**
* EventLoopGroup 是一个线程组,其中的每一个线程都在循环执行着三件事情:
* select:轮询注册在其中的 Selector 上的 Channel 的 IO 事件
* processSelectedKeys:在对应的 Channel 上处理 IO 事件
* runAllTasks:再去以此循环处理任务队列中的其他任务
*/
EventLoopGroup workGroup = new NioEventLoopGroup();
1.1.1、NioEventLoopGroup 成分
  • Debug查看 workerGroup 的“成分”,可以看到它包含 8(cpu核数*2) 个 NioEventLoop,每个 NioEventLoop 里面有选择器、任务队列、执行器等等:

  • NioEventLoop继承关系图:

1.1.2、追踪 new NioEventLoopGroup()
  • 追踪 new NioEventLoopGroup() 的底层调用:

    • (1)new NioEventLoopGroup()

      • 红色框圈住的构造方法的源码为:

            /*** Create a new instance.** @param nThreads          the number of threads that will be used by this instance.* @param executor          the Executor to use, or {@code null} if the default should be used.* @param chooserFactory    the {@link EventExecutorChooserFactory} to use.* @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}// 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());// ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command//public final class ThreadPerTaskExecutor implements Executor {//    private final ThreadFactory threadFactory;////    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {//        if (threadFactory == null) {//            throw new NullPointerException("threadFactory");//        }//        this.threadFactory = threadFactory;//    }////    @Override//    public void execute(Runnable command) {//        threadFactory.newThread(command).start();//    }//}}// 这里定义了一个容量为 nThreads 的 EventExecutor 的数组children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {// 往 EventExecutor 数组中添加元素children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {// 添加元素失败,则 shutdown 每一个 EventExecutorfor (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}// chooser 的作用是为了实现 next()方法,即从 group 中挑选一个 NioEventLoop 来处理连接上 IO 事件的方法chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {// EventExecutor 的终止事件回调方法@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {// 通过本类中定义的 Promise 属性的.setSuccess()方法设置结果,所有的监听者可以拿到该结果terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {// 为每一个 EventExecutor 添加终止事件监听器e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
    • (2):newChild(executor, args) 方法,主要关注 NioEventLoop 中的选择器、任务队列、执行器等成分是从哪来的。

      这里的 newChild() 方法包含了构建每一个 NioEventLoop 的细节,可以看到,newChild()调用了 NioEventLoop 的构造函数来构建每一个 NioEventLoop 实例。

      • 执行器(executor)

        • 调用 NioEventLoop 的构造函数的时候,传入的参数 parent 为上一层调用者,executor 为 ThreadPerTaskExecutor 的实例。上文的代码注释已经讲明了其来源和功能,如下:

          // 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数
          if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());// ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command//public final class ThreadPerTaskExecutor implements Executor {//    private final ThreadFactory threadFactory;////    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {//        if (threadFactory == null) {//            throw new NullPointerException("threadFactory");//        }//        this.threadFactory = threadFactory;//    }////    @Override//    public void execute(Runnable command) {//        threadFactory.newThread(command).start();//    }//}
          }
          
      • 选择器(selector)

        • NioEventLoop 的构造方法中有一个 openSelector(),它完成了选择器(多路复用器)的初始化。其中 provider(源码追踪图1的step-3)、selectStrategy(源码追踪图1的step-4) 由上一层传入。

        • openSelector() 源码分析

          private SelectorTuple openSelector() {final Selector unwrappedSelector;try {// 通过往下追踪发现 provider.openSelector()最终调用了 WindowsSelectorImpl 类的构造方法构造出一个 Selector,因此 unwrappedSelector 是 WindowsSelectorImpl 的实例unwrappedSelector = provider.openSelector();//public class WindowsSelectorProvider extends SelectorProviderImpl {//    public WindowsSelectorProvider() {//    }////    public AbstractSelector openSelector() throws IOException {//        return new WindowsSelectorImpl(this);//    }//}} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}// Netty 对 NIO 的 Selector 的 selectedKeys 进行了优化(默认设置),用户可以通过 io.netty.noKeySetOptimization 开关决定是否启用该优化项。// 常量 DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);if (DISABLE_KEY_SET_OPTIMIZATION) {// 若没有开启 selectedKeys 优化,直接返回return new SelectorTuple(unwrappedSelector);//    SelectorTuple(Selector unwrappedSelector) {//        this.unwrappedSelector = unwrappedSelector;//        this.selector = unwrappedSelector;//    }}// 若开启 selectedKeys 优化,需要通过反射的方式从 Selector 实例中获取 selectedKeys 和 publicSelectedKeys,将上述两个成员变量置为可写,然后通过反射的方式使用 Netty 构造的 selectedKeys 包装类selectedKeySet 将原 JDK 的 selectedKeys 替换掉。Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {return Class.forName("sun.nio.ch.SelectorImpl",false,PlatformDependent.getSystemClassLoader());} catch (Throwable cause) {return cause;}}});if (!(maybeSelectorImplClass instanceof Class) ||// ensure the current selector implementation is what we can instrument.!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {Throwable t = (Throwable) maybeSelectorImplClass;logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);}return new SelectorTuple(unwrappedSelector);}final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.// This allows us to also do this in Java9+ without any extra flags.long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset =PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}// We could not retrieve the offset, lets try reflection as last-resort.}Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);if (cause != null) {return cause;}cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);if (cause != null) {return cause;}selectedKeysField.set(unwrappedSelector, selectedKeySet);publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);return null;} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}}});if (maybeException instanceof Exception) {selectedKeys = null;Exception e = (Exception) maybeException;logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);return new SelectorTuple(unwrappedSelector);}selectedKeys = selectedKeySet;logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);return new SelectorTuple(unwrappedSelector,new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
          }
          
      • 任务队列(taskQueue)

        • 在 NioEventLoop 的构造器中,通过 rejectedExecutionHandlerqueueFactory 构造任务队列,newTaskQueue()根据参数 queueFactory 产生 Queue的实例。其中 rejectedExecutionHandler (源码追踪图1的step-5)由上一层传入,queueFactory可以自定义传入,否则为空。

        • 追踪 super 方法

          • 看到 newTaskQueue()根据参数 queueFactory 产生的 Queue实例最终被赋值给了 SingleThreadEventExecutor 的 taskQueue 属性,taskQueue 是 SingleThreadEventExecutor 中的任务队列,而 NioEventLoop 又继承于 SingleThreadEventExecutor,因此 NioEventLoop 也就具有这个任务队列了。
          • 同理,NioEventLoop 中的定时任务队列 scheduledTaskQueue 也是这么得到的:AbstractScheduledEventExecutor 包含 scheduledTaskQueue 属性,NioEventLoop 又继承于 AbstractScheduledEventExecutor,构造 NioEventLoop 的时候初始化这个 scheduledTaskQueue,因此 NioEventLoop 就有了定时任务队列。
1.1.3、总结
  • (1)NioEventLoopGroup 的无参数构造函数会调用 NioEventLoopGroup 的有参数构造函数,最终把下面的参数传递给父类 MultithreadEventLoopGroup 的有参数构造函数。

    nThreads=cpu核数*2
    executor=null
    chooserFactory=DefaultEventExecutorChooserFactory.INSTANCE
    selectorProvider=SelectorProvider.provider()
    selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE
    rejectedExecutionHandler=RejectedExecutionHandlers.reject()
    
  • (2)父类 MultithreadEventLoopGroup 的有参数构造函数创建一个 NioEventLoop 的容器 children = new EventExecutor[nThreads],并构建出 若干个 NioEventLoop 的实例放入其中。

  • (3)构建每一个 NioEventLoop 调用的是 children[i] = newChild(executor, args)。

  • (4)newChild()方法最终调用了 NioEventLoop 的构造函数,初始化其中的选择器、任务队列、执行器等成分。

  • (5)本节只详述了 NioEventLoop 中选择器、任务队列、执行器三个成分的用途和由来,对于其他成分,可按照本节的代码追踪思路继续探究。

1.2、引导类ServerBootstrap的创建与配置

本节一起看下服务端启动类 ServerBootstrap 的创建与配置代码背后的逻辑。

ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置线程组
serverBootstrap.group(bossGroup, workerGroup)
// 说明服务器端通道的实现类(便于 Netty 做反射处理)
.channel(NioServerSocketChannel.class)
// 临时存放已完成三次握手的请求的队列的最大长度。
.option(ChannelOption.SO_BACKLOG, 100)
// 对服务端的 NioServerSocketChannel 添加 Handler
// LoggingHandler 是 netty 内置的一种 ChannelDuplexHandler,既可以处理出站事件,又可以处理入站事件,即 LoggingHandler 既记录出站日志又记录入站日志。
.handler(new LoggingHandler(LogLevel.INFO))
// 对服务端接收到的、与客户端之间建立的 SocketChannel 添加 Handler
.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {// sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理p.addLast(sslCtx.newHandler(ch.alloc()));}// 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志// p.addLast(new LoggingHandler(LogLevel.INFO));// 业务 serverHandlerp.addLast(serverHandler);}
});

ServerBootstrap 提供了一系列的链式配置方法,具体而言就是 ServerBootstrap 对象的一个配置方法(比如.group())处理完配置参数之后,会将当前 ServerBootstrap 对象返回,这样就能紧随其后继续调用该对象的其他配置方法(比如.channel())。这是面向对象语言中常见的一种编程模式。

1.2.1、ServerBootstrap 的成分

  • 此时 ServerBootstrap 刚刚被创建,且未进行设置。它包含一个 ServerBootstrapConfig 对象,而这个对象又引用了 ServerBootstrap 对象,因此两个是互相引用、互相包含的关系。此外还包含了 group、handler、childGroup、childHandler 等成分,目前这些成分都为 null,后面进行的各种设置就是为这些成分赋值。

  • ServerBootstrap 和 Bootstrap 一样,都继承于抽象类 AbstractBootstrap。因此两者具备很多相同的属性和 API,例如 group、channelFactory、localAddress、options、attrs、handler、channel()、channelFactory()、register()、bind()等等。

1.2.2、ServerBootstrap 的配置
  • .group(bossGroup, workerGroup):作用是把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。

  • .channel(NioServerSocketChannel.class):作用是通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。

    • 服务端的 NioServerSocketChannel 实例就是通过这个 channelFactory 创建的,不过现在还没有开始创建,要等到后面调用.bind()的时候才会创建。
  • .option(ChannelOption.XXX, YYY):作用是将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。

  • .childOption(ChannelOption.XXX, YYY):作用是将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。

  • .handler(ChannelHandler handler):作用是将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。

    • 这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
  • .childHandler(ChannelHandler handler):作用是为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。

    • ChannelInitializer 本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。

       .childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {// sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理p.addLast(sslCtx.newHandler(ch.alloc()));}// 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志// p.addLast(new LoggingHandler(LogLevel.INFO));// 业务 serverHandlerp.addLast(serverHandler);}})
      
      • p.addLast(serverHandler):调用了 Pipeline 的 addLast 方法向 Pipeline 中的双向链表添加 ChannelHandlerContext 元素:

1.2.3、总结
  • (1).group(bossGroup, workerGroup)把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。
  • (2).channel(NioServerSocketChannel.class)通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。在调用.bind()的时候 channelFactory 会创建 NioServerSocketChannel 的实例。
  • (3).option(ChannelOption.XXX, YYY) 将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。
  • (4).childOption(ChannelOption.XXX, YYY) 将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。
  • (5).handler(ChannelHandler handler) 将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
  • (6).childHandler(ChannelHandler handler) 参数通常使用ChannelInitializer,其本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。

1.3、执行ServerBootstrap.bind(PORT)时发生了什么

本节介绍的 bind(PORT) ,实质是调用 AbstractBootstrap 的 doBind(final SocketAddress localAddress) 方法。

1.3.1、doBind 源码
  • private ChannelFuture doBind(final SocketAddress localAddress) 源码:

    PS: 主要关注 initAndRegister() 和 doBind0(regFuture, channel, localAddress, promise)。

    private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();// (1) 初始化 NioServerSocketChannel 的实例,并且将其注册到 bossGroup 中的 EvenLoop 中的 Selector 中final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// 若异步过程 initAndRegister()已经执行完毕,则进入该分支// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);//(2) 调用底层 JDK 接口完成端口绑定和监听return promise;} else {// 若异步过程 initAndRegister()还未执行完毕,则进入该分支// Registration future is almost always fulfilled already, but just in case it's not.final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);// 监听 regFuture 的完成事件,完成之后再调用 doBind0(regFuture, channel, localAddress, promise);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
    }
    
    • (1)initAndRegister() 源码:

      PS: 主要关注 newChannel() 、init(channel)、register(channel) 方法。

      final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();//(1-1) 创建 NioServerSocketChannel 实例init(channel);//(1-2) 对该 NioServerSocketChannel 进行初始化} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = config().group().register(channel);//(1-3) 最终把 NioServerSocketChannel 实例注册到 bossGroup 中 EventLoop 中的 Selector 上。if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.//    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully//    added to the event loop's task queue for later execution.//    i.e. It's safe to attempt bind() or connect() now://         because bind() or connect() will be executed *after* the scheduled registration task is executed//         because register(), bind(), and connect() are all bound to the same thread.return regFuture;
      }
      
      • (1-1) newChannel() 源码追踪:

        • 前面介绍 ServerBootstrap 的配置.channel(NioServerSocketChannel.class) 已经说明了 channelFactory 的作用。
        • newChannel() 实质是 ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例。
        • 在 NioServerSocketChannel 的空构造方法往下追踪源码,会发现传递了 SelectionKey.OP_ACCEPT 参数,并且赋予给 readInterestOp 属性,作用是标识该 Channel 感兴趣的事件。
        • 继续追踪下去会在 AbstractChannel(Channel parent) 中的 newChannelPipeline() -> DefaultChannelPipeline(Channel channel) 创建了 ChannelPipeline。里面的 head、tail 实质是 ChannelHandlerContext 类型的双向链表。
      • (1-2)init(channel) 源码:

        PS: init(channel) 方法在 AbstractBootstrap 中是抽象方法,在 ServerBootstrap 中进行了实现。注意这里添加了 ServerBootstrapAcceptor ,而且这是一个 ChannelInboundHandler。

        /**
        * ServerBootstrap.init()方法,它在 channel = channelFactory.newChannel() 之后被执行,用于初始化这个 channel
        */
        @Override
        void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);// 通过.option()设置的 TCP 参数就在这里应用//static void setChannelOptions(//        Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {//    for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {//        setChannelOption(channel, e.getKey(), e.getValue(), logger);//    }//}}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 通过.attr()设置的附加属性就在这里应用for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {// 获取 NioServerSocketChannel 实例的 pipelinefinal ChannelPipeline pipeline = ch.pipeline();// 这里的 config.handler()就是前面通过 .handler(ChannelHandler handler) 设置的 handlerChannelHandler handler = config.handler();if (handler != null) {// 将这个 handler 添加到 NioServerSocketChannel 实例的 pipeline 中pipeline.addLast(handler);}// 异步执行向 pipeline 添加 ServerBootstrapAcceptor 的步骤ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// ServerBootstrapAcceptor 是一个 ChannelInboundHandlerpipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
        }
        
      • (1-3)register(channel) 源码追踪:

        • 在上图的最后一个红色框圈住的代码处,NioServerSocketChannel 的实例被注册到 bossGroup 中 EventLoop 中的 Selector 上(ops: 0 在这里猜测是ready的意思,因为后面会在 AbstractNioChannel.doBeginRead() 方法中真正设置key感兴趣的ops)

          • doBeginRead() 方法会在channel首次注册激活或者每次readComplete之后发生(如果开启了isAutoRead,默认是开启的)。需要注意的是,即使读事件发生的时候,readyOps是0,同样可以进行read。

    • (2)doBind0(regFuture, channel, localAddress, promise) 源码追踪:

      • 在 NioServerSocketChannel 中的 javaChannel().bind(localAddress, config.getBacklog()) 调用底层 JDK 接口完成端口绑定和监听。
      • 追踪下去会发现最终调用了一个 Native 方法把.bind(PORT)最终托管给了 JVM,然后 JVM 进行系统调用。
1.3.2、run 死循环
  • 在调用register0、doBind0等方法的时候,会委托给 EventLoop 去执行,如果是当前 EventLoop,直接执行 register0 方法,否则会交给 EventLoop.execute(Runnable task)(一般情况下都是会这样异步执行)。

    // SingleThreadEventExecutor.execute(Runnable task):
    @Override
    public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();// 添加任务到队列addTask(task);if (!inEventLoop) {// 如果当前线程不属于该 EventLoop,EventLoop 需要启动新线程。最终会执行 SingleThreadEventExecutor.this.run() 方法,进入到了 NioEventLoop 中 run 方法的死循环里。startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
    }
    
  • 异步添加任务到队列之后,会在 run 循环里面,通过 runAllTasks() 方法执行队列里面的任务。即register0、doBind0等方法会在这时候被处理。

  • NioEventLoop 中的死循环,不断执行以下三个过程:

    • select:轮训注册在其中的 Selector 上的 Channel 的 IO 事件。
    • processSelectedKeys:在对应的 Channel 上处理 IO 事件。
    • runAllTasks:再去以此循环处理任务队列中的其他任务。

    // NioEventLoop.run():
    @Override
    protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and//    'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and//    'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
    }
    
1.3.3、总结
  • (1)首先调用 AbstractBootstrap 中的 initAndRegister() 方法完成 NioServerSocketChannel 实例的初始化和注册。
  • (2)然后调用 NioServerSocketChannel 实例的 doBind() 方法,最终调用 sun.nio.ch.Net 中的 bind()和 listen() 完成端口绑定和客户端连接监听。
  • (3)在真正 register0(注册)和 bind0 (绑定)之前,会委托当前 eventLoop 的 executor 去执行,实质上是在死循环run方法中通过 runAllTasks() 方法执行 eventLoop 的队列里面的任务。

Netty服务器启动源码剖析相关推荐

  1. 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

    前言 前面小飞已经讲解了NIO和Netty服务端启动,这一讲是Client的启动过程. 源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Fi ...

  2. 源码剖析 Netty 服务启动 NIO

    如果这个文章看不懂的话 , 建议反复阅读 Netty 与 Reactor 开篇立意.引用网友好的建议.看源码要针对性的看,最佳实践就是,带着明确的目的去看源码.抓主要问题,放弃小问题.主要的逻辑理解了 ...

  3. 老李推荐: 第8章4节《MonkeyRunner源码剖析》MonkeyRunner启动运行过程-启动AndroidDebugBridge 1...

    老李推荐: 第8章4节<MonkeyRunner源码剖析>MonkeyRunner启动运行过程-启动AndroidDebugBridge 上一节我们看到在启动AndroidDebugBri ...

  4. 老李推荐:第8章2节《MonkeyRunner源码剖析》MonkeyRunner启动运行过程-解析处理命令行参数...

    老李推荐:第8章2节<MonkeyRunner源码剖析>MonkeyRunner启动运行过程-解析处理命令行参数 MonkeyRunnerStarter是MonkeyRunner启动时的入 ...

  5. React Native 启动流程 源码剖析

    开始之前   开始分析之前,新建一个名为 RnDemo 的空项目,RN 版本选择 0.58.1,查看项目自动为我们生成 MainActivity.java 和 MainApplication.java ...

  6. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  7. 老李推荐:第5章5节《MonkeyRunner源码剖析》Monkey原理分析-启动运行: 获取系统服务引用 1...

    老李推荐:第5章5节<MonkeyRunner源码剖析>Monkey原理分析-启动运行: 获取系统服务引用 上一节我们描述了monkey的命令处理入口函数run是如何调用optionPro ...

  8. Mongoose源码剖析:外篇之web服务器

    引言 在深入Mongoose源码剖析之前,我们应该清楚web服务器是什么?它提供什么服务?怎样提供服务?使用什么协议?客户端如何唯一标识web服务器的资源?下面我们抛开Mongoose,来介绍一个we ...

  9. Spring源码剖析——Bean的配置与启动

    IOC介绍   相信大多数人在学习Spring时 IOC 和 Bean 算得上是最常听到的两个名词,IOC在学习Spring当中出现频率如此之高必然有其原因.如果我们做一个比喻的话,把Bean说成Sp ...

最新文章

  1. 如何利用 C# 爬取带 Token 验证的网站数据?
  2. 非常量引用的初始值必须是左值_C++核心编程--引用
  3. 中国互联网的抑郁:抄与被抄都很痛
  4. MySQL数据库同步小工具(Java实现)
  5. yii 加载php文件,Yii2框架加载css和js文件的方法分析
  6. 引导类加载器 Bootstrap ClassLoader
  7. 揭开不一样的世界,这5部纪录片绝对不能错过!
  8. 工作228:小程序学习2开始布局页面2
  9. 用SSE加速CPU蒙皮计算
  10. 九章基础算法02:栈和队列
  11. JavaScript基础学习(一)—JavaScript简介
  12. 三星智能电视将用户语音隐私泄露给第三方?
  13. 无聊时分析了下目前国内和国外汽车消费市场的区域性分布
  14. [Poi2000]公共串 hustoj2797
  15. 阿里巴巴矢量图标 iconfont 下载图标分辨率小一点、并占得内存小一点呢
  16. 深度神经网络主要模型,深度神经网络预测模型
  17. 逆向学习第二天如何手动脱UPX、Aspack壳
  18. 解决制作FAT32格式的重装U盘中文件过大问题
  19. Linux 日志切割神器 Logrotate 原理和配置详解(附多生产实例)
  20. 上海高二物理公式整理

热门文章

  1. 2022/3/8——手把手配置一台崭新的华为交换机配置堆叠(附使用console线配置交换机方法)
  2. [USACO4.4]追查坏牛奶Pollutant Control
  3. Twitter蓝伟纶:推特现在进中国也打不过微博微信了丨CES 中国之夜
  4. wap2app(五)-- 微信授权登录以及踩过的坑
  5. 全套恒压供水(3托3)图纸程序 功能: 三拖三(3台变频3台水泵),3台水泵循环软启
  6. Cesium结合Echarts的使用
  7. xilinx SDK MSS文件
  8. 微信小程序毕业设计开题报告家教信息管理系统|招聘求职兼职+后台管理系统|前后分离VUE.js
  9. 解读DO-254(机载电子硬件设计保证指南)
  10. EOF的意义及用法(while(scanf(%d,n) != EOF))