前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise.
promise是可写的future,从future的分析中可以发现在其中没有写操作的接口,netty特意使promise扩展了future,可以对异步操作结果进行设置。

(一)defaultpromise

包含的字段

//原子保存异步操作结果
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
//异步操作结果
private volatile Object result;
private final EventExecutor executor;

操作结果通知

为了能够支持异步地获取操作结果,netty中用通知的方式来对后续的listener中的操作,操作结果等进行控制。通知的前提包括success,fail,cancel三种状态。

  1. success状态:setsuccess()方法
  • 第一步:异步操作结束调用setSuccess(V result)或trySuccess(V result)
    方法,将操作结果当做参数传入,来通知可以对结果进行使用。

      public Promise<V> setSuccess(V result) {if (setSuccess0(result)) {//触发listener中的operationcomplete()方法notifyListeners();return this;}throw new IllegalStateException("complete already: " + this);}
    
  • 第二步:首先调用setsuccess0()方法对result变量进行保存,如果保存成功则通过notifyListeners()触发listener中的operationcomplete()方法.(此处先不对notifylisteners()方法进行分析,见下文)

      #setsuccess0()方法:private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result);}#setvalue0()方法:private boolean setValue0(Object objResult) {if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {checkNotifyWaiters();return true;}return false;}

    注意:在success状态下保存结果时,如果result(异步操作结果)为null,则将promise内部的result设置为常量SUCCESS。再者,在promise的result中,只允许保存一次,所以netty采用cas保证结果只保存一遍,若结果保存出错返回false.

      #SUCCESS常量private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
  • 第三步:保存完结果后,通知所有同步等待异步操作结果的线程。

      private synchronized void checkNotifyWaiters() {if (waiters > 0) {notifyAll();}} 

    2.success状态:trysuccess()方法

    trysuccess()方法与setsuccess()方法大同小异,只不过在保存结果出错的时候,返回false,而setsuccess()抛出一个异常信息。

    public boolean trySuccess(V result) {if (setSuccess0(result)) {notifyListeners();return true;}return false;
    }
    

    3.fail状态:

    fail状态下通知机制和success几乎相同,区别在于保存异步操作结果的时候,fail状态保存的是使用CauseHolder进行封装的异常信息对象。

    private boolean setFailure0(Throwable cause) {return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }
    

    4.cancel状态:
    cancel状态,表示异步操作的时候,对promise对象进行了cancel操作。

    public boolean cancel(boolean mayInterruptIfRunning) {if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {checkNotifyWaiters();notifyListeners();return true;}return false;
    }

    同样的,cancel后也和success和fail一样,对result进行了设置。在success的时候,允许初始值为null和UNCANCELLABLE常量(表示不允许cancel),在cancel状态只允许为null.

       # CANCELLATION_CAUSE_HOLDER//封装了CancellationException异常。private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
    new CancellationException(), DefaultPromise.class, "cancel(...)"));
    

    5.操作结果通知总结:

    首先,通过调用setsuccess()等方法,启动通知机制;
    然后,将异步操作结果进行保存,仅允许保存一次,否则会返回false.
    保存好信息后,触发listener中的操作,还会通知所有同步等待异步操作结果的线程。

添加监听者

接下来分析promise如何添加监听者。

(一). 首先来看一下用来保存监听者对象的字段。

/*** One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.** Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.*/private Object listeners;

从注释可以总结出那么几个信息:

 1.这个object类型的listeners字段,可以是GenericFutureListener类型,也可以是link DefaultFutureListeners(用来保存GenericFutureListener的数组)。ps:这样设计的好处,多数情况下,listener只有一个,用集合或者数组会造成浪费,只有真正需要多个监听者的时候,才使用数组2.如果listeners为null,表示还未添加监听者或者已经触发过了(一旦触发就会将listeners清空)3.可以在外部添加监听者,所以使用加锁的形式(synchronized(this))添加监听者。

(二). 添加监听器的过程

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");synchronized (this) {addListener0(listener);}if (isDone()) {notifyListeners();}return this;
}

用加锁的方式添加监听器,添加完成后,如果promise的状态为isdone,就会立即触发Listener.接下来看看addlistener0()是如何添加的。

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {if (listeners == null) {listeners = listener;} else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener);} else {listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);}
}

