netty4.0.x源码分析—bootstrap
Bootstrap的意思就是引导,辅助的意思,在编写服务端或客户端程序时,我们都需要先new一个bootstrap,然后基于这个bootstrap调用函数,添加eventloop和handler,可见对bootstrap进行分析还是有必要的。
1、bootstrap结构图
bootstrap的结构比较简单,涉及的类和接口很少,如下图所示,其中Bootstrap则是客户端程序用的引导类,ServerBootstrap是服务端程序用的引导类。
2、serverbootstrap分析
这部分,专门对serverbootstrap进行分析,bootstrap过程大同小异就不作详细的分析了。下面是我们编写服务端代码的一般化过程,整个分析过程将基于下面这段代码中用到的函数进行。
// Configure the bootstrap.EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new HexDumpProxyInitializer(remoteHost, remotePort)).childOption(ChannelOption.AUTO_READ, false).bind(localPort).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
先看关键代码(注意这里面的部分函数是在AbstractBootstrap中定义的)
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;/*** Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and* {@link Channel}'s.*/public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;}
属性值ChildGroup,ChildHandler,是用来处理accpt的Channel的。group函数其实就是将parentGroup和ChildGroup进行赋值,其中parentGroup用于处理accept事件,ChildGroup用于处理accpt的Channel的IO事件。
//channel函数的实现定义在抽象父类中,其实就是通过newInstance函数生成一个具体的channel对象。
<pre name="code" class="java"> /*** The {@link Class} which is used to create {@link Channel} instances from.* You either use this or {@link #channelFactory(ChannelFactory)} if your* {@link Channel} implementation has no no-args constructor.*/public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new BootstrapChannelFactory<C>(channelClass));}/*** {@link ChannelFactory} which is used to create {@link Channel} instances from* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}* is not working for you because of some more complex needs. If your {@link Channel} implementation* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for* simplify your code.*/@SuppressWarnings("unchecked")public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}this.channelFactory = channelFactory;return (B) this;}<pre name="code" class="java"> private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;BootstrapChannelFactory(Class<? extends T> clazz) {this.clazz = clazz;}@Overridepublic T newChannel() {try {return clazz.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}}@Overridepublic String toString() {return clazz.getSimpleName() + ".class";}}
Channel函数比较简单,其实就是通过newInstance函数,生成一个具体的Channel对象,例如服务端的NioServerSocketChannel。
/*** Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.*/public ServerBootstrap childHandler(ChannelHandler childHandler) {if (childHandler == null) {throw new NullPointerException("childHandler");}this.childHandler = childHandler;return this;}
上面的函数即给serverbootstrap的childHandler赋值。
/*** Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set* {@link ChannelOption}.*/public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {if (childOption == null) {throw new NullPointerException("childOption");}if (value == null) {synchronized (childOptions) {childOptions.remove(childOption);}} else {synchronized (childOptions) {childOptions.put(childOption, value);}}return this;}
上面的函数是指定accpt的channel的属性,channel有很多属性,比如SO_TIMEOUT时间,Buf长度等等。
/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind() {validate();SocketAddress localAddress = this.localAddress;if (localAddress == null) {throw new IllegalStateException("localAddress not set");}return doBind(localAddress);}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(String inetHost, int inetPort) {return bind(new InetSocketAddress(inetHost, inetPort));}<pre name="code" class="java"> /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(SocketAddress localAddress) {validate();if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regPromise = initAndRegister();final Channel channel = regPromise.channel();final ChannelPromise promise = channel.newPromise();if (regPromise.isDone()) {doBind0(regPromise, channel, localAddress, promise);} else {regPromise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {doBind0(future, channel, localAddress, promise);}});}return promise;}<pre name="code" class="java"> private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
Bind函数层层调用过来之后,最后就调用Channel的bind函数了,下面再看channel的bind函数是如何处理的。定义在AbstractChannel中:
@Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}
channel的bind函数,最终就是调用pipeline的bind,而pipeline的bind实际上就是调用contexthandler的bind,之个之前分析write和flush的时候说过了。所以这里直接看contexthandler的bind函数。下面是定义:
@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}validatePromise(promise, false);final DefaultChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}});}return promise;}<pre name="code" class="java"> private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {try {((ChannelOutboundHandler) handler).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}
最终调用Handler的bind函数,还记得之前说的outbound类型的事件吗,这类事件提供了默认的实现方法,HeadHandler的bind函数,下面是它的定义:
@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {unsafe.bind(localAddress, promise);}
我们又看到了unsafe这个苦力了,最终的操作还是得由它来完成啊,赶紧去看看这个bind函数吧,
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {if (!ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn("A non-root user can't receive a broadcast packet if the socket " +"is not bound to a wildcard address; binding to a non-wildcard " +"address (" + localAddress + ") anyway as requested.");}boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {closeIfClosed();promise.setFailure(t);return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}promise.setSuccess();}
上面的代码最终调用了Channel的doBind函数,这里我们的Channel是NioServerSocketChannel,所以最终就是调用它的bind函数了,代码如下
@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {javaChannel().socket().bind(localAddress, config.getBacklog());}
其实它最终也是调用了JDK的Channel的socket bind函数。
看到这里,你是否会觉得有点怪异,为什么没有注册accpt事件啊,一般的我们的server socket都是要注册accpt事件到selector,用于监听连接。如果你发现了这个问题,说明你是理解socket的编程的,^_^。实际上是前面在分析bind的时候我们漏掉了一个重要的函数,initAndRegister,下面再来看看它的定义:
final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();return channel.newFailedFuture(t);}ChannelPromise regPromise = channel.newPromise();group().register(channel, regPromise);if (regPromise.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regPromise;}
在这里,我们看到了我们之前介绍event时说的register函数,它就是用于将Channel注册到eventloop中去的。eventloop经过层层调用,最终调用了SingleThreadEventLoop类中的register函数,下面是它的定义:
@Overridepublic ChannelFuture register(final Channel channel, final ChannelPromise promise) {if (channel == null) {throw new NullPointerException("channel");}if (promise == null) {throw new NullPointerException("promise");}channel.unsafe().register(this, promise);return promise;}
还是逃离不了unsafe对象的调用,前面说了那么多的unsafe,这个函数猜都可以猜测出执行过程了,这里就不细细的列举代码了。
还有一个init函数,这里需要说明一下,代码如下:
@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();if (handler() != null) {p.addLast(handler());}final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}
它就是用来处理channel 的pipeline,并添加一个ServerBootstrapAcceptor的handler,继续看看这个handler的定义,我们就会明白它的意图。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;@SuppressWarnings("unchecked")ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child);} catch (Throwable t) {child.unsafe().closeForcibly();logger.warn("Failed to register an accepted channel: " + child, t);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final ChannelConfig config = ctx.channel().config();if (config.isAutoRead()) {// stop accept new connections for 1 second to allow the channel to recover// See https://github.com/netty/netty/issues/1328config.setAutoRead(false);ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {config.setAutoRead(true);}}, 1, TimeUnit.SECONDS);}// still let the exceptionCaught event flow through the pipeline to give the user// a chance to do something with itctx.fireExceptionCaught(cause);}}
上面就是这个handler的全部代码,它重写了ChannelRead函数,它的目的其实是想将server端accept的channel注册到ChildGroup的eventloop中,这样就可以理解,服务端代码workerGroup这个eventloop的作用了,它终于在这里体现出了它的作用了。
3、总结
这篇文章主要是分析了serverbootstrap的全过程,通过对这个的分析,我们清晰的看到了平时编写socket服务端代码时对bind,register事件,以及accept channel等的处理。
http://blog.csdn.net/pingnanlee/article/details/11973769
netty4.0.x源码分析—bootstrap相关推荐
- Spark2.4.0 SparkEnv 源码分析
Spark2.4.0 SparkEnv 源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 ...
- 菜鸟读jQuery 2.0.3 源码分析系列(1)
原文链接在这里,作为一个菜鸟,我就一边读一边写 jQuery 2.0.3 源码分析系列 前面看着差不多了,看到下面一条(我是真菜鸟),推荐木有入门或者刚刚JS入门摸不着边的看看,大大们手下留情,想一起 ...
- Android 11.0 Settings源码分析 - 主界面加载
Android 11.0 Settings源码分析 - 主界面加载 本篇主要记录AndroidR Settings源码主界面加载流程,方便后续工作调试其流程. Settings代码路径: packag ...
- Android 8.0系统源码分析--Camera processCaptureResult结果回传源码分析
相机,从上到下概览一下,真是太大了,上面的APP->Framework->CameraServer->CameraHAL,HAL进程中Pipeline.接各种算法的Node.再往下的 ...
- photoshop-v.1.0.1源码分析第三篇–FilterInterface.p
photoshop-v.1.0.1源码分析第三篇–FilterInterface.p 总体预览 一.源码预览 二.语法解释 三.结构预览 四:语句分析 五:思维导图 六:疑留问题 一.源码预览 {Ph ...
- Spark2.0.2源码分析——RPC 通信机制(消息处理)
RPC 是一种远程过程的调用,即两台节点之间的数据传输. 每个组件都有它自己的执行环境,RPC 的执行环境就是 RPCENV,RPCENV 是 Spark 2.x.x 新增加的,用于替代之前版本的 a ...
- Pushlet 2.0.3 源码分析
转载地址:http://blog.csdn.net/yxw246/article/details/2418255 Pushlet 2.0.3 源码分析 ----服务器端 1 总体架构 Pushlet从 ...
- jQuery 2.0.3 源码分析 Deferred(最细的实现剖析,带图)
Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html ******************构建Deferred对象时候的流程图* ...
- 最细的实现剖析:jQuery 2.0.3源码分析Deferred
Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html **构建Deferred对象时候的流程图** **源码解析** 因为cal ...
最新文章
- dev c++怎么设置断点_Linux怎么挂载移动硬盘光盘U盘之案例分享
- 前端实现连连看小游戏(1)
- php如何禁用浏览器的缓存,php如何禁止浏览器使用缓存页面
- 【P1063】 能量项链
- C# 全角半角相互转换
- Java多线程学习三十:ThreadLocal 适合用在哪些实际生产的场景中
- jquery templates jQuery html模板
- 由于UPS故障,造成所有服务器断电。( 重启后,机器的IP也许会发生改变(包括服务器))
- 未来人工智能应用体现出的核心技术有哪些?
- Using Beyond Compare with Version Control Systems(ZZ)
- PDF如何编辑,怎么修改PDF中的文字
- Android蓝牙自动配对和Pin码设置
- 计算机公式除以键是,excel函数的除法公式(整数及余数)《计算机除法函数公式》...
- win7系统设置cmd窗口默认以管理员权限运行
- 计算机硬盘数据清零,彻底清除Windows电脑磁盘数据
- HPE主机根据磁盘序列号或位置确定Naa号
- Asp.Net Mvc基于Fleck开发的多人网页版即时聊天室
- 5分钟学会Python爬虫神器autoscraper——自动化爬虫必备
- Grid ++ MIME 类型配置 载入报表数据,检查此URL及其数据,错误提示 网络服务器响应不成功
- 硬盘串口和并口的区别
热门文章
- 如何使用“Hash文件信息校验” 工具
- pyinstaller打包后读不到配置文件的解决方法
- 汇编语言介绍,内存和总线的初步认识
- Vue.js 插件开发详解
- 第四讲 deque
- 如何实现Asp与Asp.Net共享Session
- c# mvc html.beginform,asp.net-mvc – 使用Html.BeginForm()与自定义路由
- ASP.NET-----Repeater数据控件的用法总结
- 学生成绩管理系统数据库设计
- const 和 #define区别