Netty中的Future
先看下Future的整个继承体系,还有一个ChannelFuture不在里面;
- Future<V>的V为异步结果的返回类型
- getNow 是无阻塞调用,返回异步执行结果,如果未完成那么返回null
- await 是阻塞调用,等到异步执行完成
- isSuccess 执行成功是否成功
- sync 阻塞调用,等待这个future直到isDone(可能由于正常终止、异常或取消而完成)返回true; 如果该future失败,重新抛出失败的原因。 和await区别就是返回结果不同,它返回一个Future对象,通过这个Future知道任务执行结果。
- 添加GenericFutureListener, 执行完成(future可能由于正常终止、异常或取消而完成)后调用该监听器。
privatefinalEventExecutor executor; //任务执行器
privatevolatileObject result;//不仅仅是结果,也有可能是异常
* 一个或多个监听器,可能是GenericFutureListener或者DefaultFutureListeners。如果是NULL有两种可能* 1:没有添加触发器* 2:已经出发了
privateObject listeners;
privateLateListeners lateListeners;
privateshort waiters;
privatestaticboolean isDone0(Object result){
return result !=null&& result != UNCANCELLABLE;
}
publicboolean isSuccess(){
Object result =this.result;
if(result ==null|| result == UNCANCELLABLE){
returnfalse;
}
return!(result instanceofCauseHolder);
}
public V getNow(){
Object result =this.result;
if(result instanceofCauseHolder|| result == SUCCESS){
returnnull;
}
return(V) result;
}
@Override
publicPromise<V> sync()throwsInterruptedException{
await();
rethrowIfFailed();
returnthis;
}
@Override
publicPromise<V> await()throwsInterruptedException{
if(isDone()){
returnthis;
}
if(Thread.interrupted()){
thrownewInterruptedException(toString());
}
synchronized(this){
while(!isDone()){
checkDeadLock();//判断当前线程是否是执行线程。如果是抛出异常。
incWaiters();//添加等待个数
try{
wait();//释放锁,等待唤醒,阻塞该线程
}finally{
decWaiters();
}
}
}
returnthis;
}
@Override
publicboolean cancel(boolean mayInterruptIfRunning){
Object result =this.result;
if(isDone0(result)|| result == UNCANCELLABLE){
returnfalse;
}
synchronized(this){
// Allow only once.
result =this.result;
if(isDone0(result)|| result == UNCANCELLABLE){
returnfalse;
}
this.result = CANCELLATION_CAUSE_HOLDER;
if(hasWaiters()){
notifyAll();
}
}
notifyListeners();
returntrue;
}
/**
* 该方法不需要异步,为啥呢
* 1:这个方法在同步代码块里面调用,因此任何监听器列表的改变都happens-before该方法
* 2:该方法只有isDone==true的时候调用,一但 isDone==true 那么监听器列表将不会改变
*/
privatevoid notifyListeners(){
Object listeners =this.listeners;
if(listeners ==null){
return;
}
EventExecutor executor = executor();
if(executor.inEventLoop()){
finalInternalThreadLocalMap threadLocals =InternalThreadLocalMap.get();
finalint stackDepth = threadLocals.futureListenerStackDepth();
if(stackDepth < MAX_LISTENER_STACK_DEPTH){
threadLocals.setFutureListenerStackDepth(stackDepth +1);
try{
if(listeners instanceofDefaultFutureListeners){
notifyListeners0(this,(DefaultFutureListeners) listeners);
}else{
finalGenericFutureListener<?extendsFuture<V>> l =
(GenericFutureListener<?extendsFuture<V>>) listeners;
notifyListener0(this, l);
}
}finally{
this.listeners =null;
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
if(listeners instanceofDefaultFutureListeners){
finalDefaultFutureListeners dfl =(DefaultFutureListeners) listeners;
execute(executor,newRunnable(){
@Override
publicvoid run(){
notifyListeners0(DefaultPromise.this, dfl);
DefaultPromise.this.listeners =null;
}
});
}else{
finalGenericFutureListener<?extendsFuture<V>> l =
(GenericFutureListener<?extendsFuture<V>>) listeners;
execute(executor,newRunnable(){
@Override
publicvoid run(){
notifyListener0(DefaultPromise.this, l);
DefaultPromise.this.listeners =null;
}
});
}
}
@Override
publicboolean setUncancellable(){
Object result =this.result;
if(isDone0(result)){
return!isCancelled0(result);
}
synchronized(this){
// Allow only once.
result =this.result;
if(isDone0(result)){
return!isCancelled0(result);
}
this.result = UNCANCELLABLE;
}
returntrue;
}
privateboolean setFailure0(Throwable cause){
if(cause ==null){
thrownewNullPointerException("cause");
}
if(isDone()){
returnfalse;
}
synchronized(this){
// Allow only once.
if(isDone()){
returnfalse;
}
result =newCauseHolder(cause);
if(hasWaiters()){
notifyAll();
}
}
returntrue;
}
privateboolean setSuccess0(V result){
if(isDone()){
returnfalse;
}
synchronized(this){
// Allow only once.
if(isDone()){
returnfalse;
}
if(result ==null){
this.result = SUCCESS;
}else{
this.result = result;
}
if(hasWaiters()){
notifyAll();
}
}
returntrue;
}
CompleteFuture的几个子类是状态Promise
PromiseTask:该类继承了RunnableFuture接口,该类表示异步操作的结果也可以异步获得,类似JDK中的FutureTask,实例化该对象时候需要传一个Callable的对象,如果没有该对象可以传递一个Runnable和一个Result构造一个Callable对象。
privatestaticfinalclassRunnableAdapter<T>implementsCallable<T>{
finalRunnable task;
final T result;
RunnableAdapter(Runnable task, T result){
this.task = task;
this.result = result;
}
@Override
public T call(){
task.run();
return result;
}
@Override
publicString toString(){
return"Callable(task: "+ task +", result: "+ result +')';
}
}
@Override
publicvoid run(){
try{
if(setUncancellableInternal()){
V result = task.call();
setSuccessInternal(result);
}
}catch(Throwable e){
setFailureInternal(e);
}
IO调用会返回一个ChannelFuture的实例,通过该实例可以查看IO操作的结果和状态,
ChannelFuture有完成和未完成两种状态,当IO操作开始,就会创建一个ChannelFuture的实例,该实例初始是未完成状态,它不是成功,失败,或者取消,因为IO操作还没有完成,如果IO操作完成了那么将会有成功,失败,和取消状态,
* +---------------------------+* | Completed successfully |* +---------------------------+* +----> isDone() = <b>true</b> |* +--------------------------+ | | isSuccess() = <b>true</b> |* | Uncompleted | | +===========================+* +--------------------------+ | | Completed with failure |* | isDone() = <b>false</b> | | +---------------------------+* | isSuccess() = false |----+----> isDone() = <b>true</b> |* | isCancelled() = false | | | cause() = <b>non-null</b> |* | cause() = null | | +===========================+* +--------------------------+ | | Completed by cancellation |* | +---------------------------+* +----> isDone() = <b>true</b> |* | isCancelled() = <b>true</b> |* +---------------------------+
该类提供了很多方法用来检查IO操作是否完成,等待完成,和接受IO操作的结果。还可以添加ChannelFutureListener的监听器,这样IO操作完成时就可以得到提醒* 强烈建议使用addListener而不是await。* addListener是非阻塞的,它简单的添加指定的ChannelFutureListener到ChannelFuture中,* IO线程将在当绑定在这个future的IO操作完成时,触发这个触发器,优点是提高效率和资源的利用率* await()是一个阻塞方法,一旦调用,调用线程将会阻塞直到IO操作完成。优点是容易实现顺序逻辑
转载于:https://www.cnblogs.com/gaoxing/p/4401794.html
Netty中的Future相关推荐
- netty中的future和promise源码分析(二)
前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...
- 03-Netty中的Future接口
文章目录 Netty中的Future体系 一.Future 1.1 Future接口 1.1.1 JDK Future 1.1.2 Netty Future 1.2 Future派生子接口 1.2.1 ...
- Netty中的那些坑
Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部 ...
- 中的listeners_Netty源码学习(6)-- Netty中的异步处理
Java中的Future:对于一个异步操作,可以暂时返回一个Future对象,然后去做别的事情.最后通过get方法拿到结果.如果get时异步操作还没有完成,则进行阻塞状态. Netty对Future类 ...
- netty系列之:在netty中使用protobuf协议
文章目录 简介 定义protobuf 定义handler 设置ChannelPipeline 构建client和server端并运行 总结 简介 netty中有很多适配不同协议的编码工具,对于流行的g ...
- netty系列之:netty中的Channel详解
文章目录 简介 Channel详解 异步IO和ChannelFuture Channel的层级结构 释放资源 事件处理 总结 简介 Channel是连接ByteBuf和Event的桥梁,netty中的 ...
- 这样讲 Netty 中的心跳机制,还有谁不会?
作者:永顺 segmentfault.com/a/1190000006931568 基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, ...
- netty中的websocket
使用WebSocket 协议来实现一个基于浏览器的聊天室应用程序,图12-1 说明了该应用程序的逻辑: (1)客户端发送一个消息: (2)该消息将被广播到所有其他连接的客户端. WebSocket 在 ...
- Netty 中的粘包和拆包详解
Netty 底层是基于 TCP 协议来处理网络数据传输.我们知道 TCP 协议是面向字节流的协议,数据像流水一样在网络中传输那何来 "包" 的概念呢? TCP是四层协议不负责数据逻 ...
最新文章
- Web.xml 文件与server.xml 文件使用总结
- 本地文件夹如何断开svn连接
- DSP:6678开发板NDK网口通信完整实现(附源码)
- 算法练习day9——190327(“之” 字形打印矩阵、在行列都排好序的矩阵中找数、打印两个有序链表的公共部分、判断一个链表是否为回文结构)
- 语音识别 | GMM-HMM、DNN-HMM等主流算法及前沿技术
- java自学语法_java 基础语法学习
- Blazor 组件库开发指南
- iOS中常见的6种传值方式,UIPageViewController
- windows系统如何进入环境变量
- C#窗体在任务栏对窗体放大或缩小
- 【ElasticSearch】Es 源码之 IndicesModule 源码解读
- C++头文件中定义全局变量在多次引用时出现变量多次定义符号冲突的解决办法...
- 随想录(udp经验总结)
- this.scrollheight获取textarea的高度是0_53小米电子时钟/v1.0 介绍
- echarts雷达图
- [线性相关] 皮尔森相关系数的计算及假设检验
- Verilog自学:关于门级,数据流级,行为级建模
- 解决explorer.exe 应用程序错误,内存不能为 read或written的解决方法小结
- 阿里巴巴使用的Rax源码
- 奇文共欣赏 疑义相与析