文章目录

  • 简介
  • ChannelPipeline
  • ChannelHandler
  • ChannelHandlerContext
  • ChannelHandler中的状态变量
  • 异步Handler
  • 总结

简介

上一节我们讲解了netty中的Channel,知道了channel是事件处理器和外部联通的桥梁。今天本文将会详细讲解netty的剩下几个非常总要的部分Event、Handler和PipeLine。

ChannelPipeline

pipeLine是连接Channel和handler的桥梁,它实际上是一个filter的实现,用于控制其中handler的处理方式。

当一个channel被创建的时候,和它对应的ChannelPipeline也会被创建。

先看下ChannelPipeline的定义:

public interface ChannelPipelineextends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable 

首先ChannelPipeline继承自Iterable,表示它是可遍历的,而遍历的结果就是其中一个个的Handler。

作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote方法,通过这些方法就可以向ChannelPipeline中添加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是需要指定对应的filter的顺序的,所以ChannelPipeline中有addFirst和addLast这种添加不同顺序的方法。

然后可以看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。

先看一张channelPipeline的工作流程图:

可以看出ChannelPipeline主要有两种操作,一种是读入Inbound,一种是写出OutBound。

对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的方法。对于外部的IO写入的请求,调用的就是ChannelOutboundInvoker中的方法。

注意,Inbound和outbound的处理顺序是相反的,比如下面的例子:

    ChannelPipeline p = ...;p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());

上面的代码中我们向ChannelPipeline添加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时处理In和Out的Handler。

那么当channel遇到inbound event的时候,就会按照1,2,3,4,5的顺序进行处理,但是只有InboundHandler才能处理Inbound事件,所以,真正执行的顺序是1,2,5。

同样的当channel遇到outbound event的时候,会按照5,4,3,2,1的顺序进行执行,但是只有outboundHandler才能处理Outbound事件,所以真正执行的顺序是5,4,3.

简单的说,ChannelPipeline指定了Handler的执行顺序。

ChannelHandler

netty是一个事件驱动的框架,所有的event都是由Handler来进行处理的。ChannelHandler可以处理IO、拦截IO或者将event传递给ChannelPipeline中的下一个Handler进行处理。

ChannelHandler的结构很简单,只有三个方法,分别是:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

根据inbound和outbound事件的不同,ChannelHandler可以分为两类,分别是ChannelInboundHandler 和ChannelOutboundHandler.

因为这两个都是interface,实现起来比较麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。前面两个很好理解,分别是inbound和outbound,最后一个可以同时处理inbound和outbound。

ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。

ChannelHandlerContext

ChannelHandlerContext可以让ChannelHandler和ChannelPipeline或者其他的Handler进行交互。它就是一个上下文环境,使得Handler和Channel可以相互作用。

如可以在ChannelHandlerContext中,调用channel()获得绑定的channel。可以通过调用handler()获得绑定的Handler。通过调用fire*方法来触发Channel的事件。

看下ChannelHandlerContext的定义:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker 

可以看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和传播相应的事件。

对于Inbound来说传播事件的方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

对于Outbound来说传播事件的方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

这些方法,在一个Handler中调用,然后将事件传递给下一个Handler,如下所示:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("Connected!");ctx.fireChannelActive();}}public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) {System.out.println("Closing ..");ctx.close(promise);}}

ChannelHandler中的状态变量

ChannelHandler是一个Handler类,一般情况下,这个类的实例是可以被多个channel共同使用的,前提是这个ChannelHandler没有共享的状态变量。

