前面简单提到了下Pipeline的传播机制,这里再详细分析下
Pipeline的传播机制中有两个非常重要的属性inbound和outbound(AbstractChannelHandlerContext的属性),
inbound为true表示其对应的ChannelHandler实现了ChannelInboundHandler接口
outbound为true表示其对应的ChannelHandler实现了ChannelOutboundHandler接口

Pipeline是一个双向链表,其head实现了ChannelOutboundHandler接口,tail实现了ChannelInboundHandler接口

Netty的传播事件可以分为两种:inbound和outbound事件(我简单理解为输入和输出)


上面是Netty官网关于两个事件的说明: inbound 事件和outbound 事件的流向是不一样的, inbound 事件从socket.read()开始,其流向是自下而上的,而outbound刚好相反,自上而下,以socket.write()为结束。其中inbound 的传递方式是通过调用ChannelHandlerContext.fireIN_EVT()方法,而outbound 的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法

我们来看下Netty源码里定义的ChannelInboundHandler 和ChannelOutboundHandler


从方法命名发现,ChannelInboundHandler定义的都是类似于回调(响应事件通知)的方法,而ChannelOutboundHandler定义的都是操作(主动触发请求)的方法

Outbound事件传播方式

Outbound 事件都是请求事件(request event),即请求某件事情的发生,然后通过 Outbound 事件进行通知。按照官网的说明其传播方法应该是tail–>handler–>head
以Bootstrap的connect事件为例,分析下其传播流程

其调用链如下:
Bootstrap.connect()
–>Bootstrap.doResolveAndConnect()
–>Bootstrap.doResolveAndConnect0()
–>Bootstrap.doConnect()
–>AbstractChannel.connect(remoteAddress, promise)
–>DefaultChannelPipeline.connect(remoteAddress, promise)

 public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {return tail.connect(remoteAddress, promise);}

可以看到,这里的connect事件确实是以tail为起点开始传播的
实际会调用AbstractChannelHandlerContext的如下代码:

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {final AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, null);}return promise;}

主要做两件事:
找到下一个outbound的context,然后调用其invokeConnect方法
先来看下findContextOutbound()方法

private AbstractChannelHandlerContext findContextOutbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.prev;} while (!ctx.outbound);return ctx;}

逻辑很简单,就是从当前的context节点(这里就是tail节点)开始向前遍历,直到找到Outbound为true的context并返回
找到一个outbound为true的context之后就会调用其invokeConnect方法,然后会获取其关联的ChannelOutboundHandler并调用connect方法

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {connect(remoteAddress, localAddress, promise);}}

默认调用的是ChannelOutboundHandlerAdapter的connect方法(如果我们重写了该方法就会调用我们自己的实现,此时如果想要让其继续向下传递,需要手动调用ctx.connect()),然后又调用了context的connect方法,即又回到了AbstractChannelHandlerContext的connect()方法,开始向前去找下一个满足outbound为true的context

  public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}

那么这样的循环什么时候会结束呢,从一开始就说明了Pipeline的head节点是HeadContext,并且其满足outbound为true这一条件,所以最后一定会走到HeadContext的handler()方法,然后调用对应的connect。
前面说过HeadContext既是ChannelHandlerContext又是ChannelHandler,所以handler()方法返回的就是HeadContext对象,其connect方法如下:

  @OverrideHeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {unsafe.connect(remoteAddress, localAddress, promise);}

最终调用了unsafe的connect方法,之类的unsafe其实是Pipeline里保存的channel里的unsafe,我们在Channel的初始化的时候看到过
继续跟下去发现会调用AbstractNioChannel的内部类AbstractNioUnsafe的如下方法

  @Overridepublic final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {try {     boolean wasActive = isActive();if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);} else {connectPromise = promise;requestedRemoteAddress = remoteAddress;...promise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {}});}} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));closeIfClosed();}}

这里的doConnect()方法主要做了两件事;
1.调用jdk底层的bind方法
2.调用jdk底层的connect方法
然后我们看下这里的fulfillConnectPromise方法

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {boolean active = isActive();    boolean promiseSet = promise.trySuccess();if (!wasActive && active) {pipeline().fireChannelActive();}if (!promiseSet) {close(voidPromise());}}

