一、ServerBootstrap的启动示例代码

      EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("bossThread",false));EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("workThread",false));ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossEventLoopGroup,workEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).handler(new LoggingHandler(LogLevel.INFO)).childHandler( new PojoServerIntitlizer());try {log.debug(" will start server");ChannelFuture closeFuture =  bootstrap.bind(port).sync();log.debug("  server is closing");closeFuture.channel().closeFuture().sync();log.debug("  server is closed");} catch (InterruptedException e) {e.printStackTrace();}finally {bossEventLoopGroup.shutdownGracefully();workEventLoopGroup.shutdownGracefully();log.debug("  release event loop group");}

二、初始化流程

1.ServerBootstrap和bootStrap继承于AbstractBootStrap,都使用模板方式。AbstractBootStrap类有几个比较重要的成员变量。

group:ServerSocketChannel的EventLoopGroup,即reactor-accept的BOSS事件循环线程池组。

channelFactory:创建channel的工厂类,如创建ServerSocketChannel,SocketChannel,

handler:reactor-accept的事件处理类。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {volatile EventLoopGroup group;@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory;private volatile SocketAddress localAddress;private volatile ChannelHandler handler;
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;

2.设置参数流程。

bootstrap.group(bossEventLoopGroup,workEventLoopGroup) bossEventLoopGroup设置AbstractBootStrap的group,workEventLoopGroup设置为ServerBootstrap的childGroup,

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}
channel(NioServerSocketChannel.class) 生成一个根据指定SocketChannel类的反射CHANNEL工厂类,就是根据传入的类反射生成对象。这是设置AbstractBootStrap的类的channelFactory
    public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));}
.handler(new LoggingHandler(LogLevel.INFO)) 这是设置AbstractBootStrap的类的handler
    public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");return self();}
.childHandler( new PojoServerIntitlizer());这是设置ServerBootstrap子类的childHandler
    public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}

三、绑定流程

1.ChannelFuture closeFuture = bootstrap.bind(port).sync();这为起点,会调用AbstractBootStrap的doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;}

2.这里面首先调用initAndRegister,会先创建ServerSocketChannel,并且将channel注册到NioEventLoop的selector中,也就是前文的channel.register(selector,0,eventLoop)

  final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}ChannelFuture regFuture = config().group().register(channel);return regFuture;}

3.newChannel就是通过前面生成的ReflectChannelFactory调用反射生成一个NioServerSocketChannel对象,这里看一下类的继承图,

AbstractChannel 有两个成员变量unsafe和pipline,unsafe为操作底层网络IO的接口。pipline也就是我们的管道事件流。也就是说每个channel会自带一个pipline

    protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}

3.创建完channel后,我们来看下初始化流程,这个会调用到子类(ServerBootStrap)的init方法,

这个会做两个事件,

(1).设置channel的连接选项和属性。

(2).添加新的handler

这里会为ServerSocketChannel的管道中新增一个handler,用来处理accept-reactor的IO流读取事件,也就是新连接事件。ServerBootstrapAcceptor这个类就是来处理新连接,并注册accept的socketChannel并注册到childGroup的work事件循环以及内部eventLoop的selector中,并且设置子socketChannel的handler为childHandler.这个我们留在接收新连接的流程中讲解。

    void init(Channel channel) {setChannelOptions(channel, this.newOptionsArray(), logger);setAttributes(channel, (Entry[])this.attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}

4.现在回到第2步,初始化完成了,现在开始将serverSocketChannel注册到bossEventLoop的selector中。ChannelFuture regFuture = config().group().register(channel);这个会调用MultithreadEventLoopGroup.register,也就是从事件循环组的子eventLoop中取下一个NioEventLoop进行注册分配。也就是work-reactor的channel注册机制。

    @Overridepublic EventLoop next() {return (EventLoop) super.next();}@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

5.我们的NioEventLoop为SingleThreadEventLoop,这个会将channel,当前的eventLoop对象封装成一个promise,进行注册。

public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}

