回顾上文 Netty 服务 如何 接收新的连接

总结规律:
上一节,我们一起学习了服务接收新连接过程的源码剖析,发现一个很有趣的现象,其实, Netty 底层还是使用的Java 原生的 NIO 来操作的。那么,接收新数据也是一样吗?如果是,那么数据如何转化成 Netty 的 ByteBuf 呢?

Java 原生 NIO 从 SocketChannel 中取出来的数据是存储在 ByteBuffer 里面的,也就是下面这样的代码:

if (selectionKey.isReadable()) {// 强制转换为SocketChannelSocketChannel socketChannel = (SocketChannel) selectionKey.channel();// 创建Buffer用于读取数据ByteBuffer buffer = ByteBuffer.allocate(1024);// 将数据读入到buffer中(第二阶段阻塞)int length = socketChannel.read(buffer);// 处理数据。。。
}

那么,今天的问题是:
1 如果 Netty 底层使用的是 Java 原生的 SocketChannel,那么她是如何处理数据的?
2 数据又是如何在 ChannelPipeline 中传递的?
3 如何在控制台中打印出客户端传递过来的数据?

上一节,我们说接收新连接的处理主要是在 NioEventLoop 的 run () 方法中,里面有个方法叫做 processSelectedKey() ,它里面对于四种事件都有相应的判断并交给 Channel 的 Unsafe 属性来处理,代码如下:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();//  省略异常处理等其他代码
try {int readyOps = k.readyOps();//  如果是 Connect 事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}//  如果是 Write 事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}//  如果是 Read 事件或者 Accept 事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 ||   readyOps == 0) {unsafe.read();}
} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

让我们跟踪到这个 unsafe 内部
// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

      @Overridepublic final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();// ByteBuf的分配器final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {// key1,通过allocator创建一个ByteBuf,如何创建的byteBuf = allocHandle.allocate(allocator);// key2,读取数据到ByteBuf中,如何读取?allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}// key3,触发channelRead()allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());// key4,触发channelReadComplete()allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}

这个方法中有四个关键的信息

1 通过 allocator 创建一个 ByteBuf,如何创建的呢?默认的又是什么呢?这个比较简单,默认地,通过各种参数判断当前操作系统是否允许池化、Unsafe、堆外这三个指标,当然,这些参数也可以通过启动参数来控制。我的系统默认创建的是 PooledUnsafeDirectByteBuf,你也可以看看你的是哪个。

2 读取数据到 ByteBuf 中,我们知道,Netty 底层包装了 Java 原生的 SocketChannel,那这里是如何读取的呢?

3 触发 ChannelPipeline 中的 ChannelHandler 的 channelRead () 方法。

4 触发 ChannelPipeline 中的 ChannelHandler 的 channelReadComplete () 方法,与上述同理,本文不详细展开。

首先,让我们看看 doReadBytes(byteBuf) 这个方法内部

