Pipeline是Netty中的另一核心组件,前面在说过在Channel进行初始化的时候最后创建一系列的重要对象,其中就有Pipeline
我们看下Netty官网对于Pipeline的定义

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel

Pipeline就是由一系列处理Channel的inbound和outbound事件的ChannelHandlers组成的集合。那我们来看下其具体的实现

Pipeline初始化

AbstractChannel

protected AbstractChannel(Channel parent, ChannelId id) {this.parent = parent;this.id = id;unsafe = newUnsafe();pipeline = newChannelPipeline();}
 protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}protected DefaultChannelPipeline(Channel channel) {//保存channel的信息到pipelinethis.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}

从Pipeline的初始化代码可以看出,Pipeline本质是一个双向链表,每个节点都是ChannelHandlerContext的对象,节点里保存了Pipeline的信息。这个链表的头是 HeadContext,链表的尾是 TailContext,并且每个ChannelHandlerContext 中又关联着一个ChannelHandler。

可以看下HeadContext和TailContext的类图


不难看出HeadContext实现了ChannelOutboundHandler接口 , TailContext实现了ChannelInboundHandler 接口,二者都实现了ChannelHandlerContext接口,可以说 head 和tail 既是一个ChannelHandler,又是一个ChannelHandlerContext (这里不太明白为什么Head实现了ChannelInboundHandler 接口,但是Inboud属性又是false ??)
再看下HeadContext的构造方法的代码

  HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}

有两个重要的参数inbound,outbound,用于标识节点的inbound和outbound属性,HeadContext传入了inbound=false,outbound=true , TailContext则相反,传入了inbound=true,outbound=false

初始化后的Pipeline结构如下所示:

Pipeline context添加

在使用Netty框架的时候,我们一般会在进行bootstrap或者serverBootstrap初始化的时候通过如下的方式向Pipeline中添加节点

    bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定义协议编码器.addLast("frameEncoder", new LengthFieldPrepender(4))//对象参数类型编码器.addLast("encoder", new ObjectEncoder())// 对象参数类型解码器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(myHandler);}});

调用handler时传入了 ChannelInitializer对象,它提供了一个initChannel()方法让我们实现自己的初始化操作,在初始化的时候我们自己又定义了对pipeline的添加操作addLast
这里的ChannelInitializer是一个ChannelInboundHanler对象,这里的channelHandler会在客户端启动的是执行如下代码的时候加入到pipeline中

void init(Channel channel) throws Exception {ChannelPipeline p = channel.pipeline();//这里的config.handler()调用的就是bootstrap.handler(),也就是我们自定义的ChannelInitializer对象p.addLast(config.handler());
}

那么我们来看下这个pipeline的addLast方法到底做了什么?
DefaultChannelPipeline实现了多个重载的addLast方法,最后会调用如下代码

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;}

主要分为四步:

  • 校验handler是否重复
  • 创建context节点
  • 添加context节点到Pipeline尾部
  • 执行回调通知
    注意下,这里有一个callHandlerCallbackLater方法,当channel还没有被注册的时候,会创建一个PendingHandlerAddedTask对象,它包含一个新增的context对象,后面会用到,先mark一下
判断handler是否重复
private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}

根据handler的added标识判断该handler是否已经添加过,如果handler不是sharable共享的且已经添加过就报错,否则将added置为true
这里的isSharable()就是通过该类上是否有Sharable注解来实现的,为了提高效率,Netty对其做了缓存

public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}
创建Context节点
newCtx = newContext(group, filterName(name, handler), handler);

先根据name和handler为这个context创建一个唯一的name

    private String filterName(String name, ChannelHandler handler) {if (name == null) {return generateName(handler);}checkDuplicateName(name);return name;}

如果name为空,就根据handler来自动生成一个name,默认是handler的类名+"#0",生成后直接校验该name是否重复;如果name是用户指定的,就直接校验name是否重复,重复的话就报错给用户

private String generateName(ChannelHandler handler) {Map<Class<?>, String> cache = nameCaches.get();Class<?> handlerType = handler.getClass();String name = cache.get(handlerType);if (name == null) {//根据handler类名生成默认的name handlerClassName+"#0"name = generateName0(handlerType);cache.put(handlerType, name);}//这里的context0就是用来检测pipeline里是否已有同名的context//如果同名则将最后的数字递增,一直到没有重名为止if (context0(name) != null) {String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.for (int i = 1;; i ++) {String newName = baseName + i;if (context0(newName) == null) {name = newName;break;}}}return name;}
//从head节点开始一直向下遍历到tail,判断Pipeline中是否已有同名的context
private AbstractChannelHandlerContext context0(String name) {AbstractChannelHandlerContext context = head.next;while (context != tail) {if (context.name().equals(name)) {return context;}context = context.next;}return null;}

接下来就是根据校验后的name生成对应的context节点,然后保存对应的handler,pipeline,executor,inbound,outbound属性字段。这里的inbound,outbound就是根据其实现的接口类型来判断的

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {//这里的group=nullreturn new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}
添加context到Pipeline尾部
 private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}

就是一个简单的双向链表的操作,把context节点加入到tail节点前的最后一个节点
自此,我们可以得到一个如下的Pipeline

执行回调通知

调用的是如下方法: callHandlerAdded0(newCtx);

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.handler().handlerAdded(ctx);ctx.setAddComplete();} catch (Throwable t) {}}final void setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return;}}}

