ChannelHandler 与 ChannelPipeline 详解
本文来说下netty中的核心知识ChannelHandler 与 ChannelPipeline 详解
文章目录
- 概述
- ChannelHandler 家族
- Channel生命周期
- ChannelHandler 生命周期
- ChannelHanler子接口
- ChannelInboundHandler接口
- ChannelOutboundHandler接口
- 资源管理
- ChannelPipeline
- ChannelPipeline相对论
- 修改ChannelPipeline
- ChannelHandler的执行和阻塞
- 触发事件
- ChannelHandlerContext
- 本文小结
概述
我们在上一篇文章中研究的 bytebuf 是一个容器用来“包装”数据。在本章我们将探讨这些容器如何通过应用程序来移动,传入和传出,以及他们的内容是如何处理的。
本章主要内容
- Channel
- ChannelHandler
- ChannePipeline
- ChannelHandlerContext
ChannelHandler 家族
Channel生命周期
在 Channel 的生命周期中,它的状态与 ChannelHandler 是密切相关的,下列是 Channel 组件的四个状态:
状态 | 描述 |
---|---|
ChannelUnregistered | Channel 没有注册到 EventLoop |
ChannelRegistered | Channel 被注册到了 EventLoop |
ChannelActive | Channel 已经连接到它的远程节点,处于活动状态,可以收发数据 |
ChannelInactive | Channel 与远程节点断开不再处于活动状态 |
Channel 的生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件,ChannelPipeline 中的ChannelHandler 就可以及时做出处理:
ChannelHandler 生命周期
ChannelHandler 接口定义了其生命周期中的操作,当ChanelHandler被添加到ChannelPipeline 或从ChannelPipeline中移除时,会调用这些操作,ChannelHandler的生命周期如下:
方法 | 描述 |
---|---|
handlerAdded | 当把 ChannelHandler 添加到 ChannelPipeline 中时调用此方法 |
handlerRemoved | 当把 ChannelHandler 从 ChannelPipeline 中移除的时候会调用此方法 |
exceptionCaught | 当 ChannelHandler 在处理数据的过程中发生异常时会调用此方法 |
ChannelHanler子接口
Netty 提供2个重要的 ChannelHandler 子接口:
- ChannelInboundHandler - 处理进站数据,并且所有状态都更改
- ChannelOutboundHandler - 处理出站数据,允许拦截各种操作
ChannelInboundHandler接口
ChannelInboundHandler会在接受数据或者其对应的Channel状态发生改变时调用其生命周期的方法, ChannelInboundHandler的生命周期和Channel的生命周期其实是密切相关的。 以下是ChannelInboundHandler的生命周期方法:
方法 | 描述 |
---|---|
ChannelRegistered | 当Channel被注册到EventLoop且能够处理IO事件时会调用此方法 |
ChannelUnregistered | 当Channel从EventLoop注销且无法处理任何IO事件时会调用此方法 |
ChannelActive | 当Channel已经连接到远程节点(或者已绑定本地address)且处于活动状态时会调用此方法 |
ChannelInactive | 当Channel与远程节点断开,不再处于活动状态时调用此方法 |
ChannelReadComplete | 当Channel的某一个读操作完成时调用此方法 |
ChannelRead | 当Channel有数据可读时调用此方法 |
ChannelWritabilityChanged | 当Channel的可写状态发生改变时调用此方法,可以调用Channel的isWritable方法检测Channel的可写性,还可以通过ChannelConfig来配置write操作相关的属性 |
userEventTriggered | 当ChannelInboundHandler的fireUserEventTriggered方法被调用时才调用此方法。 |
这里有一个细节一定需要注意:当我们实现ChannelInboundHandler的channelRead方法时,请一定要记住 使用ReferenceCountUtil的release方法释放ByteBuf,这样可以减少内存的消耗,所以我们可以实现一个 ChannelHandler来完成对ByteBuf的释放,就像下面这样:
由于手工管理资源会很繁琐,您可以通过使用 SimpleChannelInboundHandler 简化问题,因为SimpleChannelInboundHandler已经帮我们 把与业务无关的逻辑在ChannelRead方法实现了,我们只需要实现它的channelRead0方法来完成我们的逻辑就够了:
@ChannelHandler.Sharable
// 继承 SimpleChannelInboundHandler
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, Object msg) {// 不需做特别的释放资源的动作}}
ChannelInitializer
回顾一下我们在第一个 Netty 应用章节的服务端代码:
public final class HelloServer {......private void start() throws InterruptedException {// 1.bossGroup 线程用于接收连接,workerGroup 线程用于具体的处理EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 2.创建服务端启动引导/辅助类:ServerBootstrapServerBootstrap b = new ServerBootstrap();// 3.给引导类配置两大线程组,确定了线程模型b.group(bossGroup, workerGroup)// (非必备)打印日志.handler(new LoggingHandler(LogLevel.INFO))// 4.指定 IO 模型为 NIO.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();// 5.可以自定义客户端消息的业务处理逻辑p.addLast(new HelloServerHandler());}});......}}......}
其中加入 ServerBootstrap 中处理的 channel 是一个 ChannelInitializer ,这是怎么回事呢,不是应该相应的ChannelHandler 吗?我们来看一下Channelnitializer源代码:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);/*** 这个方法会在Channle被注册时候调用,在方法返回之后,这个实例会在Channel对应的ChannelPipeline中删除** @param ch the {@link Channel} which was registered.* @throws Exception is thrown if an error occurs. In that case the {@link Channel} will be closed.*/protected abstract void initChannel(C ch) throws Exception;@SuppressWarnings("unchecked")@Overridepublic final void channelRegistered(ChannelHandlerContext ctx)throws Exception {boolean removed = false;boolean success = false;try {initChannel((C) ctx.channel());ctx.pipeline().remove(this);removed = true;ctx.fireChannelRegistered();success = true;} catch (Throwable t) {logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);} finally {if (!removed) {ctx.pipeline().remove(this);}if (!success) {ctx.close();}}}
}
从上面可以看出ChannelInitializer其实也是一个ChannelHandler,只是ChannelInitializer的主要任务不是对IO进行处理,而更多的负责对注册到EventGroup的Channel进行 init 处理,其中大多是进行加入 Handler 的处理
ChannelOutboundHandler接口
出站数据将由ChannelOutboundHandler处理,它的方法将被Channel,ChannelPipeline以及ChannelHandlerContext调用 (Channel,ChannelPipeline,ChannelHandlerContext都拥有write操作),以下是ChannelOutboundHandler的主要方法:
状态 | 描述 |
---|---|
bind | 当Channel绑定到本地address时会调用此方法 |
connect | 当Channel连接到远程节点时会调用此方法 |
disconnect | 当Channel和远程节点断开时会调用此方法 |
close | 当关闭Channel时会调用此方法 |
deregister | 当Channel从它的EventLoop注销时会调用此方法 |
read | 当从Channel读取数据时会调用此方法 |
flush | 当Channel将数据冲刷到远程节点时调用此方法 |
write | 当通过Channel将数据写入到远程节点时调用此方法 |
几乎所有的方法都将 ChannelPromise 作为参数, 一旦请求结束要通过 ChannelPipeline 转发的时候,必须通知此参数。
ChannelPromise vs. ChannelFuture:
ChannelPromise 是 特殊的 ChannelFuture (ChannelPromise 扩展了 ChannelFuture)。任何时候调用例如 Channel.write(…) 一个新的 ChannelPromise将会创建并且通过 ChannelPipeline传递。这次写操作本身将会返回 ChannelFuture, 这样只允许你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作为返回的 ChannelFuture 的通知,事实上在大多数时候就是 ChannelPromise 自身。
.
资源管理
当我们使用ChannelInboundHandler的read或ChannelOutboundHandler的write操作时,我们都需要保证 没有任何资源泄露并尽可能的减少资源耗费。之前已经介绍过了ReferenceCount引用计数用于处理池化的 ByteBuf资源。 为了帮助我们诊断潜在的的资源泄露问题,Netty提供了ResourceLeakDetector,它将 对我们的Netty程序的已分配的缓冲区做大约 1% 的采样用以检测内存泄露,Netty 目前定义了4种泄露检测级别,如下
级别 | 描述 |
---|---|
Disabled | 禁用泄露检测。我们应当在详细测试之后才应该使用此级别。 |
SIMPLE | 使用1%的默认采样率检测并报告任何发现的泄露,这是默认的检测级别。。 |
ADVANCED | 使用默认的采样率,报告任何发现的泄露以及对应的消息的位置。 |
PARANOID | 类似于ADVANCED,但是每次都会对消息的访问进行采样,此级别可能会对程序的性能造成影响,应该用于调试阶段。 |
我们可以通过 JVM 启动参数来设置leakDetector的级别:
java -Dio.netty.leakDetectionLevel=ADVANCED
ChannelPipeline
在 Netty 组件中也介绍过了,ChannelPipeline是一系列ChannelHandler组成的拦截链,每一个新创建的Channel 都会被分配一个新的ChannelPipeline,Channel和ChannelPipeline之间的关联是持久的,无需我们干涉它们 之间的关系。
ChannelPipeline相对论
Netty 总是将ChannelPipeline的入站口作为头部,出站口作为尾部,当我们通过ChannelPipeline的add方法 将入站处理器和出站处理器混合添加到ChannelPipeline后,ChannelHandler的顺序如下:
一个入站事件将从ChannelPipeline的头部(左侧)向尾部(右侧)开始传播,出站事件的传播则是与入站的传播方向相反。当ChannelPipeline在ChannelHandler之间传播事件的时候,它会判断下一个ChannelHandler的类型是否与当前ChannelHandler的类型相同(通过 ChannelHandlerContext),如果相同则说明它们是一个方向的事件, 如果不同则跳过该ChannelHandler并前进到下一个ChannelHandler,直到它找到相同类型的ChannelHandler。
修改ChannelPipeline
ChannelPipeline可以通过添加,删除和修改ChannelHandler来修改它自身的布局,这是它最基本的能力, 一下列举了ChannelPipeline的一些修改方法:
名称 | 描述 |
---|---|
addFirst addBefore addAfter addLast | 添加 ChannelHandler 到 ChannelPipeline. |
Remove | 从 ChannelPipeline 移除 ChannelHandler… |
Replace | 在 ChannelPipeline 替换另外一个 ChannelHandler. |
ChannelHandler的执行和阻塞
通常ChannelPipeline中的每个ChannelHandler都是通过它(ChannelPipeline)的EventLoop线程来处理 传递给他的数据的,所以我们不能去阻塞这个线程,否则会对整体的 IO 操作产生负面影响。 但有时候不得已 需要使用阻塞的 API 来完成逻辑处理,对于这种情况,ChannelPipeline的某些方法支持接受一个EventLoopGroup 类型的参数,我们可以通过自定义EventLoopGroup的方式,使ChannelHandler在我们的EventLoopGroup内处理数据。 这样一来,就可以避免阻塞线程的影响了。
触发事件
ChannelPipeline的API不仅有对ChannelHandler的增删改操作,还有对入站和出站操作的附加方法,如ChannelPipeline的入站方法:
方法 | 描述 |
---|---|
fireChannelRegistered | 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered方法 |
fireChannelUnregistered | 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnregistered方法 |
fireChannelActive | 调用ChannelPipeline中下一个ChannelInboundHandler的channelActive方法 |
fireChannelInactive | 调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive方法 |
fireExceptionCaught | 调用ChannelPipeline中下一个ChannelInboundHandler的exceptionCaught方法 |
fireUserEventTriggered | 调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered方法 |
fireChannelRead | 调用ChannelPipeline中下一个ChannelInboundHandler的channelRead方法 |
fireChannelReadComplete | 调用ChannelPipeline中下一个ChannelInboundHandler的channelReadComplete方法 |
fireChannelWritabilityChanged | 调用ChannelPipeline中下一个ChannelInboundHandler的channelWritabilityChanged方法 |
ChannelPipeline的出站方法:
方法 | 描述 |
---|---|
bind | 调用ChannelPipeline中下一个ChannelOutboundHandler的bind方法,将Channel与本地地址绑定 |
connect | 调用ChannelPipeline中下一个ChannelOutboundHandler的connect方法,将Channel连接到远程节点 |
disconnect | 调用ChannelPipeline中下一个ChannelOutboundHandler的disconnect方法,将Channel与远程连接断开 |
close | 调用ChannelPipeline中下一个ChannelOutboundHandler的close方法,将Channel关闭 |
deregister | 调用ChannelPipeline中下一个ChannelOutboundHandler的deregister方法,将Channel从其对应的EventLoop注销 |
flush | 调用ChannelPipeline中下一个ChannelOutboundHandler的flush方法,将Channel的数据冲刷到远程节点 |
write | 调用ChannelPipeline中下一个ChannelOutboundHandler的write方法,将数据写入Channel |
writeAndFlush | 先调用write方法,然后调用flush方法,将数据写入并刷回远程节点 |
read | 调用ChannelPipeline中下一个ChannelOutboundHandler的read方法,从Channel中读取数据 |
ChannelHandlerContext
ChannelHandlerContext代表的是ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler 添加到ChannelPipeline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是 管理它所关联的ChannelHandler与同一个ChannelPipeline中的其他ChannelHandler之间的交互:
ChannelHandlerContext的大部分方法和Channel和ChannelPipeline相似,但有一个重要的区别是: 调用Channel或ChannelPipeline的方法其影响是会沿着整个 ChannelPipeline 进行传播:
//使用Chanel write
Channel channel = ctx.channel();
ctx.write(xxx);//使用Pipeline write
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(xxx);
而调用 ChannelHandlerContext 的方法则是从其关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的 ChannelHandler:
//使用ChannelContext write
ChannelHandlerContext ctx = context;
ctx.write(xxx);
下面是一些比较重要的方法,有些和ChannelPipeline功能相似的方法就不再罗列了,各位同学可以直接查看原API。
方法 | 描述 |
---|---|
alloc | 获取与当前ChannelHandlerContext所关联的Channel的ByteBufAllocator |
handler | 返回与当前ChannelHandlerContext绑定的ChannelHandler |
pipeline | 返回与当前ChannelHandlerContext关联的ChannelPipeline |
… … |
ChannelHandlerContext 的高级用法
有时候我们需要在多个ChannelPipeline之间共享一个ChannelHandler,以此实现跨管道处理(获取)数据 的功能,此时的ChannelHandler属于多个ChannelPipeline,且会绑定到不同的ChannelHandlerContext上。
在多个ChannelPipeline之间共享ChannelHandler我们需要使用 @Sharable 注解,这代表着它是一个共享的 ChannelHandler,如果一个ChannelHandler没有使用@Sharable注解却被用于多个ChannelPipeline,那么 将会触发异常。 还有非常重要的一点:一个ChannelHandler被用于多个ChannelPipeline肯定涉及到多线程 数据共享的问题,因此我们需要保证ChannelHandler的方法同步。 下面是一个很好的例子:
@ChannelHandler.Sharable
public class UnsafeSharableChannelHandler extends ChannelInboundHandlerAdapter{private int count;@Overridepublic void channelRead(ChannelHandlerContext ctx,Object msg){count++;System.out.println("count : " + count);ctx.fireChannelRead(msg);}
}
上面这个ChannelHandler标识了@Sharable注解,这代表它需要被用于多个ChannelPipeline之间
这段代码的问题是它持有状态: 一个实例变量 count 保持了方法调用的计数。将这个类的一个实例添加到 ChannelPipeline 并发访问通道时很可能产生错误。一个简单的解决方法就是给修改了count的变量的方法 channelRead 加synchronized关键字,确保即使在多个ChannelPipeline之间共享, ChannelHandler也能保证数据一致。
本文小结
本文详细介绍了netty中有关ChannelHandler 与 ChannelPipeline的知识与内容。
ChannelHandler 与 ChannelPipeline 详解相关推荐
- netty系列之:channelPipeline详解
文章目录 简介 ChannelPipeline 事件传递 DefaultChannelPipeline 总结 简介 我们在介绍channel的时候提到过,几乎channel中所有的实现都是通过chan ...
- java内部格式_详解java内部类的访问格式和规则
详解java内部类的访问格式和规则 1.内部类的定义 定义一个类来描述事物,但是这个事物其中可能还有事物,这时候在类中再定义类来描述. 2.内部类访问规则 ①内部类可以直接访问外部类中的成员,包括私有 ...
- Netty详解(五):Netty TCP粘包 拆包
1. 概述 无论是服务端还是客户端,我们读取或者发送消息的时候,都需要考虑TCP底层的粘包和拆包机制.下面我们来通过Netty来详解TCP底层的粘包和拆包机制. 2. TCP底层的粘包和拆包机制 TC ...
- netty系列之:netty中的Channel详解
文章目录 简介 Channel详解 异步IO和ChannelFuture Channel的层级结构 释放资源 事件处理 总结 简介 Channel是连接ByteBuf和Event的桥梁,netty中的 ...
- Transport(传输) 详解
本文来说下有关Transport(传输) 详解 文章目录 概述 传输 API - Channel 概述 直观感受 Netty 统一的传输 API Netty自带的传输方式/协议 零拷贝 什么是零拷贝 ...
- Netty网络编程(三):Channel详解
文章目录 简介 Channel详解 异步IO和ChannelFuture Channel的层级结构 释放资源 事件处理 总结 简介 Channel是连接ByteBuf和Event的桥梁,netty中的 ...
- Netty详解(持续更新中)
Netty详解 1. Netty概述 1.1 Netty简介 1.2 原生NIO问题 1.3 Netty特点 1.4 Netty应用场景 1.3 Netty版本说明 2. Java IO模型 2.1 ...
- 从源码分析RocketMQ系列-Remoting通信架构源码详解
导语 这篇博客要从官方给出的一张图开始说起,之前的分析我们都是简单的分析了一下消息传递的流程,以及消息传递流程过程中出现的一些类的封装,并且提出,所有的封装操作都是为了更加高效的服务于NameSe ...
- Buffer(缓冲/字节容器)详解
本文来说下Buffer(缓冲/字节容器)详解 文章目录 概述 Buffer API ByteBuf - Netty 字节数据的容器 ByteBuf如何工作 ByteBuf 使用模式 HEAP BUFF ...
最新文章
- 干货 :数据可视化的10个关键术语
- 日常工作,怎么结合工具设计有效的时间管理?
- python全栈开发笔记---基本数据类型--数字型魔法
- ubuntu 11.10上安装osdlyrics 歌词插件
- 在这里的周末休息也就是看看奥运
- Java黑皮书课后题第5章:**5.37(十进制转二进制)编写程序,提示用户输入一个十进制整数,然后显示对应的二进制值(不要使用Integer.toBinaryString(int)方法)
- java ipv6校验_java - IPv6地址验证和规范化[关闭] - 堆栈内存溢出
- java从数组查找指定整数_如何在Java中使用重复项查找整数数组中的K个缺失数字?...
- java el ognl_EL和OGNL表达式的区分
- python棋盘放麦粒求和递归_Python递归调用实现数字累加的代码
- Linux 中的vmlinuz
- Java:转换汉字为unicode形式的字符串和转换unicode形式字符串转换成汉字
- vue的万年历(日历)组件
- Mac SecureCRT 8.0.2破解版
- 华为云notebook在线解压压缩包问题
- Neural Voice Puppetry阅读笔记
- 从零开始成为一名合格的数据分析师:为什么必须学统计学?
- 【C语言】sizeof操作符详解
- hydrus1d使用说明_HYDRUS——1D中文说明书.pdf
- 全网最详细,手把手教你树莓派安装opencv模块
热门文章
- Design7:数据删除设计
- 大话项目管理工具之Confluence篇
- 基于8086CPU微处理器的汇编学习之JMP指令
- ListView和SlidingDrawer
- atlas 又多了几个新控件
- [原]ASP.Net常用功能整理--生成图片的缩略图
- 安装运行symfony框架编写的edusoho开源程序
- 创建war类型的maven工程时报web.xml is missing and failOnMissingWebXml is set to true
- Stanford机器学习---第五讲. 神经网络的学习 Neural Networks learning
- python服务器环境搭建(2)——安装相关软件