// io.netty.channel.socket.nio.NioSocketChannel#doReadBytes
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();// 设置可读取的长度allocHandle.attemptedBytesRead(byteBuf.writableBytes());// key,调用ByteBuf的writeBytes()方法// 第一个参数是Java原生的SocketChannel,第二个参数是可读取的长度return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}// io.netty.buffer.AbstractByteBuf#writeBytes(java.nio.channels.ScatteringByteChannel, int)
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {// 保证容量足够,里面有扩容的逻辑ensureWritable(length);// key,调用setBytes()方法// 第一个参数是写入的位置,第二参数是SocketChannel,第三个参数可写入的长度int writtenBytes = setBytes(writerIndex, in, length);if (writtenBytes > 0) {writerIndex += writtenBytes;}return writtenBytes;
}
// io.netty.buffer.PooledByteBuf#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
@Override
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {try {// key,调用Java原生SocketChannel的read()方法// read()方法的参数是Java原生的ByteBufferreturn in.read(internalNioBuffer(index, length));} catch (ClosedChannelException ignored) {return -1;}
}

果然不出我们所料, Netty 底层在读取数据的时候依然是调用的 Java 原生的 SocketChannel 的 read() 方法,将数据读取到了 ByteBuffer 中,也就是这里的 internalNioBuffer(index, length) 返回的是一个 Java 原生的ByteBuffer ,所以,此时,我们是不是可以大胆猜测, Netty 的 ByteBuf 是对 Java 原生 ByteBuffer 的包装呢?

为了验证我们的猜想,让我们再前进一步,深入到 internalNioBuffer(index, length) 内部。

// io.netty.buffer.PooledByteBuf#internalNioBuffer(int, int)
@Override
public final ByteBuffer internalNioBuffer(int index, int length) {checkIndex(index, length);return _internalNioBuffer(index, length, false);
}
final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {index = idx(index);// duplicate 为 false ,所以使用的是第二个ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) :         internalNioBuffer();buffer.limit(index + length).position(index);return buffer;
}
protected final ByteBuffer internalNioBuffer() {// this.tmpNioBufByteBuffer tmpNioBuf = this.tmpNioBuf;if (tmpNioBuf == null) {this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);}return tmpNioBuf;
}

可以看到,最后返回的实际上是 this.tmpNioBuf,它的类型的是 ByteBuffer,果然,Netty 的 ByteBuf 底层果然是包装了 Java 原生的 ByteBuffer。

好了,让我们再来看看第二个问题,数据如何在 ChannelPipeline 中传递的?

// 1. io.netty.channel.DefaultChannelPipeline#fireChannelRead
@Override
public final ChannelPipeline fireChannelRead(Object msg) {// 从HeadContext开始调用AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;
}
// 2. io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();// 判断当前线程是不是在EventLoop中if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
// 3. io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {// 调用里面的ChannelHandler的channelRead()方法((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}
}
// 4. io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 调用ChannelHandlerContext的fireChannelRead()方法// 触发下一个Context中Handler的调用ctx.fireChannelRead(msg);
}
// 5. io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {// 寻找下一个可用的ChannelHandlerContextinvokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;
}
// 5.1 io.netty.channel.AbstractChannelHandlerContext#findContextInbound
private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while ((ctx.executionMask & mask) == 0);return ctx;
}
// 6. io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead
// 又回到了上面第2步
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}

从这段代码中我们可以得出以下几点重要内容(对于入站消息):
1 ChannelPipeline 是从 HeadContext 开始执行的;
2 同一个 Channel 的所有 ChannelHandler 的执行都会放在 EventLoop 中执行,所以它们是线程安全的;
3 调用 ctx.fireChannelRead(msg); 即可触发下一个 ChannelHandler 的执行;
对于我们的 EchoServer ,它里面有四个 Handler ,即head<=>LoggingHandler<=>EchoServerHandler<=>tail , EchoServerHandler 中 channelRead () 方法为:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {//  读取数据后写回客户端ctx.write(msg);}
}

HeadContext ,不仅是一个 ChannelHandlerContext ,也是一个 ChannelInboundHandler ,同时也是一个ChannelOutboundHandler 。
TailContext ,不仅是一 ChannelHandlerContext ,同时也是一个 ChannelInboundHandler 。

本节,我们一起学习了 Netty 接收新数据过程的源码剖析,通过不断的探寻底层的源码和原理,我们发现,揭开了 Netty 的神秘面纱之后,其实,她也很简单,底层都是对 Java 原生 NIO 的包装和优化,不断地提升用户体验,创造更美好的东西供大家使用。

好了,对于如何读取新数据以及数据如何在 ChannelPipeline 中传递,我们都弄明白了,让我们再看看第三个问题,如何在控制台中打印出客户端传递过来的消息呢?

netty服务接收新数据源码剖析总结

如何接收:
1 NioEventLoop的processSelectedKey()方法
2 Allocator创建ByteBuf – 累加器累加
3 读取数据 3.1 ByteBuf 实际封装了 ByteBuffer 3.2 通过该java原生SocketChannel读取数据
如何传递:
1 pipeline.fireChannelRead() 2.ctx.invokeChannelRead() 3.handler.channelRead() 4 ctx.fireChannelRead()

这里有一些比较有意思的故事,稍后分享:

TODO。。。。。。

