本文来说下netty中的核心知识ChannelHandler 与 ChannelPipeline 详解

文章目录

  • 概述
  • ChannelHandler 家族
    • Channel生命周期
    • ChannelHandler 生命周期
    • ChannelHanler子接口
      • ChannelInboundHandler接口
      • ChannelOutboundHandler接口
    • 资源管理
  • ChannelPipeline
    • ChannelPipeline相对论
    • 修改ChannelPipeline
    • ChannelHandler的执行和阻塞
    • 触发事件
  • ChannelHandlerContext
  • 本文小结

概述

我们在上一篇文章中研究的 bytebuf 是一个容器用来“包装”数据。在本章我们将探讨这些容器如何通过应用程序来移动,传入和传出,以及他们的内容是如何处理的。

本章主要内容

  • Channel
  • ChannelHandler
  • ChannePipeline
  • ChannelHandlerContext

ChannelHandler 家族

Channel生命周期

在 Channel 的生命周期中,它的状态与 ChannelHandler 是密切相关的,下列是 Channel 组件的四个状态:

状态 描述
ChannelUnregistered Channel 没有注册到 EventLoop
ChannelRegistered Channel 被注册到了 EventLoop
ChannelActive Channel 已经连接到它的远程节点,处于活动状态,可以收发数据
ChannelInactive Channel 与远程节点断开不再处于活动状态

Channel 的生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件,ChannelPipeline 中的ChannelHandler 就可以及时做出处理:


ChannelHandler 生命周期

ChannelHandler 接口定义了其生命周期中的操作,当ChanelHandler被添加到ChannelPipeline 或从ChannelPipeline中移除时,会调用这些操作,ChannelHandler的生命周期如下:

方法 描述
handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时调用此方法
handlerRemoved 当把 ChannelHandler 从 ChannelPipeline 中移除的时候会调用此方法
exceptionCaught 当 ChannelHandler 在处理数据的过程中发生异常时会调用此方法

ChannelHanler子接口

Netty 提供2个重要的 ChannelHandler 子接口:

  • ChannelInboundHandler - 处理进站数据,并且所有状态都更改
  • ChannelOutboundHandler - 处理出站数据,允许拦截各种操作

ChannelInboundHandler接口

ChannelInboundHandler会在接受数据或者其对应的Channel状态发生改变时调用其生命周期的方法, ChannelInboundHandler的生命周期和Channel的生命周期其实是密切相关的。 以下是ChannelInboundHandler的生命周期方法:

方法 描述
ChannelRegistered 当Channel被注册到EventLoop且能够处理IO事件时会调用此方法
ChannelUnregistered 当Channel从EventLoop注销且无法处理任何IO事件时会调用此方法
ChannelActive 当Channel已经连接到远程节点(或者已绑定本地address)且处于活动状态时会调用此方法
ChannelInactive 当Channel与远程节点断开,不再处于活动状态时调用此方法
ChannelReadComplete 当Channel的某一个读操作完成时调用此方法
ChannelRead 当Channel有数据可读时调用此方法
ChannelWritabilityChanged 当Channel的可写状态发生改变时调用此方法,可以调用Channel的isWritable方法检测Channel的可写性,还可以通过ChannelConfig来配置write操作相关的属性
userEventTriggered 当ChannelInboundHandler的fireUserEventTriggered方法被调用时才调用此方法。

这里有一个细节一定需要注意:当我们实现ChannelInboundHandler的channelRead方法时,请一定要记住 使用ReferenceCountUtil的release方法释放ByteBuf,这样可以减少内存的消耗,所以我们可以实现一个 ChannelHandler来完成对ByteBuf的释放,就像下面这样:


由于手工管理资源会很繁琐,您可以通过使用 SimpleChannelInboundHandler 简化问题,因为SimpleChannelInboundHandler已经帮我们 把与业务无关的逻辑在ChannelRead方法实现了,我们只需要实现它的channelRead0方法来完成我们的逻辑就够了:

@ChannelHandler.Sharable
// 继承 SimpleChannelInboundHandler
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, Object msg) {// 不需做特别的释放资源的动作}}

ChannelInitializer

回顾一下我们在第一个 Netty 应用章节的服务端代码:

public final class HelloServer {......private void start() throws InterruptedException {// 1.bossGroup 线程用于接收连接,workerGroup 线程用于具体的处理EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 2.创建服务端启动引导/辅助类:ServerBootstrapServerBootstrap b = new ServerBootstrap();// 3.给引导类配置两大线程组,确定了线程模型b.group(bossGroup, workerGroup)// (非必备)打印日志.handler(new LoggingHandler(LogLevel.INFO))// 4.指定 IO 模型为 NIO.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();// 5.可以自定义客户端消息的业务处理逻辑p.addLast(new HelloServerHandler());}});......}}......}

其中加入 ServerBootstrap 中处理的 channel 是一个 ChannelInitializer ,这是怎么回事呢,不是应该相应的ChannelHandler 吗?我们来看一下Channelnitializer源代码:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);/*** 这个方法会在Channle被注册时候调用,在方法返回之后,这个实例会在Channel对应的ChannelPipeline中删除** @param ch            the {@link Channel} which was registered.* @throws Exception    is thrown if an error occurs. In that case the {@link Channel} will be closed.*/protected abstract void initChannel(C ch) throws Exception;@SuppressWarnings("unchecked")@Overridepublic final void channelRegistered(ChannelHandlerContext ctx)throws Exception {boolean removed = false;boolean success = false;try {initChannel((C) ctx.channel());ctx.pipeline().remove(this);removed = true;ctx.fireChannelRegistered();success = true;} catch (Throwable t) {logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);} finally {if (!removed) {ctx.pipeline().remove(this);}if (!success) {ctx.close();}}}
}

从上面可以看出ChannelInitializer其实也是一个ChannelHandler,只是ChannelInitializer的主要任务不是对IO进行处理,而更多的负责对注册到EventGroup的Channel进行 init 处理,其中大多是进行加入 Handler 的处理


ChannelOutboundHandler接口

出站数据将由ChannelOutboundHandler处理,它的方法将被Channel,ChannelPipeline以及ChannelHandlerContext调用 (Channel,ChannelPipeline,ChannelHandlerContext都拥有write操作),以下是ChannelOutboundHandler的主要方法:

状态 描述
bind 当Channel绑定到本地address时会调用此方法
connect 当Channel连接到远程节点时会调用此方法
disconnect 当Channel和远程节点断开时会调用此方法
close 当关闭Channel时会调用此方法
deregister 当Channel从它的EventLoop注销时会调用此方法
read 当从Channel读取数据时会调用此方法
flush 当Channel将数据冲刷到远程节点时调用此方法
write 当通过Channel将数据写入到远程节点时调用此方法

几乎所有的方法都将 ChannelPromise 作为参数, 一旦请求结束要通过 ChannelPipeline 转发的时候,必须通知此参数。

ChannelPromise vs. ChannelFuture:

ChannelPromise 是 特殊的 ChannelFuture (ChannelPromise 扩展了 ChannelFuture)。任何时候调用例如 Channel.write(…) 一个新的 ChannelPromise将会创建并且通过 ChannelPipeline传递。这次写操作本身将会返回 ChannelFuture, 这样只允许你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作为返回的 ChannelFuture 的通知,事实上在大多数时候就是 ChannelPromise 自身。


.

资源管理

当我们使用ChannelInboundHandler的read或ChannelOutboundHandler的write操作时,我们都需要保证 没有任何资源泄露并尽可能的减少资源耗费。之前已经介绍过了ReferenceCount引用计数用于处理池化的 ByteBuf资源。 为了帮助我们诊断潜在的的资源泄露问题,Netty提供了ResourceLeakDetector,它将 对我们的Netty程序的已分配的缓冲区做大约 1% 的采样用以检测内存泄露,Netty 目前定义了4种泄露检测级别,如下