但有时候,我们必须要在ChannelHandler中保持一个状态,那么就涉及到ChannelHandler中的状态变量的问题,看下面的一个例子:

  public interface Message {// your methods here}public class DataServerHandler extends SimpleChannelInboundHandler<Message> {private boolean loggedIn;@Overridepublic void channelRead0(ChannelHandlerContext ctx, Message message) {if (message instanceof LoginMessage) {authenticate((LoginMessage) message);loggedIn = true;} else (message instanceof GetDataMessage) {if (loggedIn) {ctx.writeAndFlush(fetchSecret((GetDataMessage) message));} else {fail();}}}...}

这个例子中,我们需要在收到LoginMessage之后,对消息进行认证,并保存认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。

那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能出现状态不一致的问题。一个channel绑定一个Handler实例,很简单,只需要在initChannel方法中使用new关键字新建一个对象即可。

   public class DataServerInitializer extends ChannelInitializer<Channel> {@Overridepublic void initChannel(Channel channel) {channel.pipeline().addLast("handler", new DataServerHandler());}}

那么除了新建handler实例之外,还有没有其他的办法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是上面的例子,我们看一下使用AttributeKey应该怎么实现:

   public interface Message {// your methods here}@Sharablepublic class DataServerHandler extends SimpleChannelInboundHandler<Message> {private final AttributeKey<Boolean> auth =AttributeKey.valueOf("auth");@Overridepublic void channelRead(ChannelHandlerContext ctx, Message message) {Attribute<Boolean> attr = ctx.attr(auth);if (message instanceof LoginMessage) {authenticate((LoginMessage) o);attr.set(true);} else (message instanceof GetDataMessage) {if (Boolean.TRUE.equals(attr.get())) {ctx.writeAndFlush(fetchSecret((GetDataMessage) o));} else {fail();}}}...}

上例中,首先定义了一个AttributeKey,然后使用ChannelHandlerContext的attr方法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即使使用同一个Handler在不同的Channel中该属性也是不同的。

下面是使用共享Handler的例子:

   public class DataServerInitializer extends ChannelInitializer<Channel> {private static final DataServerHandler SHARED = new DataServerHandler();@Overridepublic void initChannel(Channel channel) {channel.pipeline().addLast("handler", SHARED);}}

注意,在定义DataServerHandler的时候,我们加上了@Sharable注解,如果一个ChannelHandler使用了@Sharable注解,那就意味着你可以只创建一次这个Handler,但是可以将其绑定到一个或者多个ChannelPipeline中。

注意,@Sharable注解是为java文档准备的,并不会影响到实际的代码执行效果。

异步Handler

之前介绍了,可以通过调用pipeline.addLast方法将handler加入到pipeline中,因为pipeline是一个filter的结构,所以加入的handler是顺序进行处理的。

但是,我希望某些handler是在新的线程中执行该怎么办?如果我们希望这些新的线程中执行的Handler是无序的又该怎么办?

比如我们现在有3个handler分别是MyHandler1,MyHandler2和MyHandler3。

顺序执行的写法是这样的:

ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("MyHandler1", new MyHandler1());pipeline.addLast("MyHandler2", new MyHandler2());pipeline.addLast("MyHandler3", new MyHandler3());

如果要让MyHandler3在新的线程中执行,则可以加入group选项,从而让handler在新的group中运行:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("MyHandler1", new MyHandler1());pipeline.addLast("MyHandler2", new MyHandler2());pipeline.addLast(group,"MyHandler3", new MyHandler3());

但是上例中DefaultEventExecutorGroup加入的Handler也是会顺序执行的,如果确实不想顺序执行,那么可以尝试考虑使用UnorderedThreadPoolEventExecutor 。

总结

本文讲解了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实践出发,进一步加深对netty的理解和应用,希望大家能够喜欢。

本文已收录于 http://www.flydean.com/05-netty-channelevent/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

netty系列之:Event、Handler和Pipeline相关推荐

  1. netty系列之:channelHandlerContext详解

    文章目录 简介 ChannelHandlerContext和它的应用 AbstractChannelHandlerContext DefaultChannelHandlerContext 总结 简介 ...

  2. netty系列之:自定义编码解码器

    文章目录 简介 自定义编码器 自定义解码器 添加编码解码器到pipeline 计算2的N次方 总结 简介 在之前的netty系列文章中,我们讲到了如何将对象或者String转换成为ByteBuf,通过 ...

  3. netty系列之:netty架构概述

    文章目录 简介 netty架构图 丰富的Buffer数据机构 零拷贝 统一的API 事件驱动 其他优秀的特性 总结 简介 Netty为什么这么优秀,它在JDK本身的NIO基础上又做了什么改进呢?它的架 ...

  4. netty 远程主机强迫关闭了一个现有的连接_死磕netty系列《一、netty基础概念》...

    1. Channel Channel代表了netty对网络连接的抽象,Channel是线程安全的,它提供了一些重要信息,比如当前网络连接的状态,远程的主机连接地址和本地的连接地址, 我们可以通过 Ch ...

  5. 【Netty系列_3】Netty源码分析之服务端channel

    highlight: androidstudio 前言 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精 ...

  6. 【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ?

    [读后感]Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ? 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商 ...

  7. Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列

    文章目录 1 Netty线程模型 1.1 传统阻塞 I/O 服务模型 1.2 Reactor线程模型 1.2.1 单 Reactor 单线程模型 1.2.2 单Reactor多线程 1.2.3 主从 ...

  8. Nvidia Deepstream小细节系列:Deepstream python保存pipeline结构图

    Nvidia Deepstream小细节系列:Deepstream python保存pipeline结构图 提示:此章节我们将着重阐述如何在Deepstream Python运行的情况下保存pipel ...

  9. 给Event handler传递动态参数

    有段时间没写些东西了......参加工作大半年 感觉是收获不小啊 新年以致 在这里我想真心的祝福所有园友 所有有梦想的人 新的一年技术更牛X 身体棒棒 职业发展更上一层 好了 回过来讲主要内容. 在j ...

最新文章

  1. 【Cocosd2d实例教程五】Cocos2d添加虚拟摇杆控制器
  2. 技术解析系列 | PouchContainer 支持 LXCFS 实现高可靠容器隔离
  3. gtw-050090|执行拦截器时发生异常_执行流程 | 你真的了解Spring AOP的执行顺序吗?...
  4. 【渝粤教育】国家开放大学2018年秋季 2006T经济数学基础12 参考试题
  5. 实战|手把手教你训练一个基于Keras的多标签图像分类器
  6. 左侧固定右侧自动填充_ai怎么填充颜色?在ai里怎么填充颜色?
  7. 【C语言】C语言学习整理-putchar,printf,getchar,scanf定义及区别
  8. 《哪吒之魔童降世》电影密钥延期至9月26日,将冲击中国票房总榜前三
  9. 准备在北京Tech·Ed上组织博客园聚会
  10. 今天起改用mac的marsedit写博
  11. 软考初级信息处理技术员(一)
  12. python批量下载抖音视频_Python一键批量下载抖音无水印视频
  13. Python实现康威生命游戏
  14. Label propagation
  15. zk+kafka集群
  16. c语言校友通讯录毕业论文,校友录毕业论文(C_+sql2005).doc
  17. 基于python高校学生管理系统
  18. 《惢客创业日记》2018.11.02(周五) “追梦大叔”的回忆
  19. Spark - Illegal pattern component: XXX 与org.apache.commons.lang3.time.FastDateFormat incompatible
  20. 市场调研—全球与中国手推式扫地机市场现状及未来发展趋势

热门文章

  1. 交叉编译openwrt php,构建 openwrt 交叉编译工具链
  2. Delphi之virtual,dynamic,abstract
  3. 【网络编程】之十一、重叠IO Overlapped IO 完成例程
  4. OpenCV 中的绘制功能
  5. Chromium Android编译指南
  6. 如何优雅的设计和使用缓存?
  7. 从Golang调度器的作者视角探究其设计之道!
  8. 使用iPhone相机和OpenCV来完成3D重建(第三部分)
  9. 数据分析利器:XGBoost算法最佳解析
  10. 2019 WAIC | 腾讯张正友:人工智能的热与酷