Bootstrap的意思就是引导,辅助的意思,在编写服务端或客户端程序时,我们都需要先new一个bootstrap,然后基于这个bootstrap调用函数,添加eventloop和handler,可见对bootstrap进行分析还是有必要的。

1、bootstrap结构图

bootstrap的结构比较简单,涉及的类和接口很少,如下图所示,其中Bootstrap则是客户端程序用的引导类,ServerBootstrap是服务端程序用的引导类。

2、serverbootstrap分析

这部分,专门对serverbootstrap进行分析,bootstrap过程大同小异就不作详细的分析了。下面是我们编写服务端代码的一般化过程,整个分析过程将基于下面这段代码中用到的函数进行。

// Configure the bootstrap.EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new HexDumpProxyInitializer(remoteHost, remotePort)).childOption(ChannelOption.AUTO_READ, false).bind(localPort).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}

先看关键代码(注意这里面的部分函数是在AbstractBootstrap中定义的)

    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;/*** Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and* {@link Channel}'s.*/public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;}

属性值ChildGroup,ChildHandler,是用来处理accpt的Channel的。group函数其实就是将parentGroup和ChildGroup进行赋值,其中parentGroup用于处理accept事件,ChildGroup用于处理accpt的Channel的IO事件。

    //channel函数的实现定义在抽象父类中,其实就是通过newInstance函数生成一个具体的channel对象。
<pre name="code" class="java">    /*** The {@link Class} which is used to create {@link Channel} instances from.* You either use this or {@link #channelFactory(ChannelFactory)} if your* {@link Channel} implementation has no no-args constructor.*/public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new BootstrapChannelFactory<C>(channelClass));}/*** {@link ChannelFactory} which is used to create {@link Channel} instances from* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}* is not working for you because of some more complex needs. If your {@link Channel} implementation* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for* simplify your code.*/@SuppressWarnings("unchecked")public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}this.channelFactory = channelFactory;return (B) this;}<pre name="code" class="java">    private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;BootstrapChannelFactory(Class<? extends T> clazz) {this.clazz = clazz;}@Overridepublic T newChannel() {try {return clazz.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}}@Overridepublic String toString() {return clazz.getSimpleName() + ".class";}}

Channel函数比较简单,其实就是通过newInstance函数,生成一个具体的Channel对象,例如服务端的NioServerSocketChannel。

    /*** Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.*/public ServerBootstrap childHandler(ChannelHandler childHandler) {if (childHandler == null) {throw new NullPointerException("childHandler");}this.childHandler = childHandler;return this;}

上面的函数即给serverbootstrap的childHandler赋值。

    /*** Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set* {@link ChannelOption}.*/public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {if (childOption == null) {throw new NullPointerException("childOption");}if (value == null) {synchronized (childOptions) {childOptions.remove(childOption);}} else {synchronized (childOptions) {childOptions.put(childOption, value);}}return this;}

上面的函数是指定accpt的channel的属性,channel有很多属性,比如SO_TIMEOUT时间,Buf长度等等。

    /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind() {validate();SocketAddress localAddress = this.localAddress;if (localAddress == null) {throw new IllegalStateException("localAddress not set");}return doBind(localAddress);}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(String inetHost, int inetPort) {return bind(new InetSocketAddress(inetHost, inetPort));}<pre name="code" class="java">    /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(SocketAddress localAddress) {validate();if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regPromise = initAndRegister();final Channel channel = regPromise.channel();final ChannelPromise promise = channel.newPromise();if (regPromise.isDone()) {doBind0(regPromise, channel, localAddress, promise);} else {regPromise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {doBind0(future, channel, localAddress, promise);}});}return promise;}<pre name="code" class="java">    private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

Bind函数层层调用过来之后,最后就调用Channel的bind函数了,下面再看channel的bind函数是如何处理的。定义在AbstractChannel中:

    @Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}