如果在调用doConnect方法之前channel不是active激活状态,调用后变为激活状态,那么就会调用pipeline的fireChannelActive方法,将这一事件–激活成功通知下去 (注意下这里的fireXX方法应该是inbound的类型事件)
接下来我们看看这里的inbound事件会怎么传播

Inbound事件传播方式

DefaultChannelPipeline.fireChannelActive–>AbstractChannelHandlerContext.invokeChannelActive

public final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {//最开始传入的next对象是head节点,说明Inbound事件确实是从Head开始传递的EventExecutor executor = next.executor();if (executor.inEventLoop()) {//调用HeadContext的invokeChannelActive()方法(实际是执行了AbstractChannelHandlerContext里的方法)next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}}
 private void invokeChannelActive() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelActive(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelActive();}}

先执行对应的handler的channelActive方法,这里就是HeadContext的channelActive方法,然后调用context的fireChannelActive继续向下传播

@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}

这里的fireChannelActive做的事情和前面Outbound事件的类似,向后遍历找到满足inbound为true的context,再调用invokeChannelActive(next),又回到了开始,是不是和outbound的传播很类似

不过需要注意的一点,在传播的过程中会调用对应的ChannelInboundHandler的channelActive(this)方法,如果想要让事件继续往下传播,那么在我们对应的channelActive都需要调用ctx.fireChannelActive向下传播(就像HeadContext做的那样);如果我们没有重写channelActive方法,默认会执行ChannelInboundHandlerAdapter的channelActive方法,它会帮我们调用fireChannelActive()

public ChannelHandlerContext fireChannelActive() {final AbstractChannelHandlerContext next = findContextInbound();invokeChannelActive(next);return this;}

这样不断传播下去,最后会找到TailContext节点,前面说过tail是Pipeline的尾结点并且其inbound属性为true,那么就会执行TailContext的channelActive方法,如下:

 @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { }

这里就是一个空实现,其实TailContext对于ChannelInboundHandler接口的实现大部分都是空方法,除了下面三个函数

        @Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ReferenceCountUtil.release(evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {onUnhandledInboundMessage(msg);}

说明对于Inbound事件,如果用户没有添加自定义的处理器,那么默认都是不处理的

注意到这里的HeadContext在执行fireChannelActive()向下传递之外,还执行了一个方法readIfIsAutoRead(),

@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}private void readIfIsAutoRead() {//channelconfig的默认isAutoRead是1也就是开启自动读取if (channel.config().isAutoRead()) {//这里的read()会调用 tail.read();从tail向read开始传递channel.read();}}

我们看到在如下回调方法里也调用了readIfIsAutoRead()

 @Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();readIfIsAutoRead();}

这里的channelReadComplete顾名思义就是完成了channel的数据读取的回调(是在NioEventLoop的processSelectedKey(SelectionKey k, AbstractNioChannel ch)里调用了unsafe.read()方法时触发回调的)。
说明在默认情况下,Channel会开启自动读取模式的,只要Channel是active的,读完一波数据之后就继续向selector注册读事件,这样就可以连续不断得读取数据。
关于processSelectedKey方法的调用流程可以参考我的EventLoopGroup的学习笔记
Netty学习笔记(三)EventLoopGroup开篇
Netty学习笔记(四)EventLoopGroup续篇

前面说到TailContext对于ChannelInboundHandler接口的实现大部分都是空方法,我们来看下其中比较重要的两个方法体不为空的实现exceptionCaught(),channelRead()

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {onUnhandledInboundMessage(msg);}/*** Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user* in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible* to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.*/protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {ReferenceCountUtil.release(msg);}}

如果Inbound事件没有被用户自定义的ContextHandler处理,那么就会一直向下传播,head->tail,最后tail节点会接收到在Pipeline传播过程中没有被处理的消息,tail节点就会给我们发出一个警告,告诉我们,它已经将我们未处理的数据给丢掉了

对于未处理的流转到tail的异常也是这样处理的,这里的注释提示说异常流转到Tail节点是因为Pipeline的最后一个handler没有处理异常。换句话来说就是如果我们想处理异常,就需要在Pipeline的最后一个非tail节点进行处理,即该handler需要加在自定义节点的最末尾
那么这样是如何保证我们的异常最终都会进入到这个handler的呢?后面分析一下

 /*** Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.*/protected void onUnhandledInboundException(Throwable cause) {try {logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +"It usually means the last handler in the pipeline did not handle the exception.",cause);} finally {ReferenceCountUtil.release(cause);}}

