【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!
前言
前面小飞已经讲解了NIO
和Netty
服务端启动,这一讲是Client
的启动过程。
源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty
源码及以后的文章版本都基于:4.1.22.Final
本篇是以NettyClient
启动为切入点,带大家一步步进入Netty
源码的世界。
Client启动流程揭秘
1、探秘的入口:netty-client demo
这里用netty-exmaple
中的EchoClient
来作为例子:
public final class EchoClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new EchoClientHandler());}});ChannelFuture f = b.connect(HOST, PORT).sync();f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}
}
代码没有什么独特的地方,我们上一篇文章时也梳理过Netty
网络编程的一些套路,这里就不再赘述了。
(忘记的小朋友可以查看Netty
系列文章中查找~)
上面的客户端代码虽然简单, 但是却展示了Netty
客户端初始化时所需的所有内容:
EventLoopGroup
:Netty
服务端或者客户端,都必须指定EventLoopGroup
,客户端指定的是NioEventLoopGroup
Bootstrap
:Netty
客户端启动类,负责客户端的启动和初始化过程channel()
类型:指定Channel
的类型,因为这里是客户端,所以使用的是NioSocketChannel
,服务端会使用NioServerSocketChannel
Handler
:设置数据的处理器bootstrap.connect()
: 客户端连接netty
服务的方法
2、NioEventLoopGroup 流程解析
我们先从NioEventLoopGroup
开始,一行行代码解析,先看看其类结构:
上面是大致的类结构,而 EventLoop
又继承自EventLoopGroup
,所以类的大致结构我们可想而知。这里一些核心逻辑会在MultithreadEventExecutorGroup
中,包含EventLoopGroup
的创建和初始化操作等。
接着从NioEventLoopGroup
构造方法开始看起,一步步往下跟(代码都只展示重点的部分,省去很多暂时不需要关心的代码,以下代码都遵循这个原则):
EventLoopGroup group = new NioEventLoopGroup();public NioEventLoopGroup() {this(0);
}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里通过调用this()
和super()
方法一路往下传递,期间会构造一些默认属性,一直传递到MultithreadEventExecutorGroup
类中,接着往西看。
2.1、MultithreadEventExecutorGroup
上面构造函数有一个重要的参数传递:DEFAULT_EVENT_LOOP_THREADS
,这个值默认是CPU核数 * 2
。
为什么要传递这个参数呢?我们之前说过EventLoopGroup
可以理解成一个线程池,MultithreadEventExecutorGroup
有一个线程数组EventExecutor[] children
属性,而传递过来的DEFAULT_EVENT_LOOP_THREADS
就是数组的长度。
先看下MultithreadEventExecutorGroup
中的构造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);}// ... 省略
}
这段代码执行逻辑可以理解为:
- 通过
ThreadPerTaskExecutor
构造一个Executor
执行器,后面会细说,里面包含了线程执行的execute()
方法 - 接着创建一个
EventExecutor
数组对象,大小为传递进来的threads
数量,这个所谓的EventExecutor
可以理解为我们的EventLoop
,在这个demo中就是NioEventLoop
对象 - 最后调用
newChild
方法逐个初始化EventLoopGroup
中的EventLoop
对象
上面只是大概说了下MultithreadEventExecutorGroup
中的构造方法做的事情,后面还会一个个详细展开,先不用着急,我们先有个整体的认知就好。
再回到MultithreadEventExecutorGroup
中的构造方法入参中,有个EventExecutorChooserFactory
对象,这里面是有个很亮眼的细节设计,通过它我们来洞悉Netty
的良苦用心。
2.1、亮点设计:DefaultEventExecutorChooserFactory
EventExecutorChooserFactory
这个类的作用是用来选择EventLoop
执行器的,我们知道EventLoopGroup
是一个包含了CPU * 2
个数量的EventLoop
数组对象,那每次选择EventLoop
来执行任务是选择数组中的哪一个呢?
我们看一下这个类的具体实现,红框中
都是需要重点查看的地方:
DefaultEventExecutorChooserFactory
是一个选择器工厂类,调用里面的next()
方法达到一个轮询选择的目的。
数组的长度是length,执行第n次,取数组中的哪个元素就是对length取余
继续回到代码的实现,这里的优化就是在于先通过isPowerOfTwo()
方法判断数组的长度是否为2的n次幂,判断的方式很巧妙,使用val & -val == val
,这里我不做过多的解释,网上还有很多判断2的n次幂的优秀解法,我就不班门弄斧了。(可参考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/)
当然我认为这里还有更容易理解的一个算法:x & (x - 1) == 0
大家可以看下面的图就懂了,这里就不延展了:
BUT!!! 这里为什么要去煞费苦心的判断数组的长度是2的n次幂?
不知道小伙伴们是否还记得大明湖畔的HashMap
?一般我们要求HashMap
数组的长度需要是2的n次幂,因为在key
值寻找数组位置的方法:(n - 1) & hash
n是数组长度,这里如果数组长度是2的n次幂就可以通过位运算来提升性能,当length
为2的n次幂时下面公式是等价的:
n & (length - 1) <=> n % length
还记得上面说过,数组的长度默认都是CPU * 2
,而一般服务器CPU核心数都是2、4、8、16等等,所以这一个小优化就很实用了,再仔细想想,原来数组长度的初始化也是很讲究的。
这里位运算的好处就是效率远远高于与运算,Netty
针对于这个小细节都做了优化,真是太棒了。
2.3、线程执行器:ThreadPerTaskExecutor
接着看下ThreadPerTaskExecutor
线程执行器,每次执行任务都会通过它来创建一个线程实体。
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();}
}
传递进来的threadFactory
为DefaultThreadFactory
,这里面会构造NioEventLoop
线程命名规则为nioEventLoop-1-xxx
,我们就不细看这个了。当线程执行的时候会调用execute()
方法,这里会创建一个FastThreadLocalThread
线程,具体看代码:
public class DefaultThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());return t;}protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);}
}
这里通过newThread()
来创建一个线程,然后初始化线程对象数据,最终会调用到Thread.init()
中。
2.4、EventLoop初始化
接着继续看MultithreadEventExecutorGroup
构造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);// .... 省略部分代码}
}
上面代码的最后一部分是 newChild
方法, 这个是一个抽象方法, 它的任务是实例化 EventLoop
对象. 我们跟踪一下它的代码, 可以发现, 这个方法在 NioEventLoopGroup
类中实现了, 其内容很简单:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;final SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;
}
其实就是实例化一个 NioEventLoop
对象, 然后返回。NioEventLoop
构造函数中会保存provider
和事件轮询器selector
,在其父类中还会创建一个MpscQueue队列
,然后保存线程执行器executor
。
再回过头来想一想,MultithreadEventExecutorGroup
内部维护了一个 EventExecutor[] children
数组, Netty
的 EventLoopGroup
的实现机制其实就建立在 MultithreadEventExecutorGroup
之上。
每当 Netty
需要一个 EventLoop
时, 会调用 next()
方法从EventLoopGroup
数组中获取一个可用的 EventLoop
对象。其中next
方法的实现是通过NioEventLoopGroup.next()
来完成的,就是用的上面有过讲解的通过轮询算法来计算得出的。
最后总结一下整个 EventLoopGroup
的初始化过程:
EventLoopGroup
(其实是MultithreadEventExecutorGroup
) 内部维护一个类型为EventExecutor children
数组,数组长度是nThreads
- 如果我们在实例化
NioEventLoopGroup
时, 如果指定线程池大小, 则nThreads
就是指定的值, 反之是处理器核心数 * 2
MultithreadEventExecutorGroup
中会调用newChild
抽象方法来初始化children
数组- 抽象方法
newChild
是在NioEventLoopGroup
中实现的, 它返回一个NioEventLoop
实例. NioEventLoop
属性:SelectorProvider provider
属性:NioEventLoopGroup
构造器中通过SelectorProvider.provider()
获取一个SelectorProvider
Selector selector
属性:NioEventLoop
构造器中通过调用通过selector = provider.openSelector()
获取一个selector
对象.
2.5、NioSocketChannel
在Netty
中,Channel
是对Socket
的抽象,每当Netty
建立一个连接后,都会有一个与其对应的Channel
实例。
我们在开头的Demo
中,设置了channel(NioSocketChannel.class)
,NioSocketChannel
的类结构如下:
接着分析代码,当我们调用b.channel()
时实际上会进入AbstractBootstrap.channel()
逻辑,接着看AbstractBootstrap
中代码:
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");}this.clazz = clazz;
}public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}this.channelFactory = channelFactory;return self();
}
可以看到,这里ReflectiveChannelFactory
其实就是返回我们指定的channelClass:NioSocketChannel
, 然后指定AbstractBootstrap
中的channelFactory = new ReflectiveChannelFactory()
。
2.6、Channel初始化流程
到了这一步,我们已经知道NioEventLoopGroup
和channel()
的流程,接着来看看Channel
的 初始化流程,这也是Netty
客户端启动的的核心流程之一:
ChannelFuture f = b.connect(HOST, PORT).sync();
接着就开始从b.connect()
为入口一步步往后跟,先看下NioSocketChannel
构造的整体流程:
从connet
往后梳理下整体流程:
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {Channel channel = channelFactory.newChannel();init(channel);ChannelFuture regFuture = config().group().register(channel);return regFuture;
}
为了更易读,这里代码都做了简化,只保留了一些重要的代码。
紧接着我们看看channelFactory.newChannel()
做了什么,这里channelFactory
是ReflectiveChannelFactory
,我们在上面的章节分析过:
@Override
public T newChannel() {try {return clazz.getConstructor().newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}
}
这里的clazz
是NioSocketChannel
,同样是在上面章节讲到过,这里是调用NioSocketChannel
的构造函数然后初始化一个Channel
实例。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {public NioSocketChannel() {this(DEFAULT_SELECTOR_PROVIDER);}public NioSocketChannel(SelectorProvider provider) {this(newSocket(provider));}private static SocketChannel newSocket(SelectorProvider provider) {try {return provider.openSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a socket.", e);}}
}
这里其实也很简单,就是创建一个Java NIO SocketChannel
而已,接着看看NioSocketChannel
的父类还做了哪些事情,这里梳理下类的关系:
NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);ch.configureBlocking(false);}
}
这里会调用父类的构造参数,并且传递readInterestOp = SelectionKey.OP_READ:
,这里还有一个很重要的点,配置 Java NIO SocketChannel
为非阻塞的,我们之前在NIO
章节的时候讲解过,这里也不再赘述。
接着继续看AbstractChannel
的构造函数:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}
}
这里创建一个ChannelId
,创建一个Unsafe
对象,这里的Unsafe
并不是Java中的Unsafe,后面也会讲到。然后创建一个ChannelPipeline
,后面也会讲到,到了这里,一个完整的NioSocketChannel
就初始化完成了,我们再来总结一下:
Netty
的SocketChannel
会与Java
原生的SocketChannel
绑定在一起;- 会注册
Read
事件; - 会为每一个
Channel
分配一个channelId
; - 会为每一个
Channel
创建一个Unsafe
对象; - 会为每一个
Channel
分配一个ChannelPipeline
;
2.7、Channel 注册流程
还是回到最上面initAndRegister
方法,我们上面都是在分析里面newChannel
的操作,这个方法是NioSocketChannel
创建的一个流程,接着我们在继续跟init()
和register()
的过程:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {final ChannelFuture initAndRegister() {Channel channel = channelFactory.newChannel();init(channel);ChannelFuture regFuture = config().group().register(channel);}
}
init()
就是将一些参数options
和attrs
设置到channel
中,我们重点需要看的是register
方法,其调用链为:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
这里最后到了unsafe
的register()
方法,最终调用到AbstractNioChannel.doRegister()
:
@Override
protected void doRegister() throws Exception {boolean selected = false;for (;;) {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;}
}
javaChannel()
就是Java NIO
中的SocketChannel
,这里是将SocketChannel
注册到与eventLoop
相关联的selector
上。
最后我们整理一下服务启动的整体流程:
initAndRegister()
初始化并注册什么呢?
channelFactory.newChannel()
- 通过反射创建一个
NioSocketChannel
- 将
Java
原生Channel
绑定到NettyChannel
中 - 注册
Read
事件 - 为
Channel
分配id
- 为
Channel
创建unsafe
对象 - 为
Channel
创建ChannelPipeline
(默认是head<=>tail
的双向链表)
- `init(channel)``
- 把
Bootstrap
中的配置设置到Channel
中
register(channel)
- 把
Channel
绑定到一个EventLoop
上 - 把
Java
原生Channel、Netty
的Channel、Selector
绑定到SelectionKey
中 - 触发
Register
相关的事件
2.8 unsafe初始化
上面有提到过在初始化Channel
的过程中会创建一个Unsafe
的对象,然后绑定到Channel
上:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}
newUnsafe
直接调用到了NioSocketChannel
中的方法:
@Override
protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();
}
NioSocketChannelUnsafe
是NioSocketChannel
中的一个内部类,然后向上还有几个父类继承,这里主要是对应到相关Java
底层的Socket
操作。
2.9 pipeline初始化
我们还是回到pipeline
初始化的过程,来看一下newChannelPipeline()
的具体实现:
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}
我们调用 DefaultChannelPipeline
的构造器, 传入了一个 channel
, 而这个 channel
其实就是我们实例化的 NioSocketChannel
。
DefaultChannelPipeline
会将这个 NioSocketChannel
对象保存在channel
字段中. DefaultChannelPipeline
中, 还有两个特殊的字段, 即 head
和 tail
, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline
中, 维护了一个以 AbstractChannelHandlerContext
为节点的双向链表, 这个链表是 Netty
实现 Pipeline
机制的关键.
关于 DefaultChannelPipeline
中的双向链表以及它所起的作用, 我们会在后续章节详细讲解。这里只是对pipeline
做个初步的认识。
HeadContext
的继承层次结构如下所示:
TailContext
的继承层次结构如下所示:
我们可以看到, 链表中 head
是一个 ChannelOutboundHandler
, 而 tail
则是一个 ChannelInboundHandler
.
3.0、客户端connect过程
客户端连接的入口方法还是在Bootstrap.connect()
中,上面也分析过一部分内容,请求的具体流程是:
Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)throws IOException {try {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws IOException {return socketChannel.connect(remoteAddress);}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
}
看到这里,还是用Java NIO SocketChannel
发送的connect
请求进行客户端连接请求。
总结
本篇文章以一个Netty Client demo
为入口,然后解析了NioEventLoopGroup
创建的流程、Channel
的创建和注册的流程,以及客户端发起connect
的具体流程,这里对于很多细节并没有很深的深入下去,这些会放到后续的源码分析文章,敬请期待~
【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!相关推荐
- 一看就懂的vue简版源码概述
本文不会拆步骤对源码进行实现,只介绍vue原理及相关核心实现思想.在之前的四篇文章中已对vue进行响应实现.需要可阅读相关文章: Vue源码探索之知识小储备 --Object.defineProper ...
- 《游戏系统设计十》从零复刻王者荣耀活动系统,策划都能看得懂的活动系统,源码奉送
目录 1.活动类型 2.需求 3.文件下载 4.文件解压 5.json的读取 6.模块组织方式
- TreeMap源码分析,看了都说好
一.简介 TreeMap最早出现在JDK 1.2中,是 Java 集合框架中比较重要一个的实现.TreeMap 底层基于红黑树实现,可保证在log(n)时间复杂度内完成 containsKey.get ...
- Netty源码分析第6章(解码器)----第4节: 分隔符解码器
Netty源码分析第6章(解码器)---->第4节: 分隔符解码器 Netty源码分析第六章: 解码器 第四节: 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecode ...
- Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用
Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程 第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...
- 音视频开发之旅(67) - 变速不变调之sonic源码分析
目录 基音周期.浊音的概念 Sonic源码分析 资料 收获 上一篇我们学习了音频变速不变调的原理以及WSOLA波形相似叠加算法进行时域压扩处理.其中在寻找相似帧方面,Sonic采用AMDF(平均幅度差 ...
- 【Netty系列_3】Netty源码分析之服务端channel
highlight: androidstudio 前言 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精 ...
- 【Netty源码分析摘录】(八)新连接的接入
文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...
- Netty源码分析系列之服务端Channel的端口绑定
扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...
最新文章
- Python 常用Web框架的比较
- iPad,耳机,手机,电脑,都能用typec
- 【机器学习基础】(六):通俗易懂无监督学习K-Means聚类算法及代码实践
- 【转】匈牙利算法理解
- Android——本地服务基础(一)
- phpcms_v9推送到其他栏目后再在其他栏目删除导致数据库出错
- 安装VSCode作为常用的文本编辑器
- Spring mvc4 + ActiveMQ 整合
- java代码读取dbsequence的值_JDBC读取新插入Oracle数据库Sequence值的5种方法
- mybatis13--2级缓存
- HTML——H5前端框架
- kettle使用httpClient获取ES索引数据
- 微信公众号文章中插入的图片如何实现滑动效果
- Mesos | 1.3.2 webui static 界面代码分析 ——app.js/relative-date.js
- 将“闲置资源”重新利用,这家公司重组闲置市场
- 网站优化排名的5个方法
- 京东618自动浏览叠蛋糕app
- 使用yagmail模块群发工资条
- Oracle之数据排序
- iPhone4 SIM失败?无效SIM?有效解决
热门文章
- fedora 16 安装后的基本配置
- [apifox学习笔记]在所有接口中添加登录后获取的token(有图示)
- Python实现自动控制登录网页
- iOS OC10_Block
- 4.4OC10-内存管理2-set方法的内存管理
- EAS融资租赁系统(财务业务一体化)
- android文件管理器listview,浅析Android文件管理器(项目一)
- 如何将wps中的表格转为图片,并设置较高的dpi
- [源码]Meepo路由
- vba批量合并指定的sheet_Excel VBA 多个工作表合并方法