先看下Future的整个继承体系,还有一个ChannelFuture不在里面;

    在并发编程中,我们通常会用到一组非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。 可以说这一套模型是很多异步非阻塞架构的基础。
    这一套经典的模型在 Scala、C# 中得到了原生的支持,但 JDK 中暂时还只有无 Callback 的 Future 出现,当然也并非在 JAVA 界就没有发展了,比如 Guava 就提供了ListenableFuture 接口,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 机制,在 Netty 的官方文档 Using as a generic library 中也介绍了将 Netty 作为一个 lib 包依赖,并且使用 Listenable futures 的示例。在实际的项目使用中,发现 Netty 的 EventLoop 机制不一定适用其他场景,因此想去除对 EventLoop 的依赖,实现一个简化版本。

    Netty自己实现了一套并发库,Future是其中的一块,下篇文章讲下他的并发库的线程池实现。Netty的Future的特性
  • Future<V>的V为异步结果的返回类型
  • getNow 是无阻塞调用,返回异步执行结果,如果未完成那么返回null
  • await 是阻塞调用,等到异步执行完成
  • isSuccess 执行成功是否成功
  • sync  阻塞调用,等待这个future直到isDone(可能由于正常终止、异常或取消而完成)返回true; 如果该future失败,重新抛出失败的原因。 和await区别就是返回结果不同,它返回一个Future对象,通过这个Future知道任务执行结果。
  • 添加GenericFutureListener, 执行完成(future可能由于正常终止、异常或取消而完成)后调用该监听器。
如果我实现这个Future怎么实现:1:任务执行器,这样我们可以控制任务的执行了。2:执行结果,用于返回。3:监听器集合。4:执行结果状态

5:触发监听器的函数。那么怎么执行完成后自定触发监听器,应该在Future里面又启动了一个线程去执行这个触发机制。这些都是我的猜想。看下他的子类怎么实现的:
 
AbstractFuture实现了JDK中的get方法,调用netty中future的方法,一个模板模式出现了。所有的Future都会继承该类
Promise:任务执行者可以标记任务执行成功或失败,添加了setFailure(Throwable)可以知道Promise的子类需要有个成员变量来保存异常,添加了setSuccess(V) 方法,setUncancellable()方法, 和tryFailure和trySuccess方法。这样所有的操作都可以返回一个Promise,你自己检测是否执行成功,好处是啥,接口统一吗?
ScheduledFuture:一个定时任务的执行结果
ProgressiveFuture:可以跟踪任务的执行进度。
下面看下几个类的具体实现:
DefaultPromise:成员变量如下:
  1. privatefinalEventExecutor executor; //任务执行器
  2. privatevolatileObject result;//不仅仅是结果,也有可能是异常
  3. * 一个或多个监听器,可能是GenericFutureListener或者DefaultFutureListeners。如果是NULL有两种可能* 1:没有添加触发器* 2:已经出发了
  4. privateObject listeners;
  5. privateLateListeners lateListeners;
  6. privateshort waiters;
看来异常也是结果。

看一个重要方法的实现:
isDone:可能由于正常终止、异常或取消而完成
  1. privatestaticboolean isDone0(Object result){
  2. return result !=null&& result != UNCANCELLABLE;
  3. }
isSuccess: 任务执行成功

  1. publicboolean isSuccess(){
  2. Object result =this.result;
  3. if(result ==null|| result == UNCANCELLABLE){
  4. returnfalse;
  5. }
  6. return!(result instanceofCauseHolder);
  7. }
getNow:返回执行结果

  1. public V getNow(){
  2. Object result =this.result;
  3. if(result instanceofCauseHolder|| result == SUCCESS){
  4. returnnull;
  5. }
  6. return(V) result;
  7. }
sync:同步阻塞方法

  1. @Override
  2. publicPromise<V> sync()throwsInterruptedException{
  3. await();
  4. rethrowIfFailed();
  5. returnthis;
  6. }