Netty 服务 接收新数据相关推荐

  1. Netty 服务 如何 接收新的连接

    源码调试技巧:从基本概念出手,寻找突破点:观察线程栈,寻找关键字眼 简单回顾.netty 是如何启动的 上一节回顾 netty 是如何启动的 服务启动的时候会创建ServerSocketChannel ...

  2. Netty服务如何写出数据

    上一节,我们一起学习了 Netty 接收新数据过程的源码剖析,我们又发现了一个有趣的现象,Netty 的 ByteBuf 竟然也是对 Java 原生 ByteBuffer 的包装. 经过前面的学习,我 ...

  3. 中国Azure新数据中心(区域)正式商用

    由世纪互联运营的 Microsoft Azure 两个新数据中心(分别在两个新区域:中国东部 2 和 中国北部 2)现已正式商用.我们的新老客户都可以自由使用新数据中心资源.作为最早入华商用的国际公有 ...

  4. FPGA+CPU助力数据中心实现图像处理应用体验与服务成本新平衡

    图片逐渐成为互联网主要的内容构成,相应的图片处理需求也在高速成长,移动应用与用户生产内容(UGC)正在驱动数据中心图像处理的业务负载快速增加.本文深维科技联合创始人兼CEO樊平详细剖析了图片加速的必要 ...

  5. java netty modbus协议接收iot数据

    IoTserver 源代码开源在gitee上 : IoT netty java gitee server sample c++ libuv 的IoT tcp server IoT c++ libuv ...

  6. netty udp接收超过2048字节数据

    默认netty udp接收DatagramPacket字节数最大是2048,如果数据大,超过这个限制,就会报错,抛出异常,虽然这个包解析失败,不会影响其他包的解析,但是总得来说,这种失败是不利于数据收 ...

  7. 【客户下单】后台提供webservice服务接收数据

    [客户下单]后台提供webservice服务接收数据 在bos_fore系统中,添加OrderAction封装订单数据. 接下来调用webservice,将数据传递给bos_management系统. ...

  8. #榜样的力量#航班管家全球大交通出行疫情追踪服务系统丨数据猿新冠战“疫”公益策划...

    "该项目案例由航班管家提交申报,参与数据猿推出的<寻找新冠战"疫",中国数据智能产业先锋力量>的公益主题策划活动. 大数据产业创新服务媒体 --聚焦数据 · ...

  9. #榜样的力量#内蒙古自治区互联网医疗服务系统丨数据猿新冠战“疫”公益策划...

    "该项目案例由浪潮健康提交申报,参与数据猿推出的<寻找新冠战"疫",中国数据智能产业先锋力量>的公益主题策划活动. 大数据产业创新服务媒体 --聚焦数据 · ...

最新文章

  1. option:contains后面加变量_什么是配置环境变量,配置以后有什么作用呢?
  2. Linux / OpenWRT / 目录功能说明
  3. C#的TreeView标记
  4. 白裤子变粉裤子怎么办_使用裤子构建构建数据科学的monorepo
  5. Windows系统进程介绍
  6. 施一公:带好学生,是特别要紧的事
  7. python异常值处理箱型图_如何利用python处理异常值?
  8. 【机器人】激光测距传感器的数据处理步骤
  9. 容器技术Docker K8s 2 云原生容器技术概述
  10. Tomcat介绍和MyEclipse搭建DRP系统
  11. 5. 倒车入库+侧方位停车(仿真)
  12. display: flex自我理解
  13. “中国如果有五个丘成桐,数学肯定世界一流”
  14. go语言学习-多重返回和匿名变量
  15. 2144775-48-2,D-Biotin-PEG6-Thalidomide可用于cereblon(CRBN)结合和置换分析
  16. 米莱虾_三年之期_创作纪念
  17. element UI dialog点击dialog区域外会关闭dialog
  18. Apache Spark与Apache Drill
  19. 武夷岩茶PK铁观音(南北乌龙之我见--清风云雨)
  20. iphone自带计算机删除,如何清理iPhone上的垃圾文件

热门文章

  1. Junit4集成到Maven工程
  2. RESTful测试工具RESTClient
  3. Hadoop Jobhistory配置启用
  4. 记录一次svn报错:[Previous operation has not finished; run 'cleanup' if it was interrupted] 的排错过程
  5. 为什么python不需要编译_为什么我用Go写机器学习部署平台,而偏偏不用Python?...
  6. php gif 切成一帧,GIF动画帧提取器 如何截取gif的每一帧图片
  7. Apache中限制和允许特定IP访问
  8. Android 使用webview遇到的问题及解决办法
  9. 设计模式之禅之设计模式-组合模式
  10. Tengine(Nginx)动静分离简要配置