channel的bind函数,最终就是调用pipeline的bind,而pipeline的bind实际上就是调用contexthandler的bind,之个之前分析write和flush的时候说过了。所以这里直接看contexthandler的bind函数。下面是定义:

    @Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}validatePromise(promise, false);final DefaultChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}});}return promise;}<pre name="code" class="java">    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {try {((ChannelOutboundHandler) handler).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}

最终调用Handler的bind函数,还记得之前说的outbound类型的事件吗,这类事件提供了默认的实现方法,HeadHandler的bind函数,下面是它的定义:

        @Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {unsafe.bind(localAddress, promise);}

我们又看到了unsafe这个苦力了,最终的操作还是得由它来完成啊,赶紧去看看这个bind函数吧,

        @Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {if (!ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn("A non-root user can't receive a broadcast packet if the socket " +"is not bound to a wildcard address; binding to a non-wildcard " +"address (" + localAddress + ") anyway as requested.");}boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {closeIfClosed();promise.setFailure(t);return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}promise.setSuccess();}

上面的代码最终调用了Channel的doBind函数,这里我们的Channel是NioServerSocketChannel,所以最终就是调用它的bind函数了,代码如下

    @Overrideprotected void doBind(SocketAddress localAddress) throws Exception {javaChannel().socket().bind(localAddress, config.getBacklog());}

其实它最终也是调用了JDK的Channel的socket bind函数。

看到这里,你是否会觉得有点怪异,为什么没有注册accpt事件啊,一般的我们的server socket都是要注册accpt事件到selector,用于监听连接。如果你发现了这个问题,说明你是理解socket的编程的,^_^。实际上是前面在分析bind的时候我们漏掉了一个重要的函数,initAndRegister,下面再来看看它的定义:

    final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();return channel.newFailedFuture(t);}ChannelPromise regPromise = channel.newPromise();group().register(channel, regPromise);if (regPromise.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.//    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully//    added to the event loop's task queue for later execution.//    i.e. It's safe to attempt bind() or connect() now://         because bind() or connect() will be executed *after* the scheduled registration task is executed//         because register(), bind(), and connect() are all bound to the same thread.return regPromise;}

在这里,我们看到了我们之前介绍event时说的register函数,它就是用于将Channel注册到eventloop中去的。eventloop经过层层调用,最终调用了SingleThreadEventLoop类中的register函数,下面是它的定义:

    @Overridepublic ChannelFuture register(final Channel channel, final ChannelPromise promise) {if (channel == null) {throw new NullPointerException("channel");}if (promise == null) {throw new NullPointerException("promise");}channel.unsafe().register(this, promise);return promise;}

还是逃离不了unsafe对象的调用,前面说了那么多的unsafe,这个函数猜都可以猜测出执行过程了,这里就不细细的列举代码了。

还有一个init函数,这里需要说明一下,代码如下:

    @Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();if (handler() != null) {p.addLast(handler());}final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}