Pipeline中的异常传播和处理

如果要在业务代码中加入异常处理器,统一处理pipeline过程中的所有的异常,那么该异常处理器需要加载自定义节点的最末尾,如下图所示

分别以Outbound和Inbound事件来看看异常是怎么在最后一个Handler里被捕捉到并处理的

outBound异常的处理

以 ctx.channel().read()为例

  @Overridepublic Channel read() {pipeline.read();return this;}@Overridepublic final ChannelPipeline read() {tail.read();return this;}@Overridepublic ChannelHandlerContext read() {final AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeRead();} else {...}return this;}

其调用链如下:channel.read()–>pipeline.read()–>tail.read()–>tail.invokeRead()
进入Pipeline之后会从tail传播到head,最后调用HeadContext的read()方法

@Overridepublic final void beginRead() {try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});}}

可以看到对于捕捉到的异常,最后都会调用fireExceptionCaught进行处理,我们看下它的实现

@Overridepublic final ChannelPipeline fireExceptionCaught(Throwable cause) {AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);return this;}

下面的流程就是Pipeline的传播了
调用链如下:
DefaultChannelPipeline.fireExceptionCaught()
–>AbstractChannelHandlerContext.invokeExceptionCaught(head,cause)
–>AbstractChannelHandlerContext.invokeExceptionCaught(cause) (此时节点为HeadContext)
–>HeadContext.exceptionCaught(ctx,cause)
–>AbstractChannelHandlerContext.fireExceptionCaught(cause) (向下传播)
–>AbstractChannelHandlerContext.invokeExceptionCaught(next,cause) (这里的节点就是当前节点的下一个节点)

会一直传递直到tail节点,如果我们在最后一个自定义Handler里处理了异常,那么就不会传播到TailContext,否则TailContext就会执行到如下方法,提示我们该异常未被处理,将被抛弃

 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);}
inBound异常的处理

以前面提到的DefaultChannelPipeline.fireChannelActive()方法为例
以head为起点,会调用Inbound属性为true节点的invokeChannelActive()方法,在调用context的handler的channelActive方法时会进行 try { … } catch ( Throwable t ) { }

private void invokeChannelActive() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelActive(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelActive();}}private void notifyHandlerException(Throwable cause) {if (inExceptionCaught(cause)) {...return;}invokeExceptionCaught(cause);}

下面的流程就很简单了,执行InboundContext handler的exceptionCaught()方法,ChannelInboundHandlerAdapter帮我们实现了该接口方法,如果我们没有重写对应的方法,会继续向下传播,这里的invokeExceptionCaught和上面的Outbound中的异常在传播过程中执行到的方法是同一个,最终也会向Tail传播

 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {invokeExceptionCaught(next, cause);return this;}

所以,我们就可以定义这样一个ExceptionCaughtHandler 来处理Inbound和Outbound的异常

public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {       //TODO 异常处理System.out.println("打印异常通知");}
}

我觉得这里ExceptionCaughtHandler可以是InboundHandler或者是OutboundHandler,因为在异常传播的时候并没有像其他Inbound和OutBound事件那样对context的 inbound和outbound属性有限制

总结

Outbound事件
  1. Outbound 事件是请求事件
  2. Outbound 事件的发起者是Channel。
  3. Outbound 事件最终是交给channel的unsafe,调用jdk底层的NIO API
  4. Outbound 事件在Pipeline 中的传输方向是tail -> 自定义handler–>head。
  5. Outbound事件在传播过程中,需要调用 ctx.OUT_EVT() 方法传播,否则传播会停止
Inbound事件
  1. Inbound 事件是通知回调事件,当某个操作完成后,通知上层
  2. Inbound 事件发起者是 unsafe。
  3. Inbound 事件的处理者是 Channel,如果用户没有实现自定义的处理方法,那么 Inbound 事件默认 的处理者是TailContext,并且其处理方法是空实现。
  4. Inbound 事件在Pipeline 中的传输方向是head-> 自定义handler–>tail。
  5. Inbound 事件在传播过程中,需要调用 ctx.fireIN_EVT() 方法传播,否则传播会停止

