netty系列之:Event、Handler和Pipeline
文章目录
- 简介
- ChannelPipeline
- ChannelHandler
- ChannelHandlerContext
- ChannelHandler中的状态变量
- 异步Handler
- 总结
简介
上一节我们讲解了netty中的Channel,知道了channel是事件处理器和外部联通的桥梁。今天本文将会详细讲解netty的剩下几个非常总要的部分Event、Handler和PipeLine。
ChannelPipeline
pipeLine是连接Channel和handler的桥梁,它实际上是一个filter的实现,用于控制其中handler的处理方式。
当一个channel被创建的时候,和它对应的ChannelPipeline也会被创建。
先看下ChannelPipeline的定义:
public interface ChannelPipelineextends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable
首先ChannelPipeline继承自Iterable,表示它是可遍历的,而遍历的结果就是其中一个个的Handler。
作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote方法,通过这些方法就可以向ChannelPipeline中添加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是需要指定对应的filter的顺序的,所以ChannelPipeline中有addFirst和addLast这种添加不同顺序的方法。
然后可以看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。
先看一张channelPipeline的工作流程图:
可以看出ChannelPipeline主要有两种操作,一种是读入Inbound,一种是写出OutBound。
对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的方法。对于外部的IO写入的请求,调用的就是ChannelOutboundInvoker中的方法。
注意,Inbound和outbound的处理顺序是相反的,比如下面的例子:
ChannelPipeline p = ...;p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());
上面的代码中我们向ChannelPipeline添加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时处理In和Out的Handler。
那么当channel遇到inbound event的时候,就会按照1,2,3,4,5的顺序进行处理,但是只有InboundHandler才能处理Inbound事件,所以,真正执行的顺序是1,2,5。
同样的当channel遇到outbound event的时候,会按照5,4,3,2,1的顺序进行执行,但是只有outboundHandler才能处理Outbound事件,所以真正执行的顺序是5,4,3.
简单的说,ChannelPipeline指定了Handler的执行顺序。
ChannelHandler
netty是一个事件驱动的框架,所有的event都是由Handler来进行处理的。ChannelHandler可以处理IO、拦截IO或者将event传递给ChannelPipeline中的下一个Handler进行处理。
ChannelHandler的结构很简单,只有三个方法,分别是:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
根据inbound和outbound事件的不同,ChannelHandler可以分为两类,分别是ChannelInboundHandler 和ChannelOutboundHandler.
因为这两个都是interface,实现起来比较麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。前面两个很好理解,分别是inbound和outbound,最后一个可以同时处理inbound和outbound。
ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。
ChannelHandlerContext
ChannelHandlerContext可以让ChannelHandler和ChannelPipeline或者其他的Handler进行交互。它就是一个上下文环境,使得Handler和Channel可以相互作用。
如可以在ChannelHandlerContext中,调用channel()获得绑定的channel。可以通过调用handler()获得绑定的Handler。通过调用fire*方法来触发Channel的事件。
看下ChannelHandlerContext的定义:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
可以看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和传播相应的事件。
对于Inbound来说传播事件的方法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
对于Outbound来说传播事件的方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
这些方法,在一个Handler中调用,然后将事件传递给下一个Handler,如下所示:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("Connected!");ctx.fireChannelActive();}}public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) {System.out.println("Closing ..");ctx.close(promise);}}
ChannelHandler中的状态变量
ChannelHandler是一个Handler类,一般情况下,这个类的实例是可以被多个channel共同使用的,前提是这个ChannelHandler没有共享的状态变量。
但有时候,我们必须要在ChannelHandler中保持一个状态,那么就涉及到ChannelHandler中的状态变量的问题,看下面的一个例子:
public interface Message {// your methods here}public class DataServerHandler extends SimpleChannelInboundHandler<Message> {private boolean loggedIn;@Overridepublic void channelRead0(ChannelHandlerContext ctx, Message message) {if (message instanceof LoginMessage) {authenticate((LoginMessage) message);loggedIn = true;} else (message instanceof GetDataMessage) {if (loggedIn) {ctx.writeAndFlush(fetchSecret((GetDataMessage) message));} else {fail();}}}...}
这个例子中,我们需要在收到LoginMessage之后,对消息进行认证,并保存认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。
那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能出现状态不一致的问题。一个channel绑定一个Handler实例,很简单,只需要在initChannel方法中使用new关键字新建一个对象即可。
public class DataServerInitializer extends ChannelInitializer<Channel> {@Overridepublic void initChannel(Channel channel) {channel.pipeline().addLast("handler", new DataServerHandler());}}
那么除了新建handler实例之外,还有没有其他的办法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是上面的例子,我们看一下使用AttributeKey应该怎么实现:
public interface Message {// your methods here}@Sharablepublic class DataServerHandler extends SimpleChannelInboundHandler<Message> {private final AttributeKey<Boolean> auth =AttributeKey.valueOf("auth");@Overridepublic void channelRead(ChannelHandlerContext ctx, Message message) {Attribute<Boolean> attr = ctx.attr(auth);if (message instanceof LoginMessage) {authenticate((LoginMessage) o);attr.set(true);} else (message instanceof GetDataMessage) {if (Boolean.TRUE.equals(attr.get())) {ctx.writeAndFlush(fetchSecret((GetDataMessage) o));} else {fail();}}}...}
上例中,首先定义了一个AttributeKey,然后使用ChannelHandlerContext的attr方法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即使使用同一个Handler在不同的Channel中该属性也是不同的。
下面是使用共享Handler的例子:
public class DataServerInitializer extends ChannelInitializer<Channel> {private static final DataServerHandler SHARED = new DataServerHandler();@Overridepublic void initChannel(Channel channel) {channel.pipeline().addLast("handler", SHARED);}}
注意,在定义DataServerHandler的时候,我们加上了@Sharable注解,如果一个ChannelHandler使用了@Sharable注解,那就意味着你可以只创建一次这个Handler,但是可以将其绑定到一个或者多个ChannelPipeline中。
注意,@Sharable注解是为java文档准备的,并不会影响到实际的代码执行效果。
异步Handler
之前介绍了,可以通过调用pipeline.addLast方法将handler加入到pipeline中,因为pipeline是一个filter的结构,所以加入的handler是顺序进行处理的。
但是,我希望某些handler是在新的线程中执行该怎么办?如果我们希望这些新的线程中执行的Handler是无序的又该怎么办?
比如我们现在有3个handler分别是MyHandler1,MyHandler2和MyHandler3。
顺序执行的写法是这样的:
ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("MyHandler1", new MyHandler1());pipeline.addLast("MyHandler2", new MyHandler2());pipeline.addLast("MyHandler3", new MyHandler3());
如果要让MyHandler3在新的线程中执行,则可以加入group选项,从而让handler在新的group中运行:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("MyHandler1", new MyHandler1());pipeline.addLast("MyHandler2", new MyHandler2());pipeline.addLast(group,"MyHandler3", new MyHandler3());
但是上例中DefaultEventExecutorGroup加入的Handler也是会顺序执行的,如果确实不想顺序执行,那么可以尝试考虑使用UnorderedThreadPoolEventExecutor 。
总结
本文讲解了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实践出发,进一步加深对netty的理解和应用,希望大家能够喜欢。
本文已收录于 http://www.flydean.com/05-netty-channelevent/
最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!
netty系列之:Event、Handler和Pipeline相关推荐
- netty系列之:channelHandlerContext详解
文章目录 简介 ChannelHandlerContext和它的应用 AbstractChannelHandlerContext DefaultChannelHandlerContext 总结 简介 ...
- netty系列之:自定义编码解码器
文章目录 简介 自定义编码器 自定义解码器 添加编码解码器到pipeline 计算2的N次方 总结 简介 在之前的netty系列文章中,我们讲到了如何将对象或者String转换成为ByteBuf,通过 ...
- netty系列之:netty架构概述
文章目录 简介 netty架构图 丰富的Buffer数据机构 零拷贝 统一的API 事件驱动 其他优秀的特性 总结 简介 Netty为什么这么优秀,它在JDK本身的NIO基础上又做了什么改进呢?它的架 ...
- netty 远程主机强迫关闭了一个现有的连接_死磕netty系列《一、netty基础概念》...
1. Channel Channel代表了netty对网络连接的抽象,Channel是线程安全的,它提供了一些重要信息,比如当前网络连接的状态,远程的主机连接地址和本地的连接地址, 我们可以通过 Ch ...
- 【Netty系列_3】Netty源码分析之服务端channel
highlight: androidstudio 前言 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精 ...
- 【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ?
[读后感]Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ? 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商 ...
- Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列
文章目录 1 Netty线程模型 1.1 传统阻塞 I/O 服务模型 1.2 Reactor线程模型 1.2.1 单 Reactor 单线程模型 1.2.2 单Reactor多线程 1.2.3 主从 ...
- Nvidia Deepstream小细节系列:Deepstream python保存pipeline结构图
Nvidia Deepstream小细节系列:Deepstream python保存pipeline结构图 提示:此章节我们将着重阐述如何在Deepstream Python运行的情况下保存pipel ...
- 给Event handler传递动态参数
有段时间没写些东西了......参加工作大半年 感觉是收获不小啊 新年以致 在这里我想真心的祝福所有园友 所有有梦想的人 新的一年技术更牛X 身体棒棒 职业发展更上一层 好了 回过来讲主要内容. 在j ...
最新文章
- 【Cocosd2d实例教程五】Cocos2d添加虚拟摇杆控制器
- 技术解析系列 | PouchContainer 支持 LXCFS 实现高可靠容器隔离
- gtw-050090|执行拦截器时发生异常_执行流程 | 你真的了解Spring AOP的执行顺序吗?...
- 【渝粤教育】国家开放大学2018年秋季 2006T经济数学基础12 参考试题
- 实战|手把手教你训练一个基于Keras的多标签图像分类器
- 左侧固定右侧自动填充_ai怎么填充颜色?在ai里怎么填充颜色?
- 【C语言】C语言学习整理-putchar,printf,getchar,scanf定义及区别
- 《哪吒之魔童降世》电影密钥延期至9月26日,将冲击中国票房总榜前三
- 准备在北京Tech·Ed上组织博客园聚会
- 今天起改用mac的marsedit写博
- 软考初级信息处理技术员(一)
- python批量下载抖音视频_Python一键批量下载抖音无水印视频
- Python实现康威生命游戏
- Label propagation
- zk+kafka集群
- c语言校友通讯录毕业论文,校友录毕业论文(C_+sql2005).doc
- 基于python高校学生管理系统
- 《惢客创业日记》2018.11.02(周五) “追梦大叔”的回忆
- Spark - Illegal pattern component: XXX 与org.apache.commons.lang3.time.FastDateFormat incompatible
- 市场调研—全球与中国手推式扫地机市场现状及未来发展趋势