级别 描述
Disabled 禁用泄露检测。我们应当在详细测试之后才应该使用此级别。
SIMPLE 使用1%的默认采样率检测并报告任何发现的泄露,这是默认的检测级别。。
ADVANCED 使用默认的采样率,报告任何发现的泄露以及对应的消息的位置。
PARANOID 类似于ADVANCED,但是每次都会对消息的访问进行采样,此级别可能会对程序的性能造成影响,应该用于调试阶段。

我们可以通过 JVM 启动参数来设置leakDetector的级别:

java -Dio.netty.leakDetectionLevel=ADVANCED

ChannelPipeline

在 Netty 组件中也介绍过了,ChannelPipeline是一系列ChannelHandler组成的拦截链,每一个新创建的Channel 都会被分配一个新的ChannelPipeline,Channel和ChannelPipeline之间的关联是持久的,无需我们干涉它们 之间的关系。

ChannelPipeline相对论

Netty 总是将ChannelPipeline的入站口作为头部,出站口作为尾部,当我们通过ChannelPipeline的add方法 将入站处理器和出站处理器混合添加到ChannelPipeline后,ChannelHandler的顺序如下:


一个入站事件将从ChannelPipeline的头部(左侧)向尾部(右侧)开始传播,出站事件的传播则是与入站的传播方向相反。当ChannelPipeline在ChannelHandler之间传播事件的时候,它会判断下一个ChannelHandler的类型是否与当前ChannelHandler的类型相同(通过 ChannelHandlerContext),如果相同则说明它们是一个方向的事件, 如果不同则跳过该ChannelHandler并前进到下一个ChannelHandler,直到它找到相同类型的ChannelHandler。


修改ChannelPipeline

ChannelPipeline可以通过添加,删除和修改ChannelHandler来修改它自身的布局,这是它最基本的能力, 一下列举了ChannelPipeline的一些修改方法:

名称 描述
addFirst addBefore addAfter addLast 添加 ChannelHandler 到 ChannelPipeline.
Remove 从 ChannelPipeline 移除 ChannelHandler…
Replace 在 ChannelPipeline 替换另外一个 ChannelHandler.

ChannelHandler的执行和阻塞

通常ChannelPipeline中的每个ChannelHandler都是通过它(ChannelPipeline)的EventLoop线程来处理 传递给他的数据的,所以我们不能去阻塞这个线程,否则会对整体的 IO 操作产生负面影响。 但有时候不得已 需要使用阻塞的 API 来完成逻辑处理,对于这种情况,ChannelPipeline的某些方法支持接受一个EventLoopGroup 类型的参数,我们可以通过自定义EventLoopGroup的方式,使ChannelHandler在我们的EventLoopGroup内处理数据。 这样一来,就可以避免阻塞线程的影响了。


触发事件

ChannelPipeline的API不仅有对ChannelHandler的增删改操作,还有对入站和出站操作的附加方法,如ChannelPipeline的入站方法:

方法 描述
fireChannelRegistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered方法
fireChannelUnregistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnregistered方法
fireChannelActive 调用ChannelPipeline中下一个ChannelInboundHandler的channelActive方法
fireChannelInactive 调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive方法
fireExceptionCaught 调用ChannelPipeline中下一个ChannelInboundHandler的exceptionCaught方法
fireUserEventTriggered 调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered方法
fireChannelRead 调用ChannelPipeline中下一个ChannelInboundHandler的channelRead方法
fireChannelReadComplete 调用ChannelPipeline中下一个ChannelInboundHandler的channelReadComplete方法
fireChannelWritabilityChanged 调用ChannelPipeline中下一个ChannelInboundHandler的channelWritabilityChanged方法

ChannelPipeline的出站方法:

方法 描述
bind 调用ChannelPipeline中下一个ChannelOutboundHandler的bind方法,将Channel与本地地址绑定
connect 调用ChannelPipeline中下一个ChannelOutboundHandler的connect方法,将Channel连接到远程节点
disconnect 调用ChannelPipeline中下一个ChannelOutboundHandler的disconnect方法,将Channel与远程连接断开
close 调用ChannelPipeline中下一个ChannelOutboundHandler的close方法,将Channel关闭
deregister 调用ChannelPipeline中下一个ChannelOutboundHandler的deregister方法,将Channel从其对应的EventLoop注销
flush 调用ChannelPipeline中下一个ChannelOutboundHandler的flush方法,将Channel的数据冲刷到远程节点
write 调用ChannelPipeline中下一个ChannelOutboundHandler的write方法,将数据写入Channel
writeAndFlush 先调用write方法,然后调用flush方法,将数据写入并刷回远程节点
read 调用ChannelPipeline中下一个ChannelOutboundHandler的read方法,从Channel中读取数据

ChannelHandlerContext

ChannelHandlerContext代表的是ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler 添加到ChannelPipeline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是 管理它所关联的ChannelHandler与同一个ChannelPipeline中的其他ChannelHandler之间的交互:


ChannelHandlerContext的大部分方法和Channel和ChannelPipeline相似,但有一个重要的区别是: 调用Channel或ChannelPipeline的方法其影响是会沿着整个 ChannelPipeline 进行传播:

//使用Chanel write
Channel channel = ctx.channel();
ctx.write(xxx);//使用Pipeline write
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(xxx);


而调用 ChannelHandlerContext 的方法则是从其关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的 ChannelHandler:

//使用ChannelContext write
ChannelHandlerContext ctx = context;
ctx.write(xxx);


下面是一些比较重要的方法,有些和ChannelPipeline功能相似的方法就不再罗列了,各位同学可以直接查看原API。

方法 描述
alloc 获取与当前ChannelHandlerContext所关联的Channel的ByteBufAllocator
handler 返回与当前ChannelHandlerContext绑定的ChannelHandler
pipeline 返回与当前ChannelHandlerContext关联的ChannelPipeline
… …

ChannelHandlerContext 的高级用法

有时候我们需要在多个ChannelPipeline之间共享一个ChannelHandler,以此实现跨管道处理(获取)数据 的功能,此时的ChannelHandler属于多个ChannelPipeline,且会绑定到不同的ChannelHandlerContext上。

在多个ChannelPipeline之间共享ChannelHandler我们需要使用 @Sharable 注解,这代表着它是一个共享的 ChannelHandler,如果一个ChannelHandler没有使用@Sharable注解却被用于多个ChannelPipeline,那么 将会触发异常。 还有非常重要的一点:一个ChannelHandler被用于多个ChannelPipeline肯定涉及到多线程 数据共享的问题,因此我们需要保证ChannelHandler的方法同步。 下面是一个很好的例子:

@ChannelHandler.Sharable
public class UnsafeSharableChannelHandler extends ChannelInboundHandlerAdapter{private int count;@Overridepublic void channelRead(ChannelHandlerContext ctx,Object msg){count++;System.out.println("count : " + count);ctx.fireChannelRead(msg);}
}

上面这个ChannelHandler标识了@Sharable注解,这代表它需要被用于多个ChannelPipeline之间

这段代码的问题是它持有状态: 一个实例变量 count 保持了方法调用的计数。将这个类的一个实例添加到 ChannelPipeline 并发访问通道时很可能产生错误。一个简单的解决方法就是给修改了count的变量的方法 channelRead 加synchronized关键字,确保即使在多个ChannelPipeline之间共享, ChannelHandler也能保证数据一致。


本文小结

本文详细介绍了netty中有关ChannelHandler 与 ChannelPipeline的知识与内容。