节点添加之后,调用handler的handlerAdded通知ChannelHandler,并尝试设置context的状态为ADD_COMPLETE
setAddComplete的退出只有两种可能:
1.handler已经被移除 此时状态为REMOVE_COMPLETE
2.成功设置Handler状态为ADD_COMPLETE

initChannel的执行

现在我们已经添加了ChannelInitializer这个自定义的handler到Pipeline,但是我们真正想要添加到Pipeline的应该是定义在initChannel里的操作,那么这个方法又是在哪里执行的呢?
搜索一下ChannelInitializer代码,我们可以发现initChannel()方法在channelRegistered和handlerAdded方法里都有调用,那么实际是在哪个方法回调里执行的呢?我们从客户端的启动开始再次分析下流程

调用链如下:
Bootstrap.doResolveAndConnect()
–>AbstractBootstrap.initAndRegister()
–>SingleThreadEventLoop.register(channel)
–>SingleThreadEventLoop.register(promise)
–>AbstractChannel.register(eventLoop, promise)
–>AbstractChannel.register0(promise)
–>AbstractNioChannel.doRegister()

我们看下这里的register0方法

 private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}

在注册成功之后会先调用pipeline.invokeHandlerAddedIfNeeded,再调用pipeline.fireChannelRegistered()方法
先来看下invokeHandlerAddedIfNeeded方法:

