客户端channel在建立连接之后会关注read事件,那么read事件在哪触发的呢?
NioEventLoop中

            /*** 读事件和 accept事件都会经过这里,但是拿到的unsafe对象不同  所以后续执行的read操作也不一样* NioServerChannel进行accept操作* NioChannel进行read操作*/if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

        @Overridepublic final void read() {final ChannelConfig config = config();//若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣。if (shouldBreakReadReady(config)) {//清楚读事件clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();//获取并重置allocHandle对象final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;//是否关闭try {do {//申请bytebufbyteBuf = allocHandle.allocate(allocator);//读取数据,设置最后读取字节数allocHandle.lastBytesRead(doReadBytes(byteBuf));//没读到数据if (allocHandle.lastBytesRead() <= 0) {// 梅毒到数据  释放bufbyteBuf.release();byteBuf = null;//如果最后读取的字节为小于 0 ,说明对端已经关闭close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}//读到了数据allocHandle.incMessagesRead(1);readPending = false;//通知pipline read事件pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());//循环判断是否继续读取//读取完成allocHandle.readComplete();//通知pipline读取完成pipeline.fireChannelReadComplete();if (close) {//关闭连接closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {.....if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
  1. 若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣。
  2. 获取并重置allocHandle对象(代理alloctor的一些功能)
  3. 读取数据,设置最后读取字节数
  4. 通知pipline read事件
  5. 通知pipline读取完成
    这次先跟着主线走,然后再回头看细节,直接定位到事件通知
    io.netty.channel.DefaultChannelPipeline#fireChannelRead
    /*** 有数据读入的时候会调用(InBound)  也可以手动调用* @param msg 客户端连接的channel* @return*/@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {//pipline节点类的静态方法 穿进去的headAbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}

最后调用到io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead

        /*** 被调用read  可以向下传递* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}

ctx.fireChannelRead(msg);向下个节点传递,如果自定义了handler来处理就可以拦截channelRead的bytebuf数据来进行处理,负责一直向下传递到TailContext节点处理
io.netty.channel.DefaultChannelPipeline.TailContext#channelRead

        @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//如果不去自定义handler处理byteBuf  最终会到TailContext 来处理onUnhandledInboundMessage(msg);}protected void onUnhandledInboundMessage(Object msg) {try {//日志提醒没有处理logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {//释放byteBuf内存ReferenceCountUtil.release(msg);}}

数据到达TailContext节点之后,再onUnhandledInboundMessage方法中打印数据未处理日志,然后释放bytebuf内存
io.netty.channel.nio.AbstractNioByteChannel#doReadBytes读取操作的方法

    @Override/*** 读取数据*/protected int doReadBytes(ByteBuf byteBuf) throws Exception {//获取handle对象final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//设置读索引=写索引allocHandle.attemptedBytesRead(byteBuf.writableBytes());//读取无数据到bufreturn byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}
  1. allocHandle.lastBytesRead()小于0证明没有读取到数据,要释放bytebuf
  2. 读取到数据,allocHandle.incMessagesRead(1);
  3. allocHandle.continueReading() 循环判断是否继续读取

        @Overridepublic boolean continueReading() {return continueReading(defaultMaybeMoreSupplier);}@Overridepublic boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {// Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided.//最后读取的字节数大于0  并且等于最大可写入的字节数return bytesToRead > 0 && maybeMoreDataSupplier.get();}private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {@Overridepublic boolean get() {//最后读取的字节数 是否等于最大可写入字节数return attemptedBytesRead == lastBytesRead;}};

最后读取的字节数大于0,并且最后读取的数据==尝试可写入的大小,即证明可以继续读取
出现异常时候调用io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#handleReadException

        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,RecvByteBufAllocator.Handle allocHandle) {if (byteBuf != null) {if (byteBuf.isReadable()) {readPending = false;//把已经读取到的数据  通知到pipline中pipeline.fireChannelRead(byteBuf);} else {//释放bytebufbyteBuf.release();}}//读取完成allocHandle.readComplete();//通知pipline读取完成pipeline.fireChannelReadComplete();//通知pipline异常pipeline.fireExceptionCaught(cause);if (close || cause instanceof IOException) {closeOnRead(pipeline);}}

io.netty.channel.DefaultChannelPipeline#fireExceptionCaught

    @Overridepublic final ChannelPipeline fireExceptionCaught(Throwable cause) {AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);return this;}

最终调用到TailContext的io.netty.channel.DefaultChannelPipeline.TailContext#exceptionCaught

        @Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);}protected void onUnhandledInboundException(Throwable cause) {try {logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +"It usually means the last handler in the pipeline did not handle the exception.",cause);} finally {ReferenceCountUtil.release(cause);}}

和读取操作一样,最终也是要再发生异常的时候释放buf的内存

Netty源码解析(八) —— channel的read操作相关推荐

  1. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  2. Netty源码解析之内存管理-PooledByteBufAllocator-PoolArena

      PooledByteBufAllocator是Netty中比较复杂的一种ByteBufAllocator , 因为他涉及到对内存的缓存,分配和释放策略,PooledByteBufAllocator ...

  3. netty源码分析系列——Channel

    2019独角兽企业重金招聘Python工程师标准>>> 前言 Channel是netty中作为核心的一个概念,我们从启动器(Bootstrap)中了解到最终启动器的两个关键操作con ...

  4. Flume-ng源码解析之Channel组件

    如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后 ...

  5. Android Glide 3.7.0 源码解析(八) , RecyclableBufferedInputStream 的 mark/reset 实现

    个人博客传送门 一.mark / reset 的作用 Android Glide 3.7.0 源码解析(七) , 细说图形变换和解码有提到过RecyclableBufferedInputStream ...

  6. Netty源码解析-Netty内存泄露检测

    前言: 在前一篇文章中,我们介绍了ByteBuf的引用计数器的使用,基本所有的ByteBuf都有相关计数的功能,那么这个计数有什么用呢. 实际主要就是做内存泄露检测用的.本文就其如何做检测来进行说明. ...

  7. 【Netty源码解析】Netty核心源码和高并发、高性能架构设计精髓

    Netty线程模型图 Netty线程模型源码剖析图 图链接:https://www.processon.com/view/link/5dee0943e4b079080a26c2ac Netty高并发高 ...

  8. Netty源码解析1-Buffer

    大数据成神之路系列: 请戳GitHub原文: github.com/wangzhiwubi- 更多文章关注:多线程/集合/分布式/Netty/NIO/RPC Java高级特性增强-集合 Java高级特 ...

  9. python 解包_【源码解析】python解包操作一文完全理解

    解包是如何操作? >>> a, b = [1, 2] # 以下为此解包操作的字节码 0 LOAD_CONST 1 (1) 2 LOAD_CONST 2 (2) 4 BUILD_LIS ...

最新文章

  1. 动易Ajax登陆调用
  2. Eclipse启动无响应 停留在Loading workbench状态的解决办法
  3. python 保存bin文件,python bin文件处理
  4. python怎么把数据导入excel_如何把python中的数据导入excel
  5. 核磁共振波谱仪基础知识及常见问题
  6. CAD给标注尺寸加上下公差的方法
  7. Qt调节电脑屏幕亮度
  8. xilinx platform cable usb驱动_小白入门多路高速(8 x 8bits x 100Msps)AD驱动设计专栏启动预告...
  9. 大鹅模拟器 for Mac休闲模拟游戏
  10. [软考]系统架构设计师 备考经验分享(二) - 知识点学习+综合知识篇
  11. SAS_9.3_64位_安装方法(验证可用)
  12. 安装完Oracle客户端出现ORA-12560:TNS:协议适配器错误
  13. utf8与unicode转换
  14. 一. 实战——PageHelper的使用
  15. MultiDex 相关问题解决记录
  16. day12-后羿采集器
  17. 高版本自动接听电话方法
  18. linux64位系统兼容32位程序(不下载兼容包)
  19. python打包apk_使用Python多渠道打包apk
  20. 小白车载网络测试 - 总纲

热门文章

  1. 杰理之充电仓决定左右耳【篇】
  2. 谨慎使用subList
  3. 怎么用python发弹幕_[python]bilibili弹幕发送者查询器软件
  4. Python \033[95m print打印设置字体颜色
  5. Springboot毕设项目乐器培训管理系统172z1java+VUE+Mybatis+Maven+Mysql+sprnig)
  6. html css 实现星星海,JS实现星星海特效
  7. el-upload 组件在 on-success 文件上传成功的钩子中传递更多参数
  8. 2019icpc银川打铁站 赛后总结
  9. 项目运营必备☞推荐的甘特图软件
  10. 8月英语总结--细水长流