Netty学习笔记(六)Pipeline的传播机制相关推荐

  1. Netty学习笔记(六) 简单的聊天室功能之WebSocket客户端开发实例

    在之前的Netty相关学习笔记中,学习了如何去实现聊天室的服务段,这里我们来实现聊天室的客户端,聊天室的客户端使用的是Html5和WebSocket实现,下面我们继续学习. 创建客户端 接着第五个笔记 ...

  2. Netty学习笔记(五)Pipeline

    Pipeline是Netty中的另一核心组件,前面在说过在Channel进行初始化的时候最后创建一系列的重要对象,其中就有Pipeline 我们看下Netty官网对于Pipeline的定义 A lis ...

  3. Netty学习笔记(二)Netty服务端流程启动分析

    先贴下在NIO和Netty里启动服务端的代码 public class NioServer { /*** 指定端口号启动服务* */public boolean startServer(int por ...

  4. Netty学习笔记(二) 实现服务端和客户端

    在Netty学习笔记(一) 实现DISCARD服务中,我们使用Netty和Python实现了简单的丢弃DISCARD服务,这篇,我们使用Netty实现服务端和客户端交互的需求. 前置工作 开发环境 J ...

  5. Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)

    1. Nutch 1.3 的页面评分机制 Nutch1.3目前默认还是使用OPIC作为其网页分数算法,但其之后,已经引入了PageRank-like算法,以弥补OPIC算法的不足,目前OPIC算法还是 ...

  6. Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)

    下面是Google翻译的http://wiki.apache.org/nutch/NewScoring内容,是关于Nutch 新的链接分数算法的说明,有点类似于Google的PageRank,这里有其 ...

  7. Netty学习笔记:二、NIO网络应用实例-群聊系统

    实例要求: 编写一个NIO群聊系统,实现服务器端和多个客户端之间的数据简单通讯(非阻塞): 实现多人群聊: 服务器端:可以监测用户上线.离线,并实现消息转发功能: 客户端:通过channel可以无阻塞 ...

  8. Java学习笔记 六、面向对象编程中级部分

    Java学习笔记 六.面向对象编程中级部分 包 包的注意事项和使用细节 访问修饰符 访问修饰符的注意事项和使用细节 面向对象编程三大特征 封装 封装的实现步骤(三步) 继承 继承的细节问题 继承的本质 ...

  9. Ethernet/IP 学习笔记六

    Ethernet/IP 学习笔记六 EtherNet/IP defines two primary types of communications: explicit and implicit (Ta ...

最新文章

  1. 实验四 数据库SQL语言基础编程
  2. Using breakpad in cocos2d-x 3.2,dump信息收集
  3. linux samba 空目录,linux Samba搭建
  4. LINUX安装REDIS集群
  5. Kali渗透(二)之被动信息收集
  6. 赢在 CSDN:我在 CSDN 的成长,“长风破浪会有时”,如何保证自己有持续写作的动力?
  7. 用 Maven 做项目构建
  8. Python自定义类调用方法
  9. (0.2.2)如何下载mysql数据库(二进制、RPM、源码、YUM源)
  10. 实时数据库与时序数据库
  11. SetTimer函数总结
  12. Java微信表情包字符处理,数据库存储以及转义解决方式
  13. 证明三角形中cosA+cosB+cosC=1+4sin(A/2)sin(B/2)sin(C/2)
  14. echarts柱状图颜色渐变样式
  15. 【VR】Unity3d VR学习笔记——Unity烘焙
  16. 关于CND与真实IP
  17. 蒲公英 · JELLY技术周刊 Vol.12 尤雨溪新作 Vite, 你会支持么?
  18. html5隐藏%3ctextarea,html text隐藏,html textarea
  19. 为什么大学毕业生工作难找?
  20. 猫和老鼠汤姆看java_以前看《猫和老鼠》觉得汤姆太笨太好笑,现在看才明白其中的道理...

热门文章

  1. SpringBoot 模版渲染
  2. 全国计算机一级d类考试内容,全国计算机一级考试WPS office复习题及答案2017
  3. 抽屉效果_宜家靠边,好用不贵的全格收纳抽屉使用感受
  4. pythonfor循环range_python之for循环与range()函数
  5. varnish缓存服务器构建疑问
  6. 礼赞 Wordpress,蝉知可直接使用 Wordpress 模板
  7. LinuxUSB驱动程序调试--009:编写应用程序---验证协议【转】
  8. android升级SDK后,XML graphical layout无法预览的解决
  9. Web Service学习笔记(4)
  10. 每天学一点flash(16) as3.0 与asp通信(3) 错误探究