ChannelHandler 与 ChannelPipeline 详解相关推荐

  1. netty系列之:channelPipeline详解

    文章目录 简介 ChannelPipeline 事件传递 DefaultChannelPipeline 总结 简介 我们在介绍channel的时候提到过,几乎channel中所有的实现都是通过chan ...

  2. java内部格式_详解java内部类的访问格式和规则

    详解java内部类的访问格式和规则 1.内部类的定义 定义一个类来描述事物,但是这个事物其中可能还有事物,这时候在类中再定义类来描述. 2.内部类访问规则 ①内部类可以直接访问外部类中的成员,包括私有 ...

  3. Netty详解(五):Netty TCP粘包 拆包

    1. 概述 无论是服务端还是客户端,我们读取或者发送消息的时候,都需要考虑TCP底层的粘包和拆包机制.下面我们来通过Netty来详解TCP底层的粘包和拆包机制. 2. TCP底层的粘包和拆包机制 TC ...

  4. netty系列之:netty中的Channel详解

    文章目录 简介 Channel详解 异步IO和ChannelFuture Channel的层级结构 释放资源 事件处理 总结 简介 Channel是连接ByteBuf和Event的桥梁,netty中的 ...

  5. Transport(传输) 详解

    本文来说下有关Transport(传输) 详解 文章目录 概述 传输 API - Channel 概述 直观感受 Netty 统一的传输 API Netty自带的传输方式/协议 零拷贝 什么是零拷贝 ...

  6. Netty网络编程(三):Channel详解

    文章目录 简介 Channel详解 异步IO和ChannelFuture Channel的层级结构 释放资源 事件处理 总结 简介 Channel是连接ByteBuf和Event的桥梁,netty中的 ...

  7. Netty详解(持续更新中)

    Netty详解 1. Netty概述 1.1 Netty简介 1.2 原生NIO问题 1.3 Netty特点 1.4 Netty应用场景 1.3 Netty版本说明 2. Java IO模型 2.1 ...

  8. 从源码分析RocketMQ系列-Remoting通信架构源码详解

    导语   这篇博客要从官方给出的一张图开始说起,之前的分析我们都是简单的分析了一下消息传递的流程,以及消息传递流程过程中出现的一些类的封装,并且提出,所有的封装操作都是为了更加高效的服务于NameSe ...

  9. Buffer(缓冲/字节容器)详解

    本文来说下Buffer(缓冲/字节容器)详解 文章目录 概述 Buffer API ByteBuf - Netty 字节数据的容器 ByteBuf如何工作 ByteBuf 使用模式 HEAP BUFF ...

最新文章

  1. 干货 :数据可视化的10个关键术语
  2. 日常工作,怎么结合工具设计有效的时间管理?
  3. python全栈开发笔记---基本数据类型--数字型魔法
  4. ubuntu 11.10上安装osdlyrics 歌词插件
  5. 在这里的周末休息也就是看看奥运
  6. Java黑皮书课后题第5章:**5.37(十进制转二进制)编写程序,提示用户输入一个十进制整数,然后显示对应的二进制值(不要使用Integer.toBinaryString(int)方法)
  7. java ipv6校验_java - IPv6地址验证和规范化[关闭] - 堆栈内存溢出
  8. java从数组查找指定整数_如何在Java中使用重复项查找整数数组中的K个缺失数字?...
  9. java el ognl_EL和OGNL表达式的区分
  10. python棋盘放麦粒求和递归_Python递归调用实现数字累加的代码
  11. Linux 中的vmlinuz
  12. Java:转换汉字为unicode形式的字符串和转换unicode形式字符串转换成汉字
  13. vue的万年历(日历)组件
  14. Mac SecureCRT 8.0.2破解版
  15. 华为云notebook在线解压压缩包问题
  16. Neural Voice Puppetry阅读笔记
  17. 从零开始成为一名合格的数据分析师:为什么必须学统计学?
  18. 【C语言】sizeof操作符详解
  19. hydrus1d使用说明_HYDRUS——1D中文说明书.pdf
  20. 全网最详细,手把手教你树莓派安装opencv模块

热门文章

  1. Design7:数据删除设计
  2. 大话项目管理工具之Confluence篇
  3. 基于8086CPU微处理器的汇编学习之JMP指令
  4. ListView和SlidingDrawer
  5. atlas 又多了几个新控件
  6. [原]ASP.Net常用功能整理--生成图片的缩略图
  7. 安装运行symfony框架编写的edusoho开源程序
  8. 创建war类型的maven工程时报web.xml is missing and failOnMissingWebXml is set to true
  9. Stanford机器学习---第五讲. 神经网络的学习 Neural Networks learning
  10. python服务器环境搭建(2)——安装相关软件