







@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        try {            int strategy;            try {                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                switch (strategy) {                case SelectStrategy.CONTINUE:                    continue;                case SelectStrategy.BUSY_WAIT:                    // fall-through to SELECT since the busy-wait is not supported with NIO                case SelectStrategy.SELECT:                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                    if (curDeadlineNanos == -1L) {                        curDeadlineNanos = NONE; // nothing on the calendar                    }                    nextWakeupNanos.set(curDeadlineNanos);                    try {                        if (!hasTasks()) {                            strategy = select(curDeadlineNanos);                        }                    } finally {                        // This update is just to help block unnecessary selector wakeups                        // so use of lazySet is ok (no race condition)                        nextWakeupNanos.lazySet(AWAKE);                    }                    // fall through                default:                }            } catch (IOException e) {                // If we receive an IOException here its because the Selector is messed up. Let's rebuild                // the selector and retry. https://github.com/netty/netty/issues/8566                rebuildSelector0();                selectCnt = 0;                handleLoopException(e);                continue;            }            selectCnt++;            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            boolean ranTasks;            if (ioRatio == 100) {                try {                    if (strategy > 0) {                        processSelectedKeys();                    }                } finally {                    // Ensure we always run tasks.                    ranTasks = runAllTasks();                }            } else if (strategy > 0) {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    final long ioTime = System.nanoTime() - ioStartTime;                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            } else {                ranTasks = runAllTasks(0); // This will run the minimum number of tasks            }            if (ranTasks || strategy > 0) {                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                            selectCnt - 1, selector);                }                selectCnt = 0;            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)                selectCnt = 0;            }        } catch (CancelledKeyException e) {            // Harmless exception - log anyway            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                        selector, e);            }        } catch (Error e) {            throw (Error) e;        } catch (Throwable t) {            handleLoopException(t);        } finally {            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Error e) {                throw (Error) e;            } catch (Throwable t) {                handleLoopException(t);            }        }    }}


public final class EchoServer {    static final boolean SSL = System.getProperty("ssl") != null;    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));    public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }        // Configure the server.        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        final EchoServerHandler serverHandler = new EchoServerHandler();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .option(ChannelOption.SO_BACKLOG, 100)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ChannelInitializer() {                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline p = ch.pipeline();                     if (sslCtx != null) {                         p.addLast(sslCtx.newHandler(ch.alloc()));                     }                     //p.addLast(new LoggingHandler(LogLevel.INFO));                     p.addLast(serverHandler);                 }             });            // Start the server.            ChannelFuture f = b.bind(PORT).sync();            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}


/** * Handler implementation for the echo server. */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}