6.接着找到 ServerSocketChannel的内部的unsafe对象进行注册。这个unsafe就是NioMessageUnsafe

    public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();}
}

7.由于NioMessageUnsafe继承于AbstractUnsafe,所以调用此类的register,这个方法就是首先调用Channel.register,然后触发管道的handlerAdd,channelRegister,channelActive方法。

   public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} }private void register0(ChannelPromise promise) {try {boolean firstRegistration = this.neverRegistered;AbstractChannel.this.doRegister();this.neverRegistered = false;AbstractChannel.this.registered = true;AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();this.safeSetSuccess(promise);AbstractChannel.this.pipeline.fireChannelRegistered();if (AbstractChannel.this.isActive()) {if (firstRegistration) {AbstractChannel.this.pipeline.fireChannelActive();} else if (AbstractChannel.this.config().isAutoRead()) {this.beginRead();}}}}

8.AbstractChannel.this.doRegister();这个就是把ServerSocketChannel注册到NioEventLoop的selector中去。这时还没有监听事件。

  AbstractNioChannelprotected void doRegister() throws Exception {boolean selected = false;while(true) {try {this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} }}

生成的selectKey如下: 

9.invokeHandlerAddedIfNeeded会调用到父类CHANNEL管道的channelHandler,也就是在我们初始化时第三步加入的handler

在这里才会真正触发handler的initChannel方法,生成第三步的ServerBootstrapAcceptor,初始化处理新的子连接的channelReader的处理器。

10.现在初始化和注册完成了,我们再回到第一步,进行doBind0(regFuture, channel, localAddress, promise)

AbstractBootstrap
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

11.channel.bind,会调用内部管道的bind,然后到tail的BIND,再到AbstractChannelHandlerContext的bind,因为tail就是继承于AbstractChannelHandlerContext,这里面的BIND就是从尾部向前找,找一个带有bind标签的HANDLER进行处理。最终找到了header.ChannelHandlerContext(DefaultChannelPipeline$HeadContext#0, [id: 0x159b9902])

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();next.invokeBind(localAddress, promise);return promise;}

12.最终到了head的BIND,这里面调用了unsafe本地IO类的bind.

   final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}

13.最终调用到AbstractChannel的内部类AbstractUnsafe的bind,

   @Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}safeSetSuccess(promise);}

14.接着调用到了AbstractUnsafe的外部类的继承类的doBind方法,也就是NioServerSOcketChannel.doBind,这个就是调用原生的ServeSocketChannel进行bind.

protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}

15.绑定成功后会调用pipeline.fireChannelActive();这个是从头节点一直向后传递事件。在头节点的

channelActive方法中会触发readIfIsAutoRead
      @Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}

16.readIfIsAutoRead就是channel如果开启了自动读,则开始设置读关注事件到eventLoop的selector中。这个读会从管道的读-》tail的读->一直向前找支持读的handler进行处理。这个就是找到了head的handler

       private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {public void read(ChannelHandlerContext ctx) {unsafe.beginRead();}

17.headHandler的beginRead就调用了unsafe.beginRead方法,这个会调用外部channel.

doBeginRead--》AbstractNioChannel
AbstractNioChannel  protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}

这个就是将读事件加到channel的selectKey的关注事件中。

四、接收新连接流程

1.我们回到上一节的3,9步,为父类NioServerSocketChannel的管道中新增了一个handler,

ServerBootstrapAcceptor,用来处理父类的接收数据,也就是新连接请求。当有新连接进来时,会触发到ServerBootstrapAcceptor.channelRead

2.ServerBootstrapAcceptor.channelRead 方法是怎么被调用的呢? 其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages:

NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

在 doReadMessages 中, 通过 javaChannel().accept() 获取到客户端新连接的 SocketChannel, 接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .

3.ServerBootstrapAcceptor.channelRead方法,这个就是把子channel的管道添加子handler,并且把子channel注册到子group的eventLoop的selector中。流程跟父类channel注册一样。

  public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child)}