前面已经分析过,listeners字段只可能是GenericFutureListener类型,或者DefaultFutureListeners类型。所以如果为Null,直接保存;如果已经是DefaultFutureListeners(数组形式),就让其再添加一个listener;如果是GenericFutureListener类型,就创建一个数组。


(三)DefaultFutureListeners类分析:

首先看看它的构造方法:

DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {listeners = new GenericFutureListener[2];listeners[0] = first;listeners[1] = second;size = 2;if (first instanceof GenericProgressiveFutureListener) {progressiveSize ++;}if (second instanceof GenericProgressiveFutureListener) {progressiveSize ++;}
}

可以看出,从构造之初,他就是size为2的数组。

再来看看它是如何在后续过程中添加元素的:

  public void add(GenericFutureListener<? extends Future<?>> l) {GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;final int size = this.size;if (size == listeners.length) {this.listeners = listeners = Arrays.copyOf(listeners, size << 1);}listeners[size] = l;this.size = size + 1;if (l instanceof GenericProgressiveFutureListener) {progressiveSize ++;}}

如果数组容量未满,就继续添加元素;如果数组容量已满,就将容量翻倍,将原数组内容复制拷贝到新数组中。


(四)添加监听者总结:

1.添加完监听者,就会尝试去触发listener中的操作。
2.promise内部用来保存监听者的listeners只会是两种类型,GenericFutureListener类型和link DefaultFutureListeners。

触发监听者

在前面的setsuccess()和addlistener()等方法中都可以看到notifylisteners()方法,这就是触发监听者的起点。

private void notifyListeners() {EventExecutor executor = executor();if (executor.inEventLoop()) {……notifyListenersNow();……}safeExecute(executor, new Runnable() {@Overridepublic void run() {notifyListenersNow();}});
}private static void safeExecute(EventExecutor executor, Runnable task) {executor.execute(task);……
}

在notifylisteners()方法中,可以看到,listener中触发的异步操作要求是在线程组中执行的,如果是在线程组外部提交的任务,会将任务封装成runnable提交到任务队列中等待执行。
接下来看看notifynow()方法中做了什么。

private void notifyListenersNow() {Object listeners;synchronized (this) {// Only proceed if there are listeners to notify and we are not already notifying listeners.if (notifyingListeners || this.listeners == null) {return;}notifyingListeners = true;listeners = this.listeners;this.listeners = null;}for (;;) {if (listeners instanceof DefaultFutureListeners) {notifyListeners0((DefaultFutureListeners) listeners);} else {notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);}synchronized (this) {if (this.listeners == null) {// Nothing can throw from within this method, so setting notifyingListeners back to false does not// need to be in a finally block.notifyingListeners = false;return;}listeners = this.listeners;this.listeners = null;}}
}

在该方法中,首先将Listeners取出来,然后将其清空(每次触发完listeners都会将原来的listeners清空),然后执行listener中具体的操作,执行完操作,会再次检查是否又有listeners添加进来,确保无误后,从方法中退出。

private static void notifyListener0(Future future, GenericFutureListener l) {try {l.operationComplete(future);} catch (Throwable t) {logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);}
}

触发监听者总结:
1.触发的listeners中具体的操作是在线程池中进行
2.触发完毕的listeners会将其清空。


同步等待

netty还提供了接口可以同步等待异步操作结果,使用到的是await()和sync()方法。

 public Promise<V> await() throws InterruptedException {if (isDone()) {return this;}if (Thread.interrupted()) {throw new InterruptedException(toString());}checkDeadLock();synchronized (this) {while (!isDone()) {incWaiters();try {wait();} finally {decWaiters();}}}return this;}

原理很简单,就是让线程在promise对象上等待通知。如果是isdone状态,就直接返回。
sync()方法是在await()方法的基础上添加了额外的功能,区别只是sync()调用,如果异步操作失败,则会抛出异常。

  public Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this;
}

(二)defaultchannelpromise

以上分析得出一个疑惑?
从defaultpromise的分析可以得知,listener中的操作是由线程池来执行。但注意到defaultpromise的其中一个权限为protected的构造方法不需要传入eventexecutor,这可能导致出现nullpoint异常。
所以出现了另一个扩展类,defaultchannelpromise.

 public DefaultChannelPromise(Channel channel) {this.channel = checkNotNull(channel, "channel");
}public DefaultChannelPromise(Channel channel, EventExecutor executor) {super(executor);this.channel = checkNotNull(channel, "channel");
}