/** * Sends one message when a connection is open and echoes back any received * data to the server.  Simply put, the echo client initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */public final class EchoClient {    static final boolean SSL = System.getProperty("ssl") != null;    static final String HOST = System.getProperty("host", "");    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));    public static void main(String[] args) throws Exception {        // Configure SSL.git        final SslContext sslCtx;        if (SSL) {            sslCtx = SslContextBuilder.forClient()                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();        } else {            sslCtx = null;        }        // Configure the client.        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group)             .channel(NioSocketChannel.class)             .option(ChannelOption.TCP_NODELAY, true)             .handler(new ChannelInitializer() {                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline p = ch.pipeline();                     if (sslCtx != null) {                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));                     }                     //p.addLast(new LoggingHandler(LogLevel.INFO));                     p.addLast(new EchoClientHandler());                 }             });            // Start the client.            ChannelFuture f = b.connect(HOST, PORT).sync();            // Wait until the connection is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down the event loop to terminate all threads.            group.shutdownGracefully();        }    }}


/** * Handler implementation for the echo client.  It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */public class EchoClientHandler extends ChannelInboundHandlerAdapter {    private final ByteBuf firstMessage;    /**     * Creates a client-side handler.     */    public EchoClientHandler() {        firstMessage = Unpooled.buffer(EchoClient.SIZE);        for (int i = 0; i < firstMessage.capacity(); i ++) {            firstMessage.writeByte((byte) i);        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        ctx.writeAndFlush(firstMessage);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {       ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}

下面我们就来针对这个官方提供的源代码例子来分析Netty服务器启动的过程,先来看服务端EchoServer 的代码




// Configure the server.EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();

① 这两个对象是整个Netty的核心对象,可以说,整个Netty的运作都依赖于他们,bossGroup用于接收TCP请求,他会将请求交给workerGroup ,workerGroup 会获取真正的连接,然后和连接进行通讯,比如读写编码解码等操作。

② EventLoopGroup是事件循环组(线程组)含有多个EventLoop,可以注册channel,用于在事件循环中去进行选择(和选择器相关);


③ new NioEventLoopGroup(1) 这个1表示bossGroup 事件组有1个线程你可以指定,如果new NioEventLoopGroup()会默认创建线程=cpu核数*2,这里可以充分利用多核的优势;

#第一步:EventLoopGroup workerGroup = new NioEventLoopGroup();#第二步,点击上面的 new NioEventLoopGroup()public NioEventLoopGroup() {    this(0);}#第三步,点击上面的this(0);public NioEventLoopGroup(int nThreads) {    this(nThreads, (Executor) null);}#第四步,点击上面的this(nThreads, (Executor) null);public NioEventLoopGroup(int nThreads, Executor executor) {    this(nThreads, executor, SelectorProvider.provider());}#第五步,点击上面的this(nThreads, executor, SelectorProvider.provider());public NioEventLoopGroup(        int nThreads, Executor executor, final SelectorProvider selectorProvider) {    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}#第六步,点击上面的 this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,                         final SelectStrategyFactory selectStrategyFactory) {    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}#第七步,点击上面的superprotected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}#第八步:点击 上面的DEFAULT_EVENT_LOOP_THREADS 定位到下面private static final int DEFAULT_EVENT_LOOP_THREADS;static {    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));    if (logger.isDebugEnabled()) {        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);    }}

默认会创建存储cpu核数*2的NioEventLoop的EventExecutor数组,this.children = new EventExecutor[nThreads];






/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {    super.group(parentGroup);    if (this.childGroup != null) {        throw new IllegalStateException("childGroup set already");    }    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");    return this;}

④ 然后添加了一个channel,它的参数是一个Class对象,引导类将通过这个Class对象反射创建ChannelFactory,然后添加一些TCP的参数。【说明:Channel的创建在bind方法,可以Debug下bind,会找到channel=channelFactory.newChannel();】

⑤ 再添加了一个服务器专属的日志处理器handler;

⑥ 再添加一个SocketChannel(不是ServerSocketChannel)的handler;

⑦ 然后绑定端口并阻塞至连接成功;

⑧ 最后main线程阻塞等待关闭;



/** * Create a new instance. * @param nThreads 使用的线程数,默认为cpu core*2 * @param executor 执行器,如果传入null,则采用Netty默认的线程工厂和默认的执行器ThreadPerTaskExecutor  * @param chooserFactory    单例new DefaultEventExecutorChooserFactory() * @param args   在擦黄金执行器的时候传入固定参数   * /protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                        EventExecutorChooserFactory chooserFactory, Object... args) {    if (nThreads <= 0) {        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));    }    //如果传入的执行器是null,则采用默认的线程工厂和默认的执行器    if (executor == null) {        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }    //创建指定线程数的执行器数组    children = new EventExecutor[nThreads];    //初始化线程数组    for (int i = 0; i < nThreads; i ++) {        boolean success = false;        try {            //创建 new NioEventLoop            children[i] = newChild(executor, args);            success = true;        } catch (Exception e) {            // TODO: Think about if this is a good exception type            throw new IllegalStateException("failed to create a child event loop", e);        } finally {            //如果创建失败,则优雅关闭            if (!success) {                for (int j = 0; j < i; j ++) {                    children[j].shutdownGracefully();                }                for (int j = 0; j < i; j ++) {                    EventExecutor e = children[j];                    try {                        while (!e.isTerminated()) {                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                        }                    } catch (InterruptedException interrupted) {                        // Let the caller handle the interruption.                        Thread.currentThread().interrupt();                        break;                    }                }            }        }    }    chooser = chooserFactory.newChooser(children);    final FutureListener terminationListener = new FutureListener() {        @Override        public void operationComplete(Future future) throws Exception {            if (terminatedChildren.incrementAndGet() == children.length) {                terminationFuture.setSuccess(null);            }        }    };    //为每一个单例线程池添加一个关闭监听器    for (EventExecutor e: children) {        e.terminationFuture().addListener(terminationListener);    }    //将所有的单例线程池添加到一个HashSet中    Set childrenSet = new LinkedHashSet(children.length);    Collections.addAll(childrenSet, children);    readonlyChildren = Collections.unmodifiableSet(childrenSet);}


① 如果executor是null,则创建一个默认的ThreadPerTaskExecutor,使用Netty默认的线程工厂;

② 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组;

③ 循环填充数组中的元素,如果异常,则关闭所有的单例线程池;

④ 根据线程选择工厂创建一个线程选择器;

⑤ 为每一个单例线程池添加一个关闭监听器;

⑥ 将所有的单例线程池添加到一个HashSet中;



// The order in which child ChannelOptions are applied is important they may depend on each other for validation// purposes.private final Map, Object> childOptions = new LinkedHashMap, Object>();private final Map, Object> childAttrs = new ConcurrentHashMap, Object>();private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;


ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() {     @Override     public void initChannel(SocketChannel ch) throws Exception {         ChannelPipeline p = ch.pipeline();         if (sslCtx != null) {             p.addLast(sslCtx.newHandler(ch.alloc()));         }         //p.addLast(new LoggingHandler(LogLevel.INFO));         p.addLast(serverHandler);     } });


① 链式调用group方法,将boss和worker传入,boss赋值给parentGroup属性,worker赋值给childGroup属性;

② channel方法传入NioServerSocketChannel.class对象,会根据这个class创建channel对象;

③ option方法传入TCP参数,放在一个LinkedHashMap中;

/** * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}. */public  B option(ChannelOption option, T value) {    ObjectUtil.checkNotNull(option, "option");    synchronized (options) {        if (value == null) {            options.remove(option);        } else {            options.put(option, value);        }    }    return self();}#其中options是private final Map, Object> options = new LinkedHashMap, Object>();

④ handler方法传入一个handler中,这个handler只专属于ServerSocketChannel而不是SocketChannel;

⑤ childHandler传入一个handler,这个handler将会在每个客户端连接的时候调用,供SocketChannel使用



// Start the server.ChannelFuture f = b.bind(PORT).sync();


private ChannelFuture doBind(final SocketAddress localAddress) {    final ChannelFuture regFuture = initAndRegister();    final Channel channel = regFuture.channel();    if (regFuture.cause() != null) {        return regFuture;    }    if (regFuture.isDone()) {        // At this point we know that the registration was complete and successful.        ChannelPromise promise = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise);        return promise;    } else {        // Registration future is almost always fulfilled already, but just in case it's not.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);        regFuture.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if (cause != null) {                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                    // IllegalStateException once we try to access the EventLoop of the Channel.                    promise.setFailure(cause);                } else {                    // Registration was successful, so set the correct executor to use.                    // See https://github.com/netty/netty/issues/2586                    promise.registered();                    doBind0(regFuture, channel, localAddress, promise);                }            }        });        return promise;    }}


final ChannelFuture initAndRegister() {    Channel channel = null;    try {        /**        说明:channelFactory.newChannel()方法的作用,通过ServerBootstrap的        通道工厂反射创建一个NioServerSocketChannel,具体追踪源码可以得到下面结论        ① 通过NIO的SelectorProvider的openServerSocketChannel方法得到JDK的channel.        目的是让Netty包装JDK的channel;        ② 创建了一个唯一的ChannelId,创建了一个NioMessageUnsafe,用于操作消息,创建了一个        DefaultChannelPipeline管道,是个双向链表结构,用于过滤所有的进出消息。        ③ 创建了一个NioServerSocketChannelConfig对象,用于对外展示一些配置;                **/        channel = channelFactory.newChannel();//NioServerSocketChannel        /**        说明:init初始化这个NioServerSocketChannel,具体追踪源码可以得到如下结论        ① init方法,这是个抽象方法(AbstractBootstrap类的),由ServerBootstrap实现(可以追踪一下源码)         setChannelOptions(channel, newOptionsArray(), logger);        ② 设置NioServerSocketChannel的TCP属性;        ③ 由于LinkedHash.Map 是非线程安全的,使用同步进行处理;        ④ 对NioServerSocketChannel的ChannelPipeline添加ChannelInitializer处理器;        ⑤ 可以看出,init的方法的核心作用在和ChannelPipeline相关;        ⑥ 从NioServerSocketChannel的初始化过程中,我们知道,pipeline是一个双向链表,并且,它本身就初始化了        head和tail,这里调用了它的addLast方法,也就是将整个handler插入到tail的前面,因为tail永远会在后面,        需要做一些系统的固定工作;        **/        init(channel);    } catch (Throwable t) {        if (channel != null) {            // channel can be null if newChannel crashed (eg SocketException("too many open files"))            channel.unsafe().closeForcibly();            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);        }        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);    }    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) {        if (channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }    // If we are here and the promise is not failed, it's one of the following cases:    // 1) If we attempted registration from the event loop, the registration has been completed at this point.    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.    // 2) If we attempted registration from the other thread, the registration request has been successfully    //    added to the event loop's task queue for later execution.    //    i.e. It's safe to attempt bind() or connect() now:    //         because bind() or connect() will be executed *after* the scheduled registration task is executed    //         because register(), bind(), and connect() are all bound to the same thread.    return regFuture;}








@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {    final AbstractChannelHandlerContext newCtx;    synchronized (this) {        checkMultiplicity(handler);        newCtx = newContext(group, filterName(name, handler), handler);        addLast0(newCtx);        // If the registered is false it means that the channel was not registered on an eventLoop yet.        // In this case we add the context to the pipeline and add a task that will call        // ChannelHandler.handlerAdded(...) once the channel is registered.        if (!registered) {            newCtx.setAddPending();            callHandlerCallbackLater(newCtx, true);            return this;        }        EventExecutor executor = newCtx.executor();        if (!executor.inEventLoop()) {            callHandlerAddedInEventLoop(newCtx, executor);            return this;        }    }    callHandlerAdded0(newCtx);    return this;}





4、创建一个AbstractChannelHandlerContext 对象,这里说一下,ChannelHandlerContext对象是ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到Pipeline中时,都会创建Context,Context的主要功能是管理它所关联的Handler和同一个Pipeline中的其他Handler之间的交互;


private void addLast0(AbstractChannelHandlerContext newCtx) {    AbstractChannelHandlerContext prev = tail.prev;    newCtx.prev = prev;    newCtx.next = tail;    prev.next = newCtx;    tail.prev = newCtx;}



private static void doBind0(        final ChannelFuture regFuture, final Channel channel,        final SocketAddress localAddress, final ChannelPromise promise) {    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up    // the pipeline in its channelRegistered() implementation.    channel.eventLoop().execute(new Runnable() {        @Override        public void run() {            if (regFuture.isSuccess()) {                //bind方法这里打断点来看看                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}




@Overridepublic void bind(        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {    unsafe.bind(localAddress, promise);}


@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {    assertEventLoop();    if (!promise.setUncancellable() || !ensureOpen(promise)) {        return;    }    // See: https://github.com/netty/netty/issues/576    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&        localAddress instanceof InetSocketAddress &&        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {        // Warn a user about the fact that a non-root user can't receive a        // broadcast packet on *nix if the socket is bound on non-wildcard address.        logger.warn(                "A non-root user can't receive a broadcast packet if the socket " +                "is not bound to a wildcard address; binding to a non-wildcard " +                "address (" + localAddress + ") anyway as requested.");    }    boolean wasActive = isActive();    try {        //可以看到,这里最终的方法就是doBind方法,执行成功后,执行通道的        //fireChannelActive方法,告诉所有的handler已经成功绑定        doBind(localAddress);    } catch (Throwable t) {        safeSetFailure(promise, t);        closeIfClosed();        return;    }    if (!wasActive && isActive()) {        invokeLater(new Runnable() {            @Override            public void run() {                pipeline.fireChannelActive();            }        });    }    safeSetSuccess(promise);}


@SuppressJava6Requirement(reason = "Usage guarded by java version check")@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {    if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}



protected void run() {
    while(true) {
        while(true) {
            while(true) {

