ServerBootstrap的启动流程
一、ServerBootstrap的启动示例代码
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("bossThread",false));EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("workThread",false));ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossEventLoopGroup,workEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).handler(new LoggingHandler(LogLevel.INFO)).childHandler( new PojoServerIntitlizer());try {log.debug(" will start server");ChannelFuture closeFuture = bootstrap.bind(port).sync();log.debug(" server is closing");closeFuture.channel().closeFuture().sync();log.debug(" server is closed");} catch (InterruptedException e) {e.printStackTrace();}finally {bossEventLoopGroup.shutdownGracefully();workEventLoopGroup.shutdownGracefully();log.debug(" release event loop group");}
二、初始化流程
1.ServerBootstrap和bootStrap继承于AbstractBootStrap,都使用模板方式。AbstractBootStrap类有几个比较重要的成员变量。
group:ServerSocketChannel的EventLoopGroup,即reactor-accept的BOSS事件循环线程池组。
channelFactory:创建channel的工厂类,如创建ServerSocketChannel,SocketChannel,
handler:reactor-accept的事件处理类。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {volatile EventLoopGroup group;@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory;private volatile SocketAddress localAddress;private volatile ChannelHandler handler;
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;
2.设置参数流程。
bootstrap.group(bossEventLoopGroup,workEventLoopGroup) bossEventLoopGroup设置AbstractBootStrap的group,workEventLoopGroup设置为ServerBootstrap的childGroup,
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}
channel(NioServerSocketChannel.class) 生成一个根据指定SocketChannel类的反射CHANNEL工厂类,就是根据传入的类反射生成对象。这是设置AbstractBootStrap的类的channelFactory
public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));}
.handler(new LoggingHandler(LogLevel.INFO)) 这是设置AbstractBootStrap的类的handler
public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");return self();}
.childHandler( new PojoServerIntitlizer());这是设置ServerBootstrap子类的childHandler
public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}
三、绑定流程
1.ChannelFuture closeFuture = bootstrap.bind(port).sync();这为起点,会调用AbstractBootStrap的doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;}
2.这里面首先调用initAndRegister,会先创建ServerSocketChannel,并且将channel注册到NioEventLoop的selector中,也就是前文的channel.register(selector,0,eventLoop)
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}ChannelFuture regFuture = config().group().register(channel);return regFuture;}
3.newChannel就是通过前面生成的ReflectChannelFactory调用反射生成一个NioServerSocketChannel对象,这里看一下类的继承图,
AbstractChannel 有两个成员变量unsafe和pipline,unsafe为操作底层网络IO的接口。pipline也就是我们的管道事件流。也就是说每个channel会自带一个pipline
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}
3.创建完channel后,我们来看下初始化流程,这个会调用到子类(ServerBootStrap)的init方法,
这个会做两个事件,
(1).设置channel的连接选项和属性。
(2).添加新的handler
这里会为ServerSocketChannel的管道中新增一个handler,用来处理accept-reactor的IO流读取事件,也就是新连接事件。ServerBootstrapAcceptor这个类就是来处理新连接,并注册accept的socketChannel并注册到childGroup的work事件循环以及内部eventLoop的selector中,并且设置子socketChannel的handler为childHandler.这个我们留在接收新连接的流程中讲解。
void init(Channel channel) {setChannelOptions(channel, this.newOptionsArray(), logger);setAttributes(channel, (Entry[])this.attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}
4.现在回到第2步,初始化完成了,现在开始将serverSocketChannel注册到bossEventLoop的selector中。ChannelFuture regFuture = config().group().register(channel);这个会调用MultithreadEventLoopGroup.register,也就是从事件循环组的子eventLoop中取下一个NioEventLoop进行注册分配。也就是work-reactor的channel注册机制。
@Overridepublic EventLoop next() {return (EventLoop) super.next();}@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
5.我们的NioEventLoop为SingleThreadEventLoop,这个会将channel,当前的eventLoop对象封装成一个promise,进行注册。
public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
6.接着找到 ServerSocketChannel的内部的unsafe对象进行注册。这个unsafe就是NioMessageUnsafe
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();}
}
7.由于NioMessageUnsafe继承于AbstractUnsafe,所以调用此类的register,这个方法就是首先调用Channel.register,然后触发管道的handlerAdd,channelRegister,channelActive方法。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} }private void register0(ChannelPromise promise) {try {boolean firstRegistration = this.neverRegistered;AbstractChannel.this.doRegister();this.neverRegistered = false;AbstractChannel.this.registered = true;AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();this.safeSetSuccess(promise);AbstractChannel.this.pipeline.fireChannelRegistered();if (AbstractChannel.this.isActive()) {if (firstRegistration) {AbstractChannel.this.pipeline.fireChannelActive();} else if (AbstractChannel.this.config().isAutoRead()) {this.beginRead();}}}}
8.AbstractChannel.this.doRegister();这个就是把ServerSocketChannel注册到NioEventLoop的selector中去。这时还没有监听事件。
AbstractNioChannelprotected void doRegister() throws Exception {boolean selected = false;while(true) {try {this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} }}
生成的selectKey如下:
9.invokeHandlerAddedIfNeeded会调用到父类CHANNEL管道的channelHandler,也就是在我们初始化时第三步加入的handler
在这里才会真正触发handler的initChannel方法,生成第三步的ServerBootstrapAcceptor,初始化处理新的子连接的channelReader的处理器。
10.现在初始化和注册完成了,我们再回到第一步,进行doBind0(regFuture, channel, localAddress, promise)
AbstractBootstrap
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
11.channel.bind,会调用内部管道的bind,然后到tail的BIND,再到AbstractChannelHandlerContext的bind,因为tail就是继承于AbstractChannelHandlerContext,这里面的BIND就是从尾部向前找,找一个带有bind标签的HANDLER进行处理。最终找到了header.ChannelHandlerContext(DefaultChannelPipeline$HeadContext#0, [id: 0x159b9902])
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();next.invokeBind(localAddress, promise);return promise;}
12.最终到了head的BIND,这里面调用了unsafe本地IO类的bind.
final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}
13.最终调用到AbstractChannel的内部类AbstractUnsafe的bind,
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}safeSetSuccess(promise);}
14.接着调用到了AbstractUnsafe的外部类的继承类的doBind方法,也就是NioServerSOcketChannel.doBind,这个就是调用原生的ServeSocketChannel进行bind.
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}
15.绑定成功后会调用pipeline.fireChannelActive();这个是从头节点一直向后传递事件。在头节点的
channelActive方法中会触发readIfIsAutoRead
@Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}
16.readIfIsAutoRead就是channel如果开启了自动读,则开始设置读关注事件到eventLoop的selector中。这个读会从管道的读-》tail的读->一直向前找支持读的handler进行处理。这个就是找到了head的handler
private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {public void read(ChannelHandlerContext ctx) {unsafe.beginRead();}
17.headHandler的beginRead就调用了unsafe.beginRead方法,这个会调用外部channel.
doBeginRead--》AbstractNioChannel
AbstractNioChannel protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
这个就是将读事件加到channel的selectKey的关注事件中。
四、接收新连接流程
1.我们回到上一节的3,9步,为父类NioServerSocketChannel的管道中新增了一个handler,
ServerBootstrapAcceptor,用来处理父类的接收数据,也就是新连接请求。当有新连接进来时,会触发到ServerBootstrapAcceptor.channelRead
2.ServerBootstrapAcceptor.channelRead 方法是怎么被调用的呢? 其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages:
NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
在 doReadMessages 中, 通过 javaChannel().accept() 获取到客户端新连接的 SocketChannel, 接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .
3.ServerBootstrapAcceptor.channelRead方法,这个就是把子channel的管道添加子handler,并且把子channel注册到子group的eventLoop的selector中。流程跟父类channel注册一样。
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child)}
4.还有个读完成事件,里面跟上一节的第16步一样,都是将当前channel的selectKey中加入读关注事件,以便接收读数据。
@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.fireChannelReadComplete();readIfIsAutoRead();}
当然这个事件会反复调用,然后判断加过读事件,就不重复加了。
ServerBootstrap的启动流程相关推荐
- Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用
Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程 第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...
- Netty实战 IM即时通讯系统(五)客户端启动流程
## Netty实战 IM即时通讯系统(五)客户端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...
- Netty实战 IM即时通讯系统(四)服务端启动流程
## Netty实战 IM即时通讯系统(四)服务端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...
- 《netty入门与实战》笔记-02:服务端启动流程
为什么80%的码农都做不了架构师?>>> 1.服务端启动流程 这一小节,我们来学习一下如何使用 Netty 来启动一个服务端应用程序,以下是服务端启动的一个非常精简的 Demo ...
- 源码分析Dubbo服务提供者启动流程-下篇
本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了从dubbo spring文件开始,Dubbo是如何加载配置文件,服务提供者dubbo:service标签服务暴露全流程,本节重点关注R ...
- Netty 源码解析系列-服务端启动流程解析
netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...
- BookKeeper源码解析之Bookie启动流程(一)
BookKeeper(BK)启动流程 文章目录 BookKeeper(BK)启动流程 解析命令行参数 构建bookie所需的服务 构建状态(指标)服务 构建BookieService 构造内存分配器 ...
- 源码分析-Activity的启动流程
以android 6.0源码为参考,其他版本api会稍有不同 在Activity中,启动一个Activity的方法 @Override public void startActivity(Intent ...
- Centos 6启动流程详解
author:JevonWei 版权声明:原创作品 Centos6 启动流程 POST开机自检 当按下电源键后,会启动ROM芯片中的CMOS程序检查CPU.内存等硬件设备是否正常运行,CMOS中的程序 ...
最新文章
- linux编译安装jpeg,Linux下JPEG库安装脚本(转)
- 聊聊《柒个我》这部剧
- python 远程控制win10界面切换_Python3如何实现Win10桌面自动切换
- [trustzone]-TZC400学习总结
- BZOJ 1588: [HNOI2002]营业额统计
- 2021牛客暑期多校训练营1 I-Increasing Subsequence(期望dp+优化)
- 计算机网络第三章知识网络,计算机基础教案第三章计算机网络基础知识教案
- 一行代码为UITextField添加收键盘功能
- ubuntu修改用户的默认目录
- 机器学习特征与类型概述
- 仿探探交友小程序V7.0.2 完整安装包+小程序前端
- 卡诺模型(用户需求分析模型)
- 没有事业的女孩子很悲惨 - - - 一位老总的话!
- 新买的华为Matebook,Office没激活,激活方法在这里!!!
- 冶金工程在计算机应用,冶金工程专业计算机应用能力分析
- yocto sysroot说明
- C基础的ObjectiveC学习
- 根据主机名查询本机的ip地址
- 分布式框架DSF的搭建
- 中国象棋将帅问题java_编程之美:中国象棋将帅问题
热门文章
- SAP 4.6C升级ECC6.0 WS_QUERY 的改法
- 最早做无糖茶的统一茶里王,是怎样错过年轻人的?
- 寺库等奢侈品电商补贴下的奢侈品市场,会是怎样的未来
- 计算机系统最大的加速能力,系统加速我用Windows系统四大自带工具 -电脑资料
- 怎么更进一步学python_【百尺竿头,更进一步学Python】Python进阶课程——进程,线程和协程的区别...
- div+css的布局方式进行设计成品作业_原创响应式php企业成品网站,清晰风格版
- Python基础教程:一个单列split转换为多行的练习题
- 【Python教程】两种方法教你拆分含有多种分隔符的字符串
- python基础中apply()函数的正确用法
- python基础教程:类和对象