Netty新连接接入与NioSocketChannel分析
原文链接:https://wangwei.one/posts/net...
前面的一些章节,我们分析了Netty的三大组件 —— Channel 、EventLoop、Pipeline ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接入的。
本文内容主要分为以下四部分:
- 新连接检测
- NioSocketChannel创建
- NioSocketChannel初始化与注册
- NioSocketChannel注册READ兴趣集
新连接检测
前面,我们在讲 EventLoop的启动过程源码分析 时,解读过下面这段代码:
public final class NioEventLoop extends SingleThreadEventLoop {...private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {...try {...if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 读取read事件unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}...}...}
我们还是以服务端 NioServerSocketChannel 为例,它绑定的unsafe实例为 NioMessageUnsafe 。上面的 unsafe.read()
接口,会向下调用到 NioMessageUnsafe.read() 接口,如下:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {// 用于保存新建立的 NioSocketChannel 的集合private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {// 确保在当前线程与EventLoop中的一致assert eventLoop().inEventLoop();// 获取 NioServerSocketChannel config配置final ChannelConfig config = config();// 获取 NioServerSocketChannel 绑定的 pipelinefinal ChannelPipeline pipeline = pipeline();// 获取RecvByteBuf 分配器 Handle// 当channel在接收数据时,allocHandle 会用于分配ByteBuf来保存数据// 关于allocHandle后面再去做详细介绍final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();// 重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// 调用后面的 doReadMessages 接口,读取到message则返回1int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}// 对当前read循环所读取到的message数量计数+1allocHandle.incMessagesRead(localRead);// 判断是否继续读取message} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 调用pipeline传播ChannelRead事件pipeline.fireChannelRead(readBuf.get(i));}// 清空readBufreadBuf.clear();allocHandle.readComplete();// 调用pipeline传播 ChannelReadComplete 事件pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}...}
对于 doReadMessages(...)
的分析:
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {...// 读取消息@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 获取 SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// 使用SocketChannel创建NioSocketChannel,将其存入buf list中// 关于NioSocketChannel的创建请看后面的分析buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}...}
对于 continueReading()
接口的分析,至于结果为什么返回false,后面会单独分析:
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {private volatile int maxMessagesPerRead;private volatile boolean respectMaybeMoreData = true;...public abstract class MaxMessageHandle implements ExtendedHandle {private ChannelConfig config;// 每次读取最大的消息数private int maxMessagePerRead;private int totalMessages;private int totalBytesRead;private int attemptedBytesRead;private int lastBytesRead;private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {@Overridepublic boolean get() {return attemptedBytesRead == lastBytesRead;}};...// 判断是否继续读取message @Overridepublic boolean continueReading() {return continueReading(defaultMaybeMoreSupplier);}// 判断是否继续读取message@Overridepublic boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {// 默认情况下 config.isAutoRead() 为true// respectMaybeMoreData 默认为 true// maybeMoreDataSupplier.get() 为false// totalMessages第一次循环则为1// maxMessagePerRead为16// 结果返回falsereturn config.isAutoRead() &&(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&totalMessages < maxMessagePerRead &&totalBytesRead > 0;}...}...}
NioSocketChannel创建
上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel的创建过程与此前我们分析 NioServerSocketChannel创建 大体类似。
构造器
先来看看 NioSocketChannel 的构造函数:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...public NioSocketChannel(Channel parent, SocketChannel socket) {// 调用父类构造器super(parent, socket);// 创建NioSocketChannelConfigconfig = new NioSocketChannelConfig(this, socket.socket());}...}
父类 AbstractNioByteChannel 构造器:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {...protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {// 调用父类构造器,并设置兴趣集为SelectionKey.OP_READ,对read事件感兴趣super(parent, ch, SelectionKey.OP_READ);}...}
父类 AbstractNioChannel 构造器:
public abstract class AbstractNioChannel extends AbstractChannel {...protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {// 调用父类构造器super(parent);// 设置channelthis.ch = ch;// 设置兴趣集this.readInterestOp = readInterestOp;try {// 设置为非阻塞ch.configureBlocking(false);} catch (IOException e) {...}}}
父类 AbstractChannel 构造器:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...protected AbstractChannel(Channel parent) {// 设置parentthis.parent = parent;// 创建channelIdid = newId();// 创建unsafeunsafe = newUnsafe();// 创建pipelinepipeline = newChannelPipeline();}...
}
ChannelConfig创建
接着我们看看 NioSocketChannelConfig 的创建逻辑:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {// 调用父类构造器super(channel, javaSocket);calculateMaxBytesPerGatheringWrite();}...}
父类 DefaultSocketChannelConfig 构造器:
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {...public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {// 调用父类构造器,绑定socketchannel super(channel);if (javaSocket == null) {throw new NullPointerException("javaSocket");}// 绑定java socketthis.javaSocket = javaSocket;// Enable TCP_NODELAY by default if possible.// netty一般运行在服务器上,不在Android上,canEnableTcpNoDelayByDefault返回trueif (PlatformDependent.canEnableTcpNoDelayByDefault()) {try {// 开启 TCP_NODELAY ,开启TCP的nagle算法// 尽量不要等待,只要发送缓冲区中有数据,并且发送窗口是打开的,就尽量把数据发送到网络上去。setTcpNoDelay(true);} catch (Exception e) {// Ignore.}}} ... }
NioSocketChannel初始化与注册
上面小节分析了NioSocketChannel的创建逻辑,创建完成之后,我们来分析一下NioSocketChannel是如何注册到NioEventLoop上去的。
在前面小节分析新连接检测的有如下小段代码:
private final class NioMessageUnsafe extends AbstractNioUnsafe {...int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 调用pipeline传播ChannelRead事件pipeline.fireChannelRead(readBuf.get(i));}...}
调用pipeline传播ChannelRead事件,这里的Pipeline是服务端Channel,也就是NioServerSocketChannel所绑定的Pipeline,此时的Pipeline的内部结构是怎么样子的呢?
那这个 ServerBootstrapAcceptor 是从哪里来的呢?
在此前,我们分析 NioServerSocketChannel初始化 时,有过下面这段代码:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...// NioServerSocketChannel初始化 void init(Channel channel) throws Exception {// 获取启动器 启动时配置的option参数,主要是TCP的一些属性final Map<ChannelOption<?>, Object> options = options0();// 将获得到 options 配置到 ChannelConfig 中去synchronized (options) {setChannelOptions(channel, options, logger);}// 获取 ServerBootstrap 启动时配置的 attr 参数final Map<AttributeKey<?>, Object> attrs = attrs0();// 配置 Channel attr,主要是设置用户自定义的一些参数synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipelineChannelPipeline p = channel.pipeline();// 将启动器中配置的 childGroup 保存到局部变量 currentChildGroupfinal EventLoopGroup currentChildGroup = childGroup;// 将启动器中配置的 childHandler 保存到局部变量 currentChildHandlerfinal ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;// 保存用户设置的 childOptions 到局部变量 currentChildOptionssynchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}// 保存用户设置的 childAttrs 到局部变量 currentChildAttrssynchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取启动器上配置的handlerChannelHandler handler = config.handler();if (handler != null) {// 添加 handler 到 pipeline 中pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor// 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去// 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}... }
ServerBootstrapAcceptor
NioServerSocketChannel初始化时,向NioServerSocketChannel所绑定的Pipeline添加了一个InboundHandler节点 —— ServerBootstrapAcceptor ,其代码如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {// 子EventLoopGroup,即为workGroupprivate final EventLoopGroup childGroup;// ServerBootstrap启动时配置的 childHandlerprivate final ChannelHandler childHandler;// ServerBootstrap启动时配置的 childOptionsprivate final Entry<ChannelOption<?>, Object>[] childOptions;// ServerBootstrap启动时配置的 childAttrsprivate final Entry<AttributeKey<?>, Object>[] childAttrs;private final Runnable enableAutoReadTask;// 构造函数ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;// Task which is scheduled to re-enable auto-read.// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may// not be able to load the class because of the file limit it already reached.//// See https://github.com/netty/netty/issues/1328enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}// 处理Pipeline所传播的channelRead事件// 也就是前面新连接检测时看到的那段代码// pipeline.fireChannelRead(readBuf.get(i));// ServerBootstrapAcceptor的channelRead接口将会被调用,用于处理channelRead事件@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {// 获取传播事件的对象数据,即为前面的readBuf.get(i)// readBuf.get(i)取出的对象为 NioSocketChannelfinal Channel child = (Channel) msg;// 向 NioSocketChannel 添加childHandler,也就是我们常看到的// ServerBootstrap在启动时配置的代码:// ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} )// 最终的结果就是向NioSocketChannel的Pipeline添加用户自定义的ChannelHandler// 用于处理客户端的channel连接child.pipeline().addLast(childHandler);// 配置 NioSocketChannel的TCP属性setChannelOptions(child, childOptions, logger);// 配置 NioSocketChannel 一些用户自定义数据for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}// 将NioSocketChannel注册到childGroup,也就是Netty的WorkerGroup当中去try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}...}...}
关于 ChannelInitializer 的讲解,可以看此前 Pipeline源码分析 文章。
后面的register逻辑,就与我们前面讲解 NioServerSocketChannel注册 大体类似了,这里简单介绍一下。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...// 注册NioSocketChannel// eventLoop为childGroup @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...// 绑定eventLoop到NioSocketChannel上AbstractChannel.this.eventLoop = eventLoop;// 现在分析的逻辑是在服务端的线程上,eventLoop与主线程不同,返回falseif (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {// 这里来调用register0方法register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}// 注册private void register0(ChannelPromise promise) {try {...boolean firstRegistration = neverRegistered;// 调用 doRegister()doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// 服务端的NioServerSocketChannel已经与客户端的NioSocketChannel建立了连接// 所以,NioSocketChannel是处于激活状态,isActive()返回tureif (isActive()) {// 对于新连接,是第一次注册if (firstRegistration) {// 传播ChannelActive事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}...} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}...}
调用到NioSocketChannel中的doRegister()方法:
public abstract class AbstractNioChannel extends AbstractChannel {...@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 将selector注册到底层JDK channel上,并附加了NioSocketChannel对象// 兴趣集设置为0,表示不关心任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {...}}}...}
NioSocketChannel 注册OP_READ兴趣集
紧接着上面的分析,传播ChannelActive事件之后的逻辑,主要就是向客户端的NioSocketChannel注册一个Read兴趣集
if (isActive()) {// 对于新连接,是第一次注册if (firstRegistration) {// 传播ChannelActive事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}
}
通过 Pipeline的传播机制 ,最终会调用到doBeginRead()接口,如下:
public abstract class AbstractNioChannel extends AbstractChannel {...protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {...@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called// 保存selectionKey到局部变量final SelectionKey selectionKey = this.selectionKey;// 判断有效性if (!selectionKey.isValid()) {return;}readPending = true;// 获取selectionKey的兴趣集// 前面小结分析doRegister()接口提到,selectionKey的兴趣集设置为0final int interestOps = selectionKey.interestOps();// 这里的 readInterestOp 是前面讲NioSocketChannel创建时设置的值// 为 SelectionKey.OP_READ,也就是1if ((interestOps & readInterestOp) == 0) {// 这样,selectionKey最终设置的兴趣集为SelectionKey.OP_READ// 表示对读事件感兴趣selectionKey.interestOps(interestOps | readInterestOp);}} ... } ...}
小结
- Netty是在哪里检测有新连接接入的?
- 新连接是怎样注册到NioEventLoop线程上的?
参考资料
- Java读源码之Netty深入剖析
Netty新连接接入与NioSocketChannel分析相关推荐
- 【Netty源码分析摘录】(八)新连接的接入
文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...
- AsyncHttpClient源码分析-基于Netty的连接池实现
原文地址:asynchttpclient源码分析-基于Netty的连接池实现 最近项目重构,有了个机会更多接触一个有别于HttpAsyncClient的异步网络框架AsyncHttpClient,是个 ...
- Netty消息接收类故障案例分析
<Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty消息接收类故障案例.李林锋此后还将在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同 ...
- Netty和RPC框架线程模型分析
<Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...
- Netty 和 RPC 框架线程模型分析
https://www.infoq.cn/article/9Ib3hbKSgQaALj02-90y 1. 背景 1.1 线程模型的重要性 对于 RPC 框架而言,影响其性能指标的主要有三个要素: I/ ...
- 正确地利用Netty建立连接池
为什么80%的码农都做不了架构师?>>> 一.问题描述 Netty是最近非常流行的高性能异步通讯框架,相对于Java原生的NIO接口,Netty封装后的异步通讯机制要简单很多. ...
- Java Netty长连接实现Android推送
罗嗦几句: 1.轮询(Pull)客户端定时的去询问服务器是否有新消息需要下发:确点很明显Android后台不停的访问网络费电还浪费流量.2.推送(Push)服务端有新消息立即发送给客户端,这就没有时间 ...
- 瞬发大量并发连接 造成MySQL连接不响应的分析
http://www.actionsky.com/docs/archives/252 2016年12月7日 黄炎 目录 1 现象 2 猜想 3 检查环境 4 猜想2 5 分析 5.1 TCP握手的 ...
- C#连接池的详细分析(转)
来源: http://www.25175.com 作者: onrd 使用连接池 连接到数据库服务器通常由几个需要软长时间的步骤组成.必须建立物理通道(例如套接字或命名管道),必须与服务器进行初次连 ...
最新文章
- 2022-2028年中国电容器电子薄膜行业市场研究及前瞻分析报告
- C++中的两种绑定方式(静态绑定、动态绑定)
- Datawhale组队学习周报(第032周)
- vs2019下载和更新速度非常慢的解决方案
- 多元二次方程 python_Python 二次方程
- 女生学python可以做什么_学 Python 都用来干嘛的?
- 中国人民公安大学网络对抗技术作业一
- sql server获取表的所有字段
- .Android开发在Eclipse环境中无法显示提示信息This element neither has attached
- jQuery就业课系列之.jQueryDOM
- 统计字符串中每种字符类型的个数demo
- stream模式不能接受blob文件_一文带你层层解锁文件下载的奥秘
- jdk7与jdk8环境共存与切换
- XML与HTML的作用不同
- linux集群的启动和停止,linux平台 spark standalone集群 使用 start-all,stop-all 管理集群的启动和退出...
- 在IT界取得成功应该知道的10件事(ZT)
- Optimal Step Nonrigid ICP Algorithms for surface registration
- AD域验证DirectoryEntry用法
- matlab求任意输入响应曲线,3.6 用Matlab进行动态响应分析
- [美容美发培训学校网站模板]织梦模版+响应式形象设计艺术教育学校网站dedecms模板+手机自适应