背景

最近发现ChannelOutboundHandlerAdapter的read()回调方法,在连接创建成功和读取数据后都会被回调。因此就产生了疑问“为什么建立连接和读取数据后read()方法会被调用呢?” 从网上搜索到一片文章https://my.oschina.net/lifany... 可以看出一些端倪,但是具体流程和一些疑问还是没有解开。
那我也尝试着从源码找到答案吧。

Demo演示

我们先写个小Demo,其中Test1OutboundHandlerAdapter是一个ChannelOutboundHandlerAdapter,里面的read()添加一行打印。 Test1HandlerAdapter 是一个ChannelInboundHandlerAdapter 里面的channelActive(xxx)、
channelRead(xxx)、channelReadComplete(xxx)添加打印。由于很简单,下面只贴部分代码

Test1OutboundHandlerAdapter.java

@Overridepublic void read(ChannelHandlerContext ctx) throws Exception {super.read(ctx);System.out.println("Test1OutboundHandlerAdapter------------->read");}

Test1HandlerAdapter.java

    @Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);System.out.println("Test1HandlerAdapter-------------->channelRegistered");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);System.out.println("Test1HandlerAdapter-------------->channelActive");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Test1HandlerAdapter-------------->channelRead");ctx.writeAndFlush(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {super.channelReadComplete(ctx);}    

然后我们建立连接,随便发一下数据,服务器收到数据,打印如下:

Test1HandlerAdapter-------------->handlerAdded
Test1HandlerAdapter-------------->channelRegistered
Test1HandlerAdapter-------------->channelActive
Test1OutboundHandlerAdapter------------->read

Test1HandlerAdapter-------------->channelRead
Test1HandlerAdapter-------------->channelReadComplete
Test1OutboundHandlerAdapter------------->read

如果把Test1OutboundHandlerAdapter的read(xxx)回调方法注释掉,会发现服务器无法接收数据了。

源码分析

1.channelRegistered回调流程分析

可以定位到在AbstractChannelHandlerContext invokeChannelRegistered()方法调用了channelRegistered(xxx)方法,然后再查找会发现是
AbstractChannelHandlerContext的fireChannelRegistered()----->
invokeChannelRegistered(final AbstractChannelHandlerContext next)----->invokeChannelRegistered()

AbstractChannelHandlerContext

@Overridepublic ChannelHandlerContext fireChannelRegistered() {invokeChannelRegistered(findContextInbound());return this;}static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRegistered();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});}}private void invokeChannelRegistered() {if (invokeHandler()) {try {/**ChannelInboundHandler的register(xxx)在这里被调用*/((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRegistered();}}

顺藤fireChannelRegistered()摸瓜,最终定位到AbstractChannel内部类AbstractUnsafe的
register(EventLoop eventLoop, final ChannelPromise promise)----->register0(ChannelPromise promise)

AbstractUnsafe

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}

在继续找的会找到在EventLooop层面的调用了,我们可以先不用管了。在register0的方法中又调用了pipeline.fireChannelRegistered()和pipeline.fireChannelActive();,这正是我们要找的,也符合打印顺序先channelRegistered后channelActive。为了验证我们可以加上断点调试,就是这儿了。

至此我们可以总结一下:
channelRegistered流程:

说明

  1. DefaultChannelPipeline 的fireChannelRegistered()
    @Overridepublic final ChannelPipeline fireChannelUnregistered() {AbstractChannelHandlerContext.invokeChannelUnregistered(head);return this;}

AbstractChannelHandlerContext.invokeChannelUnregistered(head);传递的参数是DefaultChannelPipeline的head,这样保证了register事件沿着pipeline从头流向尾,其对应DefaultChannelPipeline内部类HeadContext。 HeadContext多重身份即是ChannelHandlerContext又是ChannelInboundHandler和ChannelOutboundhandler

DefaultChannelPipeline

 final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...省略...protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}

DefaultChannelPipeline的内部类HeadContext

final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}@Overridepublic ChannelHandler handler() {return this;}
省略后边的代码

