原文链接:https://wangwei.one/posts/net...

前面的一些章节,我们分析了Netty的三大组件 —— Channel 、EventLoop、Pipeline ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接入的。

本文内容主要分为以下四部分:

  • 新连接检测
  • NioSocketChannel创建
  • NioSocketChannel初始化与注册
  • NioSocketChannel注册READ兴趣集

新连接检测

前面,我们在讲 EventLoop的启动过程源码分析 时,解读过下面这段代码:

public final class NioEventLoop extends SingleThreadEventLoop {...private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {...try {...if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 读取read事件unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}...}...}    

我们还是以服务端 NioServerSocketChannel 为例,它绑定的unsafe实例为 NioMessageUnsafe 。上面的 unsafe.read() 接口,会向下调用到 NioMessageUnsafe.read() 接口,如下:

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {// 用于保存新建立的 NioSocketChannel 的集合private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {// 确保在当前线程与EventLoop中的一致assert eventLoop().inEventLoop();// 获取 NioServerSocketChannel config配置final ChannelConfig config = config();// 获取 NioServerSocketChannel 绑定的 pipelinefinal ChannelPipeline pipeline = pipeline();// 获取RecvByteBuf 分配器 Handle// 当channel在接收数据时,allocHandle 会用于分配ByteBuf来保存数据// 关于allocHandle后面再去做详细介绍final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();// 重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// 调用后面的 doReadMessages 接口,读取到message则返回1int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}// 对当前read循环所读取到的message数量计数+1allocHandle.incMessagesRead(localRead);// 判断是否继续读取message} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 调用pipeline传播ChannelRead事件pipeline.fireChannelRead(readBuf.get(i));}// 清空readBufreadBuf.clear();allocHandle.readComplete();// 调用pipeline传播 ChannelReadComplete 事件pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}...}    

对于 doReadMessages(...) 的分析:

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {...// 读取消息@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 获取 SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// 使用SocketChannel创建NioSocketChannel,将其存入buf list中// 关于NioSocketChannel的创建请看后面的分析buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}...}

对于 continueReading() 接口的分析,至于结果为什么返回false,后面会单独分析:

public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {private volatile int maxMessagesPerRead;private volatile boolean respectMaybeMoreData = true;...public abstract class MaxMessageHandle implements ExtendedHandle {private ChannelConfig config;// 每次读取最大的消息数private int maxMessagePerRead;private int totalMessages;private int totalBytesRead;private int attemptedBytesRead;private int lastBytesRead;private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {@Overridepublic boolean get() {return attemptedBytesRead == lastBytesRead;}};...// 判断是否继续读取message    @Overridepublic boolean continueReading() {return continueReading(defaultMaybeMoreSupplier);}// 判断是否继续读取message@Overridepublic boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {// 默认情况下 config.isAutoRead() 为true// respectMaybeMoreData 默认为 true// maybeMoreDataSupplier.get() 为false// totalMessages第一次循环则为1// maxMessagePerRead为16// 结果返回falsereturn config.isAutoRead() &&(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&totalMessages < maxMessagePerRead &&totalBytesRead > 0;}...}...}

NioSocketChannel创建

上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel的创建过程与此前我们分析 NioServerSocketChannel创建 大体类似。

构造器

先来看看 NioSocketChannel 的构造函数:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...public NioSocketChannel(Channel parent, SocketChannel socket) {// 调用父类构造器super(parent, socket);// 创建NioSocketChannelConfigconfig = new NioSocketChannelConfig(this, socket.socket());}...}

父类 AbstractNioByteChannel 构造器:

public abstract class AbstractNioByteChannel extends AbstractNioChannel {...protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {// 调用父类构造器,并设置兴趣集为SelectionKey.OP_READ,对read事件感兴趣super(parent, ch, SelectionKey.OP_READ);}...}

父类 AbstractNioChannel 构造器:

public abstract class AbstractNioChannel extends AbstractChannel {...protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {// 调用父类构造器super(parent);// 设置channelthis.ch = ch;// 设置兴趣集this.readInterestOp = readInterestOp;try {// 设置为非阻塞ch.configureBlocking(false);} catch (IOException e) {...}}}

父类 AbstractChannel 构造器:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...protected AbstractChannel(Channel parent) {// 设置parentthis.parent = parent;// 创建channelIdid = newId();// 创建unsafeunsafe = newUnsafe();// 创建pipelinepipeline = newChannelPipeline();}...
}

ChannelConfig创建

接着我们看看 NioSocketChannelConfig 的创建逻辑:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {// 调用父类构造器super(channel, javaSocket);calculateMaxBytesPerGatheringWrite();}...}

父类 DefaultSocketChannelConfig 构造器:

public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {...public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {// 调用父类构造器,绑定socketchannel super(channel);if (javaSocket == null) {throw new NullPointerException("javaSocket");}// 绑定java socketthis.javaSocket = javaSocket;// Enable TCP_NODELAY by default if possible.// netty一般运行在服务器上,不在Android上,canEnableTcpNoDelayByDefault返回trueif (PlatformDependent.canEnableTcpNoDelayByDefault()) {try {// 开启 TCP_NODELAY ,开启TCP的nagle算法// 尽量不要等待,只要发送缓冲区中有数据,并且发送窗口是打开的,就尽量把数据发送到网络上去。setTcpNoDelay(true);} catch (Exception e) {// Ignore.}}}                                  ... }                                        

NioSocketChannel初始化与注册

上面小节分析了NioSocketChannel的创建逻辑,创建完成之后,我们来分析一下NioSocketChannel是如何注册到NioEventLoop上去的。

在前面小节分析新连接检测的有如下小段代码:

private final class NioMessageUnsafe extends AbstractNioUnsafe {...int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 调用pipeline传播ChannelRead事件pipeline.fireChannelRead(readBuf.get(i));}...}    

调用pipeline传播ChannelRead事件,这里的Pipeline是服务端Channel,也就是NioServerSocketChannel所绑定的Pipeline,此时的Pipeline的内部结构是怎么样子的呢?

那这个 ServerBootstrapAcceptor 是从哪里来的呢?

在此前,我们分析 NioServerSocketChannel初始化 时,有过下面这段代码:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...// NioServerSocketChannel初始化    void init(Channel channel) throws Exception {// 获取启动器 启动时配置的option参数,主要是TCP的一些属性final Map<ChannelOption<?>, Object> options = options0();// 将获得到 options 配置到 ChannelConfig 中去synchronized (options) {setChannelOptions(channel, options, logger);}// 获取 ServerBootstrap 启动时配置的 attr 参数final Map<AttributeKey<?>, Object> attrs = attrs0();// 配置 Channel attr,主要是设置用户自定义的一些参数synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipelineChannelPipeline p = channel.pipeline();// 将启动器中配置的 childGroup 保存到局部变量 currentChildGroupfinal EventLoopGroup currentChildGroup = childGroup;// 将启动器中配置的 childHandler 保存到局部变量 currentChildHandlerfinal ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;// 保存用户设置的 childOptions 到局部变量 currentChildOptionssynchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}// 保存用户设置的 childAttrs 到局部变量 currentChildAttrssynchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取启动器上配置的handlerChannelHandler handler = config.handler();if (handler != null) {// 添加 handler 到 pipeline 中pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor// 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去// 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}...    }

ServerBootstrapAcceptor

NioServerSocketChannel初始化时,向NioServerSocketChannel所绑定的Pipeline添加了一个InboundHandler节点 —— ServerBootstrapAcceptor ,其代码如下:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {// 子EventLoopGroup,即为workGroupprivate final EventLoopGroup childGroup;// ServerBootstrap启动时配置的 childHandlerprivate final ChannelHandler childHandler;// ServerBootstrap启动时配置的 childOptionsprivate final Entry<ChannelOption<?>, Object>[] childOptions;// ServerBootstrap启动时配置的 childAttrsprivate final Entry<AttributeKey<?>, Object>[] childAttrs;private final Runnable enableAutoReadTask;// 构造函数ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;// Task which is scheduled to re-enable auto-read.// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may// not be able to load the class because of the file limit it already reached.//// See https://github.com/netty/netty/issues/1328enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}// 处理Pipeline所传播的channelRead事件// 也就是前面新连接检测时看到的那段代码// pipeline.fireChannelRead(readBuf.get(i));// ServerBootstrapAcceptor的channelRead接口将会被调用,用于处理channelRead事件@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {// 获取传播事件的对象数据,即为前面的readBuf.get(i)// readBuf.get(i)取出的对象为 NioSocketChannelfinal Channel child = (Channel) msg;// 向 NioSocketChannel 添加childHandler,也就是我们常看到的// ServerBootstrap在启动时配置的代码:// ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} )// 最终的结果就是向NioSocketChannel的Pipeline添加用户自定义的ChannelHandler// 用于处理客户端的channel连接child.pipeline().addLast(childHandler);// 配置 NioSocketChannel的TCP属性setChannelOptions(child, childOptions, logger);// 配置 NioSocketChannel 一些用户自定义数据for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}// 将NioSocketChannel注册到childGroup,也就是Netty的WorkerGroup当中去try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}...}...}

关于 ChannelInitializer 的讲解,可以看此前 Pipeline源码分析 文章。

后面的register逻辑,就与我们前面讲解 NioServerSocketChannel注册 大体类似了,这里简单介绍一下。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...// 注册NioSocketChannel// eventLoop为childGroup    @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...// 绑定eventLoop到NioSocketChannel上AbstractChannel.this.eventLoop = eventLoop;// 现在分析的逻辑是在服务端的线程上,eventLoop与主线程不同,返回falseif (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {// 这里来调用register0方法register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}// 注册private void register0(ChannelPromise promise) {try {...boolean firstRegistration = neverRegistered;// 调用 doRegister()doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// 服务端的NioServerSocketChannel已经与客户端的NioSocketChannel建立了连接// 所以,NioSocketChannel是处于激活状态,isActive()返回tureif (isActive()) {// 对于新连接,是第一次注册if (firstRegistration) {// 传播ChannelActive事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}...} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}...}        

调用到NioSocketChannel中的doRegister()方法:

public abstract class AbstractNioChannel extends AbstractChannel {...@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 将selector注册到底层JDK channel上,并附加了NioSocketChannel对象// 兴趣集设置为0,表示不关心任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {...}}}...}    

NioSocketChannel 注册OP_READ兴趣集

紧接着上面的分析,传播ChannelActive事件之后的逻辑,主要就是向客户端的NioSocketChannel注册一个Read兴趣集

if (isActive()) {// 对于新连接,是第一次注册if (firstRegistration) {// 传播ChannelActive事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}
}

通过 Pipeline的传播机制 ,最终会调用到doBeginRead()接口,如下:

public abstract class AbstractNioChannel extends AbstractChannel {...protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {...@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called// 保存selectionKey到局部变量final SelectionKey selectionKey = this.selectionKey;// 判断有效性if (!selectionKey.isValid()) {return;}readPending = true;// 获取selectionKey的兴趣集// 前面小结分析doRegister()接口提到,selectionKey的兴趣集设置为0final int interestOps = selectionKey.interestOps();// 这里的 readInterestOp 是前面讲NioSocketChannel创建时设置的值// 为 SelectionKey.OP_READ,也就是1if ((interestOps & readInterestOp) == 0) {// 这样,selectionKey最终设置的兴趣集为SelectionKey.OP_READ// 表示对读事件感兴趣selectionKey.interestOps(interestOps | readInterestOp);}}    ...    }    ...}        

小结

  • Netty是在哪里检测有新连接接入的?
  • 新连接是怎样注册到NioEventLoop线程上的?

参考资料

  • Java读源码之Netty深入剖析

Netty新连接接入与NioSocketChannel分析相关推荐

  1. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  2. AsyncHttpClient源码分析-基于Netty的连接池实现

    原文地址:asynchttpclient源码分析-基于Netty的连接池实现 最近项目重构,有了个机会更多接触一个有别于HttpAsyncClient的异步网络框架AsyncHttpClient,是个 ...

  3. Netty消息接收类故障案例分析

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty消息接收类故障案例.李林锋此后还将在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同 ...

  4. Netty和RPC框架线程模型分析

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...

  5. Netty 和 RPC 框架线程模型分析

    https://www.infoq.cn/article/9Ib3hbKSgQaALj02-90y 1. 背景 1.1 线程模型的重要性 对于 RPC 框架而言,影响其性能指标的主要有三个要素: I/ ...

  6. 正确地利用Netty建立连接池

    为什么80%的码农都做不了架构师?>>>    一.问题描述 Netty是最近非常流行的高性能异步通讯框架,相对于Java原生的NIO接口,Netty封装后的异步通讯机制要简单很多. ...

  7. Java Netty长连接实现Android推送

    罗嗦几句: 1.轮询(Pull)客户端定时的去询问服务器是否有新消息需要下发:确点很明显Android后台不停的访问网络费电还浪费流量.2.推送(Push)服务端有新消息立即发送给客户端,这就没有时间 ...

  8. 瞬发大量并发连接 造成MySQL连接不响应的分析

    http://www.actionsky.com/docs/archives/252  2016年12月7日  黄炎 目录 1 现象 2 猜想 3 检查环境 4 猜想2 5 分析 5.1 TCP握手的 ...

  9. C#连接池的详细分析(转)

    来源: http://www.25175.com   作者: onrd 使用连接池 连接到数据库服务器通常由几个需要软长时间的步骤组成.必须建立物理通道(例如套接字或命名管道),必须与服务器进行初次连 ...

最新文章

  1. 2022-2028年中国电容器电子薄膜行业市场研究及前瞻分析报告
  2. C++中的两种绑定方式(静态绑定、动态绑定)
  3. Datawhale组队学习周报(第032周)
  4. vs2019下载和更新速度非常慢的解决方案
  5. 多元二次方程 python_Python 二次方程
  6. 女生学python可以做什么_学 Python 都用来干嘛的?
  7. 中国人民公安大学网络对抗技术作业一
  8. sql server获取表的所有字段
  9. .Android开发在Eclipse环境中无法显示提示信息This element neither has attached
  10. jQuery就业课系列之.jQueryDOM
  11. 统计字符串中每种字符类型的个数demo
  12. stream模式不能接受blob文件_一文带你层层解锁文件下载的奥秘
  13. jdk7与jdk8环境共存与切换
  14. XML与HTML的作用不同
  15. linux集群的启动和停止,linux平台 spark standalone集群 使用 start-all,stop-all 管理集群的启动和退出...
  16. 在IT界取得成功应该知道的10件事(ZT)
  17. Optimal Step Nonrigid ICP Algorithms for surface registration
  18. AD域验证DirectoryEntry用法
  19. matlab求任意输入响应曲线,3.6 用Matlab进行动态响应分析
  20. [美容美发培训学校网站模板]织梦模版+响应式形象设计艺术教育学校网站dedecms模板+手机自适应

热门文章

  1. 打造数据中心的软实力
  2. 用 TigerVNC 实现 Linux 远程桌面
  3. 一步一步学Entity FrameWork 4(1)
  4. [转]Android Service Test——简单测试例子
  5. Spring MVC 基础笔记
  6. svn服务器安装与配置
  7. CentOS下调整VolGroup-lv_root分区大小
  8. codevs——1742 爬楼梯
  9. Apache Beam的API设计
  10. 无惧杀入红海市场 ZUK手机底气在哪?