final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();if (firstRegistration) {firstRegistration = false;// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,// that were added before the registration was done.callHandlerAddedForAllHandlers();}}

这里的firstRegistration=true,所以会执行对应的分支,说明当注册channel到eventLoop成功后,就可以通过handlerAdded回调方法去添加的我们定义channelHandler , 接下去看看这个方法是怎么做的

  private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;// This Channel itself was registered.registered = true;pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;// Null out so it can be GC'ed.this.pendingHandlerCallbackHead = null;}// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside// the EventLoop.PendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {task.execute();task = task.next;}}

先会拿到pendingHandlerCallbackHead(实现了runnable接口)对象,它就是前面在执行addLast(ChannelInitializer)时创建的,后面就是遍历这个链表,调用节点的execute()执行

 @Overridevoid execute() {EventExecutor executor = ctx.executor();if (executor.inEventLoop()) {callHandlerAdded0(ctx);} else {executor.execute(this);     }}private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {}}final void callHandlerAdded() throws Exception {// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates// any pipeline events ctx.handler() will miss them because the state will not allow it.if (setAddComplete()) {handler().handlerAdded(this);}}

后面的逻辑就非常清晰了,获取ChannelInitinal的handler并调用其handlerAdded方法,从而执行我们实现的initChannel方法,尝试将我们自己定义的channelHandler添加到Pipeline,也就是会执行如下代码:

 socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定义协议编码器.addLast("frameEncoder", new LengthFieldPrepender(4))//对象参数类型编码器.addLast("encoder", new ObjectEncoder())// 对象参数类型解码器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(myHandler);}

前面说到执行完invokeHandlerAddedIfNeeded,后面还有一个fireChannelRegistered的方法方法回调,我们来简单看下,代码如下:

public final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);return this;}static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRegistered();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});}}private void invokeChannelRegistered() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {notifyHandlerException(t);}} else {//对于Head节点,这里会执行这个分支代码fireChannelRegistered();}}

也就是说会从Head节点开始一直向下遍历,找到每个Inbound属性为true的节点,然后调用其channelRegistered方法做回调通知

此时我们就会得到如下的一个Pipeline(这里只画最后的自定义handler)

 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {//移除当前的ChannelIntinalizer节点remove(ctx);}return true;}return false;}

添加完上述的channelHandler之后,还有最后一步remove(ctx),也就是说会将当前的context(ChannelInitializer从Pipeline移除),就剩下如下的Pipeline双向链表了

移除的逻辑和添加类似,就不再讲了

总结

1.Pipeline底层是一个双向链表结构,添加和删除节点就是维护这个双向链表
2.Pipeline每个节点context都会有一个唯一name,默认是HandlerClassName+"#0"
3.客户端初始化添加用户自定义Handler到Pipeline,会先添加ChannelInitializer到Pipeline,在调用jdk底层NIO API完成注册后会添加自定义的Handler到Pipeline,然后从Pipeline移除ChannelInitializer

Netty学习笔记(五)Pipeline相关推荐

  1. Netty学习笔记(六)Pipeline的传播机制

    前面简单提到了下Pipeline的传播机制,这里再详细分析下 Pipeline的传播机制中有两个非常重要的属性inbound和outbound(AbstractChannelHandlerContex ...

  2. Netty学习笔记(五) 使用Netty构建静态网页服务器

    昨天在继续完善基于Netty构建的聊天室系统的过程中,发现了一个有意思的知识点,特此拿来做一个简单的静态网页服务器,好好的玩一玩Netty. 但是不管怎么说利用netty实现各种功能的流程都是类似的 ...

  3. Netty学习笔记(二)Netty服务端流程启动分析

    先贴下在NIO和Netty里启动服务端的代码 public class NioServer { /*** 指定端口号启动服务* */public boolean startServer(int por ...

  4. Netty学习笔记(二) 实现服务端和客户端

    在Netty学习笔记(一) 实现DISCARD服务中,我们使用Netty和Python实现了简单的丢弃DISCARD服务,这篇,我们使用Netty实现服务端和客户端交互的需求. 前置工作 开发环境 J ...

  5. python函数是一段具有特定功能的语句组_Python学习笔记(五)函数和代码复用

    本文将为您描述Python学习笔记(五)函数和代码复用,具体完成步骤: 函数能提高应用的模块性,和代码的重复利用率.在很多高级语言中,都可以使用函数实现多种功能.在之前的学习中,相信你已经知道Pyth ...

  6. Ethernet/IP 学习笔记五

    Ethernet/IP 学习笔记五 Accessing data within a device using a non-time critical message (an explicit mess ...

  7. StackExchange.Redis学习笔记(五) 发布和订阅

    StackExchange.Redis学习笔记(五) 发布和订阅 原文:StackExchange.Redis学习笔记(五) 发布和订阅 Redis命令中的Pub/Sub Redis在 2.0之后的版 ...

  8. 吴恩达《机器学习》学习笔记五——逻辑回归

    吴恩达<机器学习>学习笔记五--逻辑回归 一. 分类(classification) 1.定义 2.阈值 二. 逻辑(logistic)回归假设函数 1.假设的表达式 2.假设表达式的意义 ...

  9. 好程序员教程分析Vue学习笔记五

    好程序员教程分析Vue学习笔记五,上次我们学习了Vue的组件,这次我们来学习一下路由的使用.在Vue中,所谓的路由其实跟其他的框架中的路由的概念差不多,即指跳转的路径. 注意:在Vue中,要使用路由, ...

最新文章

  1. SQLiteOpenHelper的实现
  2. PAT甲级1028 List Sorting:[C++题解]排序,cin和cout会超时
  3. linux的mysql主主_Linux下指定mysql数据库数据配置主主同步的实例
  4. Android 中 Activity 的生命周期
  5. 我是怎么用机器学习技术找到女票的
  6. 微信WeixinJSBridge API
  7. JAVA蓝桥杯:高精度算法
  8. dubbo ---- 入门
  9. js动态产生对象push进数组,发现数组所有元素(element or object)一样
  10. IoTDB常用的SQL语句大全
  11. matlab 符号函数是什么意思,matlab符号函数定义
  12. 固态硬盘分为哪几种_通俗易懂 SSD固态硬盘接口有哪几种类型的图解
  13. PhotoShop 橡皮擦工具组、渐变工具
  14. TIA protal与SCL从入门到精通(6)——函数循环处理
  15. flutter取消动态字体大小
  16. DIV display与visibility
  17. 成都最最最牛逼的IT公司全在这了,来成都一起造富。。。
  18. Rocket.chat快速安装部署
  19. 第7篇:MS12-020蓝屏漏洞在实战中的巧用
  20. 发邮件礼仪汇总 让优秀成为职场达人习惯

热门文章

  1. Nginx_负载均衡配置讲解
  2. python 功能键ord_ord()函数以及Python中的示例
  3. Java获得时间 String与Timestamp互转
  4. 微信小程序出现【需要进行身份验证】弹框解决方法
  5. 今日题解------uvalive 2689
  6. 解决针对ubuntu11.04安装中文包后不能正常查看或使用pdf和Archiver的问题
  7. [转]HTTP协议及其请求头分析
  8. 【C语言位运算的应用】如何按bit位翻转一个无符号整型
  9. struts2重定向
  10. 设计模式(创建型)之原型模式