defaultchannelpromise类有两个构造方法,一个为父类传入eventexecutor,一个调用的是上面提到的父类中protected的构造方法,那它是如何解决eventexecutor空指向的异常的?

protected EventExecutor executor() {EventExecutor e = super.executor();if (e == null) {return channel().eventLoop();} else {return e;}
}

可以看到,当eventexecutor为Null时,保存的是channel中的eventexecutor.

netty中的future和promise源码分析(二)相关推荐

  1. WebRTC[1]-WebRTC中h264解码过程的源码分析

    目录 前言 正文 <WebRTC工作原理精讲>系列-总览_liuzhen007的专栏-CSDN博客_webrtc 原理前言欢迎大家订阅Data-Mining 的<WebRTC工作原理 ...

  2. ENS最新合约源码分析二

    ENS(以太坊域名服务)智能合约源码分析二 0.简介 ​ 本次分享直接使用线上实际注册流程来分析最新注册以太坊域名的相关代码.本次主要分析最新的关于普通域名注册合约和普通域名迁移合约,短域名竞拍合约不 ...

  3. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  4. 【转】ABP源码分析二十四:Notification

    NotificationDefinition: 用于封装Notification Definnition 的信息.注意和Notification 的区别,如果把Notification看成是具体的消息 ...

  5. 【转】ABP源码分析二十三:Authorization

    Permission:用于定义一个Permission,一个permission可以包含多个子Permission. PermissionDictionary:继承自Dictionary<str ...

  6. SpringBoot源码分析(二)之自动装配demo

    SpringBoot源码分析(二)之自动装配demo 文章目录 SpringBoot源码分析(二)之自动装配demo 前言 一.创建RedissonTemplate的Maven服务 二.创建测试服务 ...

  7. gSOAP 源码分析(二)

    gSOAP 源码分析(二) 2012-5-24 flyfish 一 gSOAP XML介绍 Xml的全称是EXtensible Markup Language.可扩展标记语言.仅仅是一个纯文本.适合用 ...

  8. Android Q 10.1 KeyMaster源码分析(二) - 各家方案的实现

    写在之前 这两篇文章是我2021年3月初看KeyMaster的笔记,本来打算等分析完KeyMaster和KeyStore以后再一起做成一系列贴出来,后来KeyStore的分析中断了,这一系列的文章就变 ...

  9. 【投屏】Scrcpy源码分析二(Client篇-连接阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

最新文章

  1. python 简易 http server
  2. [HTTP]Etag的工作流程
  3. web页面刷不出来 白色_今日头条连接超时刷不出来解决方案
  4. Apache NIO 框架 Mina 使用中出现 too many open files 有关...
  5. ES6 Proxy和Reflect (上)
  6. C++ 如何释放std::function中绑定的对象
  7. SpringAOP原理解析
  8. Python使用TCP通讯例子
  9. resin设置权限_如何配置resin 3.1.9
  10. 设计一款内容阅读app,要求与市面上的产品有差异化,列举其核心功能,画出页面设计原型图(2-3个),说明其产品价值
  11. 【Alpha】事后诸葛亮
  12. python表示差值_python差值函数
  13. 云旗OS助手火了!可一站式体验统信UOS
  14. (R语言)R的统计模型
  15. ps后期处理实用技巧2
  16. Graphite详解
  17. 手淘Android容器架构——Atlas的前世今生
  18. Pandas的MultiIndex多层索引使用
  19. ionic页面footer按钮右下方的小三角实现
  20. 面试题之二:中断服务函数

热门文章

  1. 如何让引擎蜘蛛天天光临你的网站
  2. sql优化之:深入浅出理解索引(系列二)(讲解非常透彻)
  3. [导入]ZT笑到内伤:史上最雷,最爆寒的电影字幕
  4. java实现简单窗体小游戏----球球大作战
  5. 同样的代码,conda无法运行,命令行却可以运行
  6. 随笔:项目感想、知识总结、未来展望
  7. CF643E Bear and Destroying Subtrees
  8. 使用 JMeter 进行压力测试
  9. ACM-ICPC 2018 焦作赛区网络预赛 J(二分+JAVA高精)
  10. 一起学Hadoop——Hadoop的前世今生