4.还有个读完成事件,里面跟上一节的第16步一样,都是将当前channel的selectKey中加入读关注事件,以便接收读数据。

        @Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.fireChannelReadComplete();readIfIsAutoRead();}

当然这个事件会反复调用,然后判断加过读事件,就不重复加了。

ServerBootstrap的启动流程相关推荐

  1. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  2. Netty实战 IM即时通讯系统(五)客户端启动流程

    ## Netty实战 IM即时通讯系统(五)客户端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...

  3. Netty实战 IM即时通讯系统(四)服务端启动流程

    ## Netty实战 IM即时通讯系统(四)服务端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...

  4. 《netty入门与实战》笔记-02:服务端启动流程

    为什么80%的码农都做不了架构师?>>>    1.服务端启动流程 这一小节,我们来学习一下如何使用 Netty 来启动一个服务端应用程序,以下是服务端启动的一个非常精简的 Demo ...

  5. 源码分析Dubbo服务提供者启动流程-下篇

    本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了从dubbo spring文件开始,Dubbo是如何加载配置文件,服务提供者dubbo:service标签服务暴露全流程,本节重点关注R ...

  6. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  7. BookKeeper源码解析之Bookie启动流程(一)

    BookKeeper(BK)启动流程 文章目录 BookKeeper(BK)启动流程 解析命令行参数 构建bookie所需的服务 构建状态(指标)服务 构建BookieService 构造内存分配器 ...

  8. 源码分析-Activity的启动流程

    以android 6.0源码为参考,其他版本api会稍有不同 在Activity中,启动一个Activity的方法 @Override public void startActivity(Intent ...

  9. Centos 6启动流程详解

    author:JevonWei 版权声明:原创作品 Centos6 启动流程 POST开机自检 当按下电源键后,会启动ROM芯片中的CMOS程序检查CPU.内存等硬件设备是否正常运行,CMOS中的程序 ...

最新文章

  1. linux编译安装jpeg,Linux下JPEG库安装脚本(转)
  2. 聊聊《柒个我》这部剧
  3. python 远程控制win10界面切换_Python3如何实现Win10桌面自动切换
  4. [trustzone]-TZC400学习总结
  5. BZOJ 1588: [HNOI2002]营业额统计
  6. 2021牛客暑期多校训练营1 I-Increasing Subsequence(期望dp+优化)
  7. 计算机网络第三章知识网络,计算机基础教案第三章计算机网络基础知识教案
  8. 一行代码为UITextField添加收键盘功能
  9. ubuntu修改用户的默认目录
  10. 机器学习特征与类型概述
  11. 仿探探交友小程序V7.0.2 完整安装包+小程序前端
  12. 卡诺模型(用户需求分析模型)
  13. 没有事业的女孩子很悲惨 - - - 一位老总的话!
  14. 新买的华为Matebook,Office没激活,激活方法在这里!!!
  15. 冶金工程在计算机应用,冶金工程专业计算机应用能力分析
  16. yocto sysroot说明
  17. C基础的ObjectiveC学习
  18. 根据主机名查询本机的ip地址
  19. 分布式框架DSF的搭建
  20. 中国象棋将帅问题java_编程之美:中国象棋将帅问题

热门文章

  1. SAP 4.6C升级ECC6.0 WS_QUERY 的改法
  2. 最早做无糖茶的统一茶里王,是怎样错过年轻人的?
  3. 寺库等奢侈品电商补贴下的奢侈品市场,会是怎样的未来
  4. 计算机系统最大的加速能力,系统加速我用Windows系统四大自带工具 -电脑资料
  5. 怎么更进一步学python_【百尺竿头,更进一步学Python】Python进阶课程——进程,线程和协程的区别...
  6. div+css的布局方式进行设计成品作业_原创响应式php企业成品网站,清晰风格版
  7. Python基础教程:一个单列split转换为多行的练习题
  8. 【Python教程】两种方法教你拆分含有多种分隔符的字符串
  9. python基础中apply()函数的正确用法
  10. python基础教程:类和对象