2.上图黄色的部分都是调用的HeadContext中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next)接收的参数是DefaultChannelPipeline传递的head即HeadContext,那么也就是head.invokeChannelRegistered()
invokeChannelRegistered()方法中会调用
((ChannelInboundHandler) handler()).channelRegistered(this);
HeadContext类中该方法返回的就是自己(可查看上面的代码),因为HeadContext本身也是ChannelInboundHandler。 同时又将自己作为参数,调用自己的channelRegistered方法

3.HeadContext的ChannelRegister方法中调用AbstractChannelHandlerContext的fireChannelRegistered();
(还是调用的自己)该方法中调用了invokeChannelRegistered(findContextInbound()); findContextInbound()所实现的功能就是查找到下一个ChanelInboundHandler即HeadContext(本身是ChannelInboundHandler)下一个ChanelInboundHandler
上面的步骤不断重复,自此registered事件可以沿着pipeline在不同的InboundHandler里流动了。

2.channelActive回调流程分析

channelActive的回调流程和channelRegister流程没有什么区别,可参考上文分析。 但是在HeadContext的channelActive方法中会调用readIfIsAutoRead(); 这个是读数据的关键

3.netty读数据分析

读数据分析

Netty channelRegistered\ChannelActive---源码分析相关推荐

  1. netty 5 alph1源码分析(服务端创建过程)

    研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开 ...

  2. 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

    前言 前面小飞已经讲解了NIO和Netty服务端启动,这一讲是Client的启动过程. 源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Fi ...

  3. netty 5.0 源码分析(1)-----ButeBuf

    1 功能说明: 1.1工作原理 转载于:https://blog.51cto.com/chpn208/1631131

  4. 【Netty系列_3】Netty源码分析之服务端channel

    highlight: androidstudio 前言 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精 ...

  5. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  6. Netty Pipeline源码分析(2)

    原文链接:https://wangwei.one/posts/net... 前面 ,我们分析了Netty Pipeline的初始化及节点添加与删除逻辑.接下来,我们将来分析Pipeline的事件传播机 ...

  7. netty源码分析系列——Channel

    2019独角兽企业重金招聘Python工程师标准>>> 前言 Channel是netty中作为核心的一个概念,我们从启动器(Bootstrap)中了解到最终启动器的两个关键操作con ...

  8. Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析)

    Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析_2020.06.25) 前言: Netty 作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一 ...

  9. Netty源码分析系列之服务端Channel的端口绑定

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...

  10. netty源码分析系列——EventLoop

    2019独角兽企业重金招聘Python工程师标准>>> 前言 EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面 ...

最新文章

  1. 研发流程在敏捷开发中的详解
  2. python gil_Python GIL(Global Interpreter Lock)
  3. linux下=号与==号
  4. buu [BJDCTF 2nd]rsa0
  5. 判断字符串的长度,中文占两个字符
  6. 安装Mongodb并解决用户授权问题
  7. .NET 6 全新指标 System.Diagnostics.Metrics 介绍
  8. 在线生成文本图片 CFC函数计算版
  9. 让NUnit轻松支持.NET 2.0
  10. python中的axis=0和1代表什么
  11. wps中的相交_如何在wps中添加交叉引用 - 卡饭网
  12. 加密与启示录:Crypto是流着奶与蜜的“应许之地”
  13. 浅谈OpenGOP与ClosedGOP
  14. 网络营销的15大形式
  15. BitBucket介绍以及基础使用
  16. 【富文本】亿图思维导图MindMaster Pro限时赠送正版
  17. Allegro添加中文字体的简单有效方法
  18. 企业微信获取客户群里用户的unionid;企业微信获取客户详情
  19. 拯救流浪猫 | 「喵先锋」系列数字版权盲盒明日开抢
  20. latex中插入图片以及固定图片位置

热门文章

  1. 接收后台数据并向后台发送数据
  2. mysql收货地址表_收货地址表结构 以及创建修改流程
  3. !!!全球最流行开源硬件平台!不知道就OUT了!
  4. Validform_v5.3.2 自定义规则
  5. 霍夫丁不等式(Hoeffding‘s inequality)-集成学习拓展-西瓜书式8.3
  6. ftp文件盘服务器回档,企业网盘和FTP服务器对比
  7. 自然语言处理——实现美国总统就职演说词汇分布图
  8. 面向对象语法1—基础
  9. 组件Element的入门学习
  10. 一份致敬所有通信行业的老炮儿的信。