看来比await多了抛出异常。
await:等待任务执行完成
  1. @Override
  2. publicPromise<V> await()throwsInterruptedException{
  3. if(isDone()){
  4. returnthis;
  5. }
  6. if(Thread.interrupted()){
  7. thrownewInterruptedException(toString());
  8. }
  9. synchronized(this){
  10. while(!isDone()){
  11. checkDeadLock();//判断当前线程是否是执行线程。如果是抛出异常。
  12. incWaiters();//添加等待个数
  13. try{
  14. wait();//释放锁,等待唤醒,阻塞该线程
  15. }finally{
  16. decWaiters();
  17. }
  18. }
  19. }
  20. returnthis;
  21. }
执行器所在的线程不能调用await(),只能是调用者所在的线程才可以,waiters有什么用呢?

cancle方法使用:
  1. @Override
  2. publicboolean cancel(boolean mayInterruptIfRunning){
  3. Object result =this.result;
  4. if(isDone0(result)|| result == UNCANCELLABLE){
  5. returnfalse;
  6. }
  7. synchronized(this){
  8. // Allow only once.
  9. result =this.result;
  10. if(isDone0(result)|| result == UNCANCELLABLE){
  11. returnfalse;
  12. }
  13. this.result = CANCELLATION_CAUSE_HOLDER;
  14. if(hasWaiters()){
  15. notifyAll();
  16. }
  17. }
  18. notifyListeners();
  19. returntrue;
  20. }
当我们修改DefaultPromise的状态时,要触发监听器。

notifyListeners:
  1. /**
  2. * 该方法不需要异步,为啥呢
  3. * 1:这个方法在同步代码块里面调用,因此任何监听器列表的改变都happens-before该方法
  4. * 2:该方法只有isDone==true的时候调用,一但 isDone==true 那么监听器列表将不会改变
  5. */
  6. privatevoid notifyListeners(){
  7. Object listeners =this.listeners;
  8. if(listeners ==null){
  9. return;
  10. }
  11. EventExecutor executor = executor();
  12. if(executor.inEventLoop()){
  13. finalInternalThreadLocalMap threadLocals =InternalThreadLocalMap.get();
  14. finalint stackDepth = threadLocals.futureListenerStackDepth();
  15. if(stackDepth < MAX_LISTENER_STACK_DEPTH){
  16. threadLocals.setFutureListenerStackDepth(stackDepth +1);
  17. try{
  18. if(listeners instanceofDefaultFutureListeners){
  19. notifyListeners0(this,(DefaultFutureListeners) listeners);
  20. }else{
  21. finalGenericFutureListener<?extendsFuture<V>> l =
  22. (GenericFutureListener<?extendsFuture<V>>) listeners;
  23. notifyListener0(this, l);
  24. }
  25. }finally{
  26. this.listeners =null;
  27. threadLocals.setFutureListenerStackDepth(stackDepth);
  28. }
  29. return;
  30. }
  31. }
  32. if(listeners instanceofDefaultFutureListeners){
  33. finalDefaultFutureListeners dfl =(DefaultFutureListeners) listeners;
  34. execute(executor,newRunnable(){
  35. @Override
  36. publicvoid run(){
  37. notifyListeners0(DefaultPromise.this, dfl);
  38. DefaultPromise.this.listeners =null;
  39. }
  40. });
  41. }else{
  42. finalGenericFutureListener<?extendsFuture<V>> l =
  43. (GenericFutureListener<?extendsFuture<V>>) listeners;
  44. execute(executor,newRunnable(){
  45. @Override
  46. publicvoid run(){
  47. notifyListener0(DefaultPromise.this, l);
  48. DefaultPromise.this.listeners =null;
  49. }
  50. });
  51. }
  52. }
任务永远不能在主线程中执行,需要放到执行器所在的线程执行。DefaultFutureListeners和GenericFutureListener,一个是容器,一个是元素

还有几个修改状态的方法:
  1. @Override
  2. publicboolean setUncancellable(){
  3. Object result =this.result;
  4. if(isDone0(result)){
  5. return!isCancelled0(result);
  6. }
  7. synchronized(this){
  8. // Allow only once.
  9. result =this.result;
  10. if(isDone0(result)){
  11. return!isCancelled0(result);
  12. }
  13. this.result = UNCANCELLABLE;
  14. }
  15. returntrue;
  16. }
  17. privateboolean setFailure0(Throwable cause){
  18. if(cause ==null){
  19. thrownewNullPointerException("cause");
  20. }
  21. if(isDone()){
  22. returnfalse;
  23. }
  24. synchronized(this){
  25. // Allow only once.
  26. if(isDone()){
  27. returnfalse;
  28. }
  29. result =newCauseHolder(cause);
  30. if(hasWaiters()){
  31. notifyAll();
  32. }
  33. }
  34. returntrue;
  35. }
  36. privateboolean setSuccess0(V result){
  37. if(isDone()){
  38. returnfalse;
  39. }
  40. synchronized(this){
  41. // Allow only once.
  42. if(isDone()){
  43. returnfalse;
  44. }
  45. if(result ==null){
  46. this.result = SUCCESS;
  47. }else{
  48. this.result = result;
  49. }
  50. if(hasWaiters()){
  51. notifyAll();
  52. }
  53. }
  54. returntrue;
  55. }
CompleteFuture的几个子类是状态Promise
 
PromiseTask:该类继承了RunnableFuture接口,该类表示异步操作的结果也可以异步获得,类似JDK中的FutureTask,实例化该对象时候需要传一个Callable的对象,如果没有该对象可以传递一个Runnable和一个Result构造一个Callable对象。
  1. privatestaticfinalclassRunnableAdapter<T>implementsCallable<T>{
  2. finalRunnable task;
  3. final T result;
  4. RunnableAdapter(Runnable task, T result){
  5. this.task = task;
  6. this.result = result;
  7. }
  8. @Override
  9. public T call(){
  10. task.run();
  11. return result;
  12. }
  13. @Override
  14. publicString toString(){
  15. return"Callable(task: "+ task +", result: "+ result +')';
  16. }
  17. }
看下他的run方法。
  1. @Override
  2. publicvoid run(){
  3. try{
  4. if(setUncancellableInternal()){
  5. V result = task.call();
  6. setSuccessInternal(result);
  7. }
  8. }catch(Throwable e){
  9. setFailureInternal(e);
  10. }
该类的setFailure,setSuccess等方法都会抛出一异常,而加了internal的方法会成功执行,他们是protected,子类或者同一个package中可以调用。

ScheduledFutureTask:该类是定时任务返回的ChannelFuture是这个结构中最重要的类,从名称可以知道是通道异步执行的结果:在netty中所有的IO操作都是异步的。这意味这所有的IO调用都会立即返回,且不保证IO操作完成。

IO调用会返回一个ChannelFuture的实例,通过该实例可以查看IO操作的结果和状态,

ChannelFuture有完成和未完成两种状态,当IO操作开始,就会创建一个ChannelFuture的实例,该实例初始是未完成状态,它不是成功,失败,或者取消,因为IO操作还没有完成,如果IO操作完成了那么将会有成功,失败,和取消状态,

*                                      +---------------------------+*                                      | Completed successfully    |*                                      +---------------------------+*                                 +---->      isDone() = <b>true</b>      |* +--------------------------+    |    |   isSuccess() = <b>true</b>      |* |        Uncompleted       |    |    +===========================+* +--------------------------+    |    | Completed with failure    |* |      isDone() = <b>false</b>    |    |    +---------------------------+* |   isSuccess() = false    |----+---->   isDone() = <b>true</b>         |* | isCancelled() = false    |    |    | cause() = <b>non-null</b>     |* |    cause() = null     |    |    +===========================+* +--------------------------+    |    | Completed by cancellation |*                                 |    +---------------------------+*                                 +---->      isDone() = <b>true</b>      |*                                      | isCancelled() = <b>true</b>      |*                                      +---------------------------+
 该类提供了很多方法用来检查IO操作是否完成,等待完成,和接受IO操作的结果。还可以添加ChannelFutureListener的监听器,这样IO操作完成时就可以得到提醒* 强烈建议使用addListener而不是await。* addListener是非阻塞的,它简单的添加指定的ChannelFutureListener到ChannelFuture中,* IO线程将在当绑定在这个future的IO操作完成时,触发这个触发器,优点是提高效率和资源的利用率* await()是一个阻塞方法,一旦调用,调用线程将会阻塞直到IO操作完成。优点是容易实现顺序逻辑

 
 

 
 
来自为知笔记(Wiz)

转载于:https://www.cnblogs.com/gaoxing/p/4401794.html

Netty中的Future相关推荐

  1. netty中的future和promise源码分析(二)

    前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...

  2. 03-Netty中的Future接口

    文章目录 Netty中的Future体系 一.Future 1.1 Future接口 1.1.1 JDK Future 1.1.2 Netty Future 1.2 Future派生子接口 1.2.1 ...

  3. Netty中的那些坑

    Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部 ...

  4. 中的listeners_Netty源码学习(6)-- Netty中的异步处理

    Java中的Future:对于一个异步操作,可以暂时返回一个Future对象,然后去做别的事情.最后通过get方法拿到结果.如果get时异步操作还没有完成,则进行阻塞状态. Netty对Future类 ...

  5. netty系列之:在netty中使用protobuf协议

    文章目录 简介 定义protobuf 定义handler 设置ChannelPipeline 构建client和server端并运行 总结 简介 netty中有很多适配不同协议的编码工具,对于流行的g ...

  6. netty系列之:netty中的Channel详解

    文章目录 简介 Channel详解 异步IO和ChannelFuture Channel的层级结构 释放资源 事件处理 总结 简介 Channel是连接ByteBuf和Event的桥梁,netty中的 ...

  7. 这样讲 Netty 中的心跳机制,还有谁不会?

    作者:永顺 segmentfault.com/a/1190000006931568 基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, ...

  8. netty中的websocket

    使用WebSocket 协议来实现一个基于浏览器的聊天室应用程序,图12-1 说明了该应用程序的逻辑: (1)客户端发送一个消息: (2)该消息将被广播到所有其他连接的客户端. WebSocket 在 ...

  9. Netty 中的粘包和拆包详解

    Netty 底层是基于 TCP 协议来处理网络数据传输.我们知道 TCP 协议是面向字节流的协议,数据像流水一样在网络中传输那何来 "包" 的概念呢? TCP是四层协议不负责数据逻 ...

最新文章

  1. Web.xml 文件与server.xml 文件使用总结
  2. 本地文件夹如何断开svn连接
  3. DSP:6678开发板NDK网口通信完整实现(附源码)
  4. 算法练习day9——190327(“之” 字形打印矩阵、在行列都排好序的矩阵中找数、打印两个有序链表的公共部分、判断一个链表是否为回文结构)
  5. 语音识别 | GMM-HMM、DNN-HMM等主流算法及前沿技术
  6. java自学语法_java 基础语法学习
  7. Blazor 组件库开发指南
  8. iOS中常见的6种传值方式,UIPageViewController
  9. windows系统如何进入环境变量
  10. C#窗体在任务栏对窗体放大或缩小
  11. 【ElasticSearch】Es 源码之 IndicesModule 源码解读
  12. C++头文件中定义全局变量在多次引用时出现变量多次定义符号冲突的解决办法...
  13. 随想录(udp经验总结)
  14. this.scrollheight获取textarea的高度是0_53小米电子时钟/v1.0 介绍
  15. echarts雷达图
  16. [线性相关] 皮尔森相关系数的计算及假设检验
  17. Verilog自学:关于门级,数据流级,行为级建模
  18. 解决explorer.exe 应用程序错误,内存不能为 read或written的解决方法小结
  19. 阿里巴巴使用的Rax源码
  20. 奇文共欣赏 疑义相与析

热门文章

  1. 研发工程师如何转型项目经理
  2. oracle rac war配置,Oracle RAC安装配置流程
  3. devops .net_DevOps vs. Agile:它们有什么共同点吗?
  4. 教师新学年工作愿景_新学年的3个创新开源项目
  5. 您不会相信Buzzfeed如何处理变更管理
  6. fpga 开源264编码_更好的开源安全性,学习编码,开放式家庭设计等
  7. Bootstrap导航中禁用导航链接
  8. Bootstrap 默认栅格系统
  9. CSS 制作垂直导航
  10. CSS UI状态伪类选择器