它就是用来处理channel 的pipeline,并添加一个ServerBootstrapAcceptor的handler,继续看看这个handler的定义,我们就会明白它的意图。

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;@SuppressWarnings("unchecked")ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child);} catch (Throwable t) {child.unsafe().closeForcibly();logger.warn("Failed to register an accepted channel: " + child, t);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final ChannelConfig config = ctx.channel().config();if (config.isAutoRead()) {// stop accept new connections for 1 second to allow the channel to recover// See https://github.com/netty/netty/issues/1328config.setAutoRead(false);ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {config.setAutoRead(true);}}, 1, TimeUnit.SECONDS);}// still let the exceptionCaught event flow through the pipeline to give the user// a chance to do something with itctx.fireExceptionCaught(cause);}}

上面就是这个handler的全部代码,它重写了ChannelRead函数,它的目的其实是想将server端accept的channel注册到ChildGroup的eventloop中,这样就可以理解,服务端代码workerGroup这个eventloop的作用了,它终于在这里体现出了它的作用了。

3、总结

这篇文章主要是分析了serverbootstrap的全过程,通过对这个的分析,我们清晰的看到了平时编写socket服务端代码时对bind,register事件,以及accept channel等的处理。

http://blog.csdn.net/pingnanlee/article/details/11973769

netty4.0.x源码分析—bootstrap相关推荐

  1. Spark2.4.0 SparkEnv 源码分析

    Spark2.4.0 SparkEnv 源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 ...

  2. 菜鸟读jQuery 2.0.3 源码分析系列(1)

    原文链接在这里,作为一个菜鸟,我就一边读一边写 jQuery 2.0.3 源码分析系列 前面看着差不多了,看到下面一条(我是真菜鸟),推荐木有入门或者刚刚JS入门摸不着边的看看,大大们手下留情,想一起 ...

  3. Android 11.0 Settings源码分析 - 主界面加载

    Android 11.0 Settings源码分析 - 主界面加载 本篇主要记录AndroidR Settings源码主界面加载流程,方便后续工作调试其流程. Settings代码路径: packag ...

  4. Android 8.0系统源码分析--Camera processCaptureResult结果回传源码分析

    相机,从上到下概览一下,真是太大了,上面的APP->Framework->CameraServer->CameraHAL,HAL进程中Pipeline.接各种算法的Node.再往下的 ...

  5. photoshop-v.1.0.1源码分析第三篇–FilterInterface.p

    photoshop-v.1.0.1源码分析第三篇–FilterInterface.p 总体预览 一.源码预览 二.语法解释 三.结构预览 四:语句分析 五:思维导图 六:疑留问题 一.源码预览 {Ph ...

  6. Spark2.0.2源码分析——RPC 通信机制(消息处理)

    RPC 是一种远程过程的调用,即两台节点之间的数据传输. 每个组件都有它自己的执行环境,RPC 的执行环境就是 RPCENV,RPCENV 是 Spark 2.x.x 新增加的,用于替代之前版本的 a ...

  7. Pushlet 2.0.3 源码分析

    转载地址:http://blog.csdn.net/yxw246/article/details/2418255 Pushlet 2.0.3 源码分析 ----服务器端 1 总体架构 Pushlet从 ...

  8. jQuery 2.0.3 源码分析 Deferred(最细的实现剖析,带图)

    Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html ******************构建Deferred对象时候的流程图* ...

  9. 最细的实现剖析:jQuery 2.0.3源码分析Deferred

    Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html **构建Deferred对象时候的流程图** **源码解析** 因为cal ...

最新文章

  1. dev c++怎么设置断点_Linux怎么挂载移动硬盘光盘U盘之案例分享
  2. 前端实现连连看小游戏(1)
  3. php如何禁用浏览器的缓存,php如何禁止浏览器使用缓存页面
  4. 【P1063】 能量项链
  5. C# 全角半角相互转换
  6. Java多线程学习三十:ThreadLocal 适合用在哪些实际生产的场景中
  7. jquery templates jQuery html模板
  8. 由于UPS故障,造成所有服务器断电。( 重启后,机器的IP也许会发生改变(包括服务器))
  9. 未来人工智能应用体现出的核心技术有哪些?
  10. Using Beyond Compare with Version Control Systems(ZZ)
  11. PDF如何编辑,怎么修改PDF中的文字
  12. Android蓝牙自动配对和Pin码设置
  13. 计算机公式除以键是,excel函数的除法公式(整数及余数)《计算机除法函数公式》...
  14. win7系统设置cmd窗口默认以管理员权限运行
  15. 计算机硬盘数据清零,彻底清除Windows电脑磁盘数据
  16. HPE主机根据磁盘序列号或位置确定Naa号
  17. Asp.Net Mvc基于Fleck开发的多人网页版即时聊天室
  18. 5分钟学会Python爬虫神器autoscraper——自动化爬虫必备
  19. Grid ++ MIME 类型配置 载入报表数据,检查此URL及其数据,错误提示 网络服务器响应不成功
  20. 硬盘串口和并口的区别

热门文章

  1. 如何使用“Hash文件信息校验” 工具
  2. pyinstaller打包后读不到配置文件的解决方法
  3. 汇编语言介绍,内存和总线的初步认识
  4. Vue.js 插件开发详解
  5. 第四讲 deque
  6. 如何实现Asp与Asp.Net共享Session
  7. c# mvc html.beginform,asp.net-mvc – 使用Html.BeginForm()与自定义路由
  8. ASP.NET-----Repeater数据控件的用法总结
  9. 学生成绩管理系统数据库设计
